summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kvmd/plugins/ugpio/pway.py38
1 files changed, 17 insertions, 21 deletions
diff --git a/kvmd/plugins/ugpio/pway.py b/kvmd/plugins/ugpio/pway.py
index 1c5a71da..86c67411 100644
--- a/kvmd/plugins/ugpio/pway.py
+++ b/kvmd/plugins/ugpio/pway.py
@@ -2,7 +2,6 @@
# #
# KVMD - The main Pi-KVM daemon. #
# #
-# #
# Copyright (C) 2018-2021 Maxim Devaev <[email protected]> #
# #
# Modified by SppokHCK September 2021 <Find me on Discord spook#8911> #
@@ -31,7 +30,9 @@ import time
from typing import Tuple
from typing import Dict
+from typing import Callable
from typing import Optional
+from typing import Any
import serial
@@ -82,20 +83,15 @@ class Plugin(BaseUserGpioDriver): # pylint: disable=too-many-instance-attribute
@classmethod
def get_plugin_options(cls) -> Dict:
return {
- "device": Option("", type=valid_abs_path, unpack_as="device_path"),
+ "device": Option("", type=valid_abs_path, unpack_as="device_path"),
"speed": Option(19200, type=valid_tty_speed),
- # speed for pWAY 16-Port 19200 8n1.
- "read_timeout": Option(2.0, type=valid_float_f01),
- "protocol": Option(1, type=functools.partial(valid_number, min=1, max=2)),
+ "read_timeout": Option(2.0, type=valid_float_f01),
+ "protocol": Option(1, type=functools.partial(valid_number, min=1, max=2)),
}
- def register_input(self, pin: int, debounce: float) -> None:
- _ = pin
- _ = debounce
-
- def register_output(self, pin: int, initial: Optional[bool]) -> None:
- _ = pin
- _ = initial
+ @classmethod
+ def get_pin_validator(cls) -> Callable[[Any], Any]:
+ return functools.partial(valid_number, min=0, max=15, name="PWAY channel")
def prepare(self) -> None:
assert self.__proc is None
@@ -114,19 +110,19 @@ class Plugin(BaseUserGpioDriver): # pylint: disable=too-many-instance-attribute
if self.__proc.is_alive():
get_logger(0).info("Stopping %s daemon ...", self)
self.__stop_event.set()
- if self.__proc.exitcode is not None:
+ if self.__proc.is_alive() or self.__proc.exitcode is not None:
self.__proc.join()
- async def read(self, pin: int) -> bool:
+ async def read(self, pin: str) -> bool:
if not self.__is_online():
raise GpioDriverOfflineError(self)
- return (self.__channel == pin)
+ return (self.__channel == int(pin))
- async def write(self, pin: int, state: bool) -> None:
+ async def write(self, pin: str, state: bool) -> None:
if not self.__is_online():
raise GpioDriverOfflineError(self)
- if state and (0 <= pin <= 16):
- self.__ctl_queue.put_nowait(pin)
+ if state:
+ self.__ctl_queue.put_nowait(int(pin))
# =====
@@ -178,7 +174,7 @@ class Plugin(BaseUserGpioDriver): # pylint: disable=too-many-instance-attribute
# When you switch ports you see something like "VGA_SWITCH_CONTROL=[0-15]" for ports 1-16
found = re.findall(b"VGA_SWITCH_CONTROL=[0-9]*", data)
if found:
- channel = int(found[0].decode().split('=')[1]) + 1
+ channel = int(found[0].decode().split("=")[1])
data = data[-8:]
return (channel, data)
@@ -190,11 +186,11 @@ class Plugin(BaseUserGpioDriver): # pylint: disable=too-many-instance-attribute
tty.write(b"%s\r" % (cmd))
tty.flush()
else:
- # Basically send `PS [1-16]` that switches the port
+ # Basically send `PS [1-15]` that switches the port
tty.write(b"%s %d\r" % (cmd, channel))
tty.flush()
def __str__(self) -> str:
- return f"pway({self._instance_name})"
+ return f"PWAY({self._instance_name})"
__repr__ = __str__