summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDevaev Maxim <[email protected]>2020-11-04 10:52:00 +0300
committerDevaev Maxim <[email protected]>2020-11-04 10:52:00 +0300
commit3386c66278cc5f2da0133249a1d12a26f05d51b5 (patch)
treede3757dbdef497c339ecf382e01bd0716a761086
parentc31115051cb17a650e6d42e3e5cbfe05de809760 (diff)
refactoring
-rw-r--r--kvmd/plugins/hid/otg/device.py5
-rw-r--r--kvmd/plugins/hid/otg/events.py118
-rw-r--r--kvmd/plugins/hid/otg/keyboard.py50
-rw-r--r--kvmd/plugins/hid/otg/mouse.py96
4 files changed, 161 insertions, 108 deletions
diff --git a/kvmd/plugins/hid/otg/device.py b/kvmd/plugins/hid/otg/device.py
index 43d324b1..7d25597b 100644
--- a/kvmd/plugins/hid/otg/device.py
+++ b/kvmd/plugins/hid/otg/device.py
@@ -36,13 +36,10 @@ from .... import aiomulti
from .... import aioproc
from .usb import UsbDeviceController
+from .events import BaseEvent
# =====
-class BaseEvent:
- pass
-
-
class BaseDeviceProcess(multiprocessing.Process): # pylint: disable=too-many-instance-attributes
def __init__( # pylint: disable=too-many-arguments
self,
diff --git a/kvmd/plugins/hid/otg/events.py b/kvmd/plugins/hid/otg/events.py
new file mode 100644
index 00000000..351a99fd
--- /dev/null
+++ b/kvmd/plugins/hid/otg/events.py
@@ -0,0 +1,118 @@
+# ========================================================================== #
+# #
+# KVMD - The main Pi-KVM daemon. #
+# #
+# Copyright (C) 2018 Maxim Devaev <[email protected]> #
+# #
+# This program is free software: you can redistribute it and/or modify #
+# it under the terms of the GNU General Public License as published by #
+# the Free Software Foundation, either version 3 of the License, or #
+# (at your option) any later version. #
+# #
+# This program is distributed in the hope that it will be useful, #
+# but WITHOUT ANY WARRANTY; without even the implied warranty of #
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
+# GNU General Public License for more details. #
+# #
+# You should have received a copy of the GNU General Public License #
+# along with this program. If not, see <https://www.gnu.org/licenses/>. #
+# #
+# ========================================================================== #
+
+
+import dataclasses
+
+from typing import Union
+
+from ....keyboard.mappings import OtgKey
+from ....keyboard.mappings import KEYMAP
+
+
+# =====
+class BaseEvent:
+ pass
+
+
+class ClearEvent(BaseEvent):
+ pass
+
+
+class ResetEvent(BaseEvent):
+ pass
+
+
+# =====
[email protected](frozen=True)
+class KeyEvent(BaseEvent):
+ key: OtgKey
+ state: bool
+
+ def __post_init__(self) -> None:
+ assert (not self.key.is_modifier)
+
+
[email protected](frozen=True)
+class ModifierEvent(BaseEvent):
+ modifier: OtgKey
+ state: bool
+
+ def __post_init__(self) -> None:
+ assert self.modifier.is_modifier
+
+
+def make_keyboard_event(key: str, state: bool) -> Union[KeyEvent, ModifierEvent]:
+ otg_key = KEYMAP[key].otg
+ if otg_key.is_modifier:
+ return ModifierEvent(otg_key, state)
+ return KeyEvent(otg_key, state)
+
+
+# =====
[email protected](frozen=True)
+class MouseButtonEvent(BaseEvent):
+ button: str
+ state: bool
+ code: int = 0
+
+ def __post_init__(self) -> None:
+ object.__setattr__(self, "code", {
+ "left": 0x1,
+ "right": 0x2,
+ "middle": 0x4,
+ "up": 0x8, # Back
+ "down": 0x10, # Forward
+ }[self.button])
+
+
[email protected](frozen=True)
+class MouseMoveEvent(BaseEvent):
+ to_x: int
+ to_y: int
+ to_fixed_x: int = 0
+ to_fixed_y: int = 0
+
+ def __post_init__(self) -> None:
+ assert -32768 <= self.to_x <= 32767
+ assert -32768 <= self.to_y <= 32767
+ object.__setattr__(self, "to_fixed_x", (self.to_x + 32768) // 2)
+ object.__setattr__(self, "to_fixed_y", (self.to_y + 32768) // 2)
+
+
[email protected](frozen=True)
+class MouseRelativeEvent(BaseEvent):
+ delta_x: int
+ delta_y: int
+
+ def __post_init__(self) -> None:
+ assert -127 <= self.delta_x <= 127
+ assert -127 <= self.delta_y <= 127
+
+
[email protected](frozen=True)
+class MouseWheelEvent(BaseEvent):
+ delta_x: int
+ delta_y: int
+
+ def __post_init__(self) -> None:
+ assert -127 <= self.delta_x <= 127
+ assert -127 <= self.delta_y <= 127
diff --git a/kvmd/plugins/hid/otg/keyboard.py b/kvmd/plugins/hid/otg/keyboard.py
index 21b950f9..384f8fd2 100644
--- a/kvmd/plugins/hid/otg/keyboard.py
+++ b/kvmd/plugins/hid/otg/keyboard.py
@@ -20,8 +20,6 @@
# ========================================================================== #
-import dataclasses
-
from typing import Tuple
from typing import List
from typing import Set
@@ -32,31 +30,15 @@ from typing import Any
from ....logging import get_logger
from ....keyboard.mappings import OtgKey
-from ....keyboard.mappings import KEYMAP
-from .device import BaseEvent
from .device import BaseDeviceProcess
-
-# =====
-class _ClearEvent(BaseEvent):
- pass
-
-
-class _ResetEvent(BaseEvent):
- pass
-
-
[email protected](frozen=True)
-class _ModifierEvent(BaseEvent):
- modifier: OtgKey
- state: bool
-
-
[email protected](frozen=True)
-class _KeyEvent(BaseEvent):
- key: OtgKey
- state: bool
+from .events import BaseEvent
+from .events import ClearEvent
+from .events import ResetEvent
+from .events import KeyEvent
+from .events import ModifierEvent
+from .events import make_keyboard_event
# =====
@@ -79,17 +61,15 @@ class KeyboardProcess(BaseDeviceProcess):
def send_clear_event(self) -> None:
self._clear_queue()
- self._queue_event(_ClearEvent())
+ self._queue_event(ClearEvent())
def send_reset_event(self) -> None:
self._clear_queue()
- self._queue_event(_ResetEvent())
+ self._queue_event(ResetEvent())
def send_key_events(self, keys: Iterable[Tuple[str, bool]]) -> None:
for (key, state) in keys:
- otg_key = KEYMAP[key].otg
- cls = (_ModifierEvent if otg_key.is_modifier else _KeyEvent)
- self._queue_event(cls(otg_key, state))
+ self._queue_event(make_keyboard_event(key, state))
# =====
@@ -105,13 +85,13 @@ class KeyboardProcess(BaseDeviceProcess):
# =====
def _process_event(self, event: BaseEvent) -> bool:
- if isinstance(event, _ClearEvent):
+ if isinstance(event, ClearEvent):
return self.__process_clear_event()
- elif isinstance(event, _ResetEvent):
+ elif isinstance(event, ResetEvent):
return self.__process_clear_event(reopen=True)
- elif isinstance(event, _ModifierEvent):
+ elif isinstance(event, ModifierEvent):
return self.__process_modifier_event(event)
- elif isinstance(event, _KeyEvent):
+ elif isinstance(event, KeyEvent):
return self.__process_key_event(event)
raise RuntimeError(f"Not implemented event: {event}")
@@ -120,7 +100,7 @@ class KeyboardProcess(BaseDeviceProcess):
self.__clear_keys()
return self.__send_current_state(reopen=reopen)
- def __process_modifier_event(self, event: _ModifierEvent) -> bool:
+ def __process_modifier_event(self, event: ModifierEvent) -> bool:
if event.modifier in self.__pressed_modifiers:
# Ранее нажатый модификатор отжимаем
self.__pressed_modifiers.remove(event.modifier)
@@ -132,7 +112,7 @@ class KeyboardProcess(BaseDeviceProcess):
return self.__send_current_state()
return True
- def __process_key_event(self, event: _KeyEvent) -> bool:
+ def __process_key_event(self, event: KeyEvent) -> bool:
if event.key in self.__pressed_keys:
# Ранее нажатую клавишу отжимаем
self.__pressed_keys[self.__pressed_keys.index(event.key)] = None
diff --git a/kvmd/plugins/hid/otg/mouse.py b/kvmd/plugins/hid/otg/mouse.py
index 079a7dc3..3653cda4 100644
--- a/kvmd/plugins/hid/otg/mouse.py
+++ b/kvmd/plugins/hid/otg/mouse.py
@@ -21,48 +21,21 @@
import struct
-import dataclasses
from typing import Optional
from typing import Any
from ....logging import get_logger
-from .device import BaseEvent
from .device import BaseDeviceProcess
-
-# =====
-class _ClearEvent(BaseEvent):
- pass
-
-
-class _ResetEvent(BaseEvent):
- pass
-
-
[email protected](frozen=True)
-class _ButtonEvent(BaseEvent):
- code: int
- state: bool
-
-
[email protected](frozen=True)
-class _MoveEvent(BaseEvent):
- to_x: int
- to_y: int
-
-
[email protected](frozen=True)
-class _RelativeEvent(BaseEvent):
- delta_x: int
- delta_y: int
-
-
[email protected](frozen=True)
-class _WheelEvent(BaseEvent):
- delta_x: int
- delta_y: int
+from .events import BaseEvent
+from .events import ClearEvent
+from .events import ResetEvent
+from .events import MouseButtonEvent
+from .events import MouseMoveEvent
+from .events import MouseRelativeEvent
+from .events import MouseWheelEvent
# =====
@@ -93,55 +66,40 @@ class MouseProcess(BaseDeviceProcess):
def send_clear_event(self) -> None:
self._clear_queue()
- self._queue_event(_ClearEvent())
+ self._queue_event(ClearEvent())
def send_reset_event(self) -> None:
self._clear_queue()
- self._queue_event(_ResetEvent())
+ self._queue_event(ResetEvent())
def send_button_event(self, button: str, state: bool) -> None:
- code: int = {
- "left": 0x1,
- "right": 0x2,
- "middle": 0x4,
- "up": 0x8, # Back
- "down": 0x10, # Forward
- }[button]
- self._queue_event(_ButtonEvent(code, state))
+ self._queue_event(MouseButtonEvent(button, state))
def send_move_event(self, to_x: int, to_y: int) -> None:
if self.__absolute:
- assert -32768 <= to_x <= 32767
- assert -32768 <= to_y <= 32767
- to_x = (to_x + 32768) // 2
- to_y = (to_y + 32768) // 2
- self._queue_event(_MoveEvent(to_x, to_y))
+ self._queue_event(MouseMoveEvent(to_x, to_y))
def send_relative_event(self, delta_x: int, delta_y: int) -> None:
if not self.__absolute:
- assert -127 <= delta_x <= 127
- assert -127 <= delta_y <= 127
- self._queue_event(_RelativeEvent(delta_x, delta_y))
+ self._queue_event(MouseRelativeEvent(delta_x, delta_y))
def send_wheel_event(self, delta_x: int, delta_y: int) -> None:
- assert -127 <= delta_x <= 127
- assert -127 <= delta_y <= 127
- self._queue_event(_WheelEvent(delta_x, delta_y))
+ self._queue_event(MouseWheelEvent(delta_x, delta_y))
# =====
def _process_event(self, event: BaseEvent) -> bool:
- if isinstance(event, _ClearEvent):
+ if isinstance(event, ClearEvent):
return self.__process_clear_event()
- elif isinstance(event, _ResetEvent):
+ elif isinstance(event, ResetEvent):
return self.__process_clear_event(reopen=True)
- elif isinstance(event, _ButtonEvent):
+ elif isinstance(event, MouseButtonEvent):
return self.__process_button_event(event)
- elif isinstance(event, _MoveEvent):
+ elif isinstance(event, MouseMoveEvent):
return self.__process_move_event(event)
- elif isinstance(event, _RelativeEvent):
+ elif isinstance(event, MouseRelativeEvent):
return self.__process_relative_event(event)
- elif isinstance(event, _WheelEvent):
+ elif isinstance(event, MouseWheelEvent):
return self.__process_wheel_event(event)
raise RuntimeError(f"Not implemented event: {event}")
@@ -149,7 +107,7 @@ class MouseProcess(BaseDeviceProcess):
self.__clear_state()
return self.__send_current_state(reopen=reopen)
- def __process_button_event(self, event: _ButtonEvent) -> bool:
+ def __process_button_event(self, event: MouseButtonEvent) -> bool:
if event.code & self.__pressed_buttons:
# Ранее нажатую кнопку отжимаем
self.__pressed_buttons &= ~event.code
@@ -161,23 +119,23 @@ class MouseProcess(BaseDeviceProcess):
return self.__send_current_state()
return True
- def __process_move_event(self, event: _MoveEvent) -> bool:
- self.__x = event.to_x
- self.__y = event.to_y
+ def __process_move_event(self, event: MouseMoveEvent) -> bool:
+ self.__x = event.to_fixed_x
+ self.__y = event.to_fixed_y
return self.__send_current_state()
- def __process_relative_event(self, event: _RelativeEvent) -> bool:
+ def __process_relative_event(self, event: MouseRelativeEvent) -> bool:
return self.__send_current_state(relative_event=event)
- def __process_wheel_event(self, event: _WheelEvent) -> bool:
+ def __process_wheel_event(self, event: MouseWheelEvent) -> bool:
return self.__send_current_state(wheel_event=event)
# =====
def __send_current_state(
self,
- relative_event: Optional[_RelativeEvent]=None,
- wheel_event: Optional[_WheelEvent]=None,
+ relative_event: Optional[MouseRelativeEvent]=None,
+ wheel_event: Optional[MouseWheelEvent]=None,
reopen: bool=False,
) -> bool: