summaryrefslogtreecommitdiff
path: root/kvmd/plugins/hid
diff options
context:
space:
mode:
authorMaxim Devaev <[email protected]>2023-10-23 08:10:19 +0300
committerMaxim Devaev <[email protected]>2023-10-23 08:10:19 +0300
commit4038754c375e38d5379a701ea5d65411bfb19034 (patch)
tree16e501286e95aa88228624747bd805c652c97bc8 /kvmd/plugins/hid
parent73f96fa0c7fb9cda167521349c646a7b9a91253b (diff)
pikvm/pikvm#57: Mouse jiggler
Diffstat (limited to 'kvmd/plugins/hid')
-rw-r--r--kvmd/plugins/hid/__init__.py44
-rw-r--r--kvmd/plugins/hid/_mcu/__init__.py20
-rw-r--r--kvmd/plugins/hid/bt/__init__.py24
-rw-r--r--kvmd/plugins/hid/ch9329/__init__.py23
-rw-r--r--kvmd/plugins/hid/otg/__init__.py25
5 files changed, 127 insertions, 9 deletions
diff --git a/kvmd/plugins/hid/__init__.py b/kvmd/plugins/hid/__init__.py
index 2ef7495f..32f7aef1 100644
--- a/kvmd/plugins/hid/__init__.py
+++ b/kvmd/plugins/hid/__init__.py
@@ -20,6 +20,9 @@
# ========================================================================== #
+import asyncio
+import time
+
from typing import Iterable
from typing import AsyncGenerator
@@ -29,6 +32,11 @@ from .. import get_plugin_class
# =====
class BaseHid(BasePlugin):
+ def __init__(self) -> None:
+ self.__jiggler_enabled = False
+ self.__jiggler_absolute = True
+ self.__activity_ts = 0
+
def sysprep(self) -> None:
raise NotImplementedError
@@ -64,9 +72,14 @@ class BaseHid(BasePlugin):
def send_mouse_wheel_event(self, delta_x: int, delta_y: int) -> None:
raise NotImplementedError
- def set_params(self, keyboard_output: (str | None)=None, mouse_output: (str | None)=None) -> None:
- _ = keyboard_output
- _ = mouse_output
+ def set_params(
+ self,
+ keyboard_output: (str | None)=None,
+ mouse_output: (str | None)=None,
+ jiggler: (bool | None)=None,
+ ) -> None:
+
+ raise NotImplementedError
def set_connected(self, connected: bool) -> None:
_ = connected
@@ -74,6 +87,31 @@ class BaseHid(BasePlugin):
def clear_events(self) -> None:
raise NotImplementedError
+ # =====
+
+ async def systask(self) -> None:
+ factor = 1
+ while True:
+ if self.__jiggler_enabled and (self.__activity_ts + 60 < int(time.monotonic())):
+ if self.__jiggler_absolute:
+ self.send_mouse_move_event(100 * factor, 100 * factor)
+ else:
+ self.send_mouse_relative_event(10 * factor, 10 * factor)
+ factor *= -1
+ await asyncio.sleep(1)
+
+ def _bump_activity(self) -> None:
+ self.__activity_ts = int(time.monotonic())
+
+ def _set_jiggler_absolute(self, absolute: bool) -> None:
+ self.__jiggler_absolute = absolute
+
+ def _set_jiggler_enabled(self, enabled: bool) -> None:
+ self.__jiggler_enabled = enabled
+
+ def _get_jiggler_state(self) -> dict:
+ return {"enabled": self.__jiggler_enabled}
+
# =====
def get_hid_class(name: str) -> type[BaseHid]:
diff --git a/kvmd/plugins/hid/_mcu/__init__.py b/kvmd/plugins/hid/_mcu/__init__.py
index f475a880..bdd2ee32 100644
--- a/kvmd/plugins/hid/_mcu/__init__.py
+++ b/kvmd/plugins/hid/_mcu/__init__.py
@@ -117,6 +117,7 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many-
**gpio_kwargs: Any,
) -> None:
+ BaseHid.__init__(self)
multiprocessing.Process.__init__(self, daemon=True)
self.__read_retries = read_retries
@@ -177,6 +178,7 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many-
active_mouse = get_active_mouse(outputs1)
if online and active_mouse in ["usb_rel", "ps2"]:
absolute = False
+ self._set_jiggler_absolute(absolute)
keyboard_outputs: dict = {"available": [], "active": ""}
mouse_outputs: dict = {"available": [], "active": ""}
@@ -224,6 +226,7 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many-
"absolute": absolute,
"outputs": mouse_outputs,
},
+ "jiggler": self._get_jiggler_state(),
}
async def poll_state(self) -> AsyncGenerator[dict, None]:
@@ -251,20 +254,31 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many-
def send_key_events(self, keys: Iterable[tuple[str, bool]]) -> None:
for (key, state) in keys:
self.__queue_event(KeyEvent(key, state))
+ self._bump_activity()
def send_mouse_button_event(self, button: str, state: bool) -> None:
self.__queue_event(MouseButtonEvent(button, state))
+ self._bump_activity()
def send_mouse_move_event(self, to_x: int, to_y: int) -> None:
self.__queue_event(MouseMoveEvent(to_x, to_y))
+ self._bump_activity()
def send_mouse_relative_event(self, delta_x: int, delta_y: int) -> None:
self.__queue_event(MouseRelativeEvent(delta_x, delta_y))
+ self._bump_activity()
def send_mouse_wheel_event(self, delta_x: int, delta_y: int) -> None:
self.__queue_event(MouseWheelEvent(delta_x, delta_y))
+ self._bump_activity()
+
+ def set_params(
+ self,
+ keyboard_output: (str | None)=None,
+ mouse_output: (str | None)=None,
+ jiggler: (bool | None)=None,
+ ) -> None:
- def set_params(self, keyboard_output: (str | None)=None, mouse_output: (str | None)=None) -> None:
events: list[BaseEvent] = []
if keyboard_output is not None:
events.append(SetKeyboardOutputEvent(keyboard_output))
@@ -272,12 +286,16 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many-
events.append(SetMouseOutputEvent(mouse_output))
for (index, event) in enumerate(events, 1):
self.__queue_event(event, clear=(index == len(events)))
+ if jiggler is not None:
+ self._set_jiggler_enabled(jiggler)
+ self.__notifier.notify()
def set_connected(self, connected: bool) -> None:
self.__queue_event(SetConnectedEvent(connected), clear=True)
def clear_events(self) -> None:
self.__queue_event(ClearEvent(), clear=True)
+ self._bump_activity()
def __queue_event(self, event: BaseEvent, clear: bool=False) -> None:
if not self.__stop_event.is_set():
diff --git a/kvmd/plugins/hid/bt/__init__.py b/kvmd/plugins/hid/bt/__init__.py
index 8139986f..d8065c92 100644
--- a/kvmd/plugins/hid/bt/__init__.py
+++ b/kvmd/plugins/hid/bt/__init__.py
@@ -59,7 +59,7 @@ class Plugin(BaseHid): # pylint: disable=too-many-instance-attributes
# https://gist.github.com/whitelynx/9f9bd4cb266b3924c64dfdff14bce2e8
# https://archlinuxarm.org/forum/viewtopic.php?f=67&t=14244
- def __init__( # pylint: disable=too-many-arguments,too-many-locals,super-init-not-called
+ def __init__( # pylint: disable=too-many-arguments,too-many-locals
self,
manufacturer: str,
product: str,
@@ -78,6 +78,9 @@ class Plugin(BaseHid): # pylint: disable=too-many-instance-attributes
select_timeout: float,
) -> None:
+ super().__init__()
+ self._set_jiggler_absolute(False)
+
self.__proc: (multiprocessing.Process | None) = None
self.__stop_event = multiprocessing.Event()
@@ -146,6 +149,7 @@ class Plugin(BaseHid): # pylint: disable=too-many-instance-attributes
"absolute": False,
"outputs": outputs,
},
+ "jiggler": self._get_jiggler_state(),
}
async def poll_state(self) -> AsyncGenerator[dict, None]:
@@ -175,18 +179,36 @@ class Plugin(BaseHid): # pylint: disable=too-many-instance-attributes
def send_key_events(self, keys: Iterable[tuple[str, bool]]) -> None:
for (key, state) in keys:
self.__server.queue_event(make_keyboard_event(key, state))
+ self._bump_activity()
def send_mouse_button_event(self, button: str, state: bool) -> None:
self.__server.queue_event(MouseButtonEvent(button, state))
+ self._bump_activity()
def send_mouse_relative_event(self, delta_x: int, delta_y: int) -> None:
self.__server.queue_event(MouseRelativeEvent(delta_x, delta_y))
+ self._bump_activity()
def send_mouse_wheel_event(self, delta_x: int, delta_y: int) -> None:
self.__server.queue_event(MouseWheelEvent(delta_x, delta_y))
+ self._bump_activity()
def clear_events(self) -> None:
self.__server.clear_events()
+ self._bump_activity()
+
+ def set_params(
+ self,
+ keyboard_output: (str | None)=None,
+ mouse_output: (str | None)=None,
+ jiggler: (bool | None)=None,
+ ) -> None:
+
+ _ = keyboard_output
+ _ = mouse_output
+ if jiggler is not None:
+ self._set_jiggler_enabled(jiggler)
+ self.__notifier.notify()
# =====
diff --git a/kvmd/plugins/hid/ch9329/__init__.py b/kvmd/plugins/hid/ch9329/__init__.py
index e7f518d5..7f846b2f 100644
--- a/kvmd/plugins/hid/ch9329/__init__.py
+++ b/kvmd/plugins/hid/ch9329/__init__.py
@@ -58,6 +58,7 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst
read_timeout: float,
) -> None:
+ BaseHid.__init__(self)
multiprocessing.Process.__init__(self, daemon=True)
self.__device_path = device_path
@@ -112,6 +113,7 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst
"active": ("usb" if absolute else "usb_rel"),
},
},
+ "jiggler": self._get_jiggler_state(),
}
async def poll_state(self) -> AsyncGenerator[dict, None]:
@@ -139,23 +141,40 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst
def send_key_events(self, keys: Iterable[tuple[str, bool]]) -> None:
for (key, state) in keys:
self.__queue_cmd(self.__keyboard.process_key(key, state))
+ self._bump_activity()
def send_mouse_button_event(self, button: str, state: bool) -> None:
self.__queue_cmd(self.__mouse.process_button(button, state))
+ self._bump_activity()
def send_mouse_move_event(self, to_x: int, to_y: int) -> None:
self.__queue_cmd(self.__mouse.process_move(to_x, to_y))
+ self._bump_activity()
def send_mouse_wheel_event(self, delta_x: int, delta_y: int) -> None:
self.__queue_cmd(self.__mouse.process_wheel(delta_x, delta_y))
+ self._bump_activity()
def send_mouse_relative_event(self, delta_x: int, delta_y: int) -> None:
self.__queue_cmd(self.__mouse.process_relative(delta_x, delta_y))
+ self._bump_activity()
- def set_params(self, keyboard_output: (str | None)=None, mouse_output: (str | None)=None) -> None:
+ def set_params(
+ self,
+ keyboard_output: (str | None)=None,
+ mouse_output: (str | None)=None,
+ jiggler: (bool | None)=None,
+ ) -> None:
+
+ _ = keyboard_output
if mouse_output is not None:
get_logger(0).info("HID : mouse output = %s", mouse_output)
- self.__mouse.set_absolute(mouse_output == "usb")
+ absolute = (mouse_output == "usb")
+ self.__mouse.set_absolute(absolute)
+ self._set_jiggler_absolute(absolute)
+ self.__notifier.notify()
+ if jiggler is not None:
+ self._set_jiggler_enabled(jiggler)
self.__notifier.notify()
def set_connected(self, connected: bool) -> None:
diff --git a/kvmd/plugins/hid/otg/__init__.py b/kvmd/plugins/hid/otg/__init__.py
index 0090496c..30357c9a 100644
--- a/kvmd/plugins/hid/otg/__init__.py
+++ b/kvmd/plugins/hid/otg/__init__.py
@@ -44,7 +44,7 @@ from .mouse import MouseProcess
# =====
class Plugin(BaseHid): # pylint: disable=too-many-instance-attributes
- def __init__( # pylint: disable=super-init-not-called
+ def __init__(
self,
keyboard: dict[str, Any],
mouse: dict[str, Any],
@@ -53,6 +53,8 @@ class Plugin(BaseHid): # pylint: disable=too-many-instance-attributes
udc: str, # XXX: Not from options, see /kvmd/apps/kvmd/__init__.py for details
) -> None:
+ super().__init__()
+
self.__udc = udc
self.__notifier = aiomulti.AioProcessNotifier()
@@ -80,6 +82,8 @@ class Plugin(BaseHid): # pylint: disable=too-many-instance-attributes
# но так было проще реализовать переключение режимов
self.__mouses["usb_win98"] = self.__mouses["usb"]
+ self._set_jiggler_absolute(self.__mouse_current.is_absolute())
+
@classmethod
def get_plugin_options(cls) -> dict:
return {
@@ -141,6 +145,7 @@ class Plugin(BaseHid): # pylint: disable=too-many-instance-attributes
},
**mouse_state,
},
+ "jiggler": self._get_jiggler_state(),
}
async def poll_state(self) -> AsyncGenerator[dict, None]:
@@ -172,25 +177,40 @@ class Plugin(BaseHid): # pylint: disable=too-many-instance-attributes
def send_key_events(self, keys: Iterable[tuple[str, bool]]) -> None:
self.__keyboard_proc.send_key_events(keys)
+ self._bump_activity()
def send_mouse_button_event(self, button: str, state: bool) -> None:
self.__mouse_current.send_button_event(button, state)
+ self._bump_activity()
def send_mouse_move_event(self, to_x: int, to_y: int) -> None:
self.__mouse_current.send_move_event(to_x, to_y)
+ self._bump_activity()
def send_mouse_relative_event(self, delta_x: int, delta_y: int) -> None:
self.__mouse_current.send_relative_event(delta_x, delta_y)
+ self._bump_activity()
def send_mouse_wheel_event(self, delta_x: int, delta_y: int) -> None:
self.__mouse_current.send_wheel_event(delta_x, delta_y)
+ self._bump_activity()
+
+ def set_params(
+ self,
+ keyboard_output: (str | None)=None,
+ mouse_output: (str | None)=None,
+ jiggler: (bool | None)=None,
+ ) -> None:
- def set_params(self, keyboard_output: (str | None)=None, mouse_output: (str | None)=None) -> None:
_ = keyboard_output
if mouse_output in self.__mouses and mouse_output != self.__get_current_mouse_mode():
self.__mouse_current.send_clear_event()
self.__mouse_current = self.__mouses[mouse_output]
self.__mouse_current.set_win98_fix(mouse_output == "usb_win98")
+ self._set_jiggler_absolute(self.__mouse_current.is_absolute())
+ self.__notifier.notify()
+ if jiggler is not None:
+ self._set_jiggler_enabled(jiggler)
self.__notifier.notify()
def clear_events(self) -> None:
@@ -198,6 +218,7 @@ class Plugin(BaseHid): # pylint: disable=too-many-instance-attributes
self.__mouse_proc.send_clear_event()
if self.__mouse_alt_proc:
self.__mouse_alt_proc.send_clear_event()
+ self._bump_activity()
# =====