summaryrefslogtreecommitdiff
path: root/kvmd/hid.py
diff options
context:
space:
mode:
authorDevaev Maxim <[email protected]>2018-11-06 01:55:13 +0300
committerDevaev Maxim <[email protected]>2018-11-06 01:55:13 +0300
commitf0ae427d8e4bbd82653abdb8e2aef8ffe32fc732 (patch)
tree44192d6bce4cc80db95532df2888e0764f63d58d /kvmd/hid.py
parent1ac968e92411409413ed4555eed6c2944b38de0d (diff)
refactoring
Diffstat (limited to 'kvmd/hid.py')
-rw-r--r--kvmd/hid.py213
1 files changed, 0 insertions, 213 deletions
diff --git a/kvmd/hid.py b/kvmd/hid.py
deleted file mode 100644
index 776e6f0e..00000000
--- a/kvmd/hid.py
+++ /dev/null
@@ -1,213 +0,0 @@
-import asyncio
-import multiprocessing
-import multiprocessing.queues
-import queue
-import struct
-import pkgutil
-import time
-
-from typing import Dict
-from typing import Set
-from typing import NamedTuple
-
-import yaml
-import serial
-import setproctitle
-
-from . import gpio
-
-from .logging import get_logger
-
-
-# =====
-def _get_keymap() -> Dict[str, int]:
- return yaml.load(pkgutil.get_data(__name__, "data/keymap.yaml").decode()) # type: ignore
-
-
-_KEYMAP = _get_keymap()
-
-
-class _KeyEvent(NamedTuple):
- key: str
- state: bool
-
-
-class _MouseMoveEvent(NamedTuple):
- to_x: int
- to_y: int
-
-
-class _MouseButtonEvent(NamedTuple):
- button: str
- state: bool
-
-
-class _MouseWheelEvent(NamedTuple):
- delta_y: int
-
-
-class Hid(multiprocessing.Process): # pylint: disable=too-many-instance-attributes
- def __init__(
- self,
- reset: int,
- device_path: str,
- speed: int,
- reset_delay: float,
- ) -> None:
-
- super().__init__(daemon=True)
-
- self.__reset = gpio.set_output(reset)
- self.__device_path = device_path
- self.__speed = speed
- self.__reset_delay = reset_delay
-
- self.__pressed_keys: Set[str] = set()
- self.__pressed_mouse_buttons: Set[str] = set()
- self.__lock = asyncio.Lock()
- self.__queue: multiprocessing.queues.Queue = multiprocessing.Queue()
-
- self.__stop_event = multiprocessing.Event()
-
- def start(self) -> None:
- get_logger().info("Starting HID daemon ...")
- super().start()
-
- async def reset(self) -> None:
- async with self.__lock:
- gpio.write(self.__reset, True)
- await asyncio.sleep(self.__reset_delay)
- gpio.write(self.__reset, False)
-
- async def send_key_event(self, key: str, state: bool) -> None:
- if not self.__stop_event.is_set():
- async with self.__lock:
- if state and key not in self.__pressed_keys:
- self.__pressed_keys.add(key)
- self.__queue.put(_KeyEvent(key, state))
- elif not state and key in self.__pressed_keys:
- self.__pressed_keys.remove(key)
- self.__queue.put(_KeyEvent(key, state))
-
- async def send_mouse_move_event(self, to_x: int, to_y: int) -> None:
- if not self.__stop_event.is_set():
- async with self.__lock:
- self.__queue.put(_MouseMoveEvent(to_x, to_y))
-
- async def send_mouse_button_event(self, button: str, state: bool) -> None:
- if not self.__stop_event.is_set():
- async with self.__lock:
- if state and button not in self.__pressed_mouse_buttons:
- self.__pressed_mouse_buttons.add(button)
- self.__queue.put(_MouseButtonEvent(button, state))
- elif not state and button in self.__pressed_mouse_buttons:
- self.__pressed_mouse_buttons.remove(button)
- self.__queue.put(_MouseButtonEvent(button, state))
-
- async def send_mouse_wheel_event(self, delta_y: int) -> None:
- if not self.__stop_event.is_set():
- async with self.__lock:
- self.__queue.put(_MouseWheelEvent(delta_y))
-
- async def clear_events(self) -> None:
- if not self.__stop_event.is_set():
- async with self.__lock:
- self.__unsafe_clear_events()
-
- async def cleanup(self) -> None:
- async with self.__lock:
- if self.is_alive():
- self.__unsafe_clear_events()
- get_logger().info("Stopping keyboard daemon ...")
- self.__stop_event.set()
- self.join()
- else:
- get_logger().warning("Emergency cleaning up HID events ...")
- self.__emergency_clear_events()
- gpio.write(self.__reset, False)
-
- def __unsafe_clear_events(self) -> None:
- for button in self.__pressed_mouse_buttons:
- self.__queue.put(_MouseButtonEvent(button, False))
- self.__pressed_mouse_buttons.clear()
- for key in self.__pressed_keys:
- self.__queue.put(_KeyEvent(key, False))
- self.__pressed_keys.clear()
-
- def __emergency_clear_events(self) -> None:
- try:
- with serial.Serial(self.__device_path, self.__speed) as tty:
- self.__send_clear_hid(tty)
- except Exception:
- get_logger().exception("Can't execute emergency clear HID events")
-
- def run(self) -> None: # pylint: disable=too-many-branches
- setproctitle.setproctitle("[hid] " + setproctitle.getproctitle())
- try:
- with serial.Serial(self.__device_path, self.__speed) as tty:
- hid_ready = False
- while True:
- if hid_ready:
- try:
- event = self.__queue.get(timeout=0.05)
- except queue.Empty:
- pass
- else:
- if isinstance(event, _KeyEvent):
- self.__send_key_event(tty, event)
- elif isinstance(event, _MouseMoveEvent):
- self.__send_mouse_move_event(tty, event)
- elif isinstance(event, _MouseButtonEvent):
- self.__send_mouse_button_event(tty, event)
- elif isinstance(event, _MouseWheelEvent):
- self.__send_mouse_wheel_event(tty, event)
- else:
- raise RuntimeError("Unknown HID event")
- hid_ready = False
-
- if tty.in_waiting:
- while tty.in_waiting:
- tty.read(tty.in_waiting)
- hid_ready = True
- else:
- time.sleep(0.05)
-
- if self.__stop_event.is_set() and self.__queue.qsize() == 0:
- break
- except Exception:
- get_logger().exception("Unhandled exception")
- raise
-
- def __send_key_event(self, tty: serial.Serial, event: _KeyEvent) -> None:
- code = _KEYMAP.get(event.key)
- if code:
- key_bytes = bytes([code])
- assert len(key_bytes) == 1, (event, key_bytes)
- tty.write(
- b"\01"
- + key_bytes
- + (b"\01" if event.state else b"\00")
- + b"\00\00"
- )
-
- def __send_mouse_move_event(self, tty: serial.Serial, event: _MouseMoveEvent) -> None:
- to_x = min(max(-32768, event.to_x), 32767)
- to_y = min(max(-32768, event.to_y), 32767)
- tty.write(b"\02" + struct.pack(">hh", to_x, to_y))
-
- def __send_mouse_button_event(self, tty: serial.Serial, event: _MouseButtonEvent) -> None:
- if event.button == "left":
- code = (0b10000000 | (0b00001000 if event.state else 0))
- elif event.button == "right":
- code = (0b01000000 | (0b00000100 if event.state else 0))
- else:
- code = 0
- if code:
- tty.write(b"\03" + bytes([code]) + b"\00\00\00")
-
- def __send_mouse_wheel_event(self, tty: serial.Serial, event: _MouseWheelEvent) -> None:
- delta_y = min(max(-128, event.delta_y), 127)
- tty.write(b"\04\00" + struct.pack(">b", delta_y) + b"\00\00")
-
- def __send_clear_hid(self, tty: serial.Serial) -> None:
- tty.write(b"\00\00\00\00\00")