diff options
Diffstat (limited to 'kvmd/plugins/hid/_mcu/__init__.py')
-rw-r--r-- | kvmd/plugins/hid/_mcu/__init__.py | 251 |
1 files changed, 92 insertions, 159 deletions
diff --git a/kvmd/plugins/hid/_mcu/__init__.py b/kvmd/plugins/hid/_mcu/__init__.py index b46c876d..6e0f902e 100644 --- a/kvmd/plugins/hid/_mcu/__init__.py +++ b/kvmd/plugins/hid/_mcu/__init__.py @@ -22,10 +22,8 @@ import os import multiprocessing -import dataclasses import contextlib import queue -import struct import time from typing import Tuple @@ -37,8 +35,6 @@ from typing import AsyncGenerator from ....logging import get_logger -from ....keyboard.mappings import KEYMAP - from .... import tools from .... import aiotools from .... import aiomulti @@ -56,6 +52,22 @@ from .. import BaseHid from .gpio import Gpio +from .proto import REQUEST_PING +from .proto import REQUEST_REPEAT +from .proto import RESPONSE_LEGACY_OK +from .proto import BaseEvent +from .proto import SetKeyboardOutputEvent +from .proto import SetMouseOutputEvent +from .proto import ClearEvent +from .proto import KeyEvent +from .proto import MouseButtonEvent +from .proto import MouseMoveEvent +from .proto import MouseRelativeEvent +from .proto import MouseWheelEvent +from .proto import get_active_keyboard +from .proto import get_active_mouse +from .proto import check_response + # ===== class _RequestError(Exception): @@ -73,84 +85,6 @@ class _TempRequestError(_RequestError): # ===== -class _BaseEvent: - def make_command(self) -> bytes: - raise NotImplementedError - - -class _ClearEvent(_BaseEvent): - def make_command(self) -> bytes: - return b"\x10\x00\x00\x00\x00" - - [email protected](frozen=True) -class _KeyEvent(_BaseEvent): - name: str - state: bool - - def __post_init__(self) -> None: - assert self.name in KEYMAP - - def make_command(self) -> bytes: - code = KEYMAP[self.name].mcu.code - return struct.pack(">BBBxx", 0x11, code, int(self.state)) - - [email protected](frozen=True) -class _MouseButtonEvent(_BaseEvent): - name: str - state: bool - - def __post_init__(self) -> None: - assert self.name in ["left", "right", "middle", "up", "down"] - - def make_command(self) -> bytes: - (code, state_pressed, is_main) = { - "left": (0b10000000, 0b00001000, True), - "right": (0b01000000, 0b00000100, True), - "middle": (0b00100000, 0b00000010, True), - "up": (0b10000000, 0b00001000, False), # Back - "down": (0b01000000, 0b00000100, False), # Forward - }[self.name] - if self.state: - code |= state_pressed - if is_main: - main_code = code - extra_code = 0 - else: - main_code = 0 - extra_code = code - return struct.pack(">BBBxx", 0x13, main_code, extra_code) - - [email protected](frozen=True) -class _MouseMoveEvent(_BaseEvent): - to_x: int - to_y: int - - def __post_init__(self) -> None: - assert -32768 <= self.to_x <= 32767 - assert -32768 <= self.to_y <= 32767 - - def make_command(self) -> bytes: - return struct.pack(">Bhh", 0x12, self.to_x, self.to_y) - - [email protected](frozen=True) -class _MouseWheelEvent(_BaseEvent): - delta_x: int - delta_y: int - - def __post_init__(self) -> None: - assert -127 <= self.delta_x <= 127 - assert -127 <= self.delta_y <= 127 - - def make_command(self) -> bytes: - # Горизонтальная прокрутка пока не поддерживается - return struct.pack(">Bxbxx", 0x14, self.delta_y) - - -# ===== class BasePhyConnection: def send(self, request: bytes) -> bytes: raise NotImplementedError @@ -192,16 +126,13 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many- self.__phy = phy self.__gpio = Gpio(reset_pin, reset_inverted, reset_delay) - self.__events_queue: "multiprocessing.Queue[_BaseEvent]" = multiprocessing.Queue() + self.__events_queue: "multiprocessing.Queue[BaseEvent]" = multiprocessing.Queue() self.__notifier = aiomulti.AioProcessNotifier() self.__state_flags = aiomulti.AioSharedFlags({ - "keyboard_online": True, - "mouse_online": True, - "caps": False, - "scroll": False, - "num": False, - }, self.__notifier) + "online": 0, + "status": 0, + }, self.__notifier, type=int) self.__stop_event = multiprocessing.Event() @@ -226,19 +157,51 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many- async def get_state(self) -> Dict: state = await self.__state_flags.get() + online = bool(state["online"]) + pong = (state["status"] >> 16) & 0xFF + outputs = (state["status"] >> 8) & 0xFF + features = state["status"] & 0xFF + + absolute = True + active_mouse = get_active_mouse(outputs) + if online and active_mouse in ["usb_rel", "ps2"]: + absolute = False + + keyboard_outputs: Dict = {"available": {}, "active": ""} + mouse_outputs: Dict = {"available": {}, "active": ""} + + if outputs & 0b10000000: # Dynamic + if features & 0b00000001: # USB + keyboard_outputs["available"]["usb"] = {"name": "USB"} + mouse_outputs["available"]["usb"] = {"name": "USB", "absolute": True} + mouse_outputs["available"]["usb_rel"] = {"name": "USB Relative", "absolute": False} + + if features & 0b00000010: # PS/2 + keyboard_outputs["available"]["ps2"] = {"name": "PS/2"} + mouse_outputs["available"]["ps2"] = {"name": "PS/2"} + + active_keyboard = get_active_keyboard(outputs) + if active_keyboard in keyboard_outputs["available"]: + keyboard_outputs["active"] = active_keyboard + + if active_mouse in mouse_outputs["available"]: + mouse_outputs["active"] = active_mouse + return { - "online": (state["keyboard_online"] and state["mouse_online"]), + "online": online, "keyboard": { - "online": state["keyboard_online"], + "online": (online and not (pong & 0b00001000)), "leds": { - "caps": state["caps"], - "scroll": state["scroll"], - "num": state["num"], + "caps": bool(pong & 0b00000001), + "scroll": bool(pong & 0b00000010), + "num": bool(pong & 0b00000100), }, + "outputs": keyboard_outputs, }, "mouse": { - "online": state["mouse_online"], - "absolute": True, + "online": (online and not (pong & 0b00010000)), + "absolute": absolute, + "outputs": mouse_outputs, }, } @@ -268,7 +231,7 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many- get_logger().info("Clearing HID events ...") try: with self.__phy.connected() as conn: - self.__process_command(conn, b"\x10\x00\x00\x00\x00") + self.__process_request(conn, ClearEvent().make_request()) except Exception: logger.exception("Can't clear HID events") finally: @@ -278,30 +241,36 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many- def send_key_events(self, keys: Iterable[Tuple[str, bool]]) -> None: for (key, state) in keys: - self.__queue_event(_KeyEvent(key, state)) + self.__queue_event(KeyEvent(key, state)) def send_mouse_button_event(self, button: str, state: bool) -> None: - self.__queue_event(_MouseButtonEvent(button, state)) + self.__queue_event(MouseButtonEvent(button, state)) def send_mouse_move_event(self, to_x: int, to_y: int) -> None: - self.__queue_event(_MouseMoveEvent(to_x, to_y)) + self.__queue_event(MouseMoveEvent(to_x, to_y)) def send_mouse_relative_event(self, delta_x: int, delta_y: int) -> None: - _ = delta_x # No relative events yet - _ = delta_y + self.__queue_event(MouseRelativeEvent(delta_x, delta_y)) def send_mouse_wheel_event(self, delta_x: int, delta_y: int) -> None: - self.__queue_event(_MouseWheelEvent(delta_x, delta_y)) + self.__queue_event(MouseWheelEvent(delta_x, delta_y)) + + def set_keyboard_output(self, output: str) -> None: + self.__queue_event(SetKeyboardOutputEvent(output), clear=True) + + def set_mouse_output(self, output: str) -> None: + self.__queue_event(SetMouseOutputEvent(output), clear=True) def clear_events(self) -> None: - # FIXME: Если очистка производится со стороны процесса хида, то возможна гонка между - # очисткой и добавлением события _ClearEvent. Неприятно, но не смертельно. - # Починить блокировкой после перехода на асинхронные очереди. - tools.clear_queue(self.__events_queue) - self.__queue_event(_ClearEvent()) + self.__queue_event(ClearEvent(), clear=True) - def __queue_event(self, event: _BaseEvent) -> None: + def __queue_event(self, event: BaseEvent, clear: bool=False) -> None: if not self.__stop_event.is_set(): + if clear: + # FIXME: Если очистка производится со стороны процесса хида, то возможна гонка между + # очисткой и добавлением нового события. Неприятно, но не смертельно. + # Починить блокировкой после перехода на асинхронные очереди. + tools.clear_queue(self.__events_queue) self.__events_queue.put_nowait(event) def run(self) -> None: # pylint: disable=too-many-branches @@ -319,9 +288,9 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many- try: event = self.__events_queue.get(timeout=0.1) except queue.Empty: - self.__process_command(conn, b"\x01\x00\x00\x00\x00") # Ping + self.__process_request(conn, REQUEST_PING) else: - if not self.__process_command(conn, event.make_command()): + if not self.__process_request(conn, event.make_request()): self.clear_events() else: logger.error("Missing HID device") @@ -331,9 +300,6 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many- logger.exception("Unexpected HID error") time.sleep(1) - def __process_command(self, conn: BasePhyConnection, command: bytes) -> bool: - return self.__process_request(conn, self.__make_request(command)) - def __process_request(self, conn: BasePhyConnection, request: bytes) -> bool: # pylint: disable=too-many-branches logger = get_logger() error_messages: List[str] = [] @@ -344,15 +310,14 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many- error_retval = False while common_retries and read_retries: - response = self.__send_request(conn, request) + response = (RESPONSE_LEGACY_OK if self.__noop else conn.send(request)) try: if len(response) < 4: read_retries -= 1 raise _TempRequestError(f"No response from HID: request={request!r}") - assert len(response) == 4, response - if self.__make_crc16(response[-4:-2]) != struct.unpack(">H", response[-2:])[0]: - request = self.__make_request(b"\x02\x00\x00\x00\x00") # Repeat an answer + if not check_response(response): + request = REQUEST_REPEAT raise _TempRequestError("Invalid response CRC; requesting response again ...") code = response[1] @@ -368,9 +333,9 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many- self.__set_state_online(True) return True elif code & 0x80: # Pong/Done with state - self.__set_state_code(code) + self.__set_state_pong(response) return True - raise _TempRequestError(f"Invalid response from HID: request={request!r}; code=0x{code:02X}") + raise _TempRequestError(f"Invalid response from HID: request={request!r}, response=0x{response!r}") except _RequestError as err: common_retries -= 1 @@ -401,42 +366,10 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many- return error_retval def __set_state_online(self, online: bool) -> None: - self.__state_flags.update( - keyboard_online=online, - mouse_online=online, - ) - - def __set_state_code(self, code: int) -> None: - self.__state_flags.update( - keyboard_online=(not (code & 0b00001000)), - mouse_online=(not (code & 0b00010000)), - caps=bool(code & 0b00000001), - scroll=bool(code & 0b00000010), - num=bool(code & 0b00000100), - ) - - def __send_request(self, conn: BasePhyConnection, request: bytes) -> bytes: - if not self.__noop: - response = conn.send(request) - else: - response = b"\x33\x20" # Magic + OK - response += struct.pack(">H", self.__make_crc16(response)) - return response - - def __make_request(self, command: bytes) -> bytes: - request = b"\x33" + command - request += struct.pack(">H", self.__make_crc16(request)) - assert len(request) == 8, (request, command) - return request - - def __make_crc16(self, data: bytes) -> int: - crc = 0xFFFF - for byte in data: - crc = crc ^ byte - for _ in range(8): - if crc & 0x0001 == 0: - crc = crc >> 1 - else: - crc = crc >> 1 - crc = crc ^ 0xA001 - return crc + self.__state_flags.update(online=int(online)) + + def __set_state_pong(self, response: bytes) -> None: + status = response[1] << 16 + if len(response) > 4: + status |= (response[2] << 8) | response[3] + self.__state_flags.update(online=1, status=status) |