summaryrefslogtreecommitdiff
path: root/kvmd/plugins/ugpio/ezcoo.py
diff options
context:
space:
mode:
Diffstat (limited to 'kvmd/plugins/ugpio/ezcoo.py')
-rw-r--r--kvmd/plugins/ugpio/ezcoo.py25
1 files changed, 17 insertions, 8 deletions
diff --git a/kvmd/plugins/ugpio/ezcoo.py b/kvmd/plugins/ugpio/ezcoo.py
index 81e199e8..a20aed32 100644
--- a/kvmd/plugins/ugpio/ezcoo.py
+++ b/kvmd/plugins/ugpio/ezcoo.py
@@ -28,7 +28,9 @@ import time
from typing import Tuple
from typing import Dict
+from typing import Callable
from typing import Optional
+from typing import Any
import serial
@@ -85,6 +87,10 @@ class Plugin(BaseUserGpioDriver): # pylint: disable=too-many-instance-attribute
"protocol": Option(1, type=functools.partial(valid_number, min=1, max=2)),
}
+ @classmethod
+ def get_pin_validator(cls) -> Callable[[Any], str]:
+ return (lambda arg: str(valid_number(arg, min=0, max=3, name="Ezcoo channel")))
+
def prepare(self) -> None:
assert self.__proc is None
self.__proc = multiprocessing.Process(target=self.__serial_worker, daemon=True)
@@ -105,16 +111,16 @@ class Plugin(BaseUserGpioDriver): # pylint: disable=too-many-instance-attribute
if self.__proc.is_alive() or self.__proc.exitcode is not None:
self.__proc.join()
- async def read(self, pin: int) -> bool:
+ async def read(self, pin: str) -> bool:
if not self.__is_online():
raise GpioDriverOfflineError(self)
- return (self.__channel == pin)
+ return (self.__channel == int(pin))
- async def write(self, pin: int, state: bool) -> None:
+ async def write(self, pin: str, state: bool) -> None:
if not self.__is_online():
raise GpioDriverOfflineError(self)
- if state and (0 <= pin <= 3):
- self.__ctl_queue.put_nowait(pin)
+ if state:
+ self.__ctl_queue.put_nowait(int(pin))
# =====
@@ -174,9 +180,12 @@ class Plugin(BaseUserGpioDriver): # pylint: disable=too-many-instance-attribute
return (channel, data)
def __send_channel(self, tty: serial.Serial, channel: int) -> None:
- # Twice because of ezcoo bugs
- cmd = (b"SET" if self.__protocol == 1 else b"EZS")
- tty.write((b"%s OUT1 VS IN%d\n" % (cmd, channel + 1)) * 2)
+ assert 0 <= channel <= 3
+ cmd = b"%s OUT1 VS IN%d\n" % (
+ (b"SET" if self.__protocol == 1 else b"EZS"),
+ channel + 1,
+ )
+ tty.write(cmd * 2) # Twice because of ezcoo bugs
tty.flush()
def __str__(self) -> str: