summaryrefslogtreecommitdiff
path: root/kvmd/apps
diff options
context:
space:
mode:
authorDevaev Maxim <[email protected]>2019-04-07 06:10:55 +0300
committerDevaev Maxim <[email protected]>2019-04-07 06:10:55 +0300
commitf426e139077cfc54251c05adb25fe54f477fce48 (patch)
treedfdeed848310f6350156c3685e6d88b65f4e2ae5 /kvmd/apps
parentdcd774971a513932320ddc84909ea37e4ef5e155 (diff)
refactoring
Diffstat (limited to 'kvmd/apps')
-rw-r--r--kvmd/apps/kvmd/hid.py61
1 files changed, 31 insertions, 30 deletions
diff --git a/kvmd/apps/kvmd/hid.py b/kvmd/apps/kvmd/hid.py
index 2b41eda0..4b55ff4d 100644
--- a/kvmd/apps/kvmd/hid.py
+++ b/kvmd/apps/kvmd/hid.py
@@ -34,6 +34,7 @@ from typing import Dict
from typing import Set
from typing import NamedTuple
from typing import AsyncGenerator
+from typing import Any
import yaml
import serial
@@ -104,6 +105,7 @@ class _MouseWheelEvent(NamedTuple):
return b"\x14\x00" + struct.pack(">b", delta_y) + b"\x00\x00"
+# =====
class Hid(multiprocessing.Process): # pylint: disable=too-many-instance-attributes
def __init__( # pylint: disable=too-many-arguments
self,
@@ -164,36 +166,16 @@ class Hid(multiprocessing.Process): # pylint: disable=too-many-instance-attribu
gpio.write(self.__reset_pin, False)
async def send_key_event(self, key: str, state: bool) -> None:
- if not self.__stop_event.is_set():
- async with self.__lock:
- if _KeyEvent.is_valid(key):
- if state and key not in self.__pressed_keys:
- self.__pressed_keys.add(key)
- self.__events_queue.put(_KeyEvent(key, state))
- elif not state and key in self.__pressed_keys:
- self.__pressed_keys.remove(key)
- self.__events_queue.put(_KeyEvent(key, state))
+ await self.__send_bool_event(_KeyEvent, self.__pressed_keys, key, state)
async def send_mouse_move_event(self, to_x: int, to_y: int) -> None:
- if not self.__stop_event.is_set():
- async with self.__lock:
- self.__events_queue.put(_MouseMoveEvent(to_x, to_y))
+ await self.__send_int_event(_MouseMoveEvent, to_x, to_y)
async def send_mouse_button_event(self, button: str, state: bool) -> None:
- if not self.__stop_event.is_set():
- async with self.__lock:
- if _MouseButtonEvent.is_valid(button):
- if state and button not in self.__pressed_mouse_buttons:
- self.__pressed_mouse_buttons.add(button)
- self.__events_queue.put(_MouseButtonEvent(button, state))
- elif not state and button in self.__pressed_mouse_buttons:
- self.__pressed_mouse_buttons.remove(button)
- self.__events_queue.put(_MouseButtonEvent(button, state))
+ await self.__send_bool_event(_MouseButtonEvent, self.__pressed_mouse_buttons, button, state)
async def send_mouse_wheel_event(self, delta_y: int) -> None:
- if not self.__stop_event.is_set():
- async with self.__lock:
- self.__events_queue.put(_MouseWheelEvent(delta_y))
+ await self.__send_int_event(_MouseWheelEvent, delta_y)
async def clear_events(self) -> None:
if not self.__stop_event.is_set():
@@ -212,13 +194,32 @@ class Hid(multiprocessing.Process): # pylint: disable=too-many-instance-attribu
self.__emergency_clear_events()
gpio.write(self.__reset_pin, False)
+ async def __send_bool_event(self, cls: Any, pressed: Set[str], name: str, state: bool) -> None:
+ if not self.__stop_event.is_set():
+ async with self.__lock:
+ if cls.is_valid(name) and (
+ (state and (name not in pressed)) # Если еще не нажато
+ or (not state and (name in pressed)) # ... Или еще не отжато
+ ):
+ if state:
+ pressed.add(name)
+ else:
+ pressed.remove(name)
+ self.__events_queue.put(cls(name, state))
+
+ async def __send_int_event(self, cls: Any, *args: int) -> None:
+ if not self.__stop_event.is_set():
+ async with self.__lock:
+ self.__events_queue.put(cls(*args))
+
def __unsafe_clear_events(self) -> None:
- for button in self.__pressed_mouse_buttons:
- self.__events_queue.put(_MouseButtonEvent(button, False))
- self.__pressed_mouse_buttons.clear()
- for key in self.__pressed_keys:
- self.__events_queue.put(_KeyEvent(key, False))
- self.__pressed_keys.clear()
+ for (cls, pressed) in [
+ (_MouseButtonEvent, self.__pressed_mouse_buttons),
+ (_KeyEvent, self.__pressed_keys),
+ ]:
+ for name in pressed:
+ self.__events_queue.put(cls(name, False))
+ pressed.clear()
def __emergency_clear_events(self) -> None:
if os.path.exists(self.__device_path):