diff options
author | Maxim Devaev <[email protected]> | 2023-05-29 17:34:24 +0300 |
---|---|---|
committer | Maxim Devaev <[email protected]> | 2023-07-10 03:02:28 +0300 |
commit | be4269fe61963fce556d5cf633694529adddde1d (patch) | |
tree | 0a10b914a6942cc72ba8d28493c481ffd60423d6 | |
parent | c58430258758db06c198b3c456449eb05eee017d (diff) |
refactoring
-rw-r--r-- | kvmd/plugins/hid/ch9329/__init__.py | 51 | ||||
-rw-r--r-- | kvmd/plugins/hid/ch9329/chip.py (renamed from kvmd/plugins/hid/ch9329/tty.py) | 55 |
2 files changed, 44 insertions, 62 deletions
diff --git a/kvmd/plugins/hid/ch9329/__init__.py b/kvmd/plugins/hid/ch9329/__init__.py index c419a72d..ba4f8b3b 100644 --- a/kvmd/plugins/hid/ch9329/__init__.py +++ b/kvmd/plugins/hid/ch9329/__init__.py @@ -45,18 +45,11 @@ from ....validators.hw import valid_tty_speed from .. import BaseHid -from .tty import TTY +from .chip import ChipResponseError +from .chip import Chip from .mouse import Mouse from .keyboard import Keyboard -from .tty import get_info - - -class _ResError(Exception): - def __init__(self, msg: str) -> None: - super().__init__(msg) - self.msg = msg - # ===== class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-instance-attributes @@ -94,7 +87,7 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst }, self.__notifier, type=int) self.__stop_event = multiprocessing.Event() - self.__tty = TTY(device_path, speed, read_timeout) + self.__chip = Chip(device_path, speed, read_timeout) self.__keyboard = Keyboard() self.__mouse = Mouse() @@ -201,7 +194,7 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst logger = aioproc.settle("HID", "hid") while not self.__stop_event.is_set(): try: - # self.__tty.connect() + # self.__chip.connect() self.__hid_loop() except Exception: logger.exception("Unexpected error in the run loop") @@ -222,7 +215,7 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst cmd = self.__cmd_queue.get(timeout=0.1) # get_logger(0).info(f"HID : cmd = {cmd}") except queue.Empty: - self.__process_cmd(get_info()) + self.__process_cmd([]) else: self.__process_cmd(cmd) except Exception: @@ -231,35 +224,19 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst time.sleep(2) def __process_cmd(self, cmd: list[int]) -> bool: # pylint: disable=too-many-branches - error_retval = False try: - res = self.__tty.send(cmd) - # get_logger(0).info(f"HID response = {res}") - if len(res) < 4: - raise _ResError("No response from HID - might be disconnected") - - if not self.__tty.check_res(res): - raise _ResError("Invalid response checksum ...") - - # Response Error - if res[4] == 1 and res[5] != 0: - raise _ResError("Command error code = " + str(res[5])) - - # get_info response - if res[3] == 0x81: - self.__keyboard.set_leds(res[7]) - self.__notifier.notify() - - self.__set_state_online(True) - return True - - except _ResError as err: + led_byte = self.__chip.xfer(cmd) + except ChipResponseError as err: self.__set_state_online(False) get_logger(0).info(err) - error_retval = False time.sleep(2) - - return error_retval + else: + if led_byte >= 0: + self.__keyboard.set_leds(led_byte) + self.__notifier.notify() + self.__set_state_online(True) + return True + return False def __set_state_online(self, online: bool) -> None: self.__state_flags.update(online=int(online)) diff --git a/kvmd/plugins/hid/ch9329/tty.py b/kvmd/plugins/hid/ch9329/chip.py index 2e8cb22e..df6292ad 100644 --- a/kvmd/plugins/hid/ch9329/tty.py +++ b/kvmd/plugins/hid/ch9329/chip.py @@ -20,43 +20,48 @@ # ========================================================================== # -import os import serial -class TTY: +# ===== +class ChipResponseError(Exception): + pass + + +# ===== +class Chip: def __init__(self, device_path: str, speed: int, read_timeout: float) -> None: self.__tty = serial.Serial(device_path, speed, timeout=read_timeout) self.__device_path = device_path - def has_device(self) -> bool: - return os.path.exists(self.__device_path) + def xfer(self, cmd: list[int]) -> int: + self.__send(cmd) + return self.__recv() - def send(self, cmd: list[int]) -> list[int]: - cmd = self.__wrap_cmd(cmd) + def __send(self, cmd: list[int]) -> None: + # RESET = [0x00,0x0F,0x00] + # GET_INFO = [0x00,0x01,0x00] + if len(cmd) == 0: + cmd = [0x00, 0x01, 0x00] + cmd = [0x57, 0xAB, *cmd, self.__make_checksum(cmd)] self.__tty.write(serial.to_bytes(cmd)) - data = list(self.__tty.read(5)) - if data and data[4]: - more_data = list(self.__tty.read(data[4] + 1)) - data.extend(more_data) - return data - def check_res(self, res: list[int]) -> bool: - res_sum = res.pop() - return (self.__checksum(res) == res_sum) + def __recv(self) -> int: + data = self.__tty.read(5) + if len(data) < 5: + raise ChipResponseError("Too short response, HID might be disconnected") - def __wrap_cmd(self, cmd: list[int]) -> list[int]: - cmd.insert(0, 0xAB) - cmd.insert(0, 0x57) - cmd.append(self.__checksum(cmd)) - return cmd + if data and data[4]: + data += self.__tty.read(data[4] + 1) - def __checksum(self, cmd: list[int]) -> int: - return sum(cmd) % 256 + if self.__make_checksum(data[:-1]) != data[-1]: + raise ChipResponseError("Invalid response checksum") + if data[4] == 1 and data[5] != 0: + raise ChipResponseError(f"Response error code = {data[5]!r}") -def get_info() -> list[int]: - return [0x00, 0x01, 0x00] + # led_byte (info) response + return (data[7] if data[3] == 0x81 else -1) -# RESET = [0x00,0x0F,0x00] -# GET_INFO = [0x00,0x01,0x00] + def __make_checksum(self, cmd: (list[int] | bytes)) -> int: + return (sum(cmd) % 256) |