diff options
Diffstat (limited to 'kvmd/plugins/ugpio/ezcoo.py')
-rw-r--r-- | kvmd/plugins/ugpio/ezcoo.py | 25 |
1 files changed, 17 insertions, 8 deletions
diff --git a/kvmd/plugins/ugpio/ezcoo.py b/kvmd/plugins/ugpio/ezcoo.py index 81e199e8..a20aed32 100644 --- a/kvmd/plugins/ugpio/ezcoo.py +++ b/kvmd/plugins/ugpio/ezcoo.py @@ -28,7 +28,9 @@ import time from typing import Tuple from typing import Dict +from typing import Callable from typing import Optional +from typing import Any import serial @@ -85,6 +87,10 @@ class Plugin(BaseUserGpioDriver): # pylint: disable=too-many-instance-attribute "protocol": Option(1, type=functools.partial(valid_number, min=1, max=2)), } + @classmethod + def get_pin_validator(cls) -> Callable[[Any], str]: + return (lambda arg: str(valid_number(arg, min=0, max=3, name="Ezcoo channel"))) + def prepare(self) -> None: assert self.__proc is None self.__proc = multiprocessing.Process(target=self.__serial_worker, daemon=True) @@ -105,16 +111,16 @@ class Plugin(BaseUserGpioDriver): # pylint: disable=too-many-instance-attribute if self.__proc.is_alive() or self.__proc.exitcode is not None: self.__proc.join() - async def read(self, pin: int) -> bool: + async def read(self, pin: str) -> bool: if not self.__is_online(): raise GpioDriverOfflineError(self) - return (self.__channel == pin) + return (self.__channel == int(pin)) - async def write(self, pin: int, state: bool) -> None: + async def write(self, pin: str, state: bool) -> None: if not self.__is_online(): raise GpioDriverOfflineError(self) - if state and (0 <= pin <= 3): - self.__ctl_queue.put_nowait(pin) + if state: + self.__ctl_queue.put_nowait(int(pin)) # ===== @@ -174,9 +180,12 @@ class Plugin(BaseUserGpioDriver): # pylint: disable=too-many-instance-attribute return (channel, data) def __send_channel(self, tty: serial.Serial, channel: int) -> None: - # Twice because of ezcoo bugs - cmd = (b"SET" if self.__protocol == 1 else b"EZS") - tty.write((b"%s OUT1 VS IN%d\n" % (cmd, channel + 1)) * 2) + assert 0 <= channel <= 3 + cmd = b"%s OUT1 VS IN%d\n" % ( + (b"SET" if self.__protocol == 1 else b"EZS"), + channel + 1, + ) + tty.write(cmd * 2) # Twice because of ezcoo bugs tty.flush() def __str__(self) -> str: |