diff options
author | Devaev Maxim <[email protected]> | 2019-04-08 04:58:32 +0300 |
---|---|---|
committer | Devaev Maxim <[email protected]> | 2019-04-08 04:58:32 +0300 |
commit | 9243d2a00c86aa9b5df2b1df20d2ba57be0bed47 (patch) | |
tree | 4d86bf39d432af121fff66a863864719c885d84d /kvmd/apps | |
parent | 7eca51f17b7c2c95b54d4c2b77caa6c8554ef4f4 (diff) |
refactoring
Diffstat (limited to 'kvmd/apps')
-rw-r--r-- | kvmd/apps/kvmd/hid.py | 104 | ||||
-rw-r--r-- | kvmd/apps/kvmd/server.py | 32 |
2 files changed, 75 insertions, 61 deletions
diff --git a/kvmd/apps/kvmd/hid.py b/kvmd/apps/kvmd/hid.py index c308c71b..3cdc48fc 100644 --- a/kvmd/apps/kvmd/hid.py +++ b/kvmd/apps/kvmd/hid.py @@ -23,87 +23,91 @@ import os import signal import asyncio +import dataclasses import multiprocessing import multiprocessing.queues import queue import struct -import pkgutil import errno import time from typing import Dict from typing import Set -from typing import NamedTuple from typing import AsyncGenerator -from typing import Any -import yaml import serial import setproctitle from ...logging import get_logger from ... import gpio +from ... import keymap # ===== -def _get_keymap() -> Dict[str, int]: - return yaml.safe_load(pkgutil.get_data("kvmd", "data/keymap.yaml").decode()) # type: ignore +class _BaseEvent: + def make_command(self) -> bytes: + raise NotImplementedError -_KEYMAP = _get_keymap() [email protected] # pylint: disable=abstract-method +class _BoolEvent(_BaseEvent): + name: str + state: bool -class _KeyEvent(NamedTuple): - key: str - state: bool [email protected] # pylint: disable=abstract-method +class _IntEvent(_BaseEvent): + x: int + y: int - @staticmethod - def is_valid(key: str) -> bool: - return (key in _KEYMAP) + +class _KeyEvent(_BoolEvent): + def __post_init__(self) -> None: + assert self.name in keymap.KEYMAP def make_command(self) -> bytes: - code = _KEYMAP[self.key] + code = keymap.KEYMAP[self.name] key_bytes = bytes([code]) assert len(key_bytes) == 1, (self, key_bytes, code) state_bytes = (b"\x01" if self.state else b"\x00") return b"\x11" + key_bytes + state_bytes + b"\x00\x00" -class _MouseMoveEvent(NamedTuple): - to_x: int - to_y: int +class _MouseMoveEvent(_IntEvent): + def __post_init__(self) -> None: + assert -32768 <= self.x <= 32767 + assert -32768 <= self.y <= 32767 def make_command(self) -> bytes: - to_x = min(max(-32768, self.to_x), 32767) - to_y = min(max(-32768, self.to_y), 32767) - return b"\x12" + struct.pack(">hh", to_x, to_y) - + return b"\x12" + struct.pack(">hh", self.x, self.y) -class _MouseButtonEvent(NamedTuple): - button: str - state: bool - @staticmethod - def is_valid(button: str) -> bool: - return (button in ["left", "right"]) +class _MouseButtonEvent(_BoolEvent): + def __post_init__(self) -> None: + assert self.name in ["left", "right"] def make_command(self) -> bytes: code = 0 - if self.button == "left": + if self.name == "left": code = (0b10000000 | (0b00001000 if self.state else 0)) - elif self.button == "right": + elif self.name == "right": code = (0b01000000 | (0b00000100 if self.state else 0)) assert code, self return b"\x13" + bytes([code]) + b"\x00\x00\x00" -class _MouseWheelEvent(NamedTuple): - delta_y: int +class _MouseWheelEvent(_IntEvent): + def __post_init__(self) -> None: + assert self.x == 0 # Горизонтальная прокрутка пока не поддерживается + assert -128 <= self.y <= 127 def make_command(self) -> bytes: - delta_y = min(max(-128, self.delta_y), 127) - return b"\x14\x00" + struct.pack(">b", delta_y) + b"\x00\x00" + return b"\x14\x00" + struct.pack(">b", self.y) + b"\x00\x00" # ===== @@ -139,13 +143,13 @@ class Hid(multiprocessing.Process): # pylint: disable=too-many-instance-attribu self.__state_poll = state_poll + self.__lock = asyncio.Lock() + self.__pressed_keys: Set[str] = set() self.__pressed_mouse_buttons: Set[str] = set() - self.__lock = asyncio.Lock() self.__events_queue: multiprocessing.queues.Queue = multiprocessing.Queue() self.__online_shared = multiprocessing.Value("i", 1) - self.__stop_event = multiprocessing.Event() def start(self) -> None: @@ -167,16 +171,16 @@ class Hid(multiprocessing.Process): # pylint: disable=too-many-instance-attribu gpio.write(self.__reset_pin, False) async def send_key_event(self, key: str, state: bool) -> None: - await self.__send_bool_event(_KeyEvent, self.__pressed_keys, key, state) + await self.__send_bool_event(_KeyEvent(key, state), self.__pressed_keys) async def send_mouse_move_event(self, to_x: int, to_y: int) -> None: - await self.__send_int_event(_MouseMoveEvent, to_x, to_y) + await self.__send_int_event(_MouseMoveEvent(to_x, to_y)) async def send_mouse_button_event(self, button: str, state: bool) -> None: - await self.__send_bool_event(_MouseButtonEvent, self.__pressed_mouse_buttons, button, state) + await self.__send_bool_event(_MouseButtonEvent(button, state), self.__pressed_mouse_buttons) async def send_mouse_wheel_event(self, delta_y: int) -> None: - await self.__send_int_event(_MouseWheelEvent, delta_y) + await self.__send_int_event(_MouseWheelEvent(0, delta_y)) async def clear_events(self) -> None: if not self.__stop_event.is_set(): @@ -196,23 +200,23 @@ class Hid(multiprocessing.Process): # pylint: disable=too-many-instance-attribu self.join() gpio.write(self.__reset_pin, False) - async def __send_bool_event(self, cls: Any, pressed: Set[str], name: str, state: bool) -> None: + async def __send_bool_event(self, event: _BoolEvent, pressed: Set[str]) -> None: if not self.__stop_event.is_set(): async with self.__lock: - if cls.is_valid(name) and ( - (state and (name not in pressed)) # Если еще не нажато - or (not state and (name in pressed)) # ... Или еще не отжато + if ( + (event.state and (event.name not in pressed)) # Если еще не нажато + or (not event.state and (event.name in pressed)) # ... Или еще не отжато ): - if state: - pressed.add(name) + if event.state: + pressed.add(event.name) else: - pressed.remove(name) - self.__events_queue.put(cls(name, state)) + pressed.remove(event.name) + self.__events_queue.put(event) - async def __send_int_event(self, cls: Any, *args: int) -> None: + async def __send_int_event(self, event: _IntEvent) -> None: if not self.__stop_event.is_set(): async with self.__lock: - self.__events_queue.put(cls(*args)) + self.__events_queue.put(event) def __unsafe_clear_events(self) -> None: for (cls, pressed) in [ @@ -244,7 +248,7 @@ class Hid(multiprocessing.Process): # pylint: disable=too-many-instance-attribu passed = 0 while not (self.__stop_event.is_set() and self.__events_queue.qsize() == 0): try: - event = self.__events_queue.get(timeout=0.05) + event: _BaseEvent = self.__events_queue.get(timeout=0.05) except queue.Empty: if passed >= 20: # 20 * 0.05 = 1 sec self.__process_command(tty, b"\x01\x00\x00\x00\x00") # Ping diff --git a/kvmd/apps/kvmd/server.py b/kvmd/apps/kvmd/server.py index cfe0a2bc..3a531484 100644 --- a/kvmd/apps/kvmd/server.py +++ b/kvmd/apps/kvmd/server.py @@ -47,14 +47,20 @@ from ...aioregion import RegionIsBusyError from ...validators import ValidatorError from ...validators.basic import valid_bool + from ...validators.auth import valid_user from ...validators.auth import valid_passwd from ...validators.auth import valid_auth_token + from ...validators.kvm import valid_atx_button from ...validators.kvm import valid_kvm_target from ...validators.kvm import valid_log_seek from ...validators.kvm import valid_stream_quality from ...validators.kvm import valid_stream_fps +from ...validators.kvm import valid_hid_key +from ...validators.kvm import valid_hid_mouse_move +from ...validators.kvm import valid_hid_mouse_button +from ...validators.kvm import valid_hid_mouse_wheel from ... import __version__ @@ -384,28 +390,32 @@ class Server: # pylint: disable=too-many-instance-attributes return ws async def __handle_ws_key_event(self, event: Dict) -> None: - key = str(event.get("key", ""))[:64].strip() - state = event.get("state") - if key and state in [True, False]: - await self.__hid.send_key_event(key, state) + try: + key = valid_hid_key(event["key"]) + state = valid_bool(event["state"]) + except Exception: + return + await self.__hid.send_key_event(key, state) async def __handle_ws_mouse_move_event(self, event: Dict) -> None: try: - to_x = int(event["to"]["x"]) - to_y = int(event["to"]["y"]) + to_x = valid_hid_mouse_move(event["to"]["x"]) + to_y = valid_hid_mouse_move(event["to"]["y"]) except Exception: return await self.__hid.send_mouse_move_event(to_x, to_y) async def __handle_ws_mouse_button_event(self, event: Dict) -> None: - button = str(event.get("button", ""))[:64].strip() - state = event.get("state") - if button and state in [True, False]: - await self.__hid.send_mouse_button_event(button, state) + try: + button = valid_hid_mouse_button(event["button"]) + state = valid_bool(event["state"]) + except Exception: + return + await self.__hid.send_mouse_button_event(button, state) async def __handle_ws_mouse_wheel_event(self, event: Dict) -> None: try: - delta_y = int(event["delta"]["y"]) + delta_y = valid_hid_mouse_wheel(event["delta"]["y"]) except Exception: return await self.__hid.send_mouse_wheel_event(delta_y) |