diff options
author | Devaev Maxim <[email protected]> | 2019-09-28 05:21:09 +0300 |
---|---|---|
committer | Devaev Maxim <[email protected]> | 2019-09-28 05:21:09 +0300 |
commit | c16e4c953ca197951c77a1cbab14e435d282c72b (patch) | |
tree | 894f32193893147dd63613d980871af64b15ba98 /kvmd/plugins | |
parent | 31c17bb583717bf99fa5990d6a4507bebe8086ce (diff) |
otg keyboard hid
Diffstat (limited to 'kvmd/plugins')
-rw-r--r-- | kvmd/plugins/hid/otg/__init__.py | 107 | ||||
-rw-r--r-- | kvmd/plugins/hid/otg/hid.py | 197 | ||||
-rw-r--r-- | kvmd/plugins/hid/otg/keyboard.py | 151 |
3 files changed, 455 insertions, 0 deletions
diff --git a/kvmd/plugins/hid/otg/__init__.py b/kvmd/plugins/hid/otg/__init__.py new file mode 100644 index 00000000..566bb0ec --- /dev/null +++ b/kvmd/plugins/hid/otg/__init__.py @@ -0,0 +1,107 @@ +# ========================================================================== # +# # +# KVMD - The main Pi-KVM daemon. # +# # +# Copyright (C) 2018 Maxim Devaev <[email protected]> # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see <https://www.gnu.org/licenses/>. # +# # +# ========================================================================== # + + +import asyncio + +from typing import Dict +from typing import AsyncGenerator +from typing import Any + +from ....yamlconf import Option + +from ....validators.basic import valid_bool +from ....validators.basic import valid_int_f1 +from ....validators.basic import valid_float_f01 + +from ....validators.os import valid_abs_path + +from .. import BaseHid + +from .keyboard import KeyboardProcess + + +# ===== +class Plugin(BaseHid): + def __init__( # pylint: disable=super-init-not-called + self, + keyboard: Dict[str, Any], + noop: bool, + state_poll: float, + ) -> None: + + self.__keyboard_proc = KeyboardProcess(noop=noop, **keyboard) + + self.__state_poll = state_poll + + self.__lock = asyncio.Lock() + + @classmethod + def get_plugin_options(cls) -> Dict[str, Option]: + return { + "keyboard": { + "device": Option("", type=valid_abs_path, unpack_as="device_path"), + "timeout": Option(1.0, type=valid_float_f01), + "retries": Option(5, type=valid_int_f1), + "retries_delay": Option(1.0, type=valid_float_f01), + }, + + "noop": Option(False, type=valid_bool), + "state_poll": Option(0.1, type=valid_float_f01), + } + + def start(self) -> None: + self.__keyboard_proc.start() + + def get_state(self) -> Dict: + return {"online": self.__keyboard_proc.is_online()} + + async def poll_state(self) -> AsyncGenerator[Dict, None]: + prev_state: Dict = {} + while self.__keyboard_proc.is_alive(): + state = self.get_state() + if state != prev_state: + yield self.get_state() + prev_state = state + await asyncio.sleep(self.__state_poll) + + async def reset(self) -> None: + self.__keyboard_proc.send_reset_event() + + async def cleanup(self) -> None: + self.__keyboard_proc.cleanup() + + # ===== + + async def send_key_event(self, key: str, state: bool) -> None: + self.__keyboard_proc.send_key_event(key, state) + + async def send_mouse_move_event(self, to_x: int, to_y: int) -> None: + pass + + async def send_mouse_button_event(self, button: str, state: bool) -> None: + pass + + async def send_mouse_wheel_event(self, delta_y: int) -> None: + pass + + async def clear_events(self) -> None: + self.__keyboard_proc.send_clear_event() diff --git a/kvmd/plugins/hid/otg/hid.py b/kvmd/plugins/hid/otg/hid.py new file mode 100644 index 00000000..c8376149 --- /dev/null +++ b/kvmd/plugins/hid/otg/hid.py @@ -0,0 +1,197 @@ +# ========================================================================== # +# # +# KVMD - The main Pi-KVM daemon. # +# # +# Copyright (C) 2018 Maxim Devaev <[email protected]> # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see <https://www.gnu.org/licenses/>. # +# # +# ========================================================================== # + + +import os +import select +import signal +import multiprocessing +import multiprocessing.queues +import queue +import errno +import time + +import setproctitle + +from ....logging import get_logger + + +# ===== +class BaseEvent: + pass + + +class DeviceProcess(multiprocessing.Process): # pylint: disable=too-many-instance-attributes + def __init__( + self, + name: str, + device_path: str, + timeout: float, + retries: int, + retries_delay: float, + noop: bool, + ) -> None: + + super().__init__(daemon=True) + + self.__name = name + + self.__device_path = device_path + self.__timeout = timeout + self.__retries = retries + self.__retries_delay = retries_delay + self.__noop = noop + + self.__fd = -1 + self.__events_queue: multiprocessing.queues.Queue = multiprocessing.Queue() + self.__online_shared = multiprocessing.Value("i", 1) + self.__stop_event = multiprocessing.Event() + + def run(self) -> None: + logger = get_logger(0) + + logger.info("Started HID-%s pid=%d", self.__name, os.getpid()) + signal.signal(signal.SIGINT, signal.SIG_IGN) + setproctitle.setproctitle(f"[hid-{self.__name}] {setproctitle.getproctitle()}") + + while not self.__stop_event.is_set(): + try: + while not self.__stop_event.is_set(): + passed = 0 + try: + event: BaseEvent = self.__events_queue.get(timeout=0.05) + except queue.Empty: + if passed >= 20: # 20 * 0.05 = 1 sec + self._ensure_device() # Check device + passed = 0 + else: + passed += 1 + else: + self._process_event(event) + passed = 0 + except Exception: + logger.error("Unexpected HID-%s error", self.__name) + self._close_device() + finally: + time.sleep(1) + + self._close_device() + + def is_online(self) -> bool: + return bool(self.__online_shared.value) + + def _stop(self) -> None: + if self.is_alive(): + get_logger().info("Stopping HID-%s daemon ...", self.__name) + self.__stop_event.set() + if self.exitcode is not None: + self.join() + + def _process_event(self, event: BaseEvent) -> None: + raise NotImplementedError + + def _queue_event(self, event: BaseEvent) -> None: + self.__events_queue.put(event) + + def _write_report(self, report: bytes) -> bool: + if self.__noop: + return True + + assert self.__fd >= 0 + logger = get_logger() + + retries = self.__retries + while retries: + try: + written = os.write(self.__fd, report) + if written == len(report): + self.__online_shared.value = 1 + return True + else: + logger.error("HID-%s write error: written (%s) != report length (%d)", + self.__name, written, len(report)) + self._close_device() + except Exception as err: + if isinstance(err, OSError) and errno == errno.EAGAIN: + msg = "Can't write report to HID-%s {}: %s: %s" + msg.format(" (maybe unplugged)" if retries == 1 else "") + logger.error(msg, self.__name, type(err).__name__, err) # TODO: debug + else: + logger.exception("Can't write report to HID-%s", self.__name) + self._close_device() + + retries -= 1 + self.__online_shared.value = 0 + + if retries: + logger.error("Retries left (HID-%s, write_report): %d", self.__name, retries) + time.sleep(self.__retries_delay) + + return False + + def _ensure_device(self) -> bool: + if self.__noop: + return True + + logger = get_logger() + + if self.__fd < 0: + try: + self.__fd = os.open(self.__device_path, os.O_WRONLY|os.O_NONBLOCK) + except FileNotFoundError: + logger.error("Missing HID-%s device: %s", self.__name, self.__device_path) + except Exception: + logger.exception("Can't open HID-%s device: %s", self.__name, self.__device_path) + + if self.__fd >= 0: + retries = self.__retries + while retries: + try: + if select.select([], [self.__fd], [], self.__timeout)[1]: + self.__online_shared.value = 1 + return True + else: + msg = "HID-%s is unavailable for writing" + if retries == 1: + msg += " (maybe unplugged)" + logger.error(msg, self.__name) # TODO: debug + except Exception as err: + logger.error("Can't select() HID-%s: %s: %s", self.__name, type(err).__name__, err) + + retries -= 1 + self.__online_shared.value = 0 + + if retries: + logger.error("Retries left (HID-%s, ensure_device): %d", self.__name, retries) + time.sleep(self.__retries_delay) + + self._close_device() + + return False + + def _close_device(self) -> None: + if self.__fd >= 0: + try: + os.close(self.__fd) + except Exception: + pass + finally: + self.__fd = -1 diff --git a/kvmd/plugins/hid/otg/keyboard.py b/kvmd/plugins/hid/otg/keyboard.py new file mode 100644 index 00000000..ed94ac19 --- /dev/null +++ b/kvmd/plugins/hid/otg/keyboard.py @@ -0,0 +1,151 @@ +# ========================================================================== # +# # +# KVMD - The main Pi-KVM daemon. # +# # +# Copyright (C) 2018 Maxim Devaev <[email protected]> # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see <https://www.gnu.org/licenses/>. # +# # +# ========================================================================== # + + +import dataclasses + +from typing import List +from typing import Set +from typing import Optional +from typing import Any + +from ....logging import get_logger + +from .... import keymap + +from .hid import BaseEvent +from .hid import DeviceProcess + + +# ===== +class _ClearEvent(BaseEvent): + pass + + +class _ResetEvent(BaseEvent): + pass + + [email protected](frozen=True) +class _KeyEvent(BaseEvent): + key: keymap.OtgKey + state: bool + + +# ===== +class KeyboardProcess(DeviceProcess): + def __init__(self, **kwargs: Any) -> None: + super().__init__(name="keyboard", **kwargs) + + self.__pressed_modifiers: Set[keymap.OtgKey] = set() + self.__pressed_keys: List[Optional[keymap.OtgKey]] = [None] * 6 + + def cleanup(self) -> None: + self._stop() + get_logger().info("Clearing HID-keyboard events ...") + if self._ensure_device(): + try: + self._write_report(b"\x00" * 8) # Release all keys and modifiers + finally: + self._close_device() + + def send_clear_event(self) -> None: + self._queue_event(_ClearEvent()) + + def send_reset_event(self) -> None: + self._queue_event(_ResetEvent()) + + def send_key_event(self, key: str, state: bool) -> None: + assert key in keymap.KEYMAP + self._queue_event(_KeyEvent(key=keymap.KEYMAP[key].otg, state=state)) + + # ===== + + def _process_event(self, event: BaseEvent) -> None: + if isinstance(event, _ClearEvent): + self.__process_clear_event() + elif isinstance(event, _ResetEvent): + self.__process_clear_event(reopen=True) + elif isinstance(event, _KeyEvent): + self.__process_key_event(event) + + def __process_clear_event(self, reopen: bool=False) -> None: + self.__clear_modifiers() + self.__clear_keys() + if reopen: + self._close_device() + self.__send_current_state() + + def __process_key_event(self, event: _KeyEvent) -> None: + if event.key.is_modifier: + if event.key in self.__pressed_modifiers: + # Ранее нажатый модификатор отжимаем + self.__pressed_modifiers.remove(event.key) + if not self.__send_current_state(): + return + if event.state: + # Нажимаем если нужно + self.__pressed_modifiers.add(event.key) + self.__send_current_state() + + else: # regular key + if event.key in self.__pressed_keys: + # Ранее нажатую клавишу отжимаем + self.__pressed_keys[self.__pressed_keys.index(event.key)] = None + if not self.__send_current_state(): + return + elif event.state and None not in self.__pressed_keys: + # Если нужно нажать что-то новое, но свободных слотов нет - отжимаем всё + self.__clear_keys() + if not self.__send_current_state(): + return + if event.state: + # Нажимаем если нужно + self.__pressed_keys[self.__pressed_keys.index(None)] = event.key + self.__send_current_state() + + def __send_current_state(self) -> bool: + ok = False + if self._ensure_device(): + modifiers = 0 + for key in self.__pressed_modifiers: + assert key.is_modifier + modifiers |= key.code + + assert len(self.__pressed_keys) == 6 + keys = [ + (0 if key is None else key.code) + for key in self.__pressed_keys + ] + + print(self.__pressed_modifiers, self.__pressed_keys) + ok = self._write_report(bytes([modifiers, 0] + keys)) + + if not ok: + self.__clear_modifiers() + self.__clear_keys() + return ok + + def __clear_modifiers(self) -> None: + self.__pressed_modifiers.clear() + + def __clear_keys(self) -> None: + self.__pressed_keys = [None] * 6 |