summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDevaev Maxim <[email protected]>2019-04-08 04:58:32 +0300
committerDevaev Maxim <[email protected]>2019-04-08 04:58:32 +0300
commit9243d2a00c86aa9b5df2b1df20d2ba57be0bed47 (patch)
tree4d86bf39d432af121fff66a863864719c885d84d
parent7eca51f17b7c2c95b54d4c2b77caa6c8554ef4f4 (diff)
refactoring
-rw-r--r--Makefile2
-rw-r--r--kvmd/apps/kvmd/hid.py104
-rw-r--r--kvmd/apps/kvmd/server.py32
-rw-r--r--kvmd/keymap.py36
-rw-r--r--kvmd/validators/__init__.py7
-rw-r--r--kvmd/validators/kvm.py21
-rw-r--r--tests/test_keymap.py36
-rw-r--r--tests/test_validators_kvm.py71
8 files changed, 245 insertions, 64 deletions
diff --git a/Makefile b/Makefile
index 52fe4a05..3770e5e8 100644
--- a/Makefile
+++ b/Makefile
@@ -13,7 +13,7 @@ all:
tox: _testenv
- docker run --rm \
+ time docker run --rm \
--volume `pwd`:/src:ro \
--volume `pwd`/testenv:/src/testenv:rw \
--volume `pwd`/extras:/usr/share/kvmd/extras:ro \
diff --git a/kvmd/apps/kvmd/hid.py b/kvmd/apps/kvmd/hid.py
index c308c71b..3cdc48fc 100644
--- a/kvmd/apps/kvmd/hid.py
+++ b/kvmd/apps/kvmd/hid.py
@@ -23,87 +23,91 @@
import os
import signal
import asyncio
+import dataclasses
import multiprocessing
import multiprocessing.queues
import queue
import struct
-import pkgutil
import errno
import time
from typing import Dict
from typing import Set
-from typing import NamedTuple
from typing import AsyncGenerator
-from typing import Any
-import yaml
import serial
import setproctitle
from ...logging import get_logger
from ... import gpio
+from ... import keymap
# =====
-def _get_keymap() -> Dict[str, int]:
- return yaml.safe_load(pkgutil.get_data("kvmd", "data/keymap.yaml").decode()) # type: ignore
+class _BaseEvent:
+ def make_command(self) -> bytes:
+ raise NotImplementedError
-_KEYMAP = _get_keymap()
[email protected] # pylint: disable=abstract-method
+class _BoolEvent(_BaseEvent):
+ name: str
+ state: bool
-class _KeyEvent(NamedTuple):
- key: str
- state: bool
[email protected] # pylint: disable=abstract-method
+class _IntEvent(_BaseEvent):
+ x: int
+ y: int
- @staticmethod
- def is_valid(key: str) -> bool:
- return (key in _KEYMAP)
+
+class _KeyEvent(_BoolEvent):
+ def __post_init__(self) -> None:
+ assert self.name in keymap.KEYMAP
def make_command(self) -> bytes:
- code = _KEYMAP[self.key]
+ code = keymap.KEYMAP[self.name]
key_bytes = bytes([code])
assert len(key_bytes) == 1, (self, key_bytes, code)
state_bytes = (b"\x01" if self.state else b"\x00")
return b"\x11" + key_bytes + state_bytes + b"\x00\x00"
-class _MouseMoveEvent(NamedTuple):
- to_x: int
- to_y: int
+class _MouseMoveEvent(_IntEvent):
+ def __post_init__(self) -> None:
+ assert -32768 <= self.x <= 32767
+ assert -32768 <= self.y <= 32767
def make_command(self) -> bytes:
- to_x = min(max(-32768, self.to_x), 32767)
- to_y = min(max(-32768, self.to_y), 32767)
- return b"\x12" + struct.pack(">hh", to_x, to_y)
-
+ return b"\x12" + struct.pack(">hh", self.x, self.y)
-class _MouseButtonEvent(NamedTuple):
- button: str
- state: bool
- @staticmethod
- def is_valid(button: str) -> bool:
- return (button in ["left", "right"])
+class _MouseButtonEvent(_BoolEvent):
+ def __post_init__(self) -> None:
+ assert self.name in ["left", "right"]
def make_command(self) -> bytes:
code = 0
- if self.button == "left":
+ if self.name == "left":
code = (0b10000000 | (0b00001000 if self.state else 0))
- elif self.button == "right":
+ elif self.name == "right":
code = (0b01000000 | (0b00000100 if self.state else 0))
assert code, self
return b"\x13" + bytes([code]) + b"\x00\x00\x00"
-class _MouseWheelEvent(NamedTuple):
- delta_y: int
+class _MouseWheelEvent(_IntEvent):
+ def __post_init__(self) -> None:
+ assert self.x == 0 # Горизонтальная прокрутка пока не поддерживается
+ assert -128 <= self.y <= 127
def make_command(self) -> bytes:
- delta_y = min(max(-128, self.delta_y), 127)
- return b"\x14\x00" + struct.pack(">b", delta_y) + b"\x00\x00"
+ return b"\x14\x00" + struct.pack(">b", self.y) + b"\x00\x00"
# =====
@@ -139,13 +143,13 @@ class Hid(multiprocessing.Process): # pylint: disable=too-many-instance-attribu
self.__state_poll = state_poll
+ self.__lock = asyncio.Lock()
+
self.__pressed_keys: Set[str] = set()
self.__pressed_mouse_buttons: Set[str] = set()
- self.__lock = asyncio.Lock()
self.__events_queue: multiprocessing.queues.Queue = multiprocessing.Queue()
self.__online_shared = multiprocessing.Value("i", 1)
-
self.__stop_event = multiprocessing.Event()
def start(self) -> None:
@@ -167,16 +171,16 @@ class Hid(multiprocessing.Process): # pylint: disable=too-many-instance-attribu
gpio.write(self.__reset_pin, False)
async def send_key_event(self, key: str, state: bool) -> None:
- await self.__send_bool_event(_KeyEvent, self.__pressed_keys, key, state)
+ await self.__send_bool_event(_KeyEvent(key, state), self.__pressed_keys)
async def send_mouse_move_event(self, to_x: int, to_y: int) -> None:
- await self.__send_int_event(_MouseMoveEvent, to_x, to_y)
+ await self.__send_int_event(_MouseMoveEvent(to_x, to_y))
async def send_mouse_button_event(self, button: str, state: bool) -> None:
- await self.__send_bool_event(_MouseButtonEvent, self.__pressed_mouse_buttons, button, state)
+ await self.__send_bool_event(_MouseButtonEvent(button, state), self.__pressed_mouse_buttons)
async def send_mouse_wheel_event(self, delta_y: int) -> None:
- await self.__send_int_event(_MouseWheelEvent, delta_y)
+ await self.__send_int_event(_MouseWheelEvent(0, delta_y))
async def clear_events(self) -> None:
if not self.__stop_event.is_set():
@@ -196,23 +200,23 @@ class Hid(multiprocessing.Process): # pylint: disable=too-many-instance-attribu
self.join()
gpio.write(self.__reset_pin, False)
- async def __send_bool_event(self, cls: Any, pressed: Set[str], name: str, state: bool) -> None:
+ async def __send_bool_event(self, event: _BoolEvent, pressed: Set[str]) -> None:
if not self.__stop_event.is_set():
async with self.__lock:
- if cls.is_valid(name) and (
- (state and (name not in pressed)) # Если еще не нажато
- or (not state and (name in pressed)) # ... Или еще не отжато
+ if (
+ (event.state and (event.name not in pressed)) # Если еще не нажато
+ or (not event.state and (event.name in pressed)) # ... Или еще не отжато
):
- if state:
- pressed.add(name)
+ if event.state:
+ pressed.add(event.name)
else:
- pressed.remove(name)
- self.__events_queue.put(cls(name, state))
+ pressed.remove(event.name)
+ self.__events_queue.put(event)
- async def __send_int_event(self, cls: Any, *args: int) -> None:
+ async def __send_int_event(self, event: _IntEvent) -> None:
if not self.__stop_event.is_set():
async with self.__lock:
- self.__events_queue.put(cls(*args))
+ self.__events_queue.put(event)
def __unsafe_clear_events(self) -> None:
for (cls, pressed) in [
@@ -244,7 +248,7 @@ class Hid(multiprocessing.Process): # pylint: disable=too-many-instance-attribu
passed = 0
while not (self.__stop_event.is_set() and self.__events_queue.qsize() == 0):
try:
- event = self.__events_queue.get(timeout=0.05)
+ event: _BaseEvent = self.__events_queue.get(timeout=0.05)
except queue.Empty:
if passed >= 20: # 20 * 0.05 = 1 sec
self.__process_command(tty, b"\x01\x00\x00\x00\x00") # Ping
diff --git a/kvmd/apps/kvmd/server.py b/kvmd/apps/kvmd/server.py
index cfe0a2bc..3a531484 100644
--- a/kvmd/apps/kvmd/server.py
+++ b/kvmd/apps/kvmd/server.py
@@ -47,14 +47,20 @@ from ...aioregion import RegionIsBusyError
from ...validators import ValidatorError
from ...validators.basic import valid_bool
+
from ...validators.auth import valid_user
from ...validators.auth import valid_passwd
from ...validators.auth import valid_auth_token
+
from ...validators.kvm import valid_atx_button
from ...validators.kvm import valid_kvm_target
from ...validators.kvm import valid_log_seek
from ...validators.kvm import valid_stream_quality
from ...validators.kvm import valid_stream_fps
+from ...validators.kvm import valid_hid_key
+from ...validators.kvm import valid_hid_mouse_move
+from ...validators.kvm import valid_hid_mouse_button
+from ...validators.kvm import valid_hid_mouse_wheel
from ... import __version__
@@ -384,28 +390,32 @@ class Server: # pylint: disable=too-many-instance-attributes
return ws
async def __handle_ws_key_event(self, event: Dict) -> None:
- key = str(event.get("key", ""))[:64].strip()
- state = event.get("state")
- if key and state in [True, False]:
- await self.__hid.send_key_event(key, state)
+ try:
+ key = valid_hid_key(event["key"])
+ state = valid_bool(event["state"])
+ except Exception:
+ return
+ await self.__hid.send_key_event(key, state)
async def __handle_ws_mouse_move_event(self, event: Dict) -> None:
try:
- to_x = int(event["to"]["x"])
- to_y = int(event["to"]["y"])
+ to_x = valid_hid_mouse_move(event["to"]["x"])
+ to_y = valid_hid_mouse_move(event["to"]["y"])
except Exception:
return
await self.__hid.send_mouse_move_event(to_x, to_y)
async def __handle_ws_mouse_button_event(self, event: Dict) -> None:
- button = str(event.get("button", ""))[:64].strip()
- state = event.get("state")
- if button and state in [True, False]:
- await self.__hid.send_mouse_button_event(button, state)
+ try:
+ button = valid_hid_mouse_button(event["button"])
+ state = valid_bool(event["state"])
+ except Exception:
+ return
+ await self.__hid.send_mouse_button_event(button, state)
async def __handle_ws_mouse_wheel_event(self, event: Dict) -> None:
try:
- delta_y = int(event["delta"]["y"])
+ delta_y = valid_hid_mouse_wheel(event["delta"]["y"])
except Exception:
return
await self.__hid.send_mouse_wheel_event(delta_y)
diff --git a/kvmd/keymap.py b/kvmd/keymap.py
new file mode 100644
index 00000000..e6dfecbb
--- /dev/null
+++ b/kvmd/keymap.py
@@ -0,0 +1,36 @@
+# ========================================================================== #
+# #
+# KVMD - The main Pi-KVM daemon. #
+# #
+# Copyright (C) 2018 Maxim Devaev <[email protected]> #
+# #
+# This program is free software: you can redistribute it and/or modify #
+# it under the terms of the GNU General Public License as published by #
+# the Free Software Foundation, either version 3 of the License, or #
+# (at your option) any later version. #
+# #
+# This program is distributed in the hope that it will be useful, #
+# but WITHOUT ANY WARRANTY; without even the implied warranty of #
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
+# GNU General Public License for more details. #
+# #
+# You should have received a copy of the GNU General Public License #
+# along with this program. If not, see <https://www.gnu.org/licenses/>. #
+# #
+# ========================================================================== #
+
+
+import pkgutil
+
+from typing import Dict
+
+import yaml
+
+
+# =====
+def _get_keymap() -> Dict[str, int]:
+ return yaml.safe_load(pkgutil.get_data("kvmd", "data/keymap.yaml").decode()) # type: ignore
+
+
+# =====
+KEYMAP = _get_keymap()
diff --git a/kvmd/validators/__init__.py b/kvmd/validators/__init__.py
index f4b0f1fa..5556f564 100644
--- a/kvmd/validators/__init__.py
+++ b/kvmd/validators/__init__.py
@@ -23,8 +23,11 @@
import re
from typing import List
+from typing import Mapping
+from typing import Sequence
from typing import Callable
from typing import NoReturn
+from typing import Union
from typing import Any
@@ -54,13 +57,13 @@ def check_not_none_string(arg: Any, name: str, strip: bool=True) -> str:
return arg
-def check_in_list(arg: Any, name: str, variants: List) -> Any:
+def check_in_list(arg: Any, name: str, variants: Union[Sequence, Mapping]) -> Any:
if arg not in variants:
raise_error(arg, name)
return arg
-def check_string_in_list(arg: Any, name: str, variants: List[str], lower: bool=True) -> Any:
+def check_string_in_list(arg: Any, name: str, variants: Union[Sequence[str], Mapping[str, Any]], lower: bool=True) -> Any:
arg = check_not_none_string(arg, name)
if lower:
arg = arg.lower()
diff --git a/kvmd/validators/kvm.py b/kvmd/validators/kvm.py
index 034587ef..40f5ced1 100644
--- a/kvmd/validators/kvm.py
+++ b/kvmd/validators/kvm.py
@@ -22,6 +22,8 @@
from typing import Any
+from .. import keymap
+
from . import check_string_in_list
from .basic import valid_number
@@ -46,3 +48,22 @@ def valid_stream_quality(arg: Any) -> int:
def valid_stream_fps(arg: Any) -> int:
return int(valid_number(arg, min=0, max=30, name="stream FPS"))
+
+
+# =====
+def valid_hid_key(arg: Any) -> str:
+ return check_string_in_list(arg, "HID key", keymap.KEYMAP, lower=False)
+
+
+def valid_hid_mouse_move(arg: Any) -> int:
+ arg = valid_number(arg, name="HID mouse move")
+ return min(max(-32768, arg), 32767)
+
+
+def valid_hid_mouse_button(arg: Any) -> str:
+ return check_string_in_list(arg, "HID mouse button", ["left", "right"])
+
+
+def valid_hid_mouse_wheel(arg: Any) -> int:
+ arg = valid_number(arg, name="HID mouse wheel")
+ return min(max(-128, arg), 127)
diff --git a/tests/test_keymap.py b/tests/test_keymap.py
new file mode 100644
index 00000000..26251fdb
--- /dev/null
+++ b/tests/test_keymap.py
@@ -0,0 +1,36 @@
+# ========================================================================== #
+# #
+# KVMD - The main Pi-KVM daemon. #
+# #
+# Copyright (C) 2018 Maxim Devaev <[email protected]> #
+# #
+# This program is free software: you can redistribute it and/or modify #
+# it under the terms of the GNU General Public License as published by #
+# the Free Software Foundation, either version 3 of the License, or #
+# (at your option) any later version. #
+# #
+# This program is distributed in the hope that it will be useful, #
+# but WITHOUT ANY WARRANTY; without even the implied warranty of #
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
+# GNU General Public License for more details. #
+# #
+# You should have received a copy of the GNU General Public License #
+# along with this program. If not, see <https://www.gnu.org/licenses/>. #
+# #
+# ========================================================================== #
+
+
+import pytest
+
+from kvmd.keymap import KEYMAP
+
+
+# =====
+def test_keymap__ok() -> None:
+ assert type(KEYMAP["KeyA"]) == int # pylint: disable=unidiomatic-typecheck
+ assert KEYMAP["KeyA"] == 1
+
+
+def test_keymap__fail() -> None:
+ with pytest.raises(KeyError):
+ print(KEYMAP["keya"])
diff --git a/tests/test_validators_kvm.py b/tests/test_validators_kvm.py
index fad34986..f1050b36 100644
--- a/tests/test_validators_kvm.py
+++ b/tests/test_validators_kvm.py
@@ -24,12 +24,18 @@ from typing import Any
import pytest
+from kvmd.keymap import KEYMAP
+
from kvmd.validators import ValidatorError
from kvmd.validators.kvm import valid_atx_button
from kvmd.validators.kvm import valid_kvm_target
from kvmd.validators.kvm import valid_log_seek
from kvmd.validators.kvm import valid_stream_quality
from kvmd.validators.kvm import valid_stream_fps
+from kvmd.validators.kvm import valid_hid_key
+from kvmd.validators.kvm import valid_hid_mouse_move
+from kvmd.validators.kvm import valid_hid_mouse_button
+from kvmd.validators.kvm import valid_hid_mouse_wheel
# =====
@@ -96,3 +102,68 @@ def test_ok__valid_stream_fps(arg: Any) -> None:
def test_fail__valid_stream_fps(arg: Any) -> None:
with pytest.raises(ValidatorError):
print(valid_stream_fps(arg))
+
+
+# =====
+def test_ok__valid_hid_key() -> None:
+ for key in KEYMAP:
+ print(valid_hid_key(key))
+ print(valid_hid_key(key + " "))
+
+
[email protected]("arg", ["test", "", None, "keya"])
+def test_fail__valid_hid_key(arg: Any) -> None:
+ with pytest.raises(ValidatorError):
+ print(valid_hid_key(arg))
+
+
+# =====
[email protected]("arg", [-20000, "1 ", "-1", 1, -1, 0, "20000 "])
+def test_ok__valid_hid_mouse_move(arg: Any) -> None:
+ assert valid_hid_mouse_move(arg) == int(str(arg).strip())
+
+
+def test_ok__valid_hid_mouse_move__m50000() -> None:
+ assert valid_hid_mouse_move(-50000) == -32768
+
+
+def test_ok__valid_hid_mouse_move__p50000() -> None:
+ assert valid_hid_mouse_move(50000) == 32767
+
+
[email protected]("arg", ["test", "", None, 1.1])
+def test_fail__valid_hid_mouse_move(arg: Any) -> None:
+ with pytest.raises(ValidatorError):
+ print(valid_hid_mouse_move(arg))
+
+
+# =====
[email protected]("arg", ["LEFT ", "RIGHT "])
+def test_ok__valid_hid_mouse_button(arg: Any) -> None:
+ assert valid_hid_mouse_button(arg) == arg.strip().lower()
+
+
[email protected]("arg", ["test", "", None])
+def test_fail__valid_hid_mouse_button(arg: Any) -> None:
+ with pytest.raises(ValidatorError):
+ print(valid_hid_mouse_button(arg))
+
+
+# =====
[email protected]("arg", [-100, "1 ", "-1", 1, -1, 0, "100 "])
+def test_ok__valid_hid_mouse_wheel(arg: Any) -> None:
+ assert valid_hid_mouse_wheel(arg) == int(str(arg).strip())
+
+
+def test_ok__valid_hid_mouse_wheel__m200() -> None:
+ assert valid_hid_mouse_wheel(-200) == -128
+
+
+def test_ok__valid_hid_mouse_wheel__p200() -> None:
+ assert valid_hid_mouse_wheel(200) == 127
+
+
[email protected]("arg", ["test", "", None, 1.1])
+def test_fail__valid_hid_mouse_wheel(arg: Any) -> None:
+ with pytest.raises(ValidatorError):
+ print(valid_hid_mouse_wheel(arg))