diff options
Diffstat (limited to 'kvmd/plugins/hid/_mcu')
-rw-r--r-- | kvmd/plugins/hid/_mcu/__init__.py | 24 | ||||
-rw-r--r-- | kvmd/plugins/hid/_mcu/gpio.py | 9 |
2 files changed, 13 insertions, 20 deletions
diff --git a/kvmd/plugins/hid/_mcu/__init__.py b/kvmd/plugins/hid/_mcu/__init__.py index 0de017df..ff92136c 100644 --- a/kvmd/plugins/hid/_mcu/__init__.py +++ b/kvmd/plugins/hid/_mcu/__init__.py @@ -25,13 +25,9 @@ import contextlib import queue import time -from typing import Tuple -from typing import List -from typing import Dict from typing import Iterable from typing import Generator from typing import AsyncGenerator -from typing import Optional from ....logging import get_logger @@ -144,7 +140,7 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many- self.__stop_event = multiprocessing.Event() @classmethod - def get_plugin_options(cls) -> Dict: + def get_plugin_options(cls) -> dict: return { "gpio_device": Option("/dev/gpiochip0", type=valid_abs_path, unpack_as="gpio_device_path"), "reset_pin": Option(4, type=valid_gpio_pin_optional), @@ -162,7 +158,7 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many- get_logger(0).info("Starting HID daemon ...") self.start() - async def get_state(self) -> Dict: + async def get_state(self) -> dict: state = await self.__state_flags.get() online = bool(state["online"]) pong = (state["status"] >> 16) & 0xFF @@ -174,8 +170,8 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many- if online and active_mouse in ["usb_rel", "ps2"]: absolute = False - keyboard_outputs: Dict = {"available": [], "active": ""} - mouse_outputs: Dict = {"available": [], "active": ""} + keyboard_outputs: dict = {"available": [], "active": ""} + mouse_outputs: dict = {"available": [], "active": ""} if outputs1 & 0b10000000: # Dynamic if outputs2 & 0b00000001: # USB @@ -222,8 +218,8 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many- }, } - async def poll_state(self) -> AsyncGenerator[Dict, None]: - prev_state: Dict = {} + async def poll_state(self) -> AsyncGenerator[dict, None]: + prev_state: dict = {} while True: state = await self.get_state() if state != prev_state: @@ -244,7 +240,7 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many- # ===== - def send_key_events(self, keys: Iterable[Tuple[str, bool]]) -> None: + def send_key_events(self, keys: Iterable[tuple[str, bool]]) -> None: for (key, state) in keys: self.__queue_event(KeyEvent(key, state)) @@ -260,8 +256,8 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many- def send_mouse_wheel_event(self, delta_x: int, delta_y: int) -> None: self.__queue_event(MouseWheelEvent(delta_x, delta_y)) - def set_params(self, keyboard_output: Optional[str]=None, mouse_output: Optional[str]=None) -> None: - events: List[BaseEvent] = [] + def set_params(self, keyboard_output: (str | None)=None, mouse_output: (str | None)=None) -> None: + events: list[BaseEvent] = [] if keyboard_output is not None: events.append(SetKeyboardOutputEvent(keyboard_output)) if mouse_output is not None: @@ -346,7 +342,7 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many- def __process_request(self, conn: BasePhyConnection, request: bytes) -> bool: # pylint: disable=too-many-branches logger = get_logger() - error_messages: List[str] = [] + error_messages: list[str] = [] live_log_errors = False common_retries = self.__common_retries diff --git a/kvmd/plugins/hid/_mcu/gpio.py b/kvmd/plugins/hid/_mcu/gpio.py index e3c7f7b7..f4985061 100644 --- a/kvmd/plugins/hid/_mcu/gpio.py +++ b/kvmd/plugins/hid/_mcu/gpio.py @@ -23,9 +23,6 @@ import types import time -from typing import Type -from typing import Optional - import gpiod from ....logging import get_logger @@ -46,8 +43,8 @@ class Gpio: self.__reset_inverted = reset_inverted self.__reset_delay = reset_delay - self.__chip: Optional[gpiod.Chip] = None - self.__reset_line: Optional[gpiod.Line] = None + self.__chip: (gpiod.Chip | None) = None + self.__reset_line: (gpiod.Line | None) = None def __enter__(self) -> None: if self.__reset_pin >= 0: @@ -59,7 +56,7 @@ class Gpio: def __exit__( self, - _exc_type: Type[BaseException], + _exc_type: type[BaseException], _exc: BaseException, _tb: types.TracebackType, ) -> None: |