summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJendrik Weise <[email protected]>2024-05-18 13:35:49 +0200
committerGitHub <[email protected]>2024-05-18 14:35:49 +0300
commit2d3e4dbd5ecea8ad1024f4c68ad20b072960a2d9 (patch)
tree3fbe24c9704f904d173ac998203f8e47fe8a3260
parent01fff2c7a9404c963800b2df43debb816ad89874 (diff)
Introduce multi type HID (#166)multihid
-rw-r--r--kvmd/apps/kvmd/__init__.py5
-rw-r--r--kvmd/apps/otg/__init__.py17
-rw-r--r--kvmd/plugins/hid/multi.py156
3 files changed, 173 insertions, 5 deletions
diff --git a/kvmd/apps/kvmd/__init__.py b/kvmd/apps/kvmd/__init__.py
index 7cf8430c..86931c09 100644
--- a/kvmd/apps/kvmd/__init__.py
+++ b/kvmd/apps/kvmd/__init__.py
@@ -59,6 +59,11 @@ def main(argv: (list[str] | None)=None) -> None:
hid_kwargs = config.kvmd.hid._unpack(ignore=["type", "keymap", "ignore_keys", "mouse_x_range", "mouse_y_range"])
if config.kvmd.hid.type == "otg":
hid_kwargs["udc"] = config.otg.udc # XXX: Small crutch to pass UDC to the plugin
+ if config.kvmd.hid.type == "multi":
+ if config.kvmd.hid.keyboard_device.type == "otg":
+ hid_kwargs["keyboard_device"]["udc"] = config.otg.udc
+ if config.kvmd.hid.mouse_device.type == "otg":
+ hid_kwargs["mouse_device"]["udc"] = config.otg.udc
global_config = config
config = config.kvmd
diff --git a/kvmd/apps/otg/__init__.py b/kvmd/apps/otg/__init__.py
index 1da23c67..372d70b7 100644
--- a/kvmd/apps/otg/__init__.py
+++ b/kvmd/apps/otg/__init__.py
@@ -90,12 +90,17 @@ def _write_bytes(path: str, data: bytes) -> None:
file.write(data)
+def _check_hid_otg(config: Section) -> bool:
+ return config.kvmd.hid.type == "otg" or (config.kvmd.hid.type == "multi"
+ and (config.kvmd.hid.keyboard_device["type"] == "otg" or config.kvmd.hid.mouse_device["type"] == "otg"))
+
+
def _check_config(config: Section) -> None:
if (
not config.otg.devices.serial.enabled
and not config.otg.devices.ethernet.enabled
- and config.kvmd.hid.type != "otg"
and config.kvmd.msd.type != "otg"
+ and not _check_hid_otg(config)
):
raise RuntimeError("Nothing to do")
@@ -255,14 +260,16 @@ def _cmd_start(config: Section) -> None: # pylint: disable=too-many-statements,
logger.info("===== Ethernet =====")
gc.add_ethernet(**cod.ethernet._unpack(ignore=["enabled"]))
- if config.kvmd.hid.type == "otg":
+ if _check_hid_otg(config):
logger.info("===== HID-Keyboard =====")
gc.add_keyboard(cod.hid.keyboard.start, config.otg.remote_wakeup)
logger.info("===== HID-Mouse =====")
- gc.add_mouse(cod.hid.mouse.start, config.otg.remote_wakeup, config.kvmd.hid.mouse.absolute, config.kvmd.hid.mouse.horizontal_wheel)
- if config.kvmd.hid.mouse_alt.device:
+ hid_mouse = config.kvmd.hid.get("mouse") or config.kvmd.hid.get("mouse_device", {}).get("mouse")
+ gc.add_mouse(cod.hid.mouse.start, config.otg.remote_wakeup, hid_mouse["absolute"], hid_mouse["horizontal_wheel"])
+ hid_mouse_alt = config.kvmd.hid.get("mouse_alt") or config.kvmd.hid.get("mouse_device", {}).get("mouse_alt")
+ if hid_mouse_alt["device"]:
logger.info("===== HID-Mouse-Alt =====")
- gc.add_mouse(cod.hid.mouse.start, config.otg.remote_wakeup, (not config.kvmd.hid.mouse.absolute), config.kvmd.hid.mouse.horizontal_wheel)
+ gc.add_mouse(cod.hid.mouse.start, config.otg.remote_wakeup, (not hid_mouse["absolute"]), hid_mouse["horizontal_wheel"])
if config.kvmd.msd.type == "otg":
logger.info("===== MSD =====")
diff --git a/kvmd/plugins/hid/multi.py b/kvmd/plugins/hid/multi.py
new file mode 100644
index 00000000..dfa198a5
--- /dev/null
+++ b/kvmd/plugins/hid/multi.py
@@ -0,0 +1,156 @@
+# ========================================================================== #
+# #
+# KVMD - The main PiKVM daemon. #
+# #
+# Copyright (C) 2018-2024 Maxim Devaev <[email protected]> #
+# #
+# This program is free software: you can redistribute it and/or modify #
+# it under the terms of the GNU General Public License as published by #
+# the Free Software Foundation, either version 3 of the License, or #
+# (at your option) any later version. #
+# #
+# This program is distributed in the hope that it will be useful, #
+# but WITHOUT ANY WARRANTY; without even the implied warranty of #
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
+# GNU General Public License for more details. #
+# #
+# You should have received a copy of the GNU General Public License #
+# along with this program. If not, see <https://www.gnu.org/licenses/>. #
+# #
+# ========================================================================== #
+
+import asyncio
+import functools
+import pathlib
+
+from typing import Any
+from typing import AsyncGenerator
+from typing import Iterable
+from pkgutil import iter_modules
+
+from ...yamlconf import Option
+from ...yamlconf import make_config
+
+from . import get_hid_class
+from . import BaseHid
+
+
+def all_hid_plugins() -> list[str]:
+ current_directory = pathlib.Path(__file__).parent.resolve()
+ return [name for _, name, _ in iter_modules([str(current_directory)]) if name != "multi"]
+
+
+def valid_hid_plugin(key: str, value: list) -> list:
+ all_plugins = all_hid_plugins()
+ if not isinstance(value, dict) or "type" not in value or value["type"] not in all_plugins:
+ raise ValueError("Invalid hid plugin")
+ device_class = get_hid_class(value["type"])
+ opts = {
+ **device_class.get_plugin_options(),
+ "type": Option("", type=str)
+ }
+ return make_config(value, opts, ("kvmd", "hid", key))
+
+
+async def merge_generators(gens: Iterable[AsyncGenerator[Any, None]]) -> AsyncGenerator[Any, None]:
+ pending_tasks = {asyncio.ensure_future(anext(g)): g for g in gens}
+ while len(pending_tasks) > 0:
+ done, _ = await asyncio.wait(pending_tasks.keys(), return_when="FIRST_COMPLETED")
+ for completed_task in done:
+ try:
+ result = completed_task.result()
+ yield result
+ dg = pending_tasks[completed_task]
+ pending_tasks[asyncio.ensure_future(anext(dg))] = dg
+ except StopAsyncIteration as sai:
+ print("Exception in getting result", sai)
+ finally:
+ del pending_tasks[completed_task]
+
+
+class Plugin(BaseHid):
+ def __init__(
+ self,
+ keyboard_device: dict[str, Any],
+ mouse_device: dict[str, Any]
+ ):
+ super().__init__(False, False, 0)
+
+ keyboard_type = keyboard_device["type"]
+ mouse_type = mouse_device["type"]
+ del keyboard_device["type"]
+ del mouse_device["type"]
+ self.__keyboard = get_hid_class(keyboard_type)(**keyboard_device)
+ self.__mouse = get_hid_class(mouse_type)(**mouse_device)
+
+ @classmethod
+ def get_plugin_options(cls) -> dict:
+ return {
+ "keyboard_device": Option({
+ "type": "otg"
+ }, type=functools.partial(valid_hid_plugin, "keyboard_device")),
+ "mouse_device": Option({
+ "type": "otg"
+ }, type=functools.partial(valid_hid_plugin, "mouse_device")),
+ }
+
+ def sysprep(self) -> None:
+ self.__keyboard.sysprep()
+ self.__mouse.sysprep()
+
+ async def get_state(self) -> dict:
+ keyboard_state, mouse_state = await asyncio.gather(self.__keyboard.get_state(), self.__mouse.get_state())
+ return {
+ "online": keyboard_state["online"] and mouse_state["online"],
+ "busy": keyboard_state["busy"] or mouse_state["busy"],
+ "connected": keyboard_state["connected"] or mouse_state["connected"],
+ "keyboard": keyboard_state["keyboard"],
+ "mouse": mouse_state["mouse"],
+ "jiggler": mouse_state["jiggler"]
+ }
+
+ async def poll_state(self) -> AsyncGenerator[dict, None]:
+ async for state in merge_generators([self.__keyboard.poll_state(), self.__mouse.poll_state()]):
+ yield state
+
+ async def reset(self) -> None:
+ async with asyncio.TaskGroup() as tg:
+ tg.create_task(self.__keyboard.reset())
+ tg.create_task(self.__mouse.reset())
+
+ async def cleanup(self) -> None:
+ async with asyncio.TaskGroup() as tg:
+ tg.create_task(self.__keyboard.cleanup())
+ tg.create_task(self.__mouse.cleanup())
+
+ def send_key_events(self, keys: Iterable[tuple[str, bool]]) -> None:
+ self.__keyboard.send_key_events(keys)
+
+ def send_mouse_button_event(self, button: str, state: bool) -> None:
+ self.__mouse.send_mouse_button_event(button, state)
+
+ def send_mouse_move_event(self, to_x: int, to_y: int) -> None:
+ self.__mouse.send_mouse_move_event(to_x, to_y)
+
+ def send_mouse_relative_event(self, delta_x: int, delta_y: int) -> None:
+ self.__mouse.send_mouse_relative_event(delta_x, delta_y)
+
+ def send_mouse_wheel_event(self, delta_x: int, delta_y: int) -> None:
+ self.__mouse.send_mouse_wheel_event(delta_x, delta_y)
+
+ def set_params(
+ self,
+ keyboard_output: (str | None)=None,
+ mouse_output: (str | None)=None,
+ jiggler: (bool | None)=None,
+ ) -> None:
+ self.__keyboard.set_params(keyboard_output, mouse_output, jiggler)
+ self.__mouse.set_params(keyboard_output, mouse_output, jiggler)
+
+ def set_connected(self, connected: bool) -> None:
+ self.__keyboard.set_connected(connected)
+ self.__mouse.set_connected(connected)
+
+ def clear_events(self) -> None:
+ self.__keyboard.clear_events()
+ self.__mouse.clear_events()