summaryrefslogtreecommitdiff
path: root/kvmd/plugins/ugpio/gpio.py
diff options
context:
space:
mode:
authorMaxim Devaev <[email protected]>2024-01-14 22:25:09 +0200
committerMaxim Devaev <[email protected]>2024-01-14 22:25:09 +0200
commite28dec4e333657182f3b7faceebabb28889b3403 (patch)
tree9d23324ef29bab1141525bcedae885070729427d /kvmd/plugins/ugpio/gpio.py
parente1c6d1a99099562fbf42b917d83f4e00de785fa0 (diff)
libgpiod 2.x api
Diffstat (limited to 'kvmd/plugins/ugpio/gpio.py')
-rw-r--r--kvmd/plugins/ugpio/gpio.py36
1 files changed, 23 insertions, 13 deletions
diff --git a/kvmd/plugins/ugpio/gpio.py b/kvmd/plugins/ugpio/gpio.py
index 39c379f3..b86799a1 100644
--- a/kvmd/plugins/ugpio/gpio.py
+++ b/kvmd/plugins/ugpio/gpio.py
@@ -54,9 +54,7 @@ class Plugin(BaseUserGpioDriver):
self.__output_pins: dict[int, (bool | None)] = {}
self.__reader: (aiogp.AioReader | None) = None
-
- self.__chip: (gpiod.Chip | None) = None
- self.__output_lines: dict[int, gpiod.Line] = {}
+ self.__outputs_request: (gpiod.LineRequest | None) = None
@classmethod
def get_plugin_options(cls) -> dict:
@@ -76,27 +74,34 @@ class Plugin(BaseUserGpioDriver):
def prepare(self) -> None:
assert self.__reader is None
+ assert self.__outputs_request is None
self.__reader = aiogp.AioReader(
path=self.__device_path,
consumer="kvmd::gpio::inputs",
pins=self.__input_pins,
notifier=self._notifier,
)
-
- self.__chip = gpiod.Chip(self.__device_path)
- for (pin, initial) in self.__output_pins.items():
- line = self.__chip.get_line(pin)
- line.request("kvmd::gpio::outputs", gpiod.LINE_REQ_DIR_OUT, default_vals=[int(initial or False)])
- self.__output_lines[pin] = line
+ if self.__output_pins:
+ self.__outputs_request = gpiod.request_lines(
+ self.__device_path,
+ consumer="kvmd::gpiod::outputs",
+ config={
+ pin: gpiod.LineSettings(
+ direction=gpiod.line.Direction.OUTPUT,
+ output_value=gpiod.line.Value(initial or False),
+ )
+ for (pin, initial) in self.__output_pins.items()
+ },
+ )
async def run(self) -> None:
assert self.__reader
await self.__reader.poll()
async def cleanup(self) -> None:
- if self.__chip:
+ if self.__outputs_request:
try:
- self.__chip.close()
+ self.__outputs_request.release()
except Exception:
pass
@@ -105,10 +110,15 @@ class Plugin(BaseUserGpioDriver):
pin_int = int(pin)
if pin_int in self.__input_pins:
return self.__reader.get(pin_int)
- return bool(self.__output_lines[pin_int].get_value())
+ assert self.__outputs_request
+ assert pin_int in self.__output_pins
+ return bool(self.__outputs_request.get_value(pin_int).value)
async def write(self, pin: str, state: bool) -> None:
- self.__output_lines[int(pin)].set_value(int(state))
+ assert self.__outputs_request
+ pin_int = int(pin)
+ assert pin_int in self.__output_pins
+ self.__outputs_request.set_value(pin_int, gpiod.line.Value(state))
def __str__(self) -> str:
return f"GPIO({self._instance_name})"