summaryrefslogtreecommitdiff
path: root/kvmd/plugins/ugpio/gpio.py
diff options
context:
space:
mode:
Diffstat (limited to 'kvmd/plugins/ugpio/gpio.py')
-rw-r--r--kvmd/plugins/ugpio/gpio.py28
1 files changed, 18 insertions, 10 deletions
diff --git a/kvmd/plugins/ugpio/gpio.py b/kvmd/plugins/ugpio/gpio.py
index 28f0d5cb..315de6f1 100644
--- a/kvmd/plugins/ugpio/gpio.py
+++ b/kvmd/plugins/ugpio/gpio.py
@@ -21,7 +21,9 @@
from typing import Dict
+from typing import Callable
from typing import Optional
+from typing import Any
import gpiod
@@ -31,6 +33,7 @@ from ... import aiogp
from ...yamlconf import Option
from ...validators.os import valid_abs_path
+from ...validators.hw import valid_gpio_pin
from . import BaseUserGpioDriver
@@ -63,11 +66,15 @@ class Plugin(BaseUserGpioDriver):
"device": Option("/dev/gpiochip0", type=valid_abs_path, unpack_as="device_path"),
}
- def register_input(self, pin: int, debounce: float) -> None:
- self.__input_pins[pin] = aiogp.AioReaderPinParams(False, debounce)
+ @classmethod
+ def get_pin_validator(cls) -> Callable[[Any], str]:
+ return (lambda arg: str(valid_gpio_pin(arg)))
+
+ def register_input(self, pin: str, debounce: float) -> None:
+ self.__input_pins[int(pin)] = aiogp.AioReaderPinParams(False, debounce)
- def register_output(self, pin: int, initial: Optional[bool]) -> None:
- self.__output_pins[pin] = initial
+ def register_output(self, pin: str, initial: Optional[bool]) -> None:
+ self.__output_pins[int(pin)] = initial
def prepare(self) -> None:
assert self.__reader is None
@@ -95,14 +102,15 @@ class Plugin(BaseUserGpioDriver):
except Exception:
pass
- async def read(self, pin: int) -> bool:
+ async def read(self, pin: str) -> bool:
assert self.__reader
- if pin in self.__input_pins:
- return self.__reader.get(pin)
- return bool(self.__output_lines[pin].get_value())
+ pin_int = int(pin)
+ if pin_int in self.__input_pins:
+ return self.__reader.get(pin_int)
+ return bool(self.__output_lines[pin_int].get_value())
- async def write(self, pin: int, state: bool) -> None:
- self.__output_lines[pin].set_value(int(state))
+ async def write(self, pin: str, state: bool) -> None:
+ self.__output_lines[int(pin)].set_value(int(state))
def __str__(self) -> str:
return f"GPIO({self._instance_name})"