diff options
author | Maxim Devaev <[email protected]> | 2024-11-02 14:26:39 +0200 |
---|---|---|
committer | Maxim Devaev <[email protected]> | 2024-11-02 14:26:39 +0200 |
commit | d6b61cb407a729169af6d678de667dae842a86f5 (patch) | |
tree | ffd4185ae16a96795aecbb292fe0e36b2d4d3df1 /kvmd/plugins/hid/__init__.py | |
parent | 8192b1fa95e263d41d1f9be4909dc58517b8da37 (diff) |
refactoring
Diffstat (limited to 'kvmd/plugins/hid/__init__.py')
-rw-r--r-- | kvmd/plugins/hid/__init__.py | 163 |
1 files changed, 134 insertions, 29 deletions
diff --git a/kvmd/plugins/hid/__init__.py b/kvmd/plugins/hid/__init__.py index 3cba01f4..f7debe1d 100644 --- a/kvmd/plugins/hid/__init__.py +++ b/kvmd/plugins/hid/__init__.py @@ -21,9 +21,11 @@ import asyncio +import functools import time from typing import Iterable +from typing import Callable from typing import AsyncGenerator from typing import Any @@ -31,14 +33,37 @@ from ...yamlconf import Option from ...validators.basic import valid_bool from ...validators.basic import valid_int_f1 +from ...validators.basic import valid_string_list +from ...validators.hid import valid_hid_key +from ...validators.hid import valid_hid_mouse_move + +from ...mouse import MouseRange from .. import BasePlugin from .. import get_plugin_class # ===== -class BaseHid(BasePlugin): - def __init__(self, jiggler_enabled: bool, jiggler_active: bool, jiggler_interval: int) -> None: +class BaseHid(BasePlugin): # pylint: disable=too-many-instance-attributes + def __init__( + self, + ignore_keys: list[str], + + mouse_x_min: int, + mouse_x_max: int, + mouse_y_min: int, + mouse_y_max: int, + + jiggler_enabled: bool, + jiggler_active: bool, + jiggler_interval: int, + ) -> None: + + self.__ignore_keys = ignore_keys + + self.__mouse_x_range = (mouse_x_min, mouse_x_max) + self.__mouse_y_range = (mouse_y_min, mouse_y_max) + self.__jiggler_enabled = jiggler_enabled self.__jiggler_active = jiggler_active self.__jiggler_interval = jiggler_interval @@ -46,8 +71,17 @@ class BaseHid(BasePlugin): self.__activity_ts = 0 @classmethod - def _get_jiggler_options(cls) -> dict[str, Any]: + def _get_base_options(cls) -> dict[str, Any]: return { + "ignore_keys": Option([], type=functools.partial(valid_string_list, subval=valid_hid_key)), + "mouse_x_range": { + "min": Option(MouseRange.MIN, type=valid_hid_mouse_move, unpack_as="mouse_x_min"), + "max": Option(MouseRange.MAX, type=valid_hid_mouse_move, unpack_as="mouse_x_max"), + }, + "mouse_y_range": { + "min": Option(MouseRange.MIN, type=valid_hid_mouse_move, unpack_as="mouse_y_min"), + "max": Option(MouseRange.MAX, type=valid_hid_mouse_move, unpack_as="mouse_y_max"), + }, "jiggler": { "enabled": Option(False, type=valid_bool, unpack_as="jiggler_enabled"), "active": Option(False, type=valid_bool, unpack_as="jiggler_active"), @@ -76,56 +110,112 @@ class BaseHid(BasePlugin): async def cleanup(self) -> None: pass + def set_params( + self, + keyboard_output: (str | None)=None, + mouse_output: (str | None)=None, + jiggler: (bool | None)=None, + ) -> None: + + raise NotImplementedError + + def set_connected(self, connected: bool) -> None: + _ = connected + # ===== - def send_key_events(self, keys: Iterable[tuple[str, bool]]) -> None: + def send_key_events(self, keys: Iterable[tuple[str, bool]], no_ignore_keys: bool=False) -> None: + for (key, state) in keys: + if no_ignore_keys or key not in self.__ignore_keys: + self.send_key_event(key, state) + + def send_key_event(self, key: str, state: bool) -> None: + self._send_key_event(key, state) + self.__bump_activity() + + def _send_key_event(self, key: str, state: bool) -> None: raise NotImplementedError + # ===== + def send_mouse_button_event(self, button: str, state: bool) -> None: + self._send_mouse_button_event(button, state) + self.__bump_activity() + + def _send_mouse_button_event(self, button: str, state: bool) -> None: raise NotImplementedError + # ===== + def send_mouse_move_event(self, to_x: int, to_y: int) -> None: - _ = to_x + if self.__mouse_x_range != MouseRange.RANGE: + to_x = MouseRange.remap(to_x, *self.__mouse_x_range) + if self.__mouse_y_range != MouseRange.RANGE: + to_y = MouseRange.remap(to_y, *self.__mouse_y_range) + self._send_mouse_move_event(to_x, to_y) + self.__bump_activity() + + def _send_mouse_move_event(self, to_x: int, to_y: int) -> None: + _ = to_x # XXX: NotImplementedError _ = to_y + # ===== + + def send_mouse_relative_events(self, deltas: Iterable[tuple[int, int]], squash: bool) -> None: + self.__process_mouse_delta_event(deltas, squash, self.send_mouse_relative_event) + def send_mouse_relative_event(self, delta_x: int, delta_y: int) -> None: - _ = delta_x + self._send_mouse_relative_event(delta_x, delta_y) + self.__bump_activity() + + def _send_mouse_relative_event(self, delta_x: int, delta_y: int) -> None: + _ = delta_x # XXX: NotImplementedError _ = delta_y - def send_mouse_wheel_event(self, delta_x: int, delta_y: int) -> None: - raise NotImplementedError + # ===== - def set_params( - self, - keyboard_output: (str | None)=None, - mouse_output: (str | None)=None, - jiggler: (bool | None)=None, - ) -> None: + def send_mouse_wheel_events(self, deltas: Iterable[tuple[int, int]], squash: bool) -> None: + self.__process_mouse_delta_event(deltas, squash, self.send_mouse_wheel_event) + + def send_mouse_wheel_event(self, delta_x: int, delta_y: int) -> None: + self._send_mouse_wheel_event(delta_x, delta_y) + self.__bump_activity() + def _send_mouse_wheel_event(self, delta_x: int, delta_y: int) -> None: raise NotImplementedError - def set_connected(self, connected: bool) -> None: - _ = connected + # ===== def clear_events(self) -> None: + self._clear_events() # Don't bump activity here + + def _clear_events(self) -> None: raise NotImplementedError # ===== - async def systask(self) -> None: - factor = 1 - while True: - if self.__jiggler_active and (self.__activity_ts + self.__jiggler_interval < int(time.monotonic())): - for _ in range(5): - if self.__jiggler_absolute: - self.send_mouse_move_event(100 * factor, 100 * factor) - else: - self.send_mouse_relative_event(10 * factor, 10 * factor) - factor *= -1 - await asyncio.sleep(0.1) - await asyncio.sleep(1) + def __process_mouse_delta_event( + self, + deltas: Iterable[tuple[int, int]], + squash: bool, + handler: Callable[[int, int], None], + ) -> None: - def _bump_activity(self) -> None: + if squash: + prev = (0, 0) + for cur in deltas: + if abs(prev[0] + cur[0]) > 127 or abs(prev[1] + cur[1]) > 127: + handler(*prev) + prev = cur + else: + prev = (prev[0] + cur[0], prev[1] + cur[1]) + if prev[0] or prev[1]: + handler(*prev) + else: + for xy in deltas: + handler(*xy) + + def __bump_activity(self) -> None: self.__activity_ts = int(time.monotonic()) def _set_jiggler_absolute(self, absolute: bool) -> None: @@ -144,6 +234,21 @@ class BaseHid(BasePlugin): }, } + # ===== + + async def systask(self) -> None: + factor = 1 + while True: + if self.__jiggler_active and (self.__activity_ts + self.__jiggler_interval < int(time.monotonic())): + for _ in range(5): + if self.__jiggler_absolute: + self.send_mouse_move_event(100 * factor, 100 * factor) + else: + self.send_mouse_relative_event(10 * factor, 10 * factor) + factor *= -1 + await asyncio.sleep(0.1) + await asyncio.sleep(1) + # ===== def get_hid_class(name: str) -> type[BaseHid]: |