From be4269fe61963fce556d5cf633694529adddde1d Mon Sep 17 00:00:00 2001 From: Maxim Devaev Date: Mon, 29 May 2023 17:34:24 +0300 Subject: refactoring --- kvmd/plugins/hid/ch9329/__init__.py | 51 ++++++++-------------------- kvmd/plugins/hid/ch9329/chip.py | 67 +++++++++++++++++++++++++++++++++++++ kvmd/plugins/hid/ch9329/tty.py | 62 ---------------------------------- 3 files changed, 81 insertions(+), 99 deletions(-) create mode 100644 kvmd/plugins/hid/ch9329/chip.py delete mode 100644 kvmd/plugins/hid/ch9329/tty.py (limited to 'kvmd/plugins/hid') diff --git a/kvmd/plugins/hid/ch9329/__init__.py b/kvmd/plugins/hid/ch9329/__init__.py index c419a72d..ba4f8b3b 100644 --- a/kvmd/plugins/hid/ch9329/__init__.py +++ b/kvmd/plugins/hid/ch9329/__init__.py @@ -45,18 +45,11 @@ from ....validators.hw import valid_tty_speed from .. import BaseHid -from .tty import TTY +from .chip import ChipResponseError +from .chip import Chip from .mouse import Mouse from .keyboard import Keyboard -from .tty import get_info - - -class _ResError(Exception): - def __init__(self, msg: str) -> None: - super().__init__(msg) - self.msg = msg - # ===== class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-instance-attributes @@ -94,7 +87,7 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst }, self.__notifier, type=int) self.__stop_event = multiprocessing.Event() - self.__tty = TTY(device_path, speed, read_timeout) + self.__chip = Chip(device_path, speed, read_timeout) self.__keyboard = Keyboard() self.__mouse = Mouse() @@ -201,7 +194,7 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst logger = aioproc.settle("HID", "hid") while not self.__stop_event.is_set(): try: - # self.__tty.connect() + # self.__chip.connect() self.__hid_loop() except Exception: logger.exception("Unexpected error in the run loop") @@ -222,7 +215,7 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst cmd = self.__cmd_queue.get(timeout=0.1) # get_logger(0).info(f"HID : cmd = {cmd}") except queue.Empty: - self.__process_cmd(get_info()) + self.__process_cmd([]) else: self.__process_cmd(cmd) except Exception: @@ -231,35 +224,19 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst time.sleep(2) def __process_cmd(self, cmd: list[int]) -> bool: # pylint: disable=too-many-branches - error_retval = False try: - res = self.__tty.send(cmd) - # get_logger(0).info(f"HID response = {res}") - if len(res) < 4: - raise _ResError("No response from HID - might be disconnected") - - if not self.__tty.check_res(res): - raise _ResError("Invalid response checksum ...") - - # Response Error - if res[4] == 1 and res[5] != 0: - raise _ResError("Command error code = " + str(res[5])) - - # get_info response - if res[3] == 0x81: - self.__keyboard.set_leds(res[7]) - self.__notifier.notify() - - self.__set_state_online(True) - return True - - except _ResError as err: + led_byte = self.__chip.xfer(cmd) + except ChipResponseError as err: self.__set_state_online(False) get_logger(0).info(err) - error_retval = False time.sleep(2) - - return error_retval + else: + if led_byte >= 0: + self.__keyboard.set_leds(led_byte) + self.__notifier.notify() + self.__set_state_online(True) + return True + return False def __set_state_online(self, online: bool) -> None: self.__state_flags.update(online=int(online)) diff --git a/kvmd/plugins/hid/ch9329/chip.py b/kvmd/plugins/hid/ch9329/chip.py new file mode 100644 index 00000000..df6292ad --- /dev/null +++ b/kvmd/plugins/hid/ch9329/chip.py @@ -0,0 +1,67 @@ +# ========================================================================== # +# # +# KVMD - The main PiKVM daemon. # +# # +# Copyright (C) 2018-2022 Maxim Devaev # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see . # +# # +# ========================================================================== # + + +import serial + + +# ===== +class ChipResponseError(Exception): + pass + + +# ===== +class Chip: + def __init__(self, device_path: str, speed: int, read_timeout: float) -> None: + self.__tty = serial.Serial(device_path, speed, timeout=read_timeout) + self.__device_path = device_path + + def xfer(self, cmd: list[int]) -> int: + self.__send(cmd) + return self.__recv() + + def __send(self, cmd: list[int]) -> None: + # RESET = [0x00,0x0F,0x00] + # GET_INFO = [0x00,0x01,0x00] + if len(cmd) == 0: + cmd = [0x00, 0x01, 0x00] + cmd = [0x57, 0xAB, *cmd, self.__make_checksum(cmd)] + self.__tty.write(serial.to_bytes(cmd)) + + def __recv(self) -> int: + data = self.__tty.read(5) + if len(data) < 5: + raise ChipResponseError("Too short response, HID might be disconnected") + + if data and data[4]: + data += self.__tty.read(data[4] + 1) + + if self.__make_checksum(data[:-1]) != data[-1]: + raise ChipResponseError("Invalid response checksum") + + if data[4] == 1 and data[5] != 0: + raise ChipResponseError(f"Response error code = {data[5]!r}") + + # led_byte (info) response + return (data[7] if data[3] == 0x81 else -1) + + def __make_checksum(self, cmd: (list[int] | bytes)) -> int: + return (sum(cmd) % 256) diff --git a/kvmd/plugins/hid/ch9329/tty.py b/kvmd/plugins/hid/ch9329/tty.py deleted file mode 100644 index 2e8cb22e..00000000 --- a/kvmd/plugins/hid/ch9329/tty.py +++ /dev/null @@ -1,62 +0,0 @@ -# ========================================================================== # -# # -# KVMD - The main PiKVM daemon. # -# # -# Copyright (C) 2018-2022 Maxim Devaev # -# # -# This program is free software: you can redistribute it and/or modify # -# it under the terms of the GNU General Public License as published by # -# the Free Software Foundation, either version 3 of the License, or # -# (at your option) any later version. # -# # -# This program is distributed in the hope that it will be useful, # -# but WITHOUT ANY WARRANTY; without even the implied warranty of # -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # -# GNU General Public License for more details. # -# # -# You should have received a copy of the GNU General Public License # -# along with this program. If not, see . # -# # -# ========================================================================== # - - -import os -import serial - - -class TTY: - def __init__(self, device_path: str, speed: int, read_timeout: float) -> None: - self.__tty = serial.Serial(device_path, speed, timeout=read_timeout) - self.__device_path = device_path - - def has_device(self) -> bool: - return os.path.exists(self.__device_path) - - def send(self, cmd: list[int]) -> list[int]: - cmd = self.__wrap_cmd(cmd) - self.__tty.write(serial.to_bytes(cmd)) - data = list(self.__tty.read(5)) - if data and data[4]: - more_data = list(self.__tty.read(data[4] + 1)) - data.extend(more_data) - return data - - def check_res(self, res: list[int]) -> bool: - res_sum = res.pop() - return (self.__checksum(res) == res_sum) - - def __wrap_cmd(self, cmd: list[int]) -> list[int]: - cmd.insert(0, 0xAB) - cmd.insert(0, 0x57) - cmd.append(self.__checksum(cmd)) - return cmd - - def __checksum(self, cmd: list[int]) -> int: - return sum(cmd) % 256 - - -def get_info() -> list[int]: - return [0x00, 0x01, 0x00] - -# RESET = [0x00,0x0F,0x00] -# GET_INFO = [0x00,0x01,0x00] -- cgit v1.2.3