diff options
author | Maxim Devaev <[email protected]> | 2023-07-31 01:55:05 +0300 |
---|---|---|
committer | Maxim Devaev <[email protected]> | 2023-07-31 01:55:05 +0300 |
commit | cf44668af998b114fbddc8fa41b47193b606c064 (patch) | |
tree | fa34fad3601f55303503a750187e808e4e1a0654 | |
parent | e93a5c968f2846d1cf4673e2633b1ecebb5919e3 (diff) | |
parent | 8e2a5284183977d15c537ae5d26efd8fd7833cd6 (diff) |
Merge branch 'ch9329'
-rw-r--r-- | kvmd/plugins/hid/ch9329/__init__.py | 227 | ||||
-rw-r--r-- | kvmd/plugins/hid/ch9329/chip.py | 82 | ||||
-rw-r--r-- | kvmd/plugins/hid/ch9329/keyboard.py | 68 | ||||
-rw-r--r-- | kvmd/plugins/hid/ch9329/mouse.py | 115 | ||||
-rwxr-xr-x | setup.py | 1 |
5 files changed, 493 insertions, 0 deletions
diff --git a/kvmd/plugins/hid/ch9329/__init__.py b/kvmd/plugins/hid/ch9329/__init__.py new file mode 100644 index 00000000..e7f518d5 --- /dev/null +++ b/kvmd/plugins/hid/ch9329/__init__.py @@ -0,0 +1,227 @@ +# ========================================================================== # +# # +# KVMD - The main PiKVM daemon. # +# # +# Copyright (C) 2018-2022 Maxim Devaev <[email protected]> # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see <https://www.gnu.org/licenses/>. # +# # +# ========================================================================== # + + +import multiprocessing +import queue +import time + +from typing import Iterable +from typing import AsyncGenerator + +from ....logging import get_logger + +from .... import tools +from .... import aiotools +from .... import aiomulti +from .... import aioproc + +from ....yamlconf import Option + +from ....validators.basic import valid_float_f01 +from ....validators.os import valid_abs_path +from ....validators.hw import valid_tty_speed + +from .. import BaseHid + +from .chip import ChipResponseError +from .chip import ChipConnection +from .chip import Chip +from .mouse import Mouse +from .keyboard import Keyboard + + +# ===== +class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-instance-attributes + def __init__( # pylint: disable=too-many-arguments,super-init-not-called + self, + device_path: str, + speed: int, + read_timeout: float, + ) -> None: + + multiprocessing.Process.__init__(self, daemon=True) + + self.__device_path = device_path + self.__speed = speed + self.__read_timeout = read_timeout + + self.__reset_required_event = multiprocessing.Event() + self.__cmd_queue: "multiprocessing.Queue[bytes]" = multiprocessing.Queue() + + self.__notifier = aiomulti.AioProcessNotifier() + self.__state_flags = aiomulti.AioSharedFlags({ + "online": 0, + "busy": 0, + "status": 0, + }, self.__notifier, type=int) + + self.__stop_event = multiprocessing.Event() + self.__chip = Chip(device_path, speed, read_timeout) + self.__keyboard = Keyboard() + self.__mouse = Mouse() + + @classmethod + def get_plugin_options(cls) -> dict: + return { + "device": Option("/dev/kvmd-hid", type=valid_abs_path, unpack_as="device_path"), + "speed": Option(9600, type=valid_tty_speed), + "read_timeout": Option(0.3, type=valid_float_f01), + } + + def sysprep(self) -> None: + get_logger(0).info("Starting HID daemon ...") + self.start() + + async def get_state(self) -> dict: + state = await self.__state_flags.get() + absolute = self.__mouse.is_absolute() + leds = await self.__keyboard.get_leds() + return { + "online": state["online"], + "busy": False, + "connected": None, + "keyboard": { + "online": state["online"], + "leds": leds, + "outputs": {"available": [], "active": ""}, + }, + "mouse": { + "online": state["online"], + "absolute": absolute, + "outputs": { + "available": ["usb", "usb_rel"], + "active": ("usb" if absolute else "usb_rel"), + }, + }, + } + + async def poll_state(self) -> AsyncGenerator[dict, None]: + prev_state: dict = {} + while True: + state = await self.get_state() + if state != prev_state: + yield state + prev_state = state + await self.__notifier.wait() + + async def reset(self) -> None: + self.__reset_required_event.set() + + @aiotools.atomic_fg + async def cleanup(self) -> None: + if self.is_alive(): + get_logger(0).info("Stopping HID daemon ...") + self.__stop_event.set() + if self.is_alive() or self.exitcode is not None: + self.join() + + # ===== + + def send_key_events(self, keys: Iterable[tuple[str, bool]]) -> None: + for (key, state) in keys: + self.__queue_cmd(self.__keyboard.process_key(key, state)) + + def send_mouse_button_event(self, button: str, state: bool) -> None: + self.__queue_cmd(self.__mouse.process_button(button, state)) + + def send_mouse_move_event(self, to_x: int, to_y: int) -> None: + self.__queue_cmd(self.__mouse.process_move(to_x, to_y)) + + def send_mouse_wheel_event(self, delta_x: int, delta_y: int) -> None: + self.__queue_cmd(self.__mouse.process_wheel(delta_x, delta_y)) + + def send_mouse_relative_event(self, delta_x: int, delta_y: int) -> None: + self.__queue_cmd(self.__mouse.process_relative(delta_x, delta_y)) + + def set_params(self, keyboard_output: (str | None)=None, mouse_output: (str | None)=None) -> None: + if mouse_output is not None: + get_logger(0).info("HID : mouse output = %s", mouse_output) + self.__mouse.set_absolute(mouse_output == "usb") + self.__notifier.notify() + + def set_connected(self, connected: bool) -> None: + pass + + def clear_events(self) -> None: + tools.clear_queue(self.__cmd_queue) + + def __queue_cmd(self, cmd: bytes, clear: bool=False) -> None: + if not self.__stop_event.is_set(): + if clear: + # FIXME: Если очистка производится со стороны процесса хида, то возможна гонка между + # очисткой и добавлением нового события. Неприятно, но не смертельно. + # Починить блокировкой после перехода на асинхронные очереди. + tools.clear_queue(self.__cmd_queue) + self.__cmd_queue.put_nowait(cmd) + + def run(self) -> None: # pylint: disable=too-many-branches + logger = aioproc.settle("HID", "hid") + while not self.__stop_event.is_set(): + try: + self.__hid_loop() + except Exception: + logger.exception("Unexpected error in the run loop") + time.sleep(1) + + def __hid_loop(self) -> None: + while not self.__stop_event.is_set(): + try: + with self.__chip.connected() as conn: + while not (self.__stop_event.is_set() and self.__cmd_queue.qsize() == 0): + if self.__reset_required_event.is_set(): + try: + self.__set_state_busy(True) + # self.__process_request(conn, RESET) + finally: + self.__reset_required_event.clear() + try: + cmd = self.__cmd_queue.get(timeout=0.1) + # get_logger(0).info(f"HID : cmd = {cmd}") + except queue.Empty: + self.__process_cmd(conn, b"") + else: + self.__process_cmd(conn, cmd) + except Exception: + self.clear_events() + get_logger(0).exception("Unexpected error in the HID loop") + time.sleep(2) + + def __process_cmd(self, conn: ChipConnection, cmd: bytes) -> bool: # pylint: disable=too-many-branches + try: + led_byte = conn.xfer(cmd) + except ChipResponseError as err: + self.__set_state_online(False) + get_logger(0).info(err) + time.sleep(2) + else: + if led_byte >= 0: + self.__keyboard.set_leds(led_byte) + self.__notifier.notify() + self.__set_state_online(True) + return True + return False + + def __set_state_online(self, online: bool) -> None: + self.__state_flags.update(online=int(online)) + + def __set_state_busy(self, busy: bool) -> None: + self.__state_flags.update(busy=int(busy)) diff --git a/kvmd/plugins/hid/ch9329/chip.py b/kvmd/plugins/hid/ch9329/chip.py new file mode 100644 index 00000000..b8631ec0 --- /dev/null +++ b/kvmd/plugins/hid/ch9329/chip.py @@ -0,0 +1,82 @@ +# ========================================================================== # +# # +# KVMD - The main PiKVM daemon. # +# # +# Copyright (C) 2018-2022 Maxim Devaev <[email protected]> # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see <https://www.gnu.org/licenses/>. # +# # +# ========================================================================== # + + +import serial +import contextlib + +from typing import Generator + + +# ===== +class ChipResponseError(Exception): + pass + + +# ===== +class ChipConnection: + def __init__(self, tty: serial.Serial) -> None: + self.__tty = tty + + def xfer(self, cmd: bytes) -> int: + self.__send(cmd) + return self.__recv() + + def __send(self, cmd: bytes) -> None: + # RESET = [0x00,0x0F,0x00] + # GET_INFO = [0x00,0x01,0x00] + if len(cmd) == 0: + cmd = b"\x00\x01\x00" + cmd = b"\x57\xAB" + cmd + cmd += self.__make_checksum(cmd).to_bytes(1, "big") + self.__tty.write(cmd) + + def __recv(self) -> int: + data = self.__tty.read(5) + if len(data) < 5: + raise ChipResponseError("Too short response, HID might be disconnected") + + if data and data[4]: + data += self.__tty.read(data[4] + 1) + + if self.__make_checksum(data[:-1]) != data[-1]: + raise ChipResponseError("Invalid response checksum") + + if data[4] == 1 and data[5] != 0: + raise ChipResponseError(f"Response error code = {data[5]!r}") + + # led_byte (info) response + return (data[7] if data[3] == 0x81 else -1) + + def __make_checksum(self, cmd: bytes) -> int: + return (sum(cmd) % 256) + + +class Chip: + def __init__(self, device_path: str, speed: int, read_timeout: float) -> None: + self.__device_path = device_path + self.__speed = speed + self.__read_timeout = read_timeout + + @contextlib.contextmanager + def connected(self) -> Generator[ChipConnection, None, None]: # type: ignore + with serial.Serial(self.__device_path, self.__speed, timeout=self.__read_timeout) as tty: + yield ChipConnection(tty) diff --git a/kvmd/plugins/hid/ch9329/keyboard.py b/kvmd/plugins/hid/ch9329/keyboard.py new file mode 100644 index 00000000..9aa16d9d --- /dev/null +++ b/kvmd/plugins/hid/ch9329/keyboard.py @@ -0,0 +1,68 @@ +# ========================================================================== # +# # +# KVMD - The main PiKVM daemon. # +# # +# Copyright (C) 2018-2022 Maxim Devaev <[email protected]> # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see <https://www.gnu.org/licenses/>. # +# # +# ========================================================================== # + + +from .... import aiomulti + +from ....keyboard.mappings import KEYMAP + + +# ===== +class Keyboard: + def __init__(self) -> None: + self.__leds = aiomulti.AioSharedFlags({ + "num": False, + "caps": False, + "scroll": False, + }, aiomulti.AioProcessNotifier(), bool) + self.__modifiers = 0 + self.__active_keys: list[int] = [] + + def set_leds(self, led_byte: int) -> None: + self.__leds.update( + num=bool(led_byte & 1), + caps=bool((led_byte >> 1) & 1), + scroll=bool((led_byte >> 2) & 1), + ) + + async def get_leds(self) -> dict[str, bool]: + return (await self.__leds.get()) + + def process_key(self, key: str, state: bool) -> bytes: + code = KEYMAP[key].usb.code + is_modifier = KEYMAP[key].usb.is_modifier + if state: + if is_modifier: + self.__modifiers |= code + elif len(self.__active_keys) < 6 and code not in self.__active_keys: + self.__active_keys.append(code) + else: + if is_modifier: + self.__modifiers &= ~code + elif code in self.__active_keys: + self.__active_keys.remove(code) + cmd = [ + 0, 0x02, 0x08, self.__modifiers, 0, + 0, 0, 0, 0, 0, 0, + ] + for (index, code) in enumerate(self.__active_keys): + cmd[index + 5] = code + return bytes(cmd) diff --git a/kvmd/plugins/hid/ch9329/mouse.py b/kvmd/plugins/hid/ch9329/mouse.py new file mode 100644 index 00000000..0a0cfbcc --- /dev/null +++ b/kvmd/plugins/hid/ch9329/mouse.py @@ -0,0 +1,115 @@ +# ========================================================================== # +# # +# KVMD - The main PiKVM daemon. # +# # +# Copyright (C) 2018-2022 Maxim Devaev <[email protected]> # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see <https://www.gnu.org/licenses/>. # +# # +# ========================================================================== # + + +import math + +from ....mouse import MouseRange + + +# ===== +class Mouse: # pylint: disable=too-many-instance-attributes + def __init__(self) -> None: + self.__absolute = True + self.__buttons = 0 + self.__to_x = (0, 0) + self.__to_y = (0, 0) + self.__delta_x = 0 + self.__delta_y = 0 + self.__wheel_y = 0 + + def set_absolute(self, flag: bool) -> None: + self.__absolute = flag + + def is_absolute(self) -> bool: + return self.__absolute + + def process_button(self, button: str, state: bool) -> bytes: + code = 0x00 + match button: + case "left": + code = 0x01 + case "right": + code = 0x02 + case "middle": + code = 0x04 + case "up": + code = 0x08 + case "down": + code = 0x10 + if code: + if state: + self.__buttons |= code + else: + self.__buttons &= ~code + self.__wheel_y = 0 + if not self.__absolute: + return self.__make_relative_cmd() + else: + return self.__make_absolute_cmd() + + def process_move(self, to_x: int, to_y: int) -> bytes: + self.__to_x = self.__fix_absolute(to_x) + self.__to_y = self.__fix_absolute(to_y) + self.__wheel_y = 0 + return self.__make_absolute_cmd() + + def __fix_absolute(self, value: int) -> tuple[int, int]: + assert MouseRange.MIN <= value <= MouseRange.MAX + to_fixed = math.ceil(MouseRange.remap(value, 0, MouseRange.MAX) / 8) + return (to_fixed >> 8, to_fixed & 0xFF) + + def process_wheel(self, delta_x: int, delta_y: int) -> bytes: + _ = delta_x + assert -127 <= delta_y <= 127 + self.__wheel_y = (1 if delta_y > 0 else 255) + if not self.__absolute: + return self.__make_relative_cmd() + else: + return self.__make_absolute_cmd() + + def process_relative(self, delta_x: int, delta_y: int) -> bytes: + self.__delta_x = self.__fix_relative(delta_x) + self.__delta_y = self.__fix_relative(delta_y) + self.__wheel_y = 0 + return self.__make_relative_cmd() + + def __make_absolute_cmd(self) -> bytes: + return bytes([ + 0, 0x04, 0x07, 0x02, + self.__buttons, + self.__to_x[1], self.__to_x[0], + self.__to_y[1], self.__to_y[0], + self.__wheel_y, + ]) + + def __make_relative_cmd(self) -> bytes: + return bytes([ + 0, 0x05, 0x05, 0x01, + self.__buttons, + self.__delta_x, self.__delta_y, + self.__wheel_y, + ]) + + def __fix_relative(self, value: int) -> int: + assert -127 <= value <= 127 + value = math.ceil(value / 3) + return (value if value >= 0 else (255 + value)) @@ -75,6 +75,7 @@ def main() -> None: "kvmd.plugins.hid._mcu", "kvmd.plugins.hid.otg", "kvmd.plugins.hid.bt", + "kvmd.plugins.hid.ch9329", "kvmd.plugins.atx", "kvmd.plugins.msd", "kvmd.plugins.msd.otg", |