diff options
author | Devaev Maxim <[email protected]> | 2019-04-07 06:10:55 +0300 |
---|---|---|
committer | Devaev Maxim <[email protected]> | 2019-04-07 06:10:55 +0300 |
commit | f426e139077cfc54251c05adb25fe54f477fce48 (patch) | |
tree | dfdeed848310f6350156c3685e6d88b65f4e2ae5 /kvmd | |
parent | dcd774971a513932320ddc84909ea37e4ef5e155 (diff) |
refactoring
Diffstat (limited to 'kvmd')
-rw-r--r-- | kvmd/apps/kvmd/hid.py | 61 |
1 files changed, 31 insertions, 30 deletions
diff --git a/kvmd/apps/kvmd/hid.py b/kvmd/apps/kvmd/hid.py index 2b41eda0..4b55ff4d 100644 --- a/kvmd/apps/kvmd/hid.py +++ b/kvmd/apps/kvmd/hid.py @@ -34,6 +34,7 @@ from typing import Dict from typing import Set from typing import NamedTuple from typing import AsyncGenerator +from typing import Any import yaml import serial @@ -104,6 +105,7 @@ class _MouseWheelEvent(NamedTuple): return b"\x14\x00" + struct.pack(">b", delta_y) + b"\x00\x00" +# ===== class Hid(multiprocessing.Process): # pylint: disable=too-many-instance-attributes def __init__( # pylint: disable=too-many-arguments self, @@ -164,36 +166,16 @@ class Hid(multiprocessing.Process): # pylint: disable=too-many-instance-attribu gpio.write(self.__reset_pin, False) async def send_key_event(self, key: str, state: bool) -> None: - if not self.__stop_event.is_set(): - async with self.__lock: - if _KeyEvent.is_valid(key): - if state and key not in self.__pressed_keys: - self.__pressed_keys.add(key) - self.__events_queue.put(_KeyEvent(key, state)) - elif not state and key in self.__pressed_keys: - self.__pressed_keys.remove(key) - self.__events_queue.put(_KeyEvent(key, state)) + await self.__send_bool_event(_KeyEvent, self.__pressed_keys, key, state) async def send_mouse_move_event(self, to_x: int, to_y: int) -> None: - if not self.__stop_event.is_set(): - async with self.__lock: - self.__events_queue.put(_MouseMoveEvent(to_x, to_y)) + await self.__send_int_event(_MouseMoveEvent, to_x, to_y) async def send_mouse_button_event(self, button: str, state: bool) -> None: - if not self.__stop_event.is_set(): - async with self.__lock: - if _MouseButtonEvent.is_valid(button): - if state and button not in self.__pressed_mouse_buttons: - self.__pressed_mouse_buttons.add(button) - self.__events_queue.put(_MouseButtonEvent(button, state)) - elif not state and button in self.__pressed_mouse_buttons: - self.__pressed_mouse_buttons.remove(button) - self.__events_queue.put(_MouseButtonEvent(button, state)) + await self.__send_bool_event(_MouseButtonEvent, self.__pressed_mouse_buttons, button, state) async def send_mouse_wheel_event(self, delta_y: int) -> None: - if not self.__stop_event.is_set(): - async with self.__lock: - self.__events_queue.put(_MouseWheelEvent(delta_y)) + await self.__send_int_event(_MouseWheelEvent, delta_y) async def clear_events(self) -> None: if not self.__stop_event.is_set(): @@ -212,13 +194,32 @@ class Hid(multiprocessing.Process): # pylint: disable=too-many-instance-attribu self.__emergency_clear_events() gpio.write(self.__reset_pin, False) + async def __send_bool_event(self, cls: Any, pressed: Set[str], name: str, state: bool) -> None: + if not self.__stop_event.is_set(): + async with self.__lock: + if cls.is_valid(name) and ( + (state and (name not in pressed)) # Если еще не нажато + or (not state and (name in pressed)) # ... Или еще не отжато + ): + if state: + pressed.add(name) + else: + pressed.remove(name) + self.__events_queue.put(cls(name, state)) + + async def __send_int_event(self, cls: Any, *args: int) -> None: + if not self.__stop_event.is_set(): + async with self.__lock: + self.__events_queue.put(cls(*args)) + def __unsafe_clear_events(self) -> None: - for button in self.__pressed_mouse_buttons: - self.__events_queue.put(_MouseButtonEvent(button, False)) - self.__pressed_mouse_buttons.clear() - for key in self.__pressed_keys: - self.__events_queue.put(_KeyEvent(key, False)) - self.__pressed_keys.clear() + for (cls, pressed) in [ + (_MouseButtonEvent, self.__pressed_mouse_buttons), + (_KeyEvent, self.__pressed_keys), + ]: + for name in pressed: + self.__events_queue.put(cls(name, False)) + pressed.clear() def __emergency_clear_events(self) -> None: if os.path.exists(self.__device_path): |