diff options
-rw-r--r-- | Makefile | 2 | ||||
-rw-r--r-- | kvmd/apps/kvmd/hid.py | 104 | ||||
-rw-r--r-- | kvmd/apps/kvmd/server.py | 32 | ||||
-rw-r--r-- | kvmd/keymap.py | 36 | ||||
-rw-r--r-- | kvmd/validators/__init__.py | 7 | ||||
-rw-r--r-- | kvmd/validators/kvm.py | 21 | ||||
-rw-r--r-- | tests/test_keymap.py | 36 | ||||
-rw-r--r-- | tests/test_validators_kvm.py | 71 |
8 files changed, 245 insertions, 64 deletions
@@ -13,7 +13,7 @@ all: tox: _testenv - docker run --rm \ + time docker run --rm \ --volume `pwd`:/src:ro \ --volume `pwd`/testenv:/src/testenv:rw \ --volume `pwd`/extras:/usr/share/kvmd/extras:ro \ diff --git a/kvmd/apps/kvmd/hid.py b/kvmd/apps/kvmd/hid.py index c308c71b..3cdc48fc 100644 --- a/kvmd/apps/kvmd/hid.py +++ b/kvmd/apps/kvmd/hid.py @@ -23,87 +23,91 @@ import os import signal import asyncio +import dataclasses import multiprocessing import multiprocessing.queues import queue import struct -import pkgutil import errno import time from typing import Dict from typing import Set -from typing import NamedTuple from typing import AsyncGenerator -from typing import Any -import yaml import serial import setproctitle from ...logging import get_logger from ... import gpio +from ... import keymap # ===== -def _get_keymap() -> Dict[str, int]: - return yaml.safe_load(pkgutil.get_data("kvmd", "data/keymap.yaml").decode()) # type: ignore +class _BaseEvent: + def make_command(self) -> bytes: + raise NotImplementedError -_KEYMAP = _get_keymap() [email protected] # pylint: disable=abstract-method +class _BoolEvent(_BaseEvent): + name: str + state: bool -class _KeyEvent(NamedTuple): - key: str - state: bool [email protected] # pylint: disable=abstract-method +class _IntEvent(_BaseEvent): + x: int + y: int - @staticmethod - def is_valid(key: str) -> bool: - return (key in _KEYMAP) + +class _KeyEvent(_BoolEvent): + def __post_init__(self) -> None: + assert self.name in keymap.KEYMAP def make_command(self) -> bytes: - code = _KEYMAP[self.key] + code = keymap.KEYMAP[self.name] key_bytes = bytes([code]) assert len(key_bytes) == 1, (self, key_bytes, code) state_bytes = (b"\x01" if self.state else b"\x00") return b"\x11" + key_bytes + state_bytes + b"\x00\x00" -class _MouseMoveEvent(NamedTuple): - to_x: int - to_y: int +class _MouseMoveEvent(_IntEvent): + def __post_init__(self) -> None: + assert -32768 <= self.x <= 32767 + assert -32768 <= self.y <= 32767 def make_command(self) -> bytes: - to_x = min(max(-32768, self.to_x), 32767) - to_y = min(max(-32768, self.to_y), 32767) - return b"\x12" + struct.pack(">hh", to_x, to_y) - + return b"\x12" + struct.pack(">hh", self.x, self.y) -class _MouseButtonEvent(NamedTuple): - button: str - state: bool - @staticmethod - def is_valid(button: str) -> bool: - return (button in ["left", "right"]) +class _MouseButtonEvent(_BoolEvent): + def __post_init__(self) -> None: + assert self.name in ["left", "right"] def make_command(self) -> bytes: code = 0 - if self.button == "left": + if self.name == "left": code = (0b10000000 | (0b00001000 if self.state else 0)) - elif self.button == "right": + elif self.name == "right": code = (0b01000000 | (0b00000100 if self.state else 0)) assert code, self return b"\x13" + bytes([code]) + b"\x00\x00\x00" -class _MouseWheelEvent(NamedTuple): - delta_y: int +class _MouseWheelEvent(_IntEvent): + def __post_init__(self) -> None: + assert self.x == 0 # Горизонтальная прокрутка пока не поддерживается + assert -128 <= self.y <= 127 def make_command(self) -> bytes: - delta_y = min(max(-128, self.delta_y), 127) - return b"\x14\x00" + struct.pack(">b", delta_y) + b"\x00\x00" + return b"\x14\x00" + struct.pack(">b", self.y) + b"\x00\x00" # ===== @@ -139,13 +143,13 @@ class Hid(multiprocessing.Process): # pylint: disable=too-many-instance-attribu self.__state_poll = state_poll + self.__lock = asyncio.Lock() + self.__pressed_keys: Set[str] = set() self.__pressed_mouse_buttons: Set[str] = set() - self.__lock = asyncio.Lock() self.__events_queue: multiprocessing.queues.Queue = multiprocessing.Queue() self.__online_shared = multiprocessing.Value("i", 1) - self.__stop_event = multiprocessing.Event() def start(self) -> None: @@ -167,16 +171,16 @@ class Hid(multiprocessing.Process): # pylint: disable=too-many-instance-attribu gpio.write(self.__reset_pin, False) async def send_key_event(self, key: str, state: bool) -> None: - await self.__send_bool_event(_KeyEvent, self.__pressed_keys, key, state) + await self.__send_bool_event(_KeyEvent(key, state), self.__pressed_keys) async def send_mouse_move_event(self, to_x: int, to_y: int) -> None: - await self.__send_int_event(_MouseMoveEvent, to_x, to_y) + await self.__send_int_event(_MouseMoveEvent(to_x, to_y)) async def send_mouse_button_event(self, button: str, state: bool) -> None: - await self.__send_bool_event(_MouseButtonEvent, self.__pressed_mouse_buttons, button, state) + await self.__send_bool_event(_MouseButtonEvent(button, state), self.__pressed_mouse_buttons) async def send_mouse_wheel_event(self, delta_y: int) -> None: - await self.__send_int_event(_MouseWheelEvent, delta_y) + await self.__send_int_event(_MouseWheelEvent(0, delta_y)) async def clear_events(self) -> None: if not self.__stop_event.is_set(): @@ -196,23 +200,23 @@ class Hid(multiprocessing.Process): # pylint: disable=too-many-instance-attribu self.join() gpio.write(self.__reset_pin, False) - async def __send_bool_event(self, cls: Any, pressed: Set[str], name: str, state: bool) -> None: + async def __send_bool_event(self, event: _BoolEvent, pressed: Set[str]) -> None: if not self.__stop_event.is_set(): async with self.__lock: - if cls.is_valid(name) and ( - (state and (name not in pressed)) # Если еще не нажато - or (not state and (name in pressed)) # ... Или еще не отжато + if ( + (event.state and (event.name not in pressed)) # Если еще не нажато + or (not event.state and (event.name in pressed)) # ... Или еще не отжато ): - if state: - pressed.add(name) + if event.state: + pressed.add(event.name) else: - pressed.remove(name) - self.__events_queue.put(cls(name, state)) + pressed.remove(event.name) + self.__events_queue.put(event) - async def __send_int_event(self, cls: Any, *args: int) -> None: + async def __send_int_event(self, event: _IntEvent) -> None: if not self.__stop_event.is_set(): async with self.__lock: - self.__events_queue.put(cls(*args)) + self.__events_queue.put(event) def __unsafe_clear_events(self) -> None: for (cls, pressed) in [ @@ -244,7 +248,7 @@ class Hid(multiprocessing.Process): # pylint: disable=too-many-instance-attribu passed = 0 while not (self.__stop_event.is_set() and self.__events_queue.qsize() == 0): try: - event = self.__events_queue.get(timeout=0.05) + event: _BaseEvent = self.__events_queue.get(timeout=0.05) except queue.Empty: if passed >= 20: # 20 * 0.05 = 1 sec self.__process_command(tty, b"\x01\x00\x00\x00\x00") # Ping diff --git a/kvmd/apps/kvmd/server.py b/kvmd/apps/kvmd/server.py index cfe0a2bc..3a531484 100644 --- a/kvmd/apps/kvmd/server.py +++ b/kvmd/apps/kvmd/server.py @@ -47,14 +47,20 @@ from ...aioregion import RegionIsBusyError from ...validators import ValidatorError from ...validators.basic import valid_bool + from ...validators.auth import valid_user from ...validators.auth import valid_passwd from ...validators.auth import valid_auth_token + from ...validators.kvm import valid_atx_button from ...validators.kvm import valid_kvm_target from ...validators.kvm import valid_log_seek from ...validators.kvm import valid_stream_quality from ...validators.kvm import valid_stream_fps +from ...validators.kvm import valid_hid_key +from ...validators.kvm import valid_hid_mouse_move +from ...validators.kvm import valid_hid_mouse_button +from ...validators.kvm import valid_hid_mouse_wheel from ... import __version__ @@ -384,28 +390,32 @@ class Server: # pylint: disable=too-many-instance-attributes return ws async def __handle_ws_key_event(self, event: Dict) -> None: - key = str(event.get("key", ""))[:64].strip() - state = event.get("state") - if key and state in [True, False]: - await self.__hid.send_key_event(key, state) + try: + key = valid_hid_key(event["key"]) + state = valid_bool(event["state"]) + except Exception: + return + await self.__hid.send_key_event(key, state) async def __handle_ws_mouse_move_event(self, event: Dict) -> None: try: - to_x = int(event["to"]["x"]) - to_y = int(event["to"]["y"]) + to_x = valid_hid_mouse_move(event["to"]["x"]) + to_y = valid_hid_mouse_move(event["to"]["y"]) except Exception: return await self.__hid.send_mouse_move_event(to_x, to_y) async def __handle_ws_mouse_button_event(self, event: Dict) -> None: - button = str(event.get("button", ""))[:64].strip() - state = event.get("state") - if button and state in [True, False]: - await self.__hid.send_mouse_button_event(button, state) + try: + button = valid_hid_mouse_button(event["button"]) + state = valid_bool(event["state"]) + except Exception: + return + await self.__hid.send_mouse_button_event(button, state) async def __handle_ws_mouse_wheel_event(self, event: Dict) -> None: try: - delta_y = int(event["delta"]["y"]) + delta_y = valid_hid_mouse_wheel(event["delta"]["y"]) except Exception: return await self.__hid.send_mouse_wheel_event(delta_y) diff --git a/kvmd/keymap.py b/kvmd/keymap.py new file mode 100644 index 00000000..e6dfecbb --- /dev/null +++ b/kvmd/keymap.py @@ -0,0 +1,36 @@ +# ========================================================================== # +# # +# KVMD - The main Pi-KVM daemon. # +# # +# Copyright (C) 2018 Maxim Devaev <[email protected]> # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see <https://www.gnu.org/licenses/>. # +# # +# ========================================================================== # + + +import pkgutil + +from typing import Dict + +import yaml + + +# ===== +def _get_keymap() -> Dict[str, int]: + return yaml.safe_load(pkgutil.get_data("kvmd", "data/keymap.yaml").decode()) # type: ignore + + +# ===== +KEYMAP = _get_keymap() diff --git a/kvmd/validators/__init__.py b/kvmd/validators/__init__.py index f4b0f1fa..5556f564 100644 --- a/kvmd/validators/__init__.py +++ b/kvmd/validators/__init__.py @@ -23,8 +23,11 @@ import re from typing import List +from typing import Mapping +from typing import Sequence from typing import Callable from typing import NoReturn +from typing import Union from typing import Any @@ -54,13 +57,13 @@ def check_not_none_string(arg: Any, name: str, strip: bool=True) -> str: return arg -def check_in_list(arg: Any, name: str, variants: List) -> Any: +def check_in_list(arg: Any, name: str, variants: Union[Sequence, Mapping]) -> Any: if arg not in variants: raise_error(arg, name) return arg -def check_string_in_list(arg: Any, name: str, variants: List[str], lower: bool=True) -> Any: +def check_string_in_list(arg: Any, name: str, variants: Union[Sequence[str], Mapping[str, Any]], lower: bool=True) -> Any: arg = check_not_none_string(arg, name) if lower: arg = arg.lower() diff --git a/kvmd/validators/kvm.py b/kvmd/validators/kvm.py index 034587ef..40f5ced1 100644 --- a/kvmd/validators/kvm.py +++ b/kvmd/validators/kvm.py @@ -22,6 +22,8 @@ from typing import Any +from .. import keymap + from . import check_string_in_list from .basic import valid_number @@ -46,3 +48,22 @@ def valid_stream_quality(arg: Any) -> int: def valid_stream_fps(arg: Any) -> int: return int(valid_number(arg, min=0, max=30, name="stream FPS")) + + +# ===== +def valid_hid_key(arg: Any) -> str: + return check_string_in_list(arg, "HID key", keymap.KEYMAP, lower=False) + + +def valid_hid_mouse_move(arg: Any) -> int: + arg = valid_number(arg, name="HID mouse move") + return min(max(-32768, arg), 32767) + + +def valid_hid_mouse_button(arg: Any) -> str: + return check_string_in_list(arg, "HID mouse button", ["left", "right"]) + + +def valid_hid_mouse_wheel(arg: Any) -> int: + arg = valid_number(arg, name="HID mouse wheel") + return min(max(-128, arg), 127) diff --git a/tests/test_keymap.py b/tests/test_keymap.py new file mode 100644 index 00000000..26251fdb --- /dev/null +++ b/tests/test_keymap.py @@ -0,0 +1,36 @@ +# ========================================================================== # +# # +# KVMD - The main Pi-KVM daemon. # +# # +# Copyright (C) 2018 Maxim Devaev <[email protected]> # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see <https://www.gnu.org/licenses/>. # +# # +# ========================================================================== # + + +import pytest + +from kvmd.keymap import KEYMAP + + +# ===== +def test_keymap__ok() -> None: + assert type(KEYMAP["KeyA"]) == int # pylint: disable=unidiomatic-typecheck + assert KEYMAP["KeyA"] == 1 + + +def test_keymap__fail() -> None: + with pytest.raises(KeyError): + print(KEYMAP["keya"]) diff --git a/tests/test_validators_kvm.py b/tests/test_validators_kvm.py index fad34986..f1050b36 100644 --- a/tests/test_validators_kvm.py +++ b/tests/test_validators_kvm.py @@ -24,12 +24,18 @@ from typing import Any import pytest +from kvmd.keymap import KEYMAP + from kvmd.validators import ValidatorError from kvmd.validators.kvm import valid_atx_button from kvmd.validators.kvm import valid_kvm_target from kvmd.validators.kvm import valid_log_seek from kvmd.validators.kvm import valid_stream_quality from kvmd.validators.kvm import valid_stream_fps +from kvmd.validators.kvm import valid_hid_key +from kvmd.validators.kvm import valid_hid_mouse_move +from kvmd.validators.kvm import valid_hid_mouse_button +from kvmd.validators.kvm import valid_hid_mouse_wheel # ===== @@ -96,3 +102,68 @@ def test_ok__valid_stream_fps(arg: Any) -> None: def test_fail__valid_stream_fps(arg: Any) -> None: with pytest.raises(ValidatorError): print(valid_stream_fps(arg)) + + +# ===== +def test_ok__valid_hid_key() -> None: + for key in KEYMAP: + print(valid_hid_key(key)) + print(valid_hid_key(key + " ")) + + [email protected]("arg", ["test", "", None, "keya"]) +def test_fail__valid_hid_key(arg: Any) -> None: + with pytest.raises(ValidatorError): + print(valid_hid_key(arg)) + + +# ===== [email protected]("arg", [-20000, "1 ", "-1", 1, -1, 0, "20000 "]) +def test_ok__valid_hid_mouse_move(arg: Any) -> None: + assert valid_hid_mouse_move(arg) == int(str(arg).strip()) + + +def test_ok__valid_hid_mouse_move__m50000() -> None: + assert valid_hid_mouse_move(-50000) == -32768 + + +def test_ok__valid_hid_mouse_move__p50000() -> None: + assert valid_hid_mouse_move(50000) == 32767 + + [email protected]("arg", ["test", "", None, 1.1]) +def test_fail__valid_hid_mouse_move(arg: Any) -> None: + with pytest.raises(ValidatorError): + print(valid_hid_mouse_move(arg)) + + +# ===== [email protected]("arg", ["LEFT ", "RIGHT "]) +def test_ok__valid_hid_mouse_button(arg: Any) -> None: + assert valid_hid_mouse_button(arg) == arg.strip().lower() + + [email protected]("arg", ["test", "", None]) +def test_fail__valid_hid_mouse_button(arg: Any) -> None: + with pytest.raises(ValidatorError): + print(valid_hid_mouse_button(arg)) + + +# ===== [email protected]("arg", [-100, "1 ", "-1", 1, -1, 0, "100 "]) +def test_ok__valid_hid_mouse_wheel(arg: Any) -> None: + assert valid_hid_mouse_wheel(arg) == int(str(arg).strip()) + + +def test_ok__valid_hid_mouse_wheel__m200() -> None: + assert valid_hid_mouse_wheel(-200) == -128 + + +def test_ok__valid_hid_mouse_wheel__p200() -> None: + assert valid_hid_mouse_wheel(200) == 127 + + [email protected]("arg", ["test", "", None, 1.1]) +def test_fail__valid_hid_mouse_wheel(arg: Any) -> None: + with pytest.raises(ValidatorError): + print(valid_hid_mouse_wheel(arg)) |