diff options
author | Devaev Maxim <[email protected]> | 2020-11-04 10:52:00 +0300 |
---|---|---|
committer | Devaev Maxim <[email protected]> | 2020-11-04 10:52:00 +0300 |
commit | 3386c66278cc5f2da0133249a1d12a26f05d51b5 (patch) | |
tree | de3757dbdef497c339ecf382e01bd0716a761086 | |
parent | c31115051cb17a650e6d42e3e5cbfe05de809760 (diff) |
refactoring
-rw-r--r-- | kvmd/plugins/hid/otg/device.py | 5 | ||||
-rw-r--r-- | kvmd/plugins/hid/otg/events.py | 118 | ||||
-rw-r--r-- | kvmd/plugins/hid/otg/keyboard.py | 50 | ||||
-rw-r--r-- | kvmd/plugins/hid/otg/mouse.py | 96 |
4 files changed, 161 insertions, 108 deletions
diff --git a/kvmd/plugins/hid/otg/device.py b/kvmd/plugins/hid/otg/device.py index 43d324b1..7d25597b 100644 --- a/kvmd/plugins/hid/otg/device.py +++ b/kvmd/plugins/hid/otg/device.py @@ -36,13 +36,10 @@ from .... import aiomulti from .... import aioproc from .usb import UsbDeviceController +from .events import BaseEvent # ===== -class BaseEvent: - pass - - class BaseDeviceProcess(multiprocessing.Process): # pylint: disable=too-many-instance-attributes def __init__( # pylint: disable=too-many-arguments self, diff --git a/kvmd/plugins/hid/otg/events.py b/kvmd/plugins/hid/otg/events.py new file mode 100644 index 00000000..351a99fd --- /dev/null +++ b/kvmd/plugins/hid/otg/events.py @@ -0,0 +1,118 @@ +# ========================================================================== # +# # +# KVMD - The main Pi-KVM daemon. # +# # +# Copyright (C) 2018 Maxim Devaev <[email protected]> # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see <https://www.gnu.org/licenses/>. # +# # +# ========================================================================== # + + +import dataclasses + +from typing import Union + +from ....keyboard.mappings import OtgKey +from ....keyboard.mappings import KEYMAP + + +# ===== +class BaseEvent: + pass + + +class ClearEvent(BaseEvent): + pass + + +class ResetEvent(BaseEvent): + pass + + +# ===== [email protected](frozen=True) +class KeyEvent(BaseEvent): + key: OtgKey + state: bool + + def __post_init__(self) -> None: + assert (not self.key.is_modifier) + + [email protected](frozen=True) +class ModifierEvent(BaseEvent): + modifier: OtgKey + state: bool + + def __post_init__(self) -> None: + assert self.modifier.is_modifier + + +def make_keyboard_event(key: str, state: bool) -> Union[KeyEvent, ModifierEvent]: + otg_key = KEYMAP[key].otg + if otg_key.is_modifier: + return ModifierEvent(otg_key, state) + return KeyEvent(otg_key, state) + + +# ===== [email protected](frozen=True) +class MouseButtonEvent(BaseEvent): + button: str + state: bool + code: int = 0 + + def __post_init__(self) -> None: + object.__setattr__(self, "code", { + "left": 0x1, + "right": 0x2, + "middle": 0x4, + "up": 0x8, # Back + "down": 0x10, # Forward + }[self.button]) + + [email protected](frozen=True) +class MouseMoveEvent(BaseEvent): + to_x: int + to_y: int + to_fixed_x: int = 0 + to_fixed_y: int = 0 + + def __post_init__(self) -> None: + assert -32768 <= self.to_x <= 32767 + assert -32768 <= self.to_y <= 32767 + object.__setattr__(self, "to_fixed_x", (self.to_x + 32768) // 2) + object.__setattr__(self, "to_fixed_y", (self.to_y + 32768) // 2) + + [email protected](frozen=True) +class MouseRelativeEvent(BaseEvent): + delta_x: int + delta_y: int + + def __post_init__(self) -> None: + assert -127 <= self.delta_x <= 127 + assert -127 <= self.delta_y <= 127 + + [email protected](frozen=True) +class MouseWheelEvent(BaseEvent): + delta_x: int + delta_y: int + + def __post_init__(self) -> None: + assert -127 <= self.delta_x <= 127 + assert -127 <= self.delta_y <= 127 diff --git a/kvmd/plugins/hid/otg/keyboard.py b/kvmd/plugins/hid/otg/keyboard.py index 21b950f9..384f8fd2 100644 --- a/kvmd/plugins/hid/otg/keyboard.py +++ b/kvmd/plugins/hid/otg/keyboard.py @@ -20,8 +20,6 @@ # ========================================================================== # -import dataclasses - from typing import Tuple from typing import List from typing import Set @@ -32,31 +30,15 @@ from typing import Any from ....logging import get_logger from ....keyboard.mappings import OtgKey -from ....keyboard.mappings import KEYMAP -from .device import BaseEvent from .device import BaseDeviceProcess - -# ===== -class _ClearEvent(BaseEvent): - pass - - -class _ResetEvent(BaseEvent): - pass - - [email protected](frozen=True) -class _ModifierEvent(BaseEvent): - modifier: OtgKey - state: bool - - [email protected](frozen=True) -class _KeyEvent(BaseEvent): - key: OtgKey - state: bool +from .events import BaseEvent +from .events import ClearEvent +from .events import ResetEvent +from .events import KeyEvent +from .events import ModifierEvent +from .events import make_keyboard_event # ===== @@ -79,17 +61,15 @@ class KeyboardProcess(BaseDeviceProcess): def send_clear_event(self) -> None: self._clear_queue() - self._queue_event(_ClearEvent()) + self._queue_event(ClearEvent()) def send_reset_event(self) -> None: self._clear_queue() - self._queue_event(_ResetEvent()) + self._queue_event(ResetEvent()) def send_key_events(self, keys: Iterable[Tuple[str, bool]]) -> None: for (key, state) in keys: - otg_key = KEYMAP[key].otg - cls = (_ModifierEvent if otg_key.is_modifier else _KeyEvent) - self._queue_event(cls(otg_key, state)) + self._queue_event(make_keyboard_event(key, state)) # ===== @@ -105,13 +85,13 @@ class KeyboardProcess(BaseDeviceProcess): # ===== def _process_event(self, event: BaseEvent) -> bool: - if isinstance(event, _ClearEvent): + if isinstance(event, ClearEvent): return self.__process_clear_event() - elif isinstance(event, _ResetEvent): + elif isinstance(event, ResetEvent): return self.__process_clear_event(reopen=True) - elif isinstance(event, _ModifierEvent): + elif isinstance(event, ModifierEvent): return self.__process_modifier_event(event) - elif isinstance(event, _KeyEvent): + elif isinstance(event, KeyEvent): return self.__process_key_event(event) raise RuntimeError(f"Not implemented event: {event}") @@ -120,7 +100,7 @@ class KeyboardProcess(BaseDeviceProcess): self.__clear_keys() return self.__send_current_state(reopen=reopen) - def __process_modifier_event(self, event: _ModifierEvent) -> bool: + def __process_modifier_event(self, event: ModifierEvent) -> bool: if event.modifier in self.__pressed_modifiers: # Ранее нажатый модификатор отжимаем self.__pressed_modifiers.remove(event.modifier) @@ -132,7 +112,7 @@ class KeyboardProcess(BaseDeviceProcess): return self.__send_current_state() return True - def __process_key_event(self, event: _KeyEvent) -> bool: + def __process_key_event(self, event: KeyEvent) -> bool: if event.key in self.__pressed_keys: # Ранее нажатую клавишу отжимаем self.__pressed_keys[self.__pressed_keys.index(event.key)] = None diff --git a/kvmd/plugins/hid/otg/mouse.py b/kvmd/plugins/hid/otg/mouse.py index 079a7dc3..3653cda4 100644 --- a/kvmd/plugins/hid/otg/mouse.py +++ b/kvmd/plugins/hid/otg/mouse.py @@ -21,48 +21,21 @@ import struct -import dataclasses from typing import Optional from typing import Any from ....logging import get_logger -from .device import BaseEvent from .device import BaseDeviceProcess - -# ===== -class _ClearEvent(BaseEvent): - pass - - -class _ResetEvent(BaseEvent): - pass - - [email protected](frozen=True) -class _ButtonEvent(BaseEvent): - code: int - state: bool - - [email protected](frozen=True) -class _MoveEvent(BaseEvent): - to_x: int - to_y: int - - [email protected](frozen=True) -class _RelativeEvent(BaseEvent): - delta_x: int - delta_y: int - - [email protected](frozen=True) -class _WheelEvent(BaseEvent): - delta_x: int - delta_y: int +from .events import BaseEvent +from .events import ClearEvent +from .events import ResetEvent +from .events import MouseButtonEvent +from .events import MouseMoveEvent +from .events import MouseRelativeEvent +from .events import MouseWheelEvent # ===== @@ -93,55 +66,40 @@ class MouseProcess(BaseDeviceProcess): def send_clear_event(self) -> None: self._clear_queue() - self._queue_event(_ClearEvent()) + self._queue_event(ClearEvent()) def send_reset_event(self) -> None: self._clear_queue() - self._queue_event(_ResetEvent()) + self._queue_event(ResetEvent()) def send_button_event(self, button: str, state: bool) -> None: - code: int = { - "left": 0x1, - "right": 0x2, - "middle": 0x4, - "up": 0x8, # Back - "down": 0x10, # Forward - }[button] - self._queue_event(_ButtonEvent(code, state)) + self._queue_event(MouseButtonEvent(button, state)) def send_move_event(self, to_x: int, to_y: int) -> None: if self.__absolute: - assert -32768 <= to_x <= 32767 - assert -32768 <= to_y <= 32767 - to_x = (to_x + 32768) // 2 - to_y = (to_y + 32768) // 2 - self._queue_event(_MoveEvent(to_x, to_y)) + self._queue_event(MouseMoveEvent(to_x, to_y)) def send_relative_event(self, delta_x: int, delta_y: int) -> None: if not self.__absolute: - assert -127 <= delta_x <= 127 - assert -127 <= delta_y <= 127 - self._queue_event(_RelativeEvent(delta_x, delta_y)) + self._queue_event(MouseRelativeEvent(delta_x, delta_y)) def send_wheel_event(self, delta_x: int, delta_y: int) -> None: - assert -127 <= delta_x <= 127 - assert -127 <= delta_y <= 127 - self._queue_event(_WheelEvent(delta_x, delta_y)) + self._queue_event(MouseWheelEvent(delta_x, delta_y)) # ===== def _process_event(self, event: BaseEvent) -> bool: - if isinstance(event, _ClearEvent): + if isinstance(event, ClearEvent): return self.__process_clear_event() - elif isinstance(event, _ResetEvent): + elif isinstance(event, ResetEvent): return self.__process_clear_event(reopen=True) - elif isinstance(event, _ButtonEvent): + elif isinstance(event, MouseButtonEvent): return self.__process_button_event(event) - elif isinstance(event, _MoveEvent): + elif isinstance(event, MouseMoveEvent): return self.__process_move_event(event) - elif isinstance(event, _RelativeEvent): + elif isinstance(event, MouseRelativeEvent): return self.__process_relative_event(event) - elif isinstance(event, _WheelEvent): + elif isinstance(event, MouseWheelEvent): return self.__process_wheel_event(event) raise RuntimeError(f"Not implemented event: {event}") @@ -149,7 +107,7 @@ class MouseProcess(BaseDeviceProcess): self.__clear_state() return self.__send_current_state(reopen=reopen) - def __process_button_event(self, event: _ButtonEvent) -> bool: + def __process_button_event(self, event: MouseButtonEvent) -> bool: if event.code & self.__pressed_buttons: # Ранее нажатую кнопку отжимаем self.__pressed_buttons &= ~event.code @@ -161,23 +119,23 @@ class MouseProcess(BaseDeviceProcess): return self.__send_current_state() return True - def __process_move_event(self, event: _MoveEvent) -> bool: - self.__x = event.to_x - self.__y = event.to_y + def __process_move_event(self, event: MouseMoveEvent) -> bool: + self.__x = event.to_fixed_x + self.__y = event.to_fixed_y return self.__send_current_state() - def __process_relative_event(self, event: _RelativeEvent) -> bool: + def __process_relative_event(self, event: MouseRelativeEvent) -> bool: return self.__send_current_state(relative_event=event) - def __process_wheel_event(self, event: _WheelEvent) -> bool: + def __process_wheel_event(self, event: MouseWheelEvent) -> bool: return self.__send_current_state(wheel_event=event) # ===== def __send_current_state( self, - relative_event: Optional[_RelativeEvent]=None, - wheel_event: Optional[_WheelEvent]=None, + relative_event: Optional[MouseRelativeEvent]=None, + wheel_event: Optional[MouseWheelEvent]=None, reopen: bool=False, ) -> bool: |