summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kvmd/plugins/ugpio/gpio.py55
1 files changed, 26 insertions, 29 deletions
diff --git a/kvmd/plugins/ugpio/gpio.py b/kvmd/plugins/ugpio/gpio.py
index 90426e13..aa43caf1 100644
--- a/kvmd/plugins/ugpio/gpio.py
+++ b/kvmd/plugins/ugpio/gpio.py
@@ -24,12 +24,10 @@ from typing import Dict
from typing import Set
from typing import Optional
-from ... import aiotools
-from ... import gpio
-
-from ...yamlconf import Option
+import gpiod
-from ...validators.basic import valid_float_f01
+from ... import aiotools
+from ... import aiogp
from . import BaseUserGpioDriver
@@ -40,24 +38,17 @@ class Plugin(BaseUserGpioDriver):
self,
instance_name: str,
notifier: aiotools.AioNotifier,
-
- state_poll: float,
) -> None:
super().__init__(instance_name, notifier)
- self.__state_poll = state_poll
-
self.__input_pins: Set[int] = set()
self.__output_pins: Dict[int, Optional[bool]] = {}
- self.__reader: Optional[gpio.BatchReader] = None
+ self.__reader: Optional[aiogp.AioPinsReader] = None
- @classmethod
- def get_plugin_options(cls) -> Dict:
- return {
- "state_poll": Option(0.1, type=valid_float_f01),
- }
+ self.__chip: Optional[gpiod.Chip] = None
+ self.__output_lines: Dict[int, gpiod.Line] = {}
def register_input(self, pin: int) -> None:
self.__input_pins.add(pin)
@@ -67,32 +58,38 @@ class Plugin(BaseUserGpioDriver):
def prepare(self) -> None:
assert self.__reader is None
- self.__reader = gpio.BatchReader(
- pins=set([
- *map(gpio.set_input, self.__input_pins),
- *[
- gpio.set_output(pin, initial)
- for (pin, initial) in self.__output_pins.items()
- ],
- ]),
- interval=self.__state_poll,
+ self.__reader = aiogp.AioPinsReader(
+ path="/dev/gpiochip0",
+ consumer="kvmd/ugpio-gpio/inputs",
+ pins=dict.fromkeys(self.__input_pins, False),
notifier=self._notifier,
)
+ self.__chip = gpiod.Chip("/dev/gpiochip0")
+ for (pin, initial) in self.__output_pins.items():
+ line = self.__chip.get_line(pin)
+ line.request("kvmd/ugpio-gpio/outputs", gpiod.LINE_REQ_DIR_OUT, default_val=int(initial or False))
+ self.__output_lines[pin] = line
+
async def run(self) -> None:
assert self.__reader
await self.__reader.poll()
def cleanup(self) -> None:
- for (pin, initial) in self.__output_pins.items():
- if initial is not None:
- gpio.write(pin, initial)
+ if self.__chip:
+ try:
+ self.__chip.close()
+ except Exception:
+ pass
def read(self, pin: int) -> bool:
- return gpio.read(pin)
+ assert self.__reader
+ if pin in self.__input_pins:
+ return self.__reader.get(pin)
+ return bool(self.__output_lines[pin].get_value())
def write(self, pin: int, state: bool) -> None:
- gpio.write(pin, state)
+ self.__output_lines[pin].set_value(int(state))
def __str__(self) -> str:
return f"GPIO({self._instance_name})"