summaryrefslogtreecommitdiff
path: root/kvmd/apps
diff options
context:
space:
mode:
authorDevaev Maxim <[email protected]>2019-04-08 04:58:32 +0300
committerDevaev Maxim <[email protected]>2019-04-08 04:58:32 +0300
commit9243d2a00c86aa9b5df2b1df20d2ba57be0bed47 (patch)
tree4d86bf39d432af121fff66a863864719c885d84d /kvmd/apps
parent7eca51f17b7c2c95b54d4c2b77caa6c8554ef4f4 (diff)
refactoring
Diffstat (limited to 'kvmd/apps')
-rw-r--r--kvmd/apps/kvmd/hid.py104
-rw-r--r--kvmd/apps/kvmd/server.py32
2 files changed, 75 insertions, 61 deletions
diff --git a/kvmd/apps/kvmd/hid.py b/kvmd/apps/kvmd/hid.py
index c308c71b..3cdc48fc 100644
--- a/kvmd/apps/kvmd/hid.py
+++ b/kvmd/apps/kvmd/hid.py
@@ -23,87 +23,91 @@
import os
import signal
import asyncio
+import dataclasses
import multiprocessing
import multiprocessing.queues
import queue
import struct
-import pkgutil
import errno
import time
from typing import Dict
from typing import Set
-from typing import NamedTuple
from typing import AsyncGenerator
-from typing import Any
-import yaml
import serial
import setproctitle
from ...logging import get_logger
from ... import gpio
+from ... import keymap
# =====
-def _get_keymap() -> Dict[str, int]:
- return yaml.safe_load(pkgutil.get_data("kvmd", "data/keymap.yaml").decode()) # type: ignore
+class _BaseEvent:
+ def make_command(self) -> bytes:
+ raise NotImplementedError
-_KEYMAP = _get_keymap()
[email protected] # pylint: disable=abstract-method
+class _BoolEvent(_BaseEvent):
+ name: str
+ state: bool
-class _KeyEvent(NamedTuple):
- key: str
- state: bool
[email protected] # pylint: disable=abstract-method
+class _IntEvent(_BaseEvent):
+ x: int
+ y: int
- @staticmethod
- def is_valid(key: str) -> bool:
- return (key in _KEYMAP)
+
+class _KeyEvent(_BoolEvent):
+ def __post_init__(self) -> None:
+ assert self.name in keymap.KEYMAP
def make_command(self) -> bytes:
- code = _KEYMAP[self.key]
+ code = keymap.KEYMAP[self.name]
key_bytes = bytes([code])
assert len(key_bytes) == 1, (self, key_bytes, code)
state_bytes = (b"\x01" if self.state else b"\x00")
return b"\x11" + key_bytes + state_bytes + b"\x00\x00"
-class _MouseMoveEvent(NamedTuple):
- to_x: int
- to_y: int
+class _MouseMoveEvent(_IntEvent):
+ def __post_init__(self) -> None:
+ assert -32768 <= self.x <= 32767
+ assert -32768 <= self.y <= 32767
def make_command(self) -> bytes:
- to_x = min(max(-32768, self.to_x), 32767)
- to_y = min(max(-32768, self.to_y), 32767)
- return b"\x12" + struct.pack(">hh", to_x, to_y)
-
+ return b"\x12" + struct.pack(">hh", self.x, self.y)
-class _MouseButtonEvent(NamedTuple):
- button: str
- state: bool
- @staticmethod
- def is_valid(button: str) -> bool:
- return (button in ["left", "right"])
+class _MouseButtonEvent(_BoolEvent):
+ def __post_init__(self) -> None:
+ assert self.name in ["left", "right"]
def make_command(self) -> bytes:
code = 0
- if self.button == "left":
+ if self.name == "left":
code = (0b10000000 | (0b00001000 if self.state else 0))
- elif self.button == "right":
+ elif self.name == "right":
code = (0b01000000 | (0b00000100 if self.state else 0))
assert code, self
return b"\x13" + bytes([code]) + b"\x00\x00\x00"
-class _MouseWheelEvent(NamedTuple):
- delta_y: int
+class _MouseWheelEvent(_IntEvent):
+ def __post_init__(self) -> None:
+ assert self.x == 0 # Горизонтальная прокрутка пока не поддерживается
+ assert -128 <= self.y <= 127
def make_command(self) -> bytes:
- delta_y = min(max(-128, self.delta_y), 127)
- return b"\x14\x00" + struct.pack(">b", delta_y) + b"\x00\x00"
+ return b"\x14\x00" + struct.pack(">b", self.y) + b"\x00\x00"
# =====
@@ -139,13 +143,13 @@ class Hid(multiprocessing.Process): # pylint: disable=too-many-instance-attribu
self.__state_poll = state_poll
+ self.__lock = asyncio.Lock()
+
self.__pressed_keys: Set[str] = set()
self.__pressed_mouse_buttons: Set[str] = set()
- self.__lock = asyncio.Lock()
self.__events_queue: multiprocessing.queues.Queue = multiprocessing.Queue()
self.__online_shared = multiprocessing.Value("i", 1)
-
self.__stop_event = multiprocessing.Event()
def start(self) -> None:
@@ -167,16 +171,16 @@ class Hid(multiprocessing.Process): # pylint: disable=too-many-instance-attribu
gpio.write(self.__reset_pin, False)
async def send_key_event(self, key: str, state: bool) -> None:
- await self.__send_bool_event(_KeyEvent, self.__pressed_keys, key, state)
+ await self.__send_bool_event(_KeyEvent(key, state), self.__pressed_keys)
async def send_mouse_move_event(self, to_x: int, to_y: int) -> None:
- await self.__send_int_event(_MouseMoveEvent, to_x, to_y)
+ await self.__send_int_event(_MouseMoveEvent(to_x, to_y))
async def send_mouse_button_event(self, button: str, state: bool) -> None:
- await self.__send_bool_event(_MouseButtonEvent, self.__pressed_mouse_buttons, button, state)
+ await self.__send_bool_event(_MouseButtonEvent(button, state), self.__pressed_mouse_buttons)
async def send_mouse_wheel_event(self, delta_y: int) -> None:
- await self.__send_int_event(_MouseWheelEvent, delta_y)
+ await self.__send_int_event(_MouseWheelEvent(0, delta_y))
async def clear_events(self) -> None:
if not self.__stop_event.is_set():
@@ -196,23 +200,23 @@ class Hid(multiprocessing.Process): # pylint: disable=too-many-instance-attribu
self.join()
gpio.write(self.__reset_pin, False)
- async def __send_bool_event(self, cls: Any, pressed: Set[str], name: str, state: bool) -> None:
+ async def __send_bool_event(self, event: _BoolEvent, pressed: Set[str]) -> None:
if not self.__stop_event.is_set():
async with self.__lock:
- if cls.is_valid(name) and (
- (state and (name not in pressed)) # Если еще не нажато
- or (not state and (name in pressed)) # ... Или еще не отжато
+ if (
+ (event.state and (event.name not in pressed)) # Если еще не нажато
+ or (not event.state and (event.name in pressed)) # ... Или еще не отжато
):
- if state:
- pressed.add(name)
+ if event.state:
+ pressed.add(event.name)
else:
- pressed.remove(name)
- self.__events_queue.put(cls(name, state))
+ pressed.remove(event.name)
+ self.__events_queue.put(event)
- async def __send_int_event(self, cls: Any, *args: int) -> None:
+ async def __send_int_event(self, event: _IntEvent) -> None:
if not self.__stop_event.is_set():
async with self.__lock:
- self.__events_queue.put(cls(*args))
+ self.__events_queue.put(event)
def __unsafe_clear_events(self) -> None:
for (cls, pressed) in [
@@ -244,7 +248,7 @@ class Hid(multiprocessing.Process): # pylint: disable=too-many-instance-attribu
passed = 0
while not (self.__stop_event.is_set() and self.__events_queue.qsize() == 0):
try:
- event = self.__events_queue.get(timeout=0.05)
+ event: _BaseEvent = self.__events_queue.get(timeout=0.05)
except queue.Empty:
if passed >= 20: # 20 * 0.05 = 1 sec
self.__process_command(tty, b"\x01\x00\x00\x00\x00") # Ping
diff --git a/kvmd/apps/kvmd/server.py b/kvmd/apps/kvmd/server.py
index cfe0a2bc..3a531484 100644
--- a/kvmd/apps/kvmd/server.py
+++ b/kvmd/apps/kvmd/server.py
@@ -47,14 +47,20 @@ from ...aioregion import RegionIsBusyError
from ...validators import ValidatorError
from ...validators.basic import valid_bool
+
from ...validators.auth import valid_user
from ...validators.auth import valid_passwd
from ...validators.auth import valid_auth_token
+
from ...validators.kvm import valid_atx_button
from ...validators.kvm import valid_kvm_target
from ...validators.kvm import valid_log_seek
from ...validators.kvm import valid_stream_quality
from ...validators.kvm import valid_stream_fps
+from ...validators.kvm import valid_hid_key
+from ...validators.kvm import valid_hid_mouse_move
+from ...validators.kvm import valid_hid_mouse_button
+from ...validators.kvm import valid_hid_mouse_wheel
from ... import __version__
@@ -384,28 +390,32 @@ class Server: # pylint: disable=too-many-instance-attributes
return ws
async def __handle_ws_key_event(self, event: Dict) -> None:
- key = str(event.get("key", ""))[:64].strip()
- state = event.get("state")
- if key and state in [True, False]:
- await self.__hid.send_key_event(key, state)
+ try:
+ key = valid_hid_key(event["key"])
+ state = valid_bool(event["state"])
+ except Exception:
+ return
+ await self.__hid.send_key_event(key, state)
async def __handle_ws_mouse_move_event(self, event: Dict) -> None:
try:
- to_x = int(event["to"]["x"])
- to_y = int(event["to"]["y"])
+ to_x = valid_hid_mouse_move(event["to"]["x"])
+ to_y = valid_hid_mouse_move(event["to"]["y"])
except Exception:
return
await self.__hid.send_mouse_move_event(to_x, to_y)
async def __handle_ws_mouse_button_event(self, event: Dict) -> None:
- button = str(event.get("button", ""))[:64].strip()
- state = event.get("state")
- if button and state in [True, False]:
- await self.__hid.send_mouse_button_event(button, state)
+ try:
+ button = valid_hid_mouse_button(event["button"])
+ state = valid_bool(event["state"])
+ except Exception:
+ return
+ await self.__hid.send_mouse_button_event(button, state)
async def __handle_ws_mouse_wheel_event(self, event: Dict) -> None:
try:
- delta_y = int(event["delta"]["y"])
+ delta_y = valid_hid_mouse_wheel(event["delta"]["y"])
except Exception:
return
await self.__hid.send_mouse_wheel_event(delta_y)