From e28dec4e333657182f3b7faceebabb28889b3403 Mon Sep 17 00:00:00 2001 From: Maxim Devaev Date: Sun, 14 Jan 2024 22:25:09 +0200 Subject: libgpiod 2.x api --- kvmd/plugins/ugpio/gpio.py | 36 +++++++++++++++++++++++------------- kvmd/plugins/ugpio/locator.py | 34 +++++++++++++++++++--------------- 2 files changed, 42 insertions(+), 28 deletions(-) (limited to 'kvmd/plugins/ugpio') diff --git a/kvmd/plugins/ugpio/gpio.py b/kvmd/plugins/ugpio/gpio.py index 39c379f3..b86799a1 100644 --- a/kvmd/plugins/ugpio/gpio.py +++ b/kvmd/plugins/ugpio/gpio.py @@ -54,9 +54,7 @@ class Plugin(BaseUserGpioDriver): self.__output_pins: dict[int, (bool | None)] = {} self.__reader: (aiogp.AioReader | None) = None - - self.__chip: (gpiod.Chip | None) = None - self.__output_lines: dict[int, gpiod.Line] = {} + self.__outputs_request: (gpiod.LineRequest | None) = None @classmethod def get_plugin_options(cls) -> dict: @@ -76,27 +74,34 @@ class Plugin(BaseUserGpioDriver): def prepare(self) -> None: assert self.__reader is None + assert self.__outputs_request is None self.__reader = aiogp.AioReader( path=self.__device_path, consumer="kvmd::gpio::inputs", pins=self.__input_pins, notifier=self._notifier, ) - - self.__chip = gpiod.Chip(self.__device_path) - for (pin, initial) in self.__output_pins.items(): - line = self.__chip.get_line(pin) - line.request("kvmd::gpio::outputs", gpiod.LINE_REQ_DIR_OUT, default_vals=[int(initial or False)]) - self.__output_lines[pin] = line + if self.__output_pins: + self.__outputs_request = gpiod.request_lines( + self.__device_path, + consumer="kvmd::gpiod::outputs", + config={ + pin: gpiod.LineSettings( + direction=gpiod.line.Direction.OUTPUT, + output_value=gpiod.line.Value(initial or False), + ) + for (pin, initial) in self.__output_pins.items() + }, + ) async def run(self) -> None: assert self.__reader await self.__reader.poll() async def cleanup(self) -> None: - if self.__chip: + if self.__outputs_request: try: - self.__chip.close() + self.__outputs_request.release() except Exception: pass @@ -105,10 +110,15 @@ class Plugin(BaseUserGpioDriver): pin_int = int(pin) if pin_int in self.__input_pins: return self.__reader.get(pin_int) - return bool(self.__output_lines[pin_int].get_value()) + assert self.__outputs_request + assert pin_int in self.__output_pins + return bool(self.__outputs_request.get_value(pin_int).value) async def write(self, pin: str, state: bool) -> None: - self.__output_lines[int(pin)].set_value(int(state)) + assert self.__outputs_request + pin_int = int(pin) + assert pin_int in self.__output_pins + self.__outputs_request.set_value(pin_int, gpiod.line.Value(state)) def __str__(self) -> str: return f"GPIO({self._instance_name})" diff --git a/kvmd/plugins/ugpio/locator.py b/kvmd/plugins/ugpio/locator.py index b30c0530..79c10df7 100644 --- a/kvmd/plugins/ugpio/locator.py +++ b/kvmd/plugins/ugpio/locator.py @@ -53,9 +53,7 @@ class Plugin(BaseUserGpioDriver): self.__device_path = device_path self.__tasks: dict[int, (asyncio.Task | None)] = {} - - self.__chip: (gpiod.Chip | None) = None - self.__lines: dict[int, gpiod.Line] = {} + self.__line_request: (gpiod.LineRequest | None) = None @classmethod def get_plugin_options(cls) -> dict: @@ -76,11 +74,16 @@ class Plugin(BaseUserGpioDriver): self.__tasks[int(pin)] = None def prepare(self) -> None: - self.__chip = gpiod.Chip(self.__device_path) - for pin in self.__tasks: - line = self.__chip.get_line(pin) - line.request("kvmd::locator::outputs", gpiod.LINE_REQ_DIR_OUT, default_vals=[0]) - self.__lines[pin] = line + self.__line_request = gpiod.request_lines( + self.__device_path, + consumer="kvmd::locator", + config={ + tuple(self.__tasks): gpiod.LineSettings( + direction=gpiod.line.Direction.OUTPUT, + output_value=gpiod.line.Value(False), + ), + }, + ) async def cleanup(self) -> None: tasks = [ @@ -91,9 +94,9 @@ class Plugin(BaseUserGpioDriver): for task in tasks: task.cancel() await asyncio.gather(*tasks, return_exceptions=True) - if self.__chip: + if self.__line_request: try: - self.__chip.close() + self.__line_request.release() except Exception: pass @@ -111,17 +114,18 @@ class Plugin(BaseUserGpioDriver): self.__tasks[pin_int] = None async def __blink(self, pin: int) -> None: - line = self.__lines[pin] + assert pin in self.__tasks + assert self.__line_request try: - state = 1 + state = True while True: - line.set_value(state) - state = int(not state) + self.__line_request.set_value(pin, gpiod.line.Value(state)) + state = (not state) await asyncio.sleep(0.1) except asyncio.CancelledError: pass finally: - line.set_value(0) + self.__line_request.set_value(pin, gpiod.line.Value(False)) def __str__(self) -> str: return f"Locator({self._instance_name})" -- cgit v1.2.3