diff options
Diffstat (limited to 'kvmd/plugins/ugpio/pway.py')
-rw-r--r-- | kvmd/plugins/ugpio/pway.py | 38 |
1 files changed, 17 insertions, 21 deletions
diff --git a/kvmd/plugins/ugpio/pway.py b/kvmd/plugins/ugpio/pway.py index 1c5a71da..86c67411 100644 --- a/kvmd/plugins/ugpio/pway.py +++ b/kvmd/plugins/ugpio/pway.py @@ -2,7 +2,6 @@ # # # KVMD - The main Pi-KVM daemon. # # # -# # # Copyright (C) 2018-2021 Maxim Devaev <[email protected]> # # # # Modified by SppokHCK September 2021 <Find me on Discord spook#8911> # @@ -31,7 +30,9 @@ import time from typing import Tuple from typing import Dict +from typing import Callable from typing import Optional +from typing import Any import serial @@ -82,20 +83,15 @@ class Plugin(BaseUserGpioDriver): # pylint: disable=too-many-instance-attribute @classmethod def get_plugin_options(cls) -> Dict: return { - "device": Option("", type=valid_abs_path, unpack_as="device_path"), + "device": Option("", type=valid_abs_path, unpack_as="device_path"), "speed": Option(19200, type=valid_tty_speed), - # speed for pWAY 16-Port 19200 8n1. - "read_timeout": Option(2.0, type=valid_float_f01), - "protocol": Option(1, type=functools.partial(valid_number, min=1, max=2)), + "read_timeout": Option(2.0, type=valid_float_f01), + "protocol": Option(1, type=functools.partial(valid_number, min=1, max=2)), } - def register_input(self, pin: int, debounce: float) -> None: - _ = pin - _ = debounce - - def register_output(self, pin: int, initial: Optional[bool]) -> None: - _ = pin - _ = initial + @classmethod + def get_pin_validator(cls) -> Callable[[Any], Any]: + return functools.partial(valid_number, min=0, max=15, name="PWAY channel") def prepare(self) -> None: assert self.__proc is None @@ -114,19 +110,19 @@ class Plugin(BaseUserGpioDriver): # pylint: disable=too-many-instance-attribute if self.__proc.is_alive(): get_logger(0).info("Stopping %s daemon ...", self) self.__stop_event.set() - if self.__proc.exitcode is not None: + if self.__proc.is_alive() or self.__proc.exitcode is not None: self.__proc.join() - async def read(self, pin: int) -> bool: + async def read(self, pin: str) -> bool: if not self.__is_online(): raise GpioDriverOfflineError(self) - return (self.__channel == pin) + return (self.__channel == int(pin)) - async def write(self, pin: int, state: bool) -> None: + async def write(self, pin: str, state: bool) -> None: if not self.__is_online(): raise GpioDriverOfflineError(self) - if state and (0 <= pin <= 16): - self.__ctl_queue.put_nowait(pin) + if state: + self.__ctl_queue.put_nowait(int(pin)) # ===== @@ -178,7 +174,7 @@ class Plugin(BaseUserGpioDriver): # pylint: disable=too-many-instance-attribute # When you switch ports you see something like "VGA_SWITCH_CONTROL=[0-15]" for ports 1-16 found = re.findall(b"VGA_SWITCH_CONTROL=[0-9]*", data) if found: - channel = int(found[0].decode().split('=')[1]) + 1 + channel = int(found[0].decode().split("=")[1]) data = data[-8:] return (channel, data) @@ -190,11 +186,11 @@ class Plugin(BaseUserGpioDriver): # pylint: disable=too-many-instance-attribute tty.write(b"%s\r" % (cmd)) tty.flush() else: - # Basically send `PS [1-16]` that switches the port + # Basically send `PS [1-15]` that switches the port tty.write(b"%s %d\r" % (cmd, channel)) tty.flush() def __str__(self) -> str: - return f"pway({self._instance_name})" + return f"PWAY({self._instance_name})" __repr__ = __str__ |