summaryrefslogtreecommitdiff
path: root/kvmd/plugins/ugpio/gpio.py
diff options
context:
space:
mode:
Diffstat (limited to 'kvmd/plugins/ugpio/gpio.py')
-rw-r--r--kvmd/plugins/ugpio/gpio.py62
1 files changed, 29 insertions, 33 deletions
diff --git a/kvmd/plugins/ugpio/gpio.py b/kvmd/plugins/ugpio/gpio.py
index 90426e13..4b951057 100644
--- a/kvmd/plugins/ugpio/gpio.py
+++ b/kvmd/plugins/ugpio/gpio.py
@@ -21,15 +21,12 @@
from typing import Dict
-from typing import Set
from typing import Optional
-from ... import aiotools
-from ... import gpio
-
-from ...yamlconf import Option
+import gpiod
-from ...validators.basic import valid_float_f01
+from ... import aiotools
+from ... import aiogp
from . import BaseUserGpioDriver
@@ -40,59 +37,58 @@ class Plugin(BaseUserGpioDriver):
self,
instance_name: str,
notifier: aiotools.AioNotifier,
-
- state_poll: float,
) -> None:
super().__init__(instance_name, notifier)
- self.__state_poll = state_poll
-
- self.__input_pins: Set[int] = set()
+ self.__input_pins: Dict[int, aiogp.AioReaderPinParams] = {}
self.__output_pins: Dict[int, Optional[bool]] = {}
- self.__reader: Optional[gpio.BatchReader] = None
+ self.__reader: Optional[aiogp.AioReader] = None
- @classmethod
- def get_plugin_options(cls) -> Dict:
- return {
- "state_poll": Option(0.1, type=valid_float_f01),
- }
+ self.__chip: Optional[gpiod.Chip] = None
+ self.__output_lines: Dict[int, gpiod.Line] = {}
- def register_input(self, pin: int) -> None:
- self.__input_pins.add(pin)
+ def register_input(self, pin: int, debounce: float) -> None:
+ self.__input_pins[pin] = aiogp.AioReaderPinParams(False, debounce)
def register_output(self, pin: int, initial: Optional[bool]) -> None:
self.__output_pins[pin] = initial
def prepare(self) -> None:
assert self.__reader is None
- self.__reader = gpio.BatchReader(
- pins=set([
- *map(gpio.set_input, self.__input_pins),
- *[
- gpio.set_output(pin, initial)
- for (pin, initial) in self.__output_pins.items()
- ],
- ]),
- interval=self.__state_poll,
+ self.__reader = aiogp.AioReader(
+ path=aiogp.DEVICE_PATH,
+ consumer="kvmd/ugpio-gpio/inputs",
+ pins=self.__input_pins,
notifier=self._notifier,
)
+ self.__chip = gpiod.Chip(aiogp.DEVICE_PATH)
+ for (pin, initial) in self.__output_pins.items():
+ line = self.__chip.get_line(pin)
+ line.request("kvmd/ugpio-gpio/outputs", gpiod.LINE_REQ_DIR_OUT, default_val=int(initial or False))
+ self.__output_lines[pin] = line
+
async def run(self) -> None:
assert self.__reader
await self.__reader.poll()
def cleanup(self) -> None:
- for (pin, initial) in self.__output_pins.items():
- if initial is not None:
- gpio.write(pin, initial)
+ if self.__chip:
+ try:
+ self.__chip.close()
+ except Exception:
+ pass
def read(self, pin: int) -> bool:
- return gpio.read(pin)
+ assert self.__reader
+ if pin in self.__input_pins:
+ return self.__reader.get(pin)
+ return bool(self.__output_lines[pin].get_value())
def write(self, pin: int, state: bool) -> None:
- gpio.write(pin, state)
+ self.__output_lines[pin].set_value(int(state))
def __str__(self) -> str:
return f"GPIO({self._instance_name})"