diff options
Diffstat (limited to 'kvmd/plugins/ugpio/hidrelay.py')
-rw-r--r-- | kvmd/plugins/ugpio/hidrelay.py | 43 |
1 files changed, 20 insertions, 23 deletions
diff --git a/kvmd/plugins/ugpio/hidrelay.py b/kvmd/plugins/ugpio/hidrelay.py index 7e18c72a..d4bdda48 100644 --- a/kvmd/plugins/ugpio/hidrelay.py +++ b/kvmd/plugins/ugpio/hidrelay.py @@ -25,7 +25,9 @@ import contextlib from typing import Dict from typing import Set +from typing import Callable from typing import Optional +from typing import Any import hid @@ -36,6 +38,7 @@ from ... import aiotools from ...yamlconf import Option +from ...validators.basic import valid_number from ...validators.basic import valid_float_f01 from ...validators.os import valid_abs_path @@ -79,11 +82,12 @@ class Plugin(BaseUserGpioDriver): def get_modes(cls) -> Set[str]: return set([UserGpioModes.OUTPUT]) - def register_input(self, pin: int, debounce: float) -> None: - raise RuntimeError(f"Unsupported mode 'input' for pin={pin} on {self}") + @classmethod + def get_pin_validator(cls) -> Callable[[Any], str]: + return (lambda arg: str(valid_number(arg, min=0, max=7, name="HID relay channel"))) - def register_output(self, pin: int, initial: Optional[bool]) -> None: - self.__initials[pin] = initial + def register_output(self, pin: str, initial: Optional[bool]) -> None: + self.__initials[int(pin)] = initial def prepare(self) -> None: logger = get_logger(0) @@ -113,15 +117,15 @@ class Plugin(BaseUserGpioDriver): self.__close_device() self.__stop = True - async def read(self, pin: int) -> bool: + async def read(self, pin: str) -> bool: try: - return self.__inner_read(pin) + return self.__inner_read(int(pin)) except Exception: raise GpioDriverOfflineError(self) - async def write(self, pin: int, state: bool) -> None: + async def write(self, pin: str, state: bool) -> None: try: - return self.__inner_write(pin, state) + return self.__inner_write(int(pin), state) except Exception: raise GpioDriverOfflineError(self) @@ -140,27 +144,20 @@ class Plugin(BaseUserGpioDriver): pin, self, self.__device_path, tools.efmt(err)) def __inner_read(self, pin: int) -> bool: - if self.__check_pin(pin): - return bool(self.__inner_read_raw() & (1 << pin)) - return False + assert 0 <= pin <= 7 + return bool(self.__inner_read_raw() & (1 << pin)) def __inner_read_raw(self) -> int: with self.__ensure_device("reading") as device: return device.get_feature_report(1, 8)[7] def __inner_write(self, pin: int, state: bool) -> None: - if self.__check_pin(pin): - with self.__ensure_device("writing") as device: - report = [(0xFF if state else 0xFD), pin + 1] # Pin numeration starts from 0 - result = device.send_feature_report(report) - if result < 0: - raise RuntimeError(f"Retval of send_feature_report() < 0: {result}") - - def __check_pin(self, pin: int) -> bool: - ok = (0 <= pin <= 7) - if not ok: - get_logger(0).warning("Unsupported pin=%d for %s on %s", pin, self, self.__device_path) - return ok + assert 0 <= pin <= 7 + with self.__ensure_device("writing") as device: + report = [(0xFF if state else 0xFD), pin + 1] # Pin numeration starts from 0 + result = device.send_feature_report(report) + if result < 0: + raise RuntimeError(f"Retval of send_feature_report() < 0: {result}") @contextlib.contextmanager def __ensure_device(self, context: str) -> hid.device: |