From 2d3e4dbd5ecea8ad1024f4c68ad20b072960a2d9 Mon Sep 17 00:00:00 2001 From: Jendrik Weise Date: Sat, 18 May 2024 13:35:49 +0200 Subject: Introduce multi type HID (#166) --- kvmd/apps/kvmd/__init__.py | 5 ++ kvmd/apps/otg/__init__.py | 17 +++-- kvmd/plugins/hid/multi.py | 156 +++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 173 insertions(+), 5 deletions(-) create mode 100644 kvmd/plugins/hid/multi.py diff --git a/kvmd/apps/kvmd/__init__.py b/kvmd/apps/kvmd/__init__.py index 7cf8430c..86931c09 100644 --- a/kvmd/apps/kvmd/__init__.py +++ b/kvmd/apps/kvmd/__init__.py @@ -59,6 +59,11 @@ def main(argv: (list[str] | None)=None) -> None: hid_kwargs = config.kvmd.hid._unpack(ignore=["type", "keymap", "ignore_keys", "mouse_x_range", "mouse_y_range"]) if config.kvmd.hid.type == "otg": hid_kwargs["udc"] = config.otg.udc # XXX: Small crutch to pass UDC to the plugin + if config.kvmd.hid.type == "multi": + if config.kvmd.hid.keyboard_device.type == "otg": + hid_kwargs["keyboard_device"]["udc"] = config.otg.udc + if config.kvmd.hid.mouse_device.type == "otg": + hid_kwargs["mouse_device"]["udc"] = config.otg.udc global_config = config config = config.kvmd diff --git a/kvmd/apps/otg/__init__.py b/kvmd/apps/otg/__init__.py index 1da23c67..372d70b7 100644 --- a/kvmd/apps/otg/__init__.py +++ b/kvmd/apps/otg/__init__.py @@ -90,12 +90,17 @@ def _write_bytes(path: str, data: bytes) -> None: file.write(data) +def _check_hid_otg(config: Section) -> bool: + return config.kvmd.hid.type == "otg" or (config.kvmd.hid.type == "multi" + and (config.kvmd.hid.keyboard_device["type"] == "otg" or config.kvmd.hid.mouse_device["type"] == "otg")) + + def _check_config(config: Section) -> None: if ( not config.otg.devices.serial.enabled and not config.otg.devices.ethernet.enabled - and config.kvmd.hid.type != "otg" and config.kvmd.msd.type != "otg" + and not _check_hid_otg(config) ): raise RuntimeError("Nothing to do") @@ -255,14 +260,16 @@ def _cmd_start(config: Section) -> None: # pylint: disable=too-many-statements, logger.info("===== Ethernet =====") gc.add_ethernet(**cod.ethernet._unpack(ignore=["enabled"])) - if config.kvmd.hid.type == "otg": + if _check_hid_otg(config): logger.info("===== HID-Keyboard =====") gc.add_keyboard(cod.hid.keyboard.start, config.otg.remote_wakeup) logger.info("===== HID-Mouse =====") - gc.add_mouse(cod.hid.mouse.start, config.otg.remote_wakeup, config.kvmd.hid.mouse.absolute, config.kvmd.hid.mouse.horizontal_wheel) - if config.kvmd.hid.mouse_alt.device: + hid_mouse = config.kvmd.hid.get("mouse") or config.kvmd.hid.get("mouse_device", {}).get("mouse") + gc.add_mouse(cod.hid.mouse.start, config.otg.remote_wakeup, hid_mouse["absolute"], hid_mouse["horizontal_wheel"]) + hid_mouse_alt = config.kvmd.hid.get("mouse_alt") or config.kvmd.hid.get("mouse_device", {}).get("mouse_alt") + if hid_mouse_alt["device"]: logger.info("===== HID-Mouse-Alt =====") - gc.add_mouse(cod.hid.mouse.start, config.otg.remote_wakeup, (not config.kvmd.hid.mouse.absolute), config.kvmd.hid.mouse.horizontal_wheel) + gc.add_mouse(cod.hid.mouse.start, config.otg.remote_wakeup, (not hid_mouse["absolute"]), hid_mouse["horizontal_wheel"]) if config.kvmd.msd.type == "otg": logger.info("===== MSD =====") diff --git a/kvmd/plugins/hid/multi.py b/kvmd/plugins/hid/multi.py new file mode 100644 index 00000000..dfa198a5 --- /dev/null +++ b/kvmd/plugins/hid/multi.py @@ -0,0 +1,156 @@ +# ========================================================================== # +# # +# KVMD - The main PiKVM daemon. # +# # +# Copyright (C) 2018-2024 Maxim Devaev # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see . # +# # +# ========================================================================== # + +import asyncio +import functools +import pathlib + +from typing import Any +from typing import AsyncGenerator +from typing import Iterable +from pkgutil import iter_modules + +from ...yamlconf import Option +from ...yamlconf import make_config + +from . import get_hid_class +from . import BaseHid + + +def all_hid_plugins() -> list[str]: + current_directory = pathlib.Path(__file__).parent.resolve() + return [name for _, name, _ in iter_modules([str(current_directory)]) if name != "multi"] + + +def valid_hid_plugin(key: str, value: list) -> list: + all_plugins = all_hid_plugins() + if not isinstance(value, dict) or "type" not in value or value["type"] not in all_plugins: + raise ValueError("Invalid hid plugin") + device_class = get_hid_class(value["type"]) + opts = { + **device_class.get_plugin_options(), + "type": Option("", type=str) + } + return make_config(value, opts, ("kvmd", "hid", key)) + + +async def merge_generators(gens: Iterable[AsyncGenerator[Any, None]]) -> AsyncGenerator[Any, None]: + pending_tasks = {asyncio.ensure_future(anext(g)): g for g in gens} + while len(pending_tasks) > 0: + done, _ = await asyncio.wait(pending_tasks.keys(), return_when="FIRST_COMPLETED") + for completed_task in done: + try: + result = completed_task.result() + yield result + dg = pending_tasks[completed_task] + pending_tasks[asyncio.ensure_future(anext(dg))] = dg + except StopAsyncIteration as sai: + print("Exception in getting result", sai) + finally: + del pending_tasks[completed_task] + + +class Plugin(BaseHid): + def __init__( + self, + keyboard_device: dict[str, Any], + mouse_device: dict[str, Any] + ): + super().__init__(False, False, 0) + + keyboard_type = keyboard_device["type"] + mouse_type = mouse_device["type"] + del keyboard_device["type"] + del mouse_device["type"] + self.__keyboard = get_hid_class(keyboard_type)(**keyboard_device) + self.__mouse = get_hid_class(mouse_type)(**mouse_device) + + @classmethod + def get_plugin_options(cls) -> dict: + return { + "keyboard_device": Option({ + "type": "otg" + }, type=functools.partial(valid_hid_plugin, "keyboard_device")), + "mouse_device": Option({ + "type": "otg" + }, type=functools.partial(valid_hid_plugin, "mouse_device")), + } + + def sysprep(self) -> None: + self.__keyboard.sysprep() + self.__mouse.sysprep() + + async def get_state(self) -> dict: + keyboard_state, mouse_state = await asyncio.gather(self.__keyboard.get_state(), self.__mouse.get_state()) + return { + "online": keyboard_state["online"] and mouse_state["online"], + "busy": keyboard_state["busy"] or mouse_state["busy"], + "connected": keyboard_state["connected"] or mouse_state["connected"], + "keyboard": keyboard_state["keyboard"], + "mouse": mouse_state["mouse"], + "jiggler": mouse_state["jiggler"] + } + + async def poll_state(self) -> AsyncGenerator[dict, None]: + async for state in merge_generators([self.__keyboard.poll_state(), self.__mouse.poll_state()]): + yield state + + async def reset(self) -> None: + async with asyncio.TaskGroup() as tg: + tg.create_task(self.__keyboard.reset()) + tg.create_task(self.__mouse.reset()) + + async def cleanup(self) -> None: + async with asyncio.TaskGroup() as tg: + tg.create_task(self.__keyboard.cleanup()) + tg.create_task(self.__mouse.cleanup()) + + def send_key_events(self, keys: Iterable[tuple[str, bool]]) -> None: + self.__keyboard.send_key_events(keys) + + def send_mouse_button_event(self, button: str, state: bool) -> None: + self.__mouse.send_mouse_button_event(button, state) + + def send_mouse_move_event(self, to_x: int, to_y: int) -> None: + self.__mouse.send_mouse_move_event(to_x, to_y) + + def send_mouse_relative_event(self, delta_x: int, delta_y: int) -> None: + self.__mouse.send_mouse_relative_event(delta_x, delta_y) + + def send_mouse_wheel_event(self, delta_x: int, delta_y: int) -> None: + self.__mouse.send_mouse_wheel_event(delta_x, delta_y) + + def set_params( + self, + keyboard_output: (str | None)=None, + mouse_output: (str | None)=None, + jiggler: (bool | None)=None, + ) -> None: + self.__keyboard.set_params(keyboard_output, mouse_output, jiggler) + self.__mouse.set_params(keyboard_output, mouse_output, jiggler) + + def set_connected(self, connected: bool) -> None: + self.__keyboard.set_connected(connected) + self.__mouse.set_connected(connected) + + def clear_events(self) -> None: + self.__keyboard.clear_events() + self.__mouse.clear_events() -- cgit v1.2.3