diff options
author | Maxim Devaev <[email protected]> | 2021-09-08 05:43:36 +0300 |
---|---|---|
committer | Maxim Devaev <[email protected]> | 2021-09-08 05:43:36 +0300 |
commit | 98ad1145a8782d3b82516dd9f81c86d9e80391c5 (patch) | |
tree | b8f348f0815e8ee72236bf9879af779c95727755 /kvmd/plugins/ugpio/gpio.py | |
parent | 939c63fe7daf2ddc9a05c9e0fbeab63cb5c6f0c1 (diff) |
string pins
Diffstat (limited to 'kvmd/plugins/ugpio/gpio.py')
-rw-r--r-- | kvmd/plugins/ugpio/gpio.py | 28 |
1 files changed, 18 insertions, 10 deletions
diff --git a/kvmd/plugins/ugpio/gpio.py b/kvmd/plugins/ugpio/gpio.py index 28f0d5cb..315de6f1 100644 --- a/kvmd/plugins/ugpio/gpio.py +++ b/kvmd/plugins/ugpio/gpio.py @@ -21,7 +21,9 @@ from typing import Dict +from typing import Callable from typing import Optional +from typing import Any import gpiod @@ -31,6 +33,7 @@ from ... import aiogp from ...yamlconf import Option from ...validators.os import valid_abs_path +from ...validators.hw import valid_gpio_pin from . import BaseUserGpioDriver @@ -63,11 +66,15 @@ class Plugin(BaseUserGpioDriver): "device": Option("/dev/gpiochip0", type=valid_abs_path, unpack_as="device_path"), } - def register_input(self, pin: int, debounce: float) -> None: - self.__input_pins[pin] = aiogp.AioReaderPinParams(False, debounce) + @classmethod + def get_pin_validator(cls) -> Callable[[Any], str]: + return (lambda arg: str(valid_gpio_pin(arg))) + + def register_input(self, pin: str, debounce: float) -> None: + self.__input_pins[int(pin)] = aiogp.AioReaderPinParams(False, debounce) - def register_output(self, pin: int, initial: Optional[bool]) -> None: - self.__output_pins[pin] = initial + def register_output(self, pin: str, initial: Optional[bool]) -> None: + self.__output_pins[int(pin)] = initial def prepare(self) -> None: assert self.__reader is None @@ -95,14 +102,15 @@ class Plugin(BaseUserGpioDriver): except Exception: pass - async def read(self, pin: int) -> bool: + async def read(self, pin: str) -> bool: assert self.__reader - if pin in self.__input_pins: - return self.__reader.get(pin) - return bool(self.__output_lines[pin].get_value()) + pin_int = int(pin) + if pin_int in self.__input_pins: + return self.__reader.get(pin_int) + return bool(self.__output_lines[pin_int].get_value()) - async def write(self, pin: int, state: bool) -> None: - self.__output_lines[pin].set_value(int(state)) + async def write(self, pin: str, state: bool) -> None: + self.__output_lines[int(pin)].set_value(int(state)) def __str__(self) -> str: return f"GPIO({self._instance_name})" |