summaryrefslogtreecommitdiff
path: root/kvmd
diff options
context:
space:
mode:
Diffstat (limited to 'kvmd')
-rw-r--r--kvmd/apps/kvmd/hid.py104
-rw-r--r--kvmd/apps/kvmd/server.py32
-rw-r--r--kvmd/keymap.py36
-rw-r--r--kvmd/validators/__init__.py7
-rw-r--r--kvmd/validators/kvm.py21
5 files changed, 137 insertions, 63 deletions
diff --git a/kvmd/apps/kvmd/hid.py b/kvmd/apps/kvmd/hid.py
index c308c71b..3cdc48fc 100644
--- a/kvmd/apps/kvmd/hid.py
+++ b/kvmd/apps/kvmd/hid.py
@@ -23,87 +23,91 @@
import os
import signal
import asyncio
+import dataclasses
import multiprocessing
import multiprocessing.queues
import queue
import struct
-import pkgutil
import errno
import time
from typing import Dict
from typing import Set
-from typing import NamedTuple
from typing import AsyncGenerator
-from typing import Any
-import yaml
import serial
import setproctitle
from ...logging import get_logger
from ... import gpio
+from ... import keymap
# =====
-def _get_keymap() -> Dict[str, int]:
- return yaml.safe_load(pkgutil.get_data("kvmd", "data/keymap.yaml").decode()) # type: ignore
+class _BaseEvent:
+ def make_command(self) -> bytes:
+ raise NotImplementedError
-_KEYMAP = _get_keymap()
[email protected] # pylint: disable=abstract-method
+class _BoolEvent(_BaseEvent):
+ name: str
+ state: bool
-class _KeyEvent(NamedTuple):
- key: str
- state: bool
[email protected] # pylint: disable=abstract-method
+class _IntEvent(_BaseEvent):
+ x: int
+ y: int
- @staticmethod
- def is_valid(key: str) -> bool:
- return (key in _KEYMAP)
+
+class _KeyEvent(_BoolEvent):
+ def __post_init__(self) -> None:
+ assert self.name in keymap.KEYMAP
def make_command(self) -> bytes:
- code = _KEYMAP[self.key]
+ code = keymap.KEYMAP[self.name]
key_bytes = bytes([code])
assert len(key_bytes) == 1, (self, key_bytes, code)
state_bytes = (b"\x01" if self.state else b"\x00")
return b"\x11" + key_bytes + state_bytes + b"\x00\x00"
-class _MouseMoveEvent(NamedTuple):
- to_x: int
- to_y: int
+class _MouseMoveEvent(_IntEvent):
+ def __post_init__(self) -> None:
+ assert -32768 <= self.x <= 32767
+ assert -32768 <= self.y <= 32767
def make_command(self) -> bytes:
- to_x = min(max(-32768, self.to_x), 32767)
- to_y = min(max(-32768, self.to_y), 32767)
- return b"\x12" + struct.pack(">hh", to_x, to_y)
-
+ return b"\x12" + struct.pack(">hh", self.x, self.y)
-class _MouseButtonEvent(NamedTuple):
- button: str
- state: bool
- @staticmethod
- def is_valid(button: str) -> bool:
- return (button in ["left", "right"])
+class _MouseButtonEvent(_BoolEvent):
+ def __post_init__(self) -> None:
+ assert self.name in ["left", "right"]
def make_command(self) -> bytes:
code = 0
- if self.button == "left":
+ if self.name == "left":
code = (0b10000000 | (0b00001000 if self.state else 0))
- elif self.button == "right":
+ elif self.name == "right":
code = (0b01000000 | (0b00000100 if self.state else 0))
assert code, self
return b"\x13" + bytes([code]) + b"\x00\x00\x00"
-class _MouseWheelEvent(NamedTuple):
- delta_y: int
+class _MouseWheelEvent(_IntEvent):
+ def __post_init__(self) -> None:
+ assert self.x == 0 # Горизонтальная прокрутка пока не поддерживается
+ assert -128 <= self.y <= 127
def make_command(self) -> bytes:
- delta_y = min(max(-128, self.delta_y), 127)
- return b"\x14\x00" + struct.pack(">b", delta_y) + b"\x00\x00"
+ return b"\x14\x00" + struct.pack(">b", self.y) + b"\x00\x00"
# =====
@@ -139,13 +143,13 @@ class Hid(multiprocessing.Process): # pylint: disable=too-many-instance-attribu
self.__state_poll = state_poll
+ self.__lock = asyncio.Lock()
+
self.__pressed_keys: Set[str] = set()
self.__pressed_mouse_buttons: Set[str] = set()
- self.__lock = asyncio.Lock()
self.__events_queue: multiprocessing.queues.Queue = multiprocessing.Queue()
self.__online_shared = multiprocessing.Value("i", 1)
-
self.__stop_event = multiprocessing.Event()
def start(self) -> None:
@@ -167,16 +171,16 @@ class Hid(multiprocessing.Process): # pylint: disable=too-many-instance-attribu
gpio.write(self.__reset_pin, False)
async def send_key_event(self, key: str, state: bool) -> None:
- await self.__send_bool_event(_KeyEvent, self.__pressed_keys, key, state)
+ await self.__send_bool_event(_KeyEvent(key, state), self.__pressed_keys)
async def send_mouse_move_event(self, to_x: int, to_y: int) -> None:
- await self.__send_int_event(_MouseMoveEvent, to_x, to_y)
+ await self.__send_int_event(_MouseMoveEvent(to_x, to_y))
async def send_mouse_button_event(self, button: str, state: bool) -> None:
- await self.__send_bool_event(_MouseButtonEvent, self.__pressed_mouse_buttons, button, state)
+ await self.__send_bool_event(_MouseButtonEvent(button, state), self.__pressed_mouse_buttons)
async def send_mouse_wheel_event(self, delta_y: int) -> None:
- await self.__send_int_event(_MouseWheelEvent, delta_y)
+ await self.__send_int_event(_MouseWheelEvent(0, delta_y))
async def clear_events(self) -> None:
if not self.__stop_event.is_set():
@@ -196,23 +200,23 @@ class Hid(multiprocessing.Process): # pylint: disable=too-many-instance-attribu
self.join()
gpio.write(self.__reset_pin, False)
- async def __send_bool_event(self, cls: Any, pressed: Set[str], name: str, state: bool) -> None:
+ async def __send_bool_event(self, event: _BoolEvent, pressed: Set[str]) -> None:
if not self.__stop_event.is_set():
async with self.__lock:
- if cls.is_valid(name) and (
- (state and (name not in pressed)) # Если еще не нажато
- or (not state and (name in pressed)) # ... Или еще не отжато
+ if (
+ (event.state and (event.name not in pressed)) # Если еще не нажато
+ or (not event.state and (event.name in pressed)) # ... Или еще не отжато
):
- if state:
- pressed.add(name)
+ if event.state:
+ pressed.add(event.name)
else:
- pressed.remove(name)
- self.__events_queue.put(cls(name, state))
+ pressed.remove(event.name)
+ self.__events_queue.put(event)
- async def __send_int_event(self, cls: Any, *args: int) -> None:
+ async def __send_int_event(self, event: _IntEvent) -> None:
if not self.__stop_event.is_set():
async with self.__lock:
- self.__events_queue.put(cls(*args))
+ self.__events_queue.put(event)
def __unsafe_clear_events(self) -> None:
for (cls, pressed) in [
@@ -244,7 +248,7 @@ class Hid(multiprocessing.Process): # pylint: disable=too-many-instance-attribu
passed = 0
while not (self.__stop_event.is_set() and self.__events_queue.qsize() == 0):
try:
- event = self.__events_queue.get(timeout=0.05)
+ event: _BaseEvent = self.__events_queue.get(timeout=0.05)
except queue.Empty:
if passed >= 20: # 20 * 0.05 = 1 sec
self.__process_command(tty, b"\x01\x00\x00\x00\x00") # Ping
diff --git a/kvmd/apps/kvmd/server.py b/kvmd/apps/kvmd/server.py
index cfe0a2bc..3a531484 100644
--- a/kvmd/apps/kvmd/server.py
+++ b/kvmd/apps/kvmd/server.py
@@ -47,14 +47,20 @@ from ...aioregion import RegionIsBusyError
from ...validators import ValidatorError
from ...validators.basic import valid_bool
+
from ...validators.auth import valid_user
from ...validators.auth import valid_passwd
from ...validators.auth import valid_auth_token
+
from ...validators.kvm import valid_atx_button
from ...validators.kvm import valid_kvm_target
from ...validators.kvm import valid_log_seek
from ...validators.kvm import valid_stream_quality
from ...validators.kvm import valid_stream_fps
+from ...validators.kvm import valid_hid_key
+from ...validators.kvm import valid_hid_mouse_move
+from ...validators.kvm import valid_hid_mouse_button
+from ...validators.kvm import valid_hid_mouse_wheel
from ... import __version__
@@ -384,28 +390,32 @@ class Server: # pylint: disable=too-many-instance-attributes
return ws
async def __handle_ws_key_event(self, event: Dict) -> None:
- key = str(event.get("key", ""))[:64].strip()
- state = event.get("state")
- if key and state in [True, False]:
- await self.__hid.send_key_event(key, state)
+ try:
+ key = valid_hid_key(event["key"])
+ state = valid_bool(event["state"])
+ except Exception:
+ return
+ await self.__hid.send_key_event(key, state)
async def __handle_ws_mouse_move_event(self, event: Dict) -> None:
try:
- to_x = int(event["to"]["x"])
- to_y = int(event["to"]["y"])
+ to_x = valid_hid_mouse_move(event["to"]["x"])
+ to_y = valid_hid_mouse_move(event["to"]["y"])
except Exception:
return
await self.__hid.send_mouse_move_event(to_x, to_y)
async def __handle_ws_mouse_button_event(self, event: Dict) -> None:
- button = str(event.get("button", ""))[:64].strip()
- state = event.get("state")
- if button and state in [True, False]:
- await self.__hid.send_mouse_button_event(button, state)
+ try:
+ button = valid_hid_mouse_button(event["button"])
+ state = valid_bool(event["state"])
+ except Exception:
+ return
+ await self.__hid.send_mouse_button_event(button, state)
async def __handle_ws_mouse_wheel_event(self, event: Dict) -> None:
try:
- delta_y = int(event["delta"]["y"])
+ delta_y = valid_hid_mouse_wheel(event["delta"]["y"])
except Exception:
return
await self.__hid.send_mouse_wheel_event(delta_y)
diff --git a/kvmd/keymap.py b/kvmd/keymap.py
new file mode 100644
index 00000000..e6dfecbb
--- /dev/null
+++ b/kvmd/keymap.py
@@ -0,0 +1,36 @@
+# ========================================================================== #
+# #
+# KVMD - The main Pi-KVM daemon. #
+# #
+# Copyright (C) 2018 Maxim Devaev <[email protected]> #
+# #
+# This program is free software: you can redistribute it and/or modify #
+# it under the terms of the GNU General Public License as published by #
+# the Free Software Foundation, either version 3 of the License, or #
+# (at your option) any later version. #
+# #
+# This program is distributed in the hope that it will be useful, #
+# but WITHOUT ANY WARRANTY; without even the implied warranty of #
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
+# GNU General Public License for more details. #
+# #
+# You should have received a copy of the GNU General Public License #
+# along with this program. If not, see <https://www.gnu.org/licenses/>. #
+# #
+# ========================================================================== #
+
+
+import pkgutil
+
+from typing import Dict
+
+import yaml
+
+
+# =====
+def _get_keymap() -> Dict[str, int]:
+ return yaml.safe_load(pkgutil.get_data("kvmd", "data/keymap.yaml").decode()) # type: ignore
+
+
+# =====
+KEYMAP = _get_keymap()
diff --git a/kvmd/validators/__init__.py b/kvmd/validators/__init__.py
index f4b0f1fa..5556f564 100644
--- a/kvmd/validators/__init__.py
+++ b/kvmd/validators/__init__.py
@@ -23,8 +23,11 @@
import re
from typing import List
+from typing import Mapping
+from typing import Sequence
from typing import Callable
from typing import NoReturn
+from typing import Union
from typing import Any
@@ -54,13 +57,13 @@ def check_not_none_string(arg: Any, name: str, strip: bool=True) -> str:
return arg
-def check_in_list(arg: Any, name: str, variants: List) -> Any:
+def check_in_list(arg: Any, name: str, variants: Union[Sequence, Mapping]) -> Any:
if arg not in variants:
raise_error(arg, name)
return arg
-def check_string_in_list(arg: Any, name: str, variants: List[str], lower: bool=True) -> Any:
+def check_string_in_list(arg: Any, name: str, variants: Union[Sequence[str], Mapping[str, Any]], lower: bool=True) -> Any:
arg = check_not_none_string(arg, name)
if lower:
arg = arg.lower()
diff --git a/kvmd/validators/kvm.py b/kvmd/validators/kvm.py
index 034587ef..40f5ced1 100644
--- a/kvmd/validators/kvm.py
+++ b/kvmd/validators/kvm.py
@@ -22,6 +22,8 @@
from typing import Any
+from .. import keymap
+
from . import check_string_in_list
from .basic import valid_number
@@ -46,3 +48,22 @@ def valid_stream_quality(arg: Any) -> int:
def valid_stream_fps(arg: Any) -> int:
return int(valid_number(arg, min=0, max=30, name="stream FPS"))
+
+
+# =====
+def valid_hid_key(arg: Any) -> str:
+ return check_string_in_list(arg, "HID key", keymap.KEYMAP, lower=False)
+
+
+def valid_hid_mouse_move(arg: Any) -> int:
+ arg = valid_number(arg, name="HID mouse move")
+ return min(max(-32768, arg), 32767)
+
+
+def valid_hid_mouse_button(arg: Any) -> str:
+ return check_string_in_list(arg, "HID mouse button", ["left", "right"])
+
+
+def valid_hid_mouse_wheel(arg: Any) -> int:
+ arg = valid_number(arg, name="HID mouse wheel")
+ return min(max(-128, arg), 127)