diff options
Diffstat (limited to 'kvmd/plugins')
-rw-r--r-- | kvmd/plugins/hid/serial.py | 127 |
1 files changed, 48 insertions, 79 deletions
diff --git a/kvmd/plugins/hid/serial.py b/kvmd/plugins/hid/serial.py index 6f8132e4..4a4346dc 100644 --- a/kvmd/plugins/hid/serial.py +++ b/kvmd/plugins/hid/serial.py @@ -32,7 +32,6 @@ import errno import time from typing import Dict -from typing import Set from typing import AsyncGenerator import serial @@ -64,67 +63,68 @@ class _BaseEvent: raise NotImplementedError [email protected](frozen=True) # pylint: disable=abstract-method -class _BoolEvent(_BaseEvent): - name: str - state: bool - - [email protected](frozen=True) # pylint: disable=abstract-method -class _IntEvent(_BaseEvent): - x: int - y: int +class _ClearEvent(_BaseEvent): + def make_command(self) -> bytes: + return b"\x10\x00\x00\x00\x00" @dataclasses.dataclass(frozen=True) -class _KeyEvent(_BoolEvent): +class _KeyEvent(_BaseEvent): + name: str + state: bool + def __post_init__(self) -> None: assert self.name in keymap.KEYMAP def make_command(self) -> bytes: code = keymap.KEYMAP[self.name].serial.code - key_bytes = bytes([code]) - assert len(key_bytes) == 1, (self, key_bytes, code) - state_bytes = (b"\x01" if self.state else b"\x00") - return b"\x11" + key_bytes + state_bytes + b"\x00\x00" + return struct.pack(">BBBxx", 0x11, code, int(self.state)) @dataclasses.dataclass(frozen=True) -class _MouseMoveEvent(_IntEvent): +class _MouseMoveEvent(_BaseEvent): + to_x: int + to_y: int + def __post_init__(self) -> None: - assert -32768 <= self.x <= 32767 - assert -32768 <= self.y <= 32767 + assert -32768 <= self.to_x <= 32767 + assert -32768 <= self.to_y <= 32767 def make_command(self) -> bytes: - return b"\x12" + struct.pack(">hh", self.x, self.y) + return struct.pack(">Bhh", 0x12, self.to_x, self.to_y) @dataclasses.dataclass(frozen=True) -class _MouseButtonEvent(_BoolEvent): +class _MouseButtonEvent(_BaseEvent): + name: str + state: bool + def __post_init__(self) -> None: assert self.name in ["left", "right", "middle"] def make_command(self) -> bytes: - code = 0 - if self.name == "left": - code = (0b10000000 | (0b00001000 if self.state else 0)) - elif self.name == "right": - code = (0b01000000 | (0b00000100 if self.state else 0)) - elif self.name == "middle": - code = (0b00100000 | (0b00000010 if self.state else 0)) - assert code, self - return b"\x13" + bytes([code]) + b"\x00\x00\x00" + (code, state_pressed) = { + "left": (0b10000000, 0b00001000), + "right": (0b01000000, 0b00000100), + "middle": (0b00100000, 0b00000010), + }[self.name] + if self.state: + code |= state_pressed + return struct.pack(">BBxxx", 0x13, code) @dataclasses.dataclass(frozen=True) -class _MouseWheelEvent(_IntEvent): +class _MouseWheelEvent(_BaseEvent): + delta_x: int + delta_y: int + def __post_init__(self) -> None: - assert -127 <= self.x <= 127 - assert -127 <= self.y <= 127 + assert -127 <= self.delta_x <= 127 + assert -127 <= self.delta_y <= 127 def make_command(self) -> bytes: # Горизонтальная прокрутка пока не поддерживается - return b"\x14\x00" + struct.pack(">b", self.y) + b"\x00\x00" + return struct.pack(">Bxbxx", 0x14, self.delta_y) # ===== @@ -162,10 +162,7 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst self.__lock = asyncio.Lock() - self.__pressed_keys: Set[str] = set() - self.__pressed_mouse_buttons: Set[str] = set() self.__events_queue: multiprocessing.queues.Queue = multiprocessing.Queue() - self.__online_shared = multiprocessing.Value("i", 1) self.__stop_event = multiprocessing.Event() @@ -232,71 +229,42 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst async with self.__lock: try: if self.is_alive(): - self.__unsafe_clear_events() logger.info("Stopping HID daemon ...") self.__stop_event.set() - else: - logger.warning("Emergency cleaning up HID events ...") - self.__emergency_clear_events() if self.exitcode is not None: self.join() + if os.path.exists(self.__device_path): + get_logger().info("Clearing HID events ...") + try: + with self.__get_serial() as tty: + self.__process_command(tty, b"\x10\x00\x00\x00\x00") + except Exception: + logger.exception("Can't clear HID events") finally: gpio.write(self.__reset_pin, False) # ===== async def send_key_event(self, key: str, state: bool) -> None: - await self.__send_bool_event(_KeyEvent(key, state), self.__pressed_keys) + await self.__queue_event(_KeyEvent(key, state)) async def send_mouse_move_event(self, to_x: int, to_y: int) -> None: - await self.__send_int_event(_MouseMoveEvent(to_x, to_y)) + await self.__queue_event(_MouseMoveEvent(to_x, to_y)) async def send_mouse_button_event(self, button: str, state: bool) -> None: - await self.__send_bool_event(_MouseButtonEvent(button, state), self.__pressed_mouse_buttons) + await self.__queue_event(_MouseButtonEvent(button, state)) async def send_mouse_wheel_event(self, delta_x: int, delta_y: int) -> None: - await self.__send_int_event(_MouseWheelEvent(delta_x, delta_y)) + await self.__queue_event(_MouseWheelEvent(delta_x, delta_y)) async def clear_events(self) -> None: - if not self.__stop_event.is_set(): - async with self.__lock: - self.__unsafe_clear_events() + await self.__queue_event(_ClearEvent()) - async def __send_bool_event(self, event: _BoolEvent, pressed: Set[str]) -> None: - if not self.__stop_event.is_set(): - async with self.__lock: - if ( - (event.state and (event.name not in pressed)) # Если еще не нажато - or (not event.state and (event.name in pressed)) # ... Или еще не отжато - ): - if event.state: - pressed.add(event.name) - else: - pressed.remove(event.name) - self.__events_queue.put(event) - - async def __send_int_event(self, event: _IntEvent) -> None: + async def __queue_event(self, event: _BaseEvent) -> None: if not self.__stop_event.is_set(): async with self.__lock: self.__events_queue.put(event) - def __unsafe_clear_events(self) -> None: - for (cls, pressed) in [ - (_MouseButtonEvent, self.__pressed_mouse_buttons), - (_KeyEvent, self.__pressed_keys), - ]: - for name in pressed: - self.__events_queue.put(cls(name, False)) - pressed.clear() - - def __emergency_clear_events(self) -> None: - if os.path.exists(self.__device_path): - try: - with self.__get_serial() as tty: - self.__process_command(tty, b"\x10\x00\x00\x00\x00") - except Exception: - get_logger().exception("Can't execute emergency clear HID events") - def run(self) -> None: # pylint: disable=too-many-branches logger = get_logger(0) @@ -388,6 +356,7 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst logger.error("Invalid response from HID: request=%r; code=0x%x", request, code) common_retries -= 1 + error_occured = True self.__online_shared.value = 0 |