summaryrefslogtreecommitdiff
path: root/kvmd/plugins/ugpio/hidrelay.py
diff options
context:
space:
mode:
Diffstat (limited to 'kvmd/plugins/ugpio/hidrelay.py')
-rw-r--r--kvmd/plugins/ugpio/hidrelay.py43
1 files changed, 20 insertions, 23 deletions
diff --git a/kvmd/plugins/ugpio/hidrelay.py b/kvmd/plugins/ugpio/hidrelay.py
index 7e18c72a..d4bdda48 100644
--- a/kvmd/plugins/ugpio/hidrelay.py
+++ b/kvmd/plugins/ugpio/hidrelay.py
@@ -25,7 +25,9 @@ import contextlib
from typing import Dict
from typing import Set
+from typing import Callable
from typing import Optional
+from typing import Any
import hid
@@ -36,6 +38,7 @@ from ... import aiotools
from ...yamlconf import Option
+from ...validators.basic import valid_number
from ...validators.basic import valid_float_f01
from ...validators.os import valid_abs_path
@@ -79,11 +82,12 @@ class Plugin(BaseUserGpioDriver):
def get_modes(cls) -> Set[str]:
return set([UserGpioModes.OUTPUT])
- def register_input(self, pin: int, debounce: float) -> None:
- raise RuntimeError(f"Unsupported mode 'input' for pin={pin} on {self}")
+ @classmethod
+ def get_pin_validator(cls) -> Callable[[Any], str]:
+ return (lambda arg: str(valid_number(arg, min=0, max=7, name="HID relay channel")))
- def register_output(self, pin: int, initial: Optional[bool]) -> None:
- self.__initials[pin] = initial
+ def register_output(self, pin: str, initial: Optional[bool]) -> None:
+ self.__initials[int(pin)] = initial
def prepare(self) -> None:
logger = get_logger(0)
@@ -113,15 +117,15 @@ class Plugin(BaseUserGpioDriver):
self.__close_device()
self.__stop = True
- async def read(self, pin: int) -> bool:
+ async def read(self, pin: str) -> bool:
try:
- return self.__inner_read(pin)
+ return self.__inner_read(int(pin))
except Exception:
raise GpioDriverOfflineError(self)
- async def write(self, pin: int, state: bool) -> None:
+ async def write(self, pin: str, state: bool) -> None:
try:
- return self.__inner_write(pin, state)
+ return self.__inner_write(int(pin), state)
except Exception:
raise GpioDriverOfflineError(self)
@@ -140,27 +144,20 @@ class Plugin(BaseUserGpioDriver):
pin, self, self.__device_path, tools.efmt(err))
def __inner_read(self, pin: int) -> bool:
- if self.__check_pin(pin):
- return bool(self.__inner_read_raw() & (1 << pin))
- return False
+ assert 0 <= pin <= 7
+ return bool(self.__inner_read_raw() & (1 << pin))
def __inner_read_raw(self) -> int:
with self.__ensure_device("reading") as device:
return device.get_feature_report(1, 8)[7]
def __inner_write(self, pin: int, state: bool) -> None:
- if self.__check_pin(pin):
- with self.__ensure_device("writing") as device:
- report = [(0xFF if state else 0xFD), pin + 1] # Pin numeration starts from 0
- result = device.send_feature_report(report)
- if result < 0:
- raise RuntimeError(f"Retval of send_feature_report() < 0: {result}")
-
- def __check_pin(self, pin: int) -> bool:
- ok = (0 <= pin <= 7)
- if not ok:
- get_logger(0).warning("Unsupported pin=%d for %s on %s", pin, self, self.__device_path)
- return ok
+ assert 0 <= pin <= 7
+ with self.__ensure_device("writing") as device:
+ report = [(0xFF if state else 0xFD), pin + 1] # Pin numeration starts from 0
+ result = device.send_feature_report(report)
+ if result < 0:
+ raise RuntimeError(f"Retval of send_feature_report() < 0: {result}")
@contextlib.contextmanager
def __ensure_device(self, context: str) -> hid.device: