diff options
Diffstat (limited to 'kvmd/plugins/ugpio/gpio.py')
-rw-r--r-- | kvmd/plugins/ugpio/gpio.py | 62 |
1 files changed, 29 insertions, 33 deletions
diff --git a/kvmd/plugins/ugpio/gpio.py b/kvmd/plugins/ugpio/gpio.py index 90426e13..4b951057 100644 --- a/kvmd/plugins/ugpio/gpio.py +++ b/kvmd/plugins/ugpio/gpio.py @@ -21,15 +21,12 @@ from typing import Dict -from typing import Set from typing import Optional -from ... import aiotools -from ... import gpio - -from ...yamlconf import Option +import gpiod -from ...validators.basic import valid_float_f01 +from ... import aiotools +from ... import aiogp from . import BaseUserGpioDriver @@ -40,59 +37,58 @@ class Plugin(BaseUserGpioDriver): self, instance_name: str, notifier: aiotools.AioNotifier, - - state_poll: float, ) -> None: super().__init__(instance_name, notifier) - self.__state_poll = state_poll - - self.__input_pins: Set[int] = set() + self.__input_pins: Dict[int, aiogp.AioReaderPinParams] = {} self.__output_pins: Dict[int, Optional[bool]] = {} - self.__reader: Optional[gpio.BatchReader] = None + self.__reader: Optional[aiogp.AioReader] = None - @classmethod - def get_plugin_options(cls) -> Dict: - return { - "state_poll": Option(0.1, type=valid_float_f01), - } + self.__chip: Optional[gpiod.Chip] = None + self.__output_lines: Dict[int, gpiod.Line] = {} - def register_input(self, pin: int) -> None: - self.__input_pins.add(pin) + def register_input(self, pin: int, debounce: float) -> None: + self.__input_pins[pin] = aiogp.AioReaderPinParams(False, debounce) def register_output(self, pin: int, initial: Optional[bool]) -> None: self.__output_pins[pin] = initial def prepare(self) -> None: assert self.__reader is None - self.__reader = gpio.BatchReader( - pins=set([ - *map(gpio.set_input, self.__input_pins), - *[ - gpio.set_output(pin, initial) - for (pin, initial) in self.__output_pins.items() - ], - ]), - interval=self.__state_poll, + self.__reader = aiogp.AioReader( + path=aiogp.DEVICE_PATH, + consumer="kvmd/ugpio-gpio/inputs", + pins=self.__input_pins, notifier=self._notifier, ) + self.__chip = gpiod.Chip(aiogp.DEVICE_PATH) + for (pin, initial) in self.__output_pins.items(): + line = self.__chip.get_line(pin) + line.request("kvmd/ugpio-gpio/outputs", gpiod.LINE_REQ_DIR_OUT, default_val=int(initial or False)) + self.__output_lines[pin] = line + async def run(self) -> None: assert self.__reader await self.__reader.poll() def cleanup(self) -> None: - for (pin, initial) in self.__output_pins.items(): - if initial is not None: - gpio.write(pin, initial) + if self.__chip: + try: + self.__chip.close() + except Exception: + pass def read(self, pin: int) -> bool: - return gpio.read(pin) + assert self.__reader + if pin in self.__input_pins: + return self.__reader.get(pin) + return bool(self.__output_lines[pin].get_value()) def write(self, pin: int, state: bool) -> None: - gpio.write(pin, state) + self.__output_lines[pin].set_value(int(state)) def __str__(self) -> str: return f"GPIO({self._instance_name})" |