summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMaxim Devaev <[email protected]>2023-05-29 17:34:24 +0300
committerMaxim Devaev <[email protected]>2023-07-10 03:02:28 +0300
commitbe4269fe61963fce556d5cf633694529adddde1d (patch)
tree0a10b914a6942cc72ba8d28493c481ffd60423d6
parentc58430258758db06c198b3c456449eb05eee017d (diff)
refactoring
-rw-r--r--kvmd/plugins/hid/ch9329/__init__.py51
-rw-r--r--kvmd/plugins/hid/ch9329/chip.py (renamed from kvmd/plugins/hid/ch9329/tty.py)55
2 files changed, 44 insertions, 62 deletions
diff --git a/kvmd/plugins/hid/ch9329/__init__.py b/kvmd/plugins/hid/ch9329/__init__.py
index c419a72d..ba4f8b3b 100644
--- a/kvmd/plugins/hid/ch9329/__init__.py
+++ b/kvmd/plugins/hid/ch9329/__init__.py
@@ -45,18 +45,11 @@ from ....validators.hw import valid_tty_speed
from .. import BaseHid
-from .tty import TTY
+from .chip import ChipResponseError
+from .chip import Chip
from .mouse import Mouse
from .keyboard import Keyboard
-from .tty import get_info
-
-
-class _ResError(Exception):
- def __init__(self, msg: str) -> None:
- super().__init__(msg)
- self.msg = msg
-
# =====
class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-instance-attributes
@@ -94,7 +87,7 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst
}, self.__notifier, type=int)
self.__stop_event = multiprocessing.Event()
- self.__tty = TTY(device_path, speed, read_timeout)
+ self.__chip = Chip(device_path, speed, read_timeout)
self.__keyboard = Keyboard()
self.__mouse = Mouse()
@@ -201,7 +194,7 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst
logger = aioproc.settle("HID", "hid")
while not self.__stop_event.is_set():
try:
- # self.__tty.connect()
+ # self.__chip.connect()
self.__hid_loop()
except Exception:
logger.exception("Unexpected error in the run loop")
@@ -222,7 +215,7 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst
cmd = self.__cmd_queue.get(timeout=0.1)
# get_logger(0).info(f"HID : cmd = {cmd}")
except queue.Empty:
- self.__process_cmd(get_info())
+ self.__process_cmd([])
else:
self.__process_cmd(cmd)
except Exception:
@@ -231,35 +224,19 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst
time.sleep(2)
def __process_cmd(self, cmd: list[int]) -> bool: # pylint: disable=too-many-branches
- error_retval = False
try:
- res = self.__tty.send(cmd)
- # get_logger(0).info(f"HID response = {res}")
- if len(res) < 4:
- raise _ResError("No response from HID - might be disconnected")
-
- if not self.__tty.check_res(res):
- raise _ResError("Invalid response checksum ...")
-
- # Response Error
- if res[4] == 1 and res[5] != 0:
- raise _ResError("Command error code = " + str(res[5]))
-
- # get_info response
- if res[3] == 0x81:
- self.__keyboard.set_leds(res[7])
- self.__notifier.notify()
-
- self.__set_state_online(True)
- return True
-
- except _ResError as err:
+ led_byte = self.__chip.xfer(cmd)
+ except ChipResponseError as err:
self.__set_state_online(False)
get_logger(0).info(err)
- error_retval = False
time.sleep(2)
-
- return error_retval
+ else:
+ if led_byte >= 0:
+ self.__keyboard.set_leds(led_byte)
+ self.__notifier.notify()
+ self.__set_state_online(True)
+ return True
+ return False
def __set_state_online(self, online: bool) -> None:
self.__state_flags.update(online=int(online))
diff --git a/kvmd/plugins/hid/ch9329/tty.py b/kvmd/plugins/hid/ch9329/chip.py
index 2e8cb22e..df6292ad 100644
--- a/kvmd/plugins/hid/ch9329/tty.py
+++ b/kvmd/plugins/hid/ch9329/chip.py
@@ -20,43 +20,48 @@
# ========================================================================== #
-import os
import serial
-class TTY:
+# =====
+class ChipResponseError(Exception):
+ pass
+
+
+# =====
+class Chip:
def __init__(self, device_path: str, speed: int, read_timeout: float) -> None:
self.__tty = serial.Serial(device_path, speed, timeout=read_timeout)
self.__device_path = device_path
- def has_device(self) -> bool:
- return os.path.exists(self.__device_path)
+ def xfer(self, cmd: list[int]) -> int:
+ self.__send(cmd)
+ return self.__recv()
- def send(self, cmd: list[int]) -> list[int]:
- cmd = self.__wrap_cmd(cmd)
+ def __send(self, cmd: list[int]) -> None:
+ # RESET = [0x00,0x0F,0x00]
+ # GET_INFO = [0x00,0x01,0x00]
+ if len(cmd) == 0:
+ cmd = [0x00, 0x01, 0x00]
+ cmd = [0x57, 0xAB, *cmd, self.__make_checksum(cmd)]
self.__tty.write(serial.to_bytes(cmd))
- data = list(self.__tty.read(5))
- if data and data[4]:
- more_data = list(self.__tty.read(data[4] + 1))
- data.extend(more_data)
- return data
- def check_res(self, res: list[int]) -> bool:
- res_sum = res.pop()
- return (self.__checksum(res) == res_sum)
+ def __recv(self) -> int:
+ data = self.__tty.read(5)
+ if len(data) < 5:
+ raise ChipResponseError("Too short response, HID might be disconnected")
- def __wrap_cmd(self, cmd: list[int]) -> list[int]:
- cmd.insert(0, 0xAB)
- cmd.insert(0, 0x57)
- cmd.append(self.__checksum(cmd))
- return cmd
+ if data and data[4]:
+ data += self.__tty.read(data[4] + 1)
- def __checksum(self, cmd: list[int]) -> int:
- return sum(cmd) % 256
+ if self.__make_checksum(data[:-1]) != data[-1]:
+ raise ChipResponseError("Invalid response checksum")
+ if data[4] == 1 and data[5] != 0:
+ raise ChipResponseError(f"Response error code = {data[5]!r}")
-def get_info() -> list[int]:
- return [0x00, 0x01, 0x00]
+ # led_byte (info) response
+ return (data[7] if data[3] == 0x81 else -1)
-# RESET = [0x00,0x0F,0x00]
-# GET_INFO = [0x00,0x01,0x00]
+ def __make_checksum(self, cmd: (list[int] | bytes)) -> int:
+ return (sum(cmd) % 256)