diff options
Diffstat (limited to 'kvmd/plugins/hid/otg')
-rw-r--r-- | kvmd/plugins/hid/otg/device.py | 20 | ||||
-rw-r--r-- | kvmd/plugins/hid/otg/keyboard.py | 31 | ||||
-rw-r--r-- | kvmd/plugins/hid/otg/mouse.py | 36 |
3 files changed, 45 insertions, 42 deletions
diff --git a/kvmd/plugins/hid/otg/device.py b/kvmd/plugins/hid/otg/device.py index 625aeef5..43d324b1 100644 --- a/kvmd/plugins/hid/otg/device.py +++ b/kvmd/plugins/hid/otg/device.py @@ -23,7 +23,6 @@ import os import select import multiprocessing -import multiprocessing.queues import queue import errno import time @@ -32,6 +31,7 @@ from typing import Dict from ....logging import get_logger +from .... import tools from .... import aiomulti from .... import aioproc @@ -76,7 +76,7 @@ class BaseDeviceProcess(multiprocessing.Process): # pylint: disable=too-many-in self.__noop = noop self.__fd = -1 - self.__events_queue: multiprocessing.queues.Queue = multiprocessing.Queue() + self.__events_queue: "multiprocessing.Queue[BaseEvent]" = multiprocessing.Queue() self.__state_flags = aiomulti.AioSharedFlags({"online": True, **initial_state}, notifier) self.__stop_event = multiprocessing.Event() @@ -93,16 +93,18 @@ class BaseDeviceProcess(multiprocessing.Process): # pylint: disable=too-many-in if self.__ensure_device(): # Check device and process reports if needed self.__read_all_reports() try: - event: BaseEvent = self.__events_queue.get(timeout=0.1) + event = self.__events_queue.get(timeout=0.1) except queue.Empty: if not self.__udc.can_operate(): + self._clear_queue() self.__close_device() else: - self._process_event(event) + if not self._process_event(event): + self._clear_queue() except Exception: logger.exception("Unexpected HID-%s error", self.__name) + self._clear_queue() self.__close_device() - finally: time.sleep(1) self.__close_device() @@ -112,7 +114,7 @@ class BaseDeviceProcess(multiprocessing.Process): # pylint: disable=too-many-in # ===== - def _process_event(self, event: BaseEvent) -> None: + def _process_event(self, event: BaseEvent) -> bool: raise NotImplementedError def _process_read_report(self, report: bytes) -> None: @@ -135,11 +137,7 @@ class BaseDeviceProcess(multiprocessing.Process): # pylint: disable=too-many-in self.__events_queue.put_nowait(event) def _clear_queue(self) -> None: - while not self.__events_queue.empty(): - try: - self.__events_queue.get_nowait() - except queue.Empty: - break + tools.clear_queue(self.__events_queue) def _ensure_write(self, report: bytes, reopen: bool=False, close: bool=False) -> bool: if reopen: diff --git a/kvmd/plugins/hid/otg/keyboard.py b/kvmd/plugins/hid/otg/keyboard.py index e0243798..c497f7c8 100644 --- a/kvmd/plugins/hid/otg/keyboard.py +++ b/kvmd/plugins/hid/otg/keyboard.py @@ -112,47 +112,50 @@ class KeyboardProcess(BaseDeviceProcess): # ===== - def _process_event(self, event: BaseEvent) -> None: + def _process_event(self, event: BaseEvent) -> bool: if isinstance(event, _ClearEvent): - self.__process_clear_event() + return self.__process_clear_event() elif isinstance(event, _ResetEvent): - self.__process_clear_event(reopen=True) + return self.__process_clear_event(reopen=True) elif isinstance(event, _ModifierEvent): - self.__process_modifier_event(event) + return self.__process_modifier_event(event) elif isinstance(event, _KeyEvent): - self.__process_key_event(event) + return self.__process_key_event(event) + raise RuntimeError(f"Not implemented event: {event}") - def __process_clear_event(self, reopen: bool=False) -> None: + def __process_clear_event(self, reopen: bool=False) -> bool: self.__clear_modifiers() self.__clear_keys() - self.__send_current_state(reopen=reopen) + return self.__send_current_state(reopen=reopen) - def __process_modifier_event(self, event: _ModifierEvent) -> None: + def __process_modifier_event(self, event: _ModifierEvent) -> bool: if event.modifier in self.__pressed_modifiers: # Ранее нажатый модификатор отжимаем self.__pressed_modifiers.remove(event.modifier) if not self.__send_current_state(): - return + return False if event.state: # Нажимаем если нужно self.__pressed_modifiers.add(event.modifier) - self.__send_current_state() + return self.__send_current_state() + return True - def __process_key_event(self, event: _KeyEvent) -> None: + def __process_key_event(self, event: _KeyEvent) -> bool: if event.key in self.__pressed_keys: # Ранее нажатую клавишу отжимаем self.__pressed_keys[self.__pressed_keys.index(event.key)] = None if not self.__send_current_state(): - return + return False elif event.state and None not in self.__pressed_keys: # Если нужно нажать что-то новое, но свободных слотов нет - отжимаем всё self.__clear_keys() if not self.__send_current_state(): - return + return False if event.state: # Нажимаем если нужно self.__pressed_keys[self.__pressed_keys.index(None)] = event.key - self.__send_current_state() + return self.__send_current_state() + return True # ===== diff --git a/kvmd/plugins/hid/otg/mouse.py b/kvmd/plugins/hid/otg/mouse.py index adb2469f..3e0cf0e8 100644 --- a/kvmd/plugins/hid/otg/mouse.py +++ b/kvmd/plugins/hid/otg/mouse.py @@ -110,44 +110,46 @@ class MouseProcess(BaseDeviceProcess): # ===== - def _process_event(self, event: BaseEvent) -> None: + def _process_event(self, event: BaseEvent) -> bool: if isinstance(event, _ClearEvent): - self.__process_clear_event() + return self.__process_clear_event() elif isinstance(event, _ResetEvent): - self.__process_clear_event(reopen=True) + return self.__process_clear_event(reopen=True) elif isinstance(event, _ButtonEvent): - self.__process_button_event(event) + return self.__process_button_event(event) elif isinstance(event, _MoveEvent): - self.__process_move_event(event) + return self.__process_move_event(event) elif isinstance(event, _WheelEvent): - self.__process_wheel_event(event) + return self.__process_wheel_event(event) + raise RuntimeError(f"Not implemented event: {event}") - def __process_clear_event(self, reopen: bool=False) -> None: + def __process_clear_event(self, reopen: bool=False) -> bool: self.__clear_state() - self.__send_current_state(0, 0, reopen=reopen) + return self.__send_current_state(reopen=reopen) - def __process_button_event(self, event: _ButtonEvent) -> None: + def __process_button_event(self, event: _ButtonEvent) -> bool: if event.code & self.__pressed_buttons: # Ранее нажатую кнопку отжимаем self.__pressed_buttons &= ~event.code - if not self.__send_current_state(0, 0): - return + if not self.__send_current_state(): + return False if event.state: # Нажимаем если нужно self.__pressed_buttons |= event.code - self.__send_current_state(0, 0) + return self.__send_current_state() + return True - def __process_move_event(self, event: _MoveEvent) -> None: + def __process_move_event(self, event: _MoveEvent) -> bool: self.__x = event.to_x self.__y = event.to_y - self.__send_current_state(0, 0) + return self.__send_current_state() - def __process_wheel_event(self, event: _WheelEvent) -> None: - self.__send_current_state(event.delta_x, event.delta_y) + def __process_wheel_event(self, event: _WheelEvent) -> bool: + return self.__send_current_state(event.delta_x, event.delta_y) # ===== - def __send_current_state(self, delta_x: int, delta_y: int, reopen: bool=False) -> bool: + def __send_current_state(self, delta_x: int=0, delta_y: int=0, reopen: bool=False) -> bool: report = self.__make_report( buttons=self.__pressed_buttons, to_x=self.__x, |