summaryrefslogtreecommitdiff
path: root/kvmd/plugins
diff options
context:
space:
mode:
Diffstat (limited to 'kvmd/plugins')
-rw-r--r--kvmd/plugins/hid/serial.py127
1 files changed, 48 insertions, 79 deletions
diff --git a/kvmd/plugins/hid/serial.py b/kvmd/plugins/hid/serial.py
index 6f8132e4..4a4346dc 100644
--- a/kvmd/plugins/hid/serial.py
+++ b/kvmd/plugins/hid/serial.py
@@ -32,7 +32,6 @@ import errno
import time
from typing import Dict
-from typing import Set
from typing import AsyncGenerator
import serial
@@ -64,67 +63,68 @@ class _BaseEvent:
raise NotImplementedError
[email protected](frozen=True) # pylint: disable=abstract-method
-class _BoolEvent(_BaseEvent):
- name: str
- state: bool
-
-
[email protected](frozen=True) # pylint: disable=abstract-method
-class _IntEvent(_BaseEvent):
- x: int
- y: int
+class _ClearEvent(_BaseEvent):
+ def make_command(self) -> bytes:
+ return b"\x10\x00\x00\x00\x00"
@dataclasses.dataclass(frozen=True)
-class _KeyEvent(_BoolEvent):
+class _KeyEvent(_BaseEvent):
+ name: str
+ state: bool
+
def __post_init__(self) -> None:
assert self.name in keymap.KEYMAP
def make_command(self) -> bytes:
code = keymap.KEYMAP[self.name].serial.code
- key_bytes = bytes([code])
- assert len(key_bytes) == 1, (self, key_bytes, code)
- state_bytes = (b"\x01" if self.state else b"\x00")
- return b"\x11" + key_bytes + state_bytes + b"\x00\x00"
+ return struct.pack(">BBBxx", 0x11, code, int(self.state))
@dataclasses.dataclass(frozen=True)
-class _MouseMoveEvent(_IntEvent):
+class _MouseMoveEvent(_BaseEvent):
+ to_x: int
+ to_y: int
+
def __post_init__(self) -> None:
- assert -32768 <= self.x <= 32767
- assert -32768 <= self.y <= 32767
+ assert -32768 <= self.to_x <= 32767
+ assert -32768 <= self.to_y <= 32767
def make_command(self) -> bytes:
- return b"\x12" + struct.pack(">hh", self.x, self.y)
+ return struct.pack(">Bhh", 0x12, self.to_x, self.to_y)
@dataclasses.dataclass(frozen=True)
-class _MouseButtonEvent(_BoolEvent):
+class _MouseButtonEvent(_BaseEvent):
+ name: str
+ state: bool
+
def __post_init__(self) -> None:
assert self.name in ["left", "right", "middle"]
def make_command(self) -> bytes:
- code = 0
- if self.name == "left":
- code = (0b10000000 | (0b00001000 if self.state else 0))
- elif self.name == "right":
- code = (0b01000000 | (0b00000100 if self.state else 0))
- elif self.name == "middle":
- code = (0b00100000 | (0b00000010 if self.state else 0))
- assert code, self
- return b"\x13" + bytes([code]) + b"\x00\x00\x00"
+ (code, state_pressed) = {
+ "left": (0b10000000, 0b00001000),
+ "right": (0b01000000, 0b00000100),
+ "middle": (0b00100000, 0b00000010),
+ }[self.name]
+ if self.state:
+ code |= state_pressed
+ return struct.pack(">BBxxx", 0x13, code)
@dataclasses.dataclass(frozen=True)
-class _MouseWheelEvent(_IntEvent):
+class _MouseWheelEvent(_BaseEvent):
+ delta_x: int
+ delta_y: int
+
def __post_init__(self) -> None:
- assert -127 <= self.x <= 127
- assert -127 <= self.y <= 127
+ assert -127 <= self.delta_x <= 127
+ assert -127 <= self.delta_y <= 127
def make_command(self) -> bytes:
# Горизонтальная прокрутка пока не поддерживается
- return b"\x14\x00" + struct.pack(">b", self.y) + b"\x00\x00"
+ return struct.pack(">Bxbxx", 0x14, self.delta_y)
# =====
@@ -162,10 +162,7 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst
self.__lock = asyncio.Lock()
- self.__pressed_keys: Set[str] = set()
- self.__pressed_mouse_buttons: Set[str] = set()
self.__events_queue: multiprocessing.queues.Queue = multiprocessing.Queue()
-
self.__online_shared = multiprocessing.Value("i", 1)
self.__stop_event = multiprocessing.Event()
@@ -232,71 +229,42 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst
async with self.__lock:
try:
if self.is_alive():
- self.__unsafe_clear_events()
logger.info("Stopping HID daemon ...")
self.__stop_event.set()
- else:
- logger.warning("Emergency cleaning up HID events ...")
- self.__emergency_clear_events()
if self.exitcode is not None:
self.join()
+ if os.path.exists(self.__device_path):
+ get_logger().info("Clearing HID events ...")
+ try:
+ with self.__get_serial() as tty:
+ self.__process_command(tty, b"\x10\x00\x00\x00\x00")
+ except Exception:
+ logger.exception("Can't clear HID events")
finally:
gpio.write(self.__reset_pin, False)
# =====
async def send_key_event(self, key: str, state: bool) -> None:
- await self.__send_bool_event(_KeyEvent(key, state), self.__pressed_keys)
+ await self.__queue_event(_KeyEvent(key, state))
async def send_mouse_move_event(self, to_x: int, to_y: int) -> None:
- await self.__send_int_event(_MouseMoveEvent(to_x, to_y))
+ await self.__queue_event(_MouseMoveEvent(to_x, to_y))
async def send_mouse_button_event(self, button: str, state: bool) -> None:
- await self.__send_bool_event(_MouseButtonEvent(button, state), self.__pressed_mouse_buttons)
+ await self.__queue_event(_MouseButtonEvent(button, state))
async def send_mouse_wheel_event(self, delta_x: int, delta_y: int) -> None:
- await self.__send_int_event(_MouseWheelEvent(delta_x, delta_y))
+ await self.__queue_event(_MouseWheelEvent(delta_x, delta_y))
async def clear_events(self) -> None:
- if not self.__stop_event.is_set():
- async with self.__lock:
- self.__unsafe_clear_events()
+ await self.__queue_event(_ClearEvent())
- async def __send_bool_event(self, event: _BoolEvent, pressed: Set[str]) -> None:
- if not self.__stop_event.is_set():
- async with self.__lock:
- if (
- (event.state and (event.name not in pressed)) # Если еще не нажато
- or (not event.state and (event.name in pressed)) # ... Или еще не отжато
- ):
- if event.state:
- pressed.add(event.name)
- else:
- pressed.remove(event.name)
- self.__events_queue.put(event)
-
- async def __send_int_event(self, event: _IntEvent) -> None:
+ async def __queue_event(self, event: _BaseEvent) -> None:
if not self.__stop_event.is_set():
async with self.__lock:
self.__events_queue.put(event)
- def __unsafe_clear_events(self) -> None:
- for (cls, pressed) in [
- (_MouseButtonEvent, self.__pressed_mouse_buttons),
- (_KeyEvent, self.__pressed_keys),
- ]:
- for name in pressed:
- self.__events_queue.put(cls(name, False))
- pressed.clear()
-
- def __emergency_clear_events(self) -> None:
- if os.path.exists(self.__device_path):
- try:
- with self.__get_serial() as tty:
- self.__process_command(tty, b"\x10\x00\x00\x00\x00")
- except Exception:
- get_logger().exception("Can't execute emergency clear HID events")
-
def run(self) -> None: # pylint: disable=too-many-branches
logger = get_logger(0)
@@ -388,6 +356,7 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst
logger.error("Invalid response from HID: request=%r; code=0x%x", request, code)
common_retries -= 1
+
error_occured = True
self.__online_shared.value = 0