summaryrefslogtreecommitdiff
path: root/kvmd/plugins/hid/__init__.py
diff options
context:
space:
mode:
Diffstat (limited to 'kvmd/plugins/hid/__init__.py')
-rw-r--r--kvmd/plugins/hid/__init__.py163
1 files changed, 134 insertions, 29 deletions
diff --git a/kvmd/plugins/hid/__init__.py b/kvmd/plugins/hid/__init__.py
index 3cba01f4..f7debe1d 100644
--- a/kvmd/plugins/hid/__init__.py
+++ b/kvmd/plugins/hid/__init__.py
@@ -21,9 +21,11 @@
import asyncio
+import functools
import time
from typing import Iterable
+from typing import Callable
from typing import AsyncGenerator
from typing import Any
@@ -31,14 +33,37 @@ from ...yamlconf import Option
from ...validators.basic import valid_bool
from ...validators.basic import valid_int_f1
+from ...validators.basic import valid_string_list
+from ...validators.hid import valid_hid_key
+from ...validators.hid import valid_hid_mouse_move
+
+from ...mouse import MouseRange
from .. import BasePlugin
from .. import get_plugin_class
# =====
-class BaseHid(BasePlugin):
- def __init__(self, jiggler_enabled: bool, jiggler_active: bool, jiggler_interval: int) -> None:
+class BaseHid(BasePlugin): # pylint: disable=too-many-instance-attributes
+ def __init__(
+ self,
+ ignore_keys: list[str],
+
+ mouse_x_min: int,
+ mouse_x_max: int,
+ mouse_y_min: int,
+ mouse_y_max: int,
+
+ jiggler_enabled: bool,
+ jiggler_active: bool,
+ jiggler_interval: int,
+ ) -> None:
+
+ self.__ignore_keys = ignore_keys
+
+ self.__mouse_x_range = (mouse_x_min, mouse_x_max)
+ self.__mouse_y_range = (mouse_y_min, mouse_y_max)
+
self.__jiggler_enabled = jiggler_enabled
self.__jiggler_active = jiggler_active
self.__jiggler_interval = jiggler_interval
@@ -46,8 +71,17 @@ class BaseHid(BasePlugin):
self.__activity_ts = 0
@classmethod
- def _get_jiggler_options(cls) -> dict[str, Any]:
+ def _get_base_options(cls) -> dict[str, Any]:
return {
+ "ignore_keys": Option([], type=functools.partial(valid_string_list, subval=valid_hid_key)),
+ "mouse_x_range": {
+ "min": Option(MouseRange.MIN, type=valid_hid_mouse_move, unpack_as="mouse_x_min"),
+ "max": Option(MouseRange.MAX, type=valid_hid_mouse_move, unpack_as="mouse_x_max"),
+ },
+ "mouse_y_range": {
+ "min": Option(MouseRange.MIN, type=valid_hid_mouse_move, unpack_as="mouse_y_min"),
+ "max": Option(MouseRange.MAX, type=valid_hid_mouse_move, unpack_as="mouse_y_max"),
+ },
"jiggler": {
"enabled": Option(False, type=valid_bool, unpack_as="jiggler_enabled"),
"active": Option(False, type=valid_bool, unpack_as="jiggler_active"),
@@ -76,56 +110,112 @@ class BaseHid(BasePlugin):
async def cleanup(self) -> None:
pass
+ def set_params(
+ self,
+ keyboard_output: (str | None)=None,
+ mouse_output: (str | None)=None,
+ jiggler: (bool | None)=None,
+ ) -> None:
+
+ raise NotImplementedError
+
+ def set_connected(self, connected: bool) -> None:
+ _ = connected
+
# =====
- def send_key_events(self, keys: Iterable[tuple[str, bool]]) -> None:
+ def send_key_events(self, keys: Iterable[tuple[str, bool]], no_ignore_keys: bool=False) -> None:
+ for (key, state) in keys:
+ if no_ignore_keys or key not in self.__ignore_keys:
+ self.send_key_event(key, state)
+
+ def send_key_event(self, key: str, state: bool) -> None:
+ self._send_key_event(key, state)
+ self.__bump_activity()
+
+ def _send_key_event(self, key: str, state: bool) -> None:
raise NotImplementedError
+ # =====
+
def send_mouse_button_event(self, button: str, state: bool) -> None:
+ self._send_mouse_button_event(button, state)
+ self.__bump_activity()
+
+ def _send_mouse_button_event(self, button: str, state: bool) -> None:
raise NotImplementedError
+ # =====
+
def send_mouse_move_event(self, to_x: int, to_y: int) -> None:
- _ = to_x
+ if self.__mouse_x_range != MouseRange.RANGE:
+ to_x = MouseRange.remap(to_x, *self.__mouse_x_range)
+ if self.__mouse_y_range != MouseRange.RANGE:
+ to_y = MouseRange.remap(to_y, *self.__mouse_y_range)
+ self._send_mouse_move_event(to_x, to_y)
+ self.__bump_activity()
+
+ def _send_mouse_move_event(self, to_x: int, to_y: int) -> None:
+ _ = to_x # XXX: NotImplementedError
_ = to_y
+ # =====
+
+ def send_mouse_relative_events(self, deltas: Iterable[tuple[int, int]], squash: bool) -> None:
+ self.__process_mouse_delta_event(deltas, squash, self.send_mouse_relative_event)
+
def send_mouse_relative_event(self, delta_x: int, delta_y: int) -> None:
- _ = delta_x
+ self._send_mouse_relative_event(delta_x, delta_y)
+ self.__bump_activity()
+
+ def _send_mouse_relative_event(self, delta_x: int, delta_y: int) -> None:
+ _ = delta_x # XXX: NotImplementedError
_ = delta_y
- def send_mouse_wheel_event(self, delta_x: int, delta_y: int) -> None:
- raise NotImplementedError
+ # =====
- def set_params(
- self,
- keyboard_output: (str | None)=None,
- mouse_output: (str | None)=None,
- jiggler: (bool | None)=None,
- ) -> None:
+ def send_mouse_wheel_events(self, deltas: Iterable[tuple[int, int]], squash: bool) -> None:
+ self.__process_mouse_delta_event(deltas, squash, self.send_mouse_wheel_event)
+
+ def send_mouse_wheel_event(self, delta_x: int, delta_y: int) -> None:
+ self._send_mouse_wheel_event(delta_x, delta_y)
+ self.__bump_activity()
+ def _send_mouse_wheel_event(self, delta_x: int, delta_y: int) -> None:
raise NotImplementedError
- def set_connected(self, connected: bool) -> None:
- _ = connected
+ # =====
def clear_events(self) -> None:
+ self._clear_events() # Don't bump activity here
+
+ def _clear_events(self) -> None:
raise NotImplementedError
# =====
- async def systask(self) -> None:
- factor = 1
- while True:
- if self.__jiggler_active and (self.__activity_ts + self.__jiggler_interval < int(time.monotonic())):
- for _ in range(5):
- if self.__jiggler_absolute:
- self.send_mouse_move_event(100 * factor, 100 * factor)
- else:
- self.send_mouse_relative_event(10 * factor, 10 * factor)
- factor *= -1
- await asyncio.sleep(0.1)
- await asyncio.sleep(1)
+ def __process_mouse_delta_event(
+ self,
+ deltas: Iterable[tuple[int, int]],
+ squash: bool,
+ handler: Callable[[int, int], None],
+ ) -> None:
- def _bump_activity(self) -> None:
+ if squash:
+ prev = (0, 0)
+ for cur in deltas:
+ if abs(prev[0] + cur[0]) > 127 or abs(prev[1] + cur[1]) > 127:
+ handler(*prev)
+ prev = cur
+ else:
+ prev = (prev[0] + cur[0], prev[1] + cur[1])
+ if prev[0] or prev[1]:
+ handler(*prev)
+ else:
+ for xy in deltas:
+ handler(*xy)
+
+ def __bump_activity(self) -> None:
self.__activity_ts = int(time.monotonic())
def _set_jiggler_absolute(self, absolute: bool) -> None:
@@ -144,6 +234,21 @@ class BaseHid(BasePlugin):
},
}
+ # =====
+
+ async def systask(self) -> None:
+ factor = 1
+ while True:
+ if self.__jiggler_active and (self.__activity_ts + self.__jiggler_interval < int(time.monotonic())):
+ for _ in range(5):
+ if self.__jiggler_absolute:
+ self.send_mouse_move_event(100 * factor, 100 * factor)
+ else:
+ self.send_mouse_relative_event(10 * factor, 10 * factor)
+ factor *= -1
+ await asyncio.sleep(0.1)
+ await asyncio.sleep(1)
+
# =====
def get_hid_class(name: str) -> type[BaseHid]: