diff options
author | Maxim Devaev <[email protected]> | 2024-01-14 22:25:09 +0200 |
---|---|---|
committer | Maxim Devaev <[email protected]> | 2024-01-14 22:25:09 +0200 |
commit | e28dec4e333657182f3b7faceebabb28889b3403 (patch) | |
tree | 9d23324ef29bab1141525bcedae885070729427d /kvmd/plugins/ugpio/locator.py | |
parent | e1c6d1a99099562fbf42b917d83f4e00de785fa0 (diff) |
libgpiod 2.x api
Diffstat (limited to 'kvmd/plugins/ugpio/locator.py')
-rw-r--r-- | kvmd/plugins/ugpio/locator.py | 34 |
1 files changed, 19 insertions, 15 deletions
diff --git a/kvmd/plugins/ugpio/locator.py b/kvmd/plugins/ugpio/locator.py index b30c0530..79c10df7 100644 --- a/kvmd/plugins/ugpio/locator.py +++ b/kvmd/plugins/ugpio/locator.py @@ -53,9 +53,7 @@ class Plugin(BaseUserGpioDriver): self.__device_path = device_path self.__tasks: dict[int, (asyncio.Task | None)] = {} - - self.__chip: (gpiod.Chip | None) = None - self.__lines: dict[int, gpiod.Line] = {} + self.__line_request: (gpiod.LineRequest | None) = None @classmethod def get_plugin_options(cls) -> dict: @@ -76,11 +74,16 @@ class Plugin(BaseUserGpioDriver): self.__tasks[int(pin)] = None def prepare(self) -> None: - self.__chip = gpiod.Chip(self.__device_path) - for pin in self.__tasks: - line = self.__chip.get_line(pin) - line.request("kvmd::locator::outputs", gpiod.LINE_REQ_DIR_OUT, default_vals=[0]) - self.__lines[pin] = line + self.__line_request = gpiod.request_lines( + self.__device_path, + consumer="kvmd::locator", + config={ + tuple(self.__tasks): gpiod.LineSettings( + direction=gpiod.line.Direction.OUTPUT, + output_value=gpiod.line.Value(False), + ), + }, + ) async def cleanup(self) -> None: tasks = [ @@ -91,9 +94,9 @@ class Plugin(BaseUserGpioDriver): for task in tasks: task.cancel() await asyncio.gather(*tasks, return_exceptions=True) - if self.__chip: + if self.__line_request: try: - self.__chip.close() + self.__line_request.release() except Exception: pass @@ -111,17 +114,18 @@ class Plugin(BaseUserGpioDriver): self.__tasks[pin_int] = None async def __blink(self, pin: int) -> None: - line = self.__lines[pin] + assert pin in self.__tasks + assert self.__line_request try: - state = 1 + state = True while True: - line.set_value(state) - state = int(not state) + self.__line_request.set_value(pin, gpiod.line.Value(state)) + state = (not state) await asyncio.sleep(0.1) except asyncio.CancelledError: pass finally: - line.set_value(0) + self.__line_request.set_value(pin, gpiod.line.Value(False)) def __str__(self) -> str: return f"Locator({self._instance_name})" |