summaryrefslogtreecommitdiff
path: root/kvmd/plugins/ugpio/locator.py
diff options
context:
space:
mode:
authorMaxim Devaev <[email protected]>2024-01-14 22:25:09 +0200
committerMaxim Devaev <[email protected]>2024-01-14 22:25:09 +0200
commite28dec4e333657182f3b7faceebabb28889b3403 (patch)
tree9d23324ef29bab1141525bcedae885070729427d /kvmd/plugins/ugpio/locator.py
parente1c6d1a99099562fbf42b917d83f4e00de785fa0 (diff)
libgpiod 2.x api
Diffstat (limited to 'kvmd/plugins/ugpio/locator.py')
-rw-r--r--kvmd/plugins/ugpio/locator.py34
1 files changed, 19 insertions, 15 deletions
diff --git a/kvmd/plugins/ugpio/locator.py b/kvmd/plugins/ugpio/locator.py
index b30c0530..79c10df7 100644
--- a/kvmd/plugins/ugpio/locator.py
+++ b/kvmd/plugins/ugpio/locator.py
@@ -53,9 +53,7 @@ class Plugin(BaseUserGpioDriver):
self.__device_path = device_path
self.__tasks: dict[int, (asyncio.Task | None)] = {}
-
- self.__chip: (gpiod.Chip | None) = None
- self.__lines: dict[int, gpiod.Line] = {}
+ self.__line_request: (gpiod.LineRequest | None) = None
@classmethod
def get_plugin_options(cls) -> dict:
@@ -76,11 +74,16 @@ class Plugin(BaseUserGpioDriver):
self.__tasks[int(pin)] = None
def prepare(self) -> None:
- self.__chip = gpiod.Chip(self.__device_path)
- for pin in self.__tasks:
- line = self.__chip.get_line(pin)
- line.request("kvmd::locator::outputs", gpiod.LINE_REQ_DIR_OUT, default_vals=[0])
- self.__lines[pin] = line
+ self.__line_request = gpiod.request_lines(
+ self.__device_path,
+ consumer="kvmd::locator",
+ config={
+ tuple(self.__tasks): gpiod.LineSettings(
+ direction=gpiod.line.Direction.OUTPUT,
+ output_value=gpiod.line.Value(False),
+ ),
+ },
+ )
async def cleanup(self) -> None:
tasks = [
@@ -91,9 +94,9 @@ class Plugin(BaseUserGpioDriver):
for task in tasks:
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
- if self.__chip:
+ if self.__line_request:
try:
- self.__chip.close()
+ self.__line_request.release()
except Exception:
pass
@@ -111,17 +114,18 @@ class Plugin(BaseUserGpioDriver):
self.__tasks[pin_int] = None
async def __blink(self, pin: int) -> None:
- line = self.__lines[pin]
+ assert pin in self.__tasks
+ assert self.__line_request
try:
- state = 1
+ state = True
while True:
- line.set_value(state)
- state = int(not state)
+ self.__line_request.set_value(pin, gpiod.line.Value(state))
+ state = (not state)
await asyncio.sleep(0.1)
except asyncio.CancelledError:
pass
finally:
- line.set_value(0)
+ self.__line_request.set_value(pin, gpiod.line.Value(False))
def __str__(self) -> str:
return f"Locator({self._instance_name})"