diff options
-rw-r--r-- | kvmd/plugins/hid/ch9329/__init__.py | 271 | ||||
-rw-r--r-- | kvmd/plugins/hid/ch9329/keyboard.py | 71 | ||||
-rw-r--r-- | kvmd/plugins/hid/ch9329/mouse.py | 115 | ||||
-rw-r--r-- | kvmd/plugins/hid/ch9329/tty.py | 62 |
4 files changed, 519 insertions, 0 deletions
diff --git a/kvmd/plugins/hid/ch9329/__init__.py b/kvmd/plugins/hid/ch9329/__init__.py new file mode 100644 index 00000000..0b4d4a14 --- /dev/null +++ b/kvmd/plugins/hid/ch9329/__init__.py @@ -0,0 +1,271 @@ +# ========================================================================== # +# # +# KVMD - The main PiKVM daemon. # +# # +# Copyright (C) 2018-2022 Maxim Devaev <[email protected]> # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see <https://www.gnu.org/licenses/>. # +# # +# ========================================================================== # + + +import multiprocessing +import queue +import time + +from typing import Iterable +from typing import AsyncGenerator + +from ....logging import get_logger + +from .... import tools +from .... import aiotools +from .... import aiomulti +from .... import aioproc + +from ....yamlconf import Option + +from ....validators.basic import valid_bool +from ....validators.basic import valid_int_f0 +from ....validators.basic import valid_int_f1 +from ....validators.basic import valid_float_f01 +from ....validators.os import valid_abs_path +from ....validators.hw import valid_tty_speed + +from .. import BaseHid + +from .tty import TTY +from .mouse import Mouse +from .keyboard import Keyboard + +from .tty import get_info + + +class _ResError(Exception): + def __init__(self, msg: str) -> None: + super().__init__(msg) + self.msg = msg + + +# ===== +class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-instance-attributes + def __init__( # pylint: disable=too-many-arguments,super-init-not-called + self, + device_path: str, + speed: int, + read_timeout: float, + read_retries: int, + common_retries: int, + retries_delay: float, + errors_threshold: int, + noop: bool, + ) -> None: + + multiprocessing.Process.__init__(self, daemon=True) + + self.__device_path = device_path + self.__speed = speed + self.__read_timeout = read_timeout + self.__read_retries = read_retries + self.__common_retries = common_retries + self.__retries_delay = retries_delay + self.__errors_threshold = errors_threshold + self.__noop = noop + + self.__reset_required_event = multiprocessing.Event() + self.__cmd_queue: "multiprocessing.Queue[list]" = multiprocessing.Queue() + + self.__notifier = aiomulti.AioProcessNotifier() + self.__state_flags = aiomulti.AioSharedFlags({ + "online": 0, + "busy": 0, + "status": 0, + }, self.__notifier, type=int) + + self.__stop_event = multiprocessing.Event() + self.__tty = TTY(device_path, speed, read_timeout) + self.__keyboard = Keyboard() + self.__mouse = Mouse() + + @classmethod + def get_plugin_options(cls) -> dict: + return { + "device": Option("/dev/kvmd-hid", type=valid_abs_path, unpack_as="device_path"), + "speed": Option(9600, type=valid_tty_speed), + "read_timeout": Option(0.3, type=valid_float_f01), + "read_retries": Option(5, type=valid_int_f1), + "common_retries": Option(5, type=valid_int_f1), + "retries_delay": Option(0.5, type=valid_float_f01), + "errors_threshold": Option(5, type=valid_int_f0), + "noop": Option(False, type=valid_bool), + } + + def sysprep(self) -> None: + get_logger(0).info("Starting HID daemon ...") + self.start() + + async def get_state(self) -> dict: + state = await self.__state_flags.get() + online = bool(state["online"]) + active_mouse = self.__mouse.active() + absolute = (active_mouse == "usb") + keyboard_leds = await self.__keyboard.leds() + + return { + "online": online, + "busy": False, + "connected": None, + "keyboard": { + "online": True, + "leds": keyboard_leds, + "outputs": {"available": [], "active": ""}, + }, + "mouse": { + "online": True, + "absolute": absolute, + "outputs": { + "available": ["usb", "usb_rel"], + "active": active_mouse + }, + }, + } + + async def poll_state(self) -> AsyncGenerator[dict, None]: + prev_state: dict = {} + while True: + state = await self.get_state() + if state != prev_state: + yield state + prev_state = state + await self.__notifier.wait() + + async def reset(self) -> None: + self.__reset_required_event.set() + + @aiotools.atomic_fg + async def cleanup(self) -> None: + if self.is_alive(): + get_logger(0).info("Stopping HID daemon ...") + self.__stop_event.set() + if self.is_alive() or self.exitcode is not None: + self.join() + + # ===== + + def send_key_events(self, keys: Iterable[tuple[str, bool]]) -> None: + for (key, state) in keys: + self.__queue_cmd(self.__keyboard.key(key, state)) + + def send_mouse_button_event(self, button: str, state: bool) -> None: + self.__queue_cmd(self.__mouse.button(button, state)) + + def send_mouse_move_event(self, to_x: int, to_y: int) -> None: + self.__queue_cmd(self.__mouse.move(to_x, to_y)) + + def send_mouse_wheel_event(self, delta_x: int, delta_y: int) -> None: + self.__queue_cmd(self.__mouse.wheel(delta_x, delta_y)) + + def send_mouse_relative_event(self, delta_x: int, delta_y: int) -> None: + self.__queue_cmd(self.__mouse.relative(delta_x, delta_y)) + + def set_params(self, keyboard_output: (str | None)=None, mouse_output: (str | None)=None) -> None: + if mouse_output is not None: + get_logger(0).info("HID : mouse output = %s", mouse_output) + self.__mouse.set_active(mouse_output) + self.__notifier.notify() + + def set_connected(self, connected: bool) -> None: + pass + + def clear_events(self) -> None: + tools.clear_queue(self.__cmd_queue) + + def __queue_cmd(self, cmd: list[int], clear: bool=False) -> None: + if not self.__stop_event.is_set(): + if clear: + # FIXME: Если очистка производится со стороны процесса хида, то возможна гонка между + # очисткой и добавлением нового события. Неприятно, но не смертельно. + # Починить блокировкой после перехода на асинхронные очереди. + tools.clear_queue(self.__cmd_queue) + self.__cmd_queue.put_nowait(cmd) + + def run(self) -> None: # pylint: disable=too-many-branches + logger = aioproc.settle("HID", "hid") + while not self.__stop_event.is_set(): + try: + # self.__tty.connect() + self.__hid_loop() + except Exception: + logger.exception("Unexpected error in the run loop") + time.sleep(1) + + def __hid_loop(self) -> None: + while not self.__stop_event.is_set(): + try: + + while not (self.__stop_event.is_set() and self.__cmd_queue.qsize() == 0): + if self.__reset_required_event.is_set(): + try: + self.__set_state_busy(True) + # self.__process_request(conn, RESET) + finally: + self.__reset_required_event.clear() + try: + cmd = self.__cmd_queue.get(timeout=0.1) + # get_logger(0).info(f"HID : cmd = {cmd}") + except queue.Empty: + self.__process_cmd(get_info()) + else: + self.__process_cmd(cmd) + except Exception: + self.clear_events() + get_logger(0).exception("Unexpected error in the HID loop") + time.sleep(2) + + def __process_cmd(self, cmd: list[int]) -> bool: # pylint: disable=too-many-branches + error_retval = False + try: + res = self.__tty.send(cmd) + # get_logger(0).info(f"HID response = {res}") + if len(res) < 4: + raise _ResError("No response from HID - might be disconnected") + + if not self.__tty.check_res(res): + raise _ResError("Invalid response checksum ...") + + # Response Error + if res[4] == 1 and res[5] != 0: + raise _ResError("Command error code = " + str(res[5])) + + # get_info response + if res[3] == 0x81: + self.__keyboard.set_leds(res[7]) + self.__notifier.notify() + + self.__set_state_online(True) + return True + + except Exception as err: + self.__set_state_online(False) + get_logger(0).info(err) + time.sleep(2) + error_retval = False + + return error_retval + + def __set_state_online(self, online: bool) -> None: + self.__state_flags.update(online=int(online)) + + def __set_state_busy(self, busy: bool) -> None: + self.__state_flags.update(busy=int(busy)) diff --git a/kvmd/plugins/hid/ch9329/keyboard.py b/kvmd/plugins/hid/ch9329/keyboard.py new file mode 100644 index 00000000..70ccf547 --- /dev/null +++ b/kvmd/plugins/hid/ch9329/keyboard.py @@ -0,0 +1,71 @@ +# ========================================================================== # +# # +# KVMD - The main PiKVM daemon. # +# # +# Copyright (C) 2018-2022 Maxim Devaev <[email protected]> # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see <https://www.gnu.org/licenses/>. # +# # +# ========================================================================== # + +from .... import aiomulti + +from ....keyboard.mappings import KEYMAP + + +class Keyboard: + def __init__(self) -> None: + + self.__notifier = aiomulti.AioProcessNotifier() + self.__leds = aiomulti.AioSharedFlags({ + "num": False, + "caps": False, + "scroll": False, + }, self.__notifier, type=bool) + + self.__active_keys: list[list] = [] + + def key(self, key: str, state: bool) -> list[int]: + if state: + self.__active_keys.append([key, self.__is_modifier(key)]) + else: + self.__active_keys.remove([key, self.__is_modifier(key)]) + return self.__key() + + async def leds(self) -> dict: + leds = await self.__leds.get() + return leds + + def set_leds(self, led_byte: int) -> None: + num = bool(led_byte & 1) + caps = bool((led_byte >> 1) & 1) + scroll = bool((led_byte >> 2) & 1) + self.__leds.update(num=num, caps=caps, scroll=scroll) + + def __key(self) -> list[int]: + cmd = [0x00, 0x02, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00] + counter = 0 + for key in self.__active_keys: + if key[1]: + cmd[3 + counter] = self.__keycode(key[0]) + else: + cmd[5 + counter] = self.__keycode(key[0]) + counter += 1 + return cmd + + def __keycode(self, key: str) -> int: + return KEYMAP[key].usb.code + + def __is_modifier(self, key: str) -> bool: + return KEYMAP[key].usb.is_modifier diff --git a/kvmd/plugins/hid/ch9329/mouse.py b/kvmd/plugins/hid/ch9329/mouse.py new file mode 100644 index 00000000..034aff92 --- /dev/null +++ b/kvmd/plugins/hid/ch9329/mouse.py @@ -0,0 +1,115 @@ +# ========================================================================== # +# # +# KVMD - The main PiKVM daemon. # +# # +# Copyright (C) 2018-2022 Maxim Devaev <[email protected]> # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see <https://www.gnu.org/licenses/>. # +# # +# ========================================================================== # + +import math + +from ....mouse import MouseRange + + +class Mouse: # pylint: disable=too-many-instance-attributes + def __init__(self) -> None: + self.__active = "usb" + self.__button = "" + self.__clicked = False + self.__to_x = [0, 0] + self.__to_y = [0, 0] + self.__wheel_y = 0 + self.__delta_x = 0 + self.__delta_y = 0 + + def button(self, button: str, clicked: bool) -> list[int]: + self.__button = button + self.__clicked = clicked + self.__wheel_y = 0 + if self.__active != "usb": + self.__to_x = [0, 0] + self.__to_y = [0, 0] + return self.__absolute() + + def move(self, to_x: int, to_y: int) -> list[int]: + assert MouseRange.MIN <= to_x <= MouseRange.MAX + assert MouseRange.MIN <= to_y <= MouseRange.MAX + self.__to_x = self.__to_fixed(to_x) + self.__to_y = self.__to_fixed(to_y) + self.__wheel_y = 0 + return self.__absolute() + + def wheel(self, delta_x: int, delta_y: int) -> list[int]: + assert -127 <= delta_y <= 127 + _ = delta_x + self.__wheel_y = 1 if delta_y > 0 else 255 + return self.__absolute() + + def relative(self, delta_x: int, delta_y: int) -> list[int]: + assert -127 <= delta_x <= 127 + assert -127 <= delta_y <= 127 + delta_x = math.ceil(delta_x / 3) + delta_y = math.ceil(delta_y / 3) + self.__delta_x = delta_x if delta_x >= 0 else 255 + delta_x + self.__delta_y = delta_y if delta_y >= 0 else 255 + delta_y + return self.__relative() + + def active(self) -> str: + return self.__active + + def set_active(self, name: str) -> None: + self.__active = name + + def __absolute(self) -> list[int]: + code = 0x00 + if self.__clicked: + code = self.__button_code(self.__button) + cmd = [0x00, 0x04, 0x07, 0x02, code, 0x00, 0x00, 0x00, 0x00, 0x00] + if len(self.__to_x) == 2: + cmd[6] = self.__to_x[0] + cmd[5] = self.__to_x[1] + if len(self.__to_y) == 2: + cmd[8] = self.__to_y[0] + cmd[7] = self.__to_y[1] + if self.__wheel_y: + cmd[9] = self.__wheel_y + return cmd + + def __relative(self) -> list[int]: + code = 0x00 + if self.__clicked: + code = self.__button_code(self.__button) + cmd = [0x00, 0x05, 0x05, 0x01, code, self.__delta_x, self.__delta_y, 0x00] + return cmd + + def __button_code(self, name: str) -> int: + code = 0x00 + match name: + case "left": + code = 0x01 + case "right": + code = 0x02 + case "middle": + code = 0x04 + case "up": + code = 0x08 + case "down": + code = 0x10 + return code + + def __to_fixed(self, num: int) -> list[int]: + to_fixed = math.ceil(MouseRange.remap(num, 0, MouseRange.MAX) / 8) + return [to_fixed >> 8, to_fixed & 0xFF] diff --git a/kvmd/plugins/hid/ch9329/tty.py b/kvmd/plugins/hid/ch9329/tty.py new file mode 100644 index 00000000..2e8cb22e --- /dev/null +++ b/kvmd/plugins/hid/ch9329/tty.py @@ -0,0 +1,62 @@ +# ========================================================================== # +# # +# KVMD - The main PiKVM daemon. # +# # +# Copyright (C) 2018-2022 Maxim Devaev <[email protected]> # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see <https://www.gnu.org/licenses/>. # +# # +# ========================================================================== # + + +import os +import serial + + +class TTY: + def __init__(self, device_path: str, speed: int, read_timeout: float) -> None: + self.__tty = serial.Serial(device_path, speed, timeout=read_timeout) + self.__device_path = device_path + + def has_device(self) -> bool: + return os.path.exists(self.__device_path) + + def send(self, cmd: list[int]) -> list[int]: + cmd = self.__wrap_cmd(cmd) + self.__tty.write(serial.to_bytes(cmd)) + data = list(self.__tty.read(5)) + if data and data[4]: + more_data = list(self.__tty.read(data[4] + 1)) + data.extend(more_data) + return data + + def check_res(self, res: list[int]) -> bool: + res_sum = res.pop() + return (self.__checksum(res) == res_sum) + + def __wrap_cmd(self, cmd: list[int]) -> list[int]: + cmd.insert(0, 0xAB) + cmd.insert(0, 0x57) + cmd.append(self.__checksum(cmd)) + return cmd + + def __checksum(self, cmd: list[int]) -> int: + return sum(cmd) % 256 + + +def get_info() -> list[int]: + return [0x00, 0x01, 0x00] + +# RESET = [0x00,0x0F,0x00] +# GET_INFO = [0x00,0x01,0x00] |