diff options
-rw-r--r-- | kvmd/plugins/ugpio/gpio.py | 55 |
1 files changed, 26 insertions, 29 deletions
diff --git a/kvmd/plugins/ugpio/gpio.py b/kvmd/plugins/ugpio/gpio.py index 90426e13..aa43caf1 100644 --- a/kvmd/plugins/ugpio/gpio.py +++ b/kvmd/plugins/ugpio/gpio.py @@ -24,12 +24,10 @@ from typing import Dict from typing import Set from typing import Optional -from ... import aiotools -from ... import gpio - -from ...yamlconf import Option +import gpiod -from ...validators.basic import valid_float_f01 +from ... import aiotools +from ... import aiogp from . import BaseUserGpioDriver @@ -40,24 +38,17 @@ class Plugin(BaseUserGpioDriver): self, instance_name: str, notifier: aiotools.AioNotifier, - - state_poll: float, ) -> None: super().__init__(instance_name, notifier) - self.__state_poll = state_poll - self.__input_pins: Set[int] = set() self.__output_pins: Dict[int, Optional[bool]] = {} - self.__reader: Optional[gpio.BatchReader] = None + self.__reader: Optional[aiogp.AioPinsReader] = None - @classmethod - def get_plugin_options(cls) -> Dict: - return { - "state_poll": Option(0.1, type=valid_float_f01), - } + self.__chip: Optional[gpiod.Chip] = None + self.__output_lines: Dict[int, gpiod.Line] = {} def register_input(self, pin: int) -> None: self.__input_pins.add(pin) @@ -67,32 +58,38 @@ class Plugin(BaseUserGpioDriver): def prepare(self) -> None: assert self.__reader is None - self.__reader = gpio.BatchReader( - pins=set([ - *map(gpio.set_input, self.__input_pins), - *[ - gpio.set_output(pin, initial) - for (pin, initial) in self.__output_pins.items() - ], - ]), - interval=self.__state_poll, + self.__reader = aiogp.AioPinsReader( + path="/dev/gpiochip0", + consumer="kvmd/ugpio-gpio/inputs", + pins=dict.fromkeys(self.__input_pins, False), notifier=self._notifier, ) + self.__chip = gpiod.Chip("/dev/gpiochip0") + for (pin, initial) in self.__output_pins.items(): + line = self.__chip.get_line(pin) + line.request("kvmd/ugpio-gpio/outputs", gpiod.LINE_REQ_DIR_OUT, default_val=int(initial or False)) + self.__output_lines[pin] = line + async def run(self) -> None: assert self.__reader await self.__reader.poll() def cleanup(self) -> None: - for (pin, initial) in self.__output_pins.items(): - if initial is not None: - gpio.write(pin, initial) + if self.__chip: + try: + self.__chip.close() + except Exception: + pass def read(self, pin: int) -> bool: - return gpio.read(pin) + assert self.__reader + if pin in self.__input_pins: + return self.__reader.get(pin) + return bool(self.__output_lines[pin].get_value()) def write(self, pin: int, state: bool) -> None: - gpio.write(pin, state) + self.__output_lines[pin].set_value(int(state)) def __str__(self) -> str: return f"GPIO({self._instance_name})" |