diff options
Diffstat (limited to 'kvmd/plugins/hid')
-rw-r--r-- | kvmd/plugins/hid/__init__.py | 44 | ||||
-rw-r--r-- | kvmd/plugins/hid/_mcu/__init__.py | 20 | ||||
-rw-r--r-- | kvmd/plugins/hid/bt/__init__.py | 24 | ||||
-rw-r--r-- | kvmd/plugins/hid/ch9329/__init__.py | 23 | ||||
-rw-r--r-- | kvmd/plugins/hid/otg/__init__.py | 25 |
5 files changed, 127 insertions, 9 deletions
diff --git a/kvmd/plugins/hid/__init__.py b/kvmd/plugins/hid/__init__.py index 2ef7495f..32f7aef1 100644 --- a/kvmd/plugins/hid/__init__.py +++ b/kvmd/plugins/hid/__init__.py @@ -20,6 +20,9 @@ # ========================================================================== # +import asyncio +import time + from typing import Iterable from typing import AsyncGenerator @@ -29,6 +32,11 @@ from .. import get_plugin_class # ===== class BaseHid(BasePlugin): + def __init__(self) -> None: + self.__jiggler_enabled = False + self.__jiggler_absolute = True + self.__activity_ts = 0 + def sysprep(self) -> None: raise NotImplementedError @@ -64,9 +72,14 @@ class BaseHid(BasePlugin): def send_mouse_wheel_event(self, delta_x: int, delta_y: int) -> None: raise NotImplementedError - def set_params(self, keyboard_output: (str | None)=None, mouse_output: (str | None)=None) -> None: - _ = keyboard_output - _ = mouse_output + def set_params( + self, + keyboard_output: (str | None)=None, + mouse_output: (str | None)=None, + jiggler: (bool | None)=None, + ) -> None: + + raise NotImplementedError def set_connected(self, connected: bool) -> None: _ = connected @@ -74,6 +87,31 @@ class BaseHid(BasePlugin): def clear_events(self) -> None: raise NotImplementedError + # ===== + + async def systask(self) -> None: + factor = 1 + while True: + if self.__jiggler_enabled and (self.__activity_ts + 60 < int(time.monotonic())): + if self.__jiggler_absolute: + self.send_mouse_move_event(100 * factor, 100 * factor) + else: + self.send_mouse_relative_event(10 * factor, 10 * factor) + factor *= -1 + await asyncio.sleep(1) + + def _bump_activity(self) -> None: + self.__activity_ts = int(time.monotonic()) + + def _set_jiggler_absolute(self, absolute: bool) -> None: + self.__jiggler_absolute = absolute + + def _set_jiggler_enabled(self, enabled: bool) -> None: + self.__jiggler_enabled = enabled + + def _get_jiggler_state(self) -> dict: + return {"enabled": self.__jiggler_enabled} + # ===== def get_hid_class(name: str) -> type[BaseHid]: diff --git a/kvmd/plugins/hid/_mcu/__init__.py b/kvmd/plugins/hid/_mcu/__init__.py index f475a880..bdd2ee32 100644 --- a/kvmd/plugins/hid/_mcu/__init__.py +++ b/kvmd/plugins/hid/_mcu/__init__.py @@ -117,6 +117,7 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many- **gpio_kwargs: Any, ) -> None: + BaseHid.__init__(self) multiprocessing.Process.__init__(self, daemon=True) self.__read_retries = read_retries @@ -177,6 +178,7 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many- active_mouse = get_active_mouse(outputs1) if online and active_mouse in ["usb_rel", "ps2"]: absolute = False + self._set_jiggler_absolute(absolute) keyboard_outputs: dict = {"available": [], "active": ""} mouse_outputs: dict = {"available": [], "active": ""} @@ -224,6 +226,7 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many- "absolute": absolute, "outputs": mouse_outputs, }, + "jiggler": self._get_jiggler_state(), } async def poll_state(self) -> AsyncGenerator[dict, None]: @@ -251,20 +254,31 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many- def send_key_events(self, keys: Iterable[tuple[str, bool]]) -> None: for (key, state) in keys: self.__queue_event(KeyEvent(key, state)) + self._bump_activity() def send_mouse_button_event(self, button: str, state: bool) -> None: self.__queue_event(MouseButtonEvent(button, state)) + self._bump_activity() def send_mouse_move_event(self, to_x: int, to_y: int) -> None: self.__queue_event(MouseMoveEvent(to_x, to_y)) + self._bump_activity() def send_mouse_relative_event(self, delta_x: int, delta_y: int) -> None: self.__queue_event(MouseRelativeEvent(delta_x, delta_y)) + self._bump_activity() def send_mouse_wheel_event(self, delta_x: int, delta_y: int) -> None: self.__queue_event(MouseWheelEvent(delta_x, delta_y)) + self._bump_activity() + + def set_params( + self, + keyboard_output: (str | None)=None, + mouse_output: (str | None)=None, + jiggler: (bool | None)=None, + ) -> None: - def set_params(self, keyboard_output: (str | None)=None, mouse_output: (str | None)=None) -> None: events: list[BaseEvent] = [] if keyboard_output is not None: events.append(SetKeyboardOutputEvent(keyboard_output)) @@ -272,12 +286,16 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many- events.append(SetMouseOutputEvent(mouse_output)) for (index, event) in enumerate(events, 1): self.__queue_event(event, clear=(index == len(events))) + if jiggler is not None: + self._set_jiggler_enabled(jiggler) + self.__notifier.notify() def set_connected(self, connected: bool) -> None: self.__queue_event(SetConnectedEvent(connected), clear=True) def clear_events(self) -> None: self.__queue_event(ClearEvent(), clear=True) + self._bump_activity() def __queue_event(self, event: BaseEvent, clear: bool=False) -> None: if not self.__stop_event.is_set(): diff --git a/kvmd/plugins/hid/bt/__init__.py b/kvmd/plugins/hid/bt/__init__.py index 8139986f..d8065c92 100644 --- a/kvmd/plugins/hid/bt/__init__.py +++ b/kvmd/plugins/hid/bt/__init__.py @@ -59,7 +59,7 @@ class Plugin(BaseHid): # pylint: disable=too-many-instance-attributes # https://gist.github.com/whitelynx/9f9bd4cb266b3924c64dfdff14bce2e8 # https://archlinuxarm.org/forum/viewtopic.php?f=67&t=14244 - def __init__( # pylint: disable=too-many-arguments,too-many-locals,super-init-not-called + def __init__( # pylint: disable=too-many-arguments,too-many-locals self, manufacturer: str, product: str, @@ -78,6 +78,9 @@ class Plugin(BaseHid): # pylint: disable=too-many-instance-attributes select_timeout: float, ) -> None: + super().__init__() + self._set_jiggler_absolute(False) + self.__proc: (multiprocessing.Process | None) = None self.__stop_event = multiprocessing.Event() @@ -146,6 +149,7 @@ class Plugin(BaseHid): # pylint: disable=too-many-instance-attributes "absolute": False, "outputs": outputs, }, + "jiggler": self._get_jiggler_state(), } async def poll_state(self) -> AsyncGenerator[dict, None]: @@ -175,18 +179,36 @@ class Plugin(BaseHid): # pylint: disable=too-many-instance-attributes def send_key_events(self, keys: Iterable[tuple[str, bool]]) -> None: for (key, state) in keys: self.__server.queue_event(make_keyboard_event(key, state)) + self._bump_activity() def send_mouse_button_event(self, button: str, state: bool) -> None: self.__server.queue_event(MouseButtonEvent(button, state)) + self._bump_activity() def send_mouse_relative_event(self, delta_x: int, delta_y: int) -> None: self.__server.queue_event(MouseRelativeEvent(delta_x, delta_y)) + self._bump_activity() def send_mouse_wheel_event(self, delta_x: int, delta_y: int) -> None: self.__server.queue_event(MouseWheelEvent(delta_x, delta_y)) + self._bump_activity() def clear_events(self) -> None: self.__server.clear_events() + self._bump_activity() + + def set_params( + self, + keyboard_output: (str | None)=None, + mouse_output: (str | None)=None, + jiggler: (bool | None)=None, + ) -> None: + + _ = keyboard_output + _ = mouse_output + if jiggler is not None: + self._set_jiggler_enabled(jiggler) + self.__notifier.notify() # ===== diff --git a/kvmd/plugins/hid/ch9329/__init__.py b/kvmd/plugins/hid/ch9329/__init__.py index e7f518d5..7f846b2f 100644 --- a/kvmd/plugins/hid/ch9329/__init__.py +++ b/kvmd/plugins/hid/ch9329/__init__.py @@ -58,6 +58,7 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst read_timeout: float, ) -> None: + BaseHid.__init__(self) multiprocessing.Process.__init__(self, daemon=True) self.__device_path = device_path @@ -112,6 +113,7 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst "active": ("usb" if absolute else "usb_rel"), }, }, + "jiggler": self._get_jiggler_state(), } async def poll_state(self) -> AsyncGenerator[dict, None]: @@ -139,23 +141,40 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst def send_key_events(self, keys: Iterable[tuple[str, bool]]) -> None: for (key, state) in keys: self.__queue_cmd(self.__keyboard.process_key(key, state)) + self._bump_activity() def send_mouse_button_event(self, button: str, state: bool) -> None: self.__queue_cmd(self.__mouse.process_button(button, state)) + self._bump_activity() def send_mouse_move_event(self, to_x: int, to_y: int) -> None: self.__queue_cmd(self.__mouse.process_move(to_x, to_y)) + self._bump_activity() def send_mouse_wheel_event(self, delta_x: int, delta_y: int) -> None: self.__queue_cmd(self.__mouse.process_wheel(delta_x, delta_y)) + self._bump_activity() def send_mouse_relative_event(self, delta_x: int, delta_y: int) -> None: self.__queue_cmd(self.__mouse.process_relative(delta_x, delta_y)) + self._bump_activity() - def set_params(self, keyboard_output: (str | None)=None, mouse_output: (str | None)=None) -> None: + def set_params( + self, + keyboard_output: (str | None)=None, + mouse_output: (str | None)=None, + jiggler: (bool | None)=None, + ) -> None: + + _ = keyboard_output if mouse_output is not None: get_logger(0).info("HID : mouse output = %s", mouse_output) - self.__mouse.set_absolute(mouse_output == "usb") + absolute = (mouse_output == "usb") + self.__mouse.set_absolute(absolute) + self._set_jiggler_absolute(absolute) + self.__notifier.notify() + if jiggler is not None: + self._set_jiggler_enabled(jiggler) self.__notifier.notify() def set_connected(self, connected: bool) -> None: diff --git a/kvmd/plugins/hid/otg/__init__.py b/kvmd/plugins/hid/otg/__init__.py index 0090496c..30357c9a 100644 --- a/kvmd/plugins/hid/otg/__init__.py +++ b/kvmd/plugins/hid/otg/__init__.py @@ -44,7 +44,7 @@ from .mouse import MouseProcess # ===== class Plugin(BaseHid): # pylint: disable=too-many-instance-attributes - def __init__( # pylint: disable=super-init-not-called + def __init__( self, keyboard: dict[str, Any], mouse: dict[str, Any], @@ -53,6 +53,8 @@ class Plugin(BaseHid): # pylint: disable=too-many-instance-attributes udc: str, # XXX: Not from options, see /kvmd/apps/kvmd/__init__.py for details ) -> None: + super().__init__() + self.__udc = udc self.__notifier = aiomulti.AioProcessNotifier() @@ -80,6 +82,8 @@ class Plugin(BaseHid): # pylint: disable=too-many-instance-attributes # но так было проще реализовать переключение режимов self.__mouses["usb_win98"] = self.__mouses["usb"] + self._set_jiggler_absolute(self.__mouse_current.is_absolute()) + @classmethod def get_plugin_options(cls) -> dict: return { @@ -141,6 +145,7 @@ class Plugin(BaseHid): # pylint: disable=too-many-instance-attributes }, **mouse_state, }, + "jiggler": self._get_jiggler_state(), } async def poll_state(self) -> AsyncGenerator[dict, None]: @@ -172,25 +177,40 @@ class Plugin(BaseHid): # pylint: disable=too-many-instance-attributes def send_key_events(self, keys: Iterable[tuple[str, bool]]) -> None: self.__keyboard_proc.send_key_events(keys) + self._bump_activity() def send_mouse_button_event(self, button: str, state: bool) -> None: self.__mouse_current.send_button_event(button, state) + self._bump_activity() def send_mouse_move_event(self, to_x: int, to_y: int) -> None: self.__mouse_current.send_move_event(to_x, to_y) + self._bump_activity() def send_mouse_relative_event(self, delta_x: int, delta_y: int) -> None: self.__mouse_current.send_relative_event(delta_x, delta_y) + self._bump_activity() def send_mouse_wheel_event(self, delta_x: int, delta_y: int) -> None: self.__mouse_current.send_wheel_event(delta_x, delta_y) + self._bump_activity() + + def set_params( + self, + keyboard_output: (str | None)=None, + mouse_output: (str | None)=None, + jiggler: (bool | None)=None, + ) -> None: - def set_params(self, keyboard_output: (str | None)=None, mouse_output: (str | None)=None) -> None: _ = keyboard_output if mouse_output in self.__mouses and mouse_output != self.__get_current_mouse_mode(): self.__mouse_current.send_clear_event() self.__mouse_current = self.__mouses[mouse_output] self.__mouse_current.set_win98_fix(mouse_output == "usb_win98") + self._set_jiggler_absolute(self.__mouse_current.is_absolute()) + self.__notifier.notify() + if jiggler is not None: + self._set_jiggler_enabled(jiggler) self.__notifier.notify() def clear_events(self) -> None: @@ -198,6 +218,7 @@ class Plugin(BaseHid): # pylint: disable=too-many-instance-attributes self.__mouse_proc.send_clear_event() if self.__mouse_alt_proc: self.__mouse_alt_proc.send_clear_event() + self._bump_activity() # ===== |