diff options
Diffstat (limited to 'kvmd')
-rw-r--r-- | kvmd/aiofs.py | 11 | ||||
-rw-r--r-- | kvmd/aiogp.py | 3 | ||||
-rw-r--r-- | kvmd/aiomulti.py | 6 | ||||
-rw-r--r-- | kvmd/aiotools.py | 3 | ||||
-rw-r--r-- | kvmd/apps/kvmd/info/hw.py | 9 | ||||
-rw-r--r-- | kvmd/apps/vnc/vncauth.py | 7 | ||||
-rw-r--r-- | kvmd/clients/kvmd.py | 3 | ||||
-rw-r--r-- | kvmd/inotify.py | 3 | ||||
-rw-r--r-- | kvmd/plugins/hid/otg/device.py | 20 | ||||
-rw-r--r-- | kvmd/plugins/hid/otg/keyboard.py | 31 | ||||
-rw-r--r-- | kvmd/plugins/hid/otg/mouse.py | 36 | ||||
-rw-r--r-- | kvmd/plugins/hid/serial.py | 70 | ||||
-rw-r--r-- | kvmd/plugins/msd/otg/__init__.py | 4 | ||||
-rw-r--r-- | kvmd/plugins/msd/relay.py | 8 | ||||
-rw-r--r-- | kvmd/tools.py | 11 |
15 files changed, 116 insertions, 109 deletions
diff --git a/kvmd/aiofs.py b/kvmd/aiofs.py index c7bbd10d..53188286 100644 --- a/kvmd/aiofs.py +++ b/kvmd/aiofs.py @@ -29,7 +29,12 @@ from . import aiotools # ===== +async def read(path: str) -> str: + async with aiofiles.open(path) as afile: # type: ignore + return (await afile.read()) + + async def afile_write_now(afile: aiofiles.base.AiofilesContextManager, data: bytes) -> None: - await afile.write(data) - await afile.flush() - await aiotools.run_async(os.fsync, afile.fileno()) + await afile.write(data) # type: ignore + await afile.flush() # type: ignore + await aiotools.run_async(os.fsync, afile.fileno()) # type: ignore diff --git a/kvmd/aiogp.py b/kvmd/aiogp.py index d4652d53..ba406e8d 100644 --- a/kvmd/aiogp.py +++ b/kvmd/aiogp.py @@ -21,7 +21,6 @@ import asyncio -import asyncio.queues import threading import dataclasses @@ -147,7 +146,7 @@ class _DebouncedValue: self.__notifier = notifier self.__loop = loop - self.__queue: asyncio.queues.Queue = asyncio.Queue(loop=loop) + self.__queue: "asyncio.Queue[bool]" = asyncio.Queue(loop=loop) self.__task = loop.create_task(self.__consumer_task_loop()) def set(self, value: bool) -> None: diff --git a/kvmd/aiomulti.py b/kvmd/aiomulti.py index 9ff60d4c..edfc5c34 100644 --- a/kvmd/aiomulti.py +++ b/kvmd/aiomulti.py @@ -21,8 +21,6 @@ import multiprocessing -import multiprocessing.queues -import multiprocessing.sharedctypes import queue from typing import Dict @@ -33,7 +31,7 @@ from . import aiotools # ===== class AioProcessNotifier: def __init__(self) -> None: - self.__queue: multiprocessing.queues.Queue = multiprocessing.Queue() + self.__queue: "multiprocessing.Queue[None]" = multiprocessing.Queue() def notify(self) -> None: self.__queue.put_nowait(None) @@ -62,7 +60,7 @@ class AioSharedFlags: self.__notifier = notifier - self.__flags: Dict[str, multiprocessing.sharedctypes.RawValue] = { + self.__flags = { key: multiprocessing.RawValue("i", int(value)) # type: ignore for (key, value) in initial.items() } diff --git a/kvmd/aiotools.py b/kvmd/aiotools.py index dfd67f44..b5d9e2fd 100644 --- a/kvmd/aiotools.py +++ b/kvmd/aiotools.py @@ -21,7 +21,6 @@ import asyncio -import asyncio.queues import functools import types @@ -92,7 +91,7 @@ async def wait_first(*aws: Awaitable) -> Tuple[Set[asyncio.Future], Set[asyncio. # ===== class AioNotifier: def __init__(self) -> None: - self.__queue: asyncio.queues.Queue = asyncio.Queue() + self.__queue: "asyncio.Queue[None]" = asyncio.Queue() async def notify(self) -> None: await self.__queue.put(None) diff --git a/kvmd/apps/kvmd/info/hw.py b/kvmd/apps/kvmd/info/hw.py index e34ace5b..57a8521d 100644 --- a/kvmd/apps/kvmd/info/hw.py +++ b/kvmd/apps/kvmd/info/hw.py @@ -29,11 +29,10 @@ from typing import AsyncGenerator from typing import TypeVar from typing import Optional -import aiofiles - from ....logging import get_logger from .... import env +from .... import aiofs from .... import aioproc from .base import BaseInfoSubmanager @@ -89,8 +88,7 @@ class HwInfoSubmanager(BaseInfoSubmanager): async def __get_dt_model(self) -> Optional[str]: model_path = f"{env.PROCFS_PREFIX}/proc/device-tree/model" try: - async with aiofiles.open(model_path) as model_file: - return (await model_file.read()).strip(" \t\r\n\0") + return (await aiofs.read(model_path)).strip(" \t\r\n\0") except Exception as err: get_logger(0).error("Can't read DT model from %s: %s", model_path, err) return None @@ -98,8 +96,7 @@ class HwInfoSubmanager(BaseInfoSubmanager): async def __get_cpu_temp(self) -> Optional[float]: temp_path = f"{env.SYSFS_PREFIX}/sys/class/thermal/thermal_zone0/temp" try: - async with aiofiles.open(temp_path) as temp_file: - return int((await temp_file.read()).strip()) / 1000 + return int((await aiofs.read(temp_path)).strip()) / 1000 except Exception as err: get_logger(0).error("Can't read CPU temp from %s: %s", temp_path, err) return None diff --git a/kvmd/apps/vnc/vncauth.py b/kvmd/apps/vnc/vncauth.py index 06420ee0..b8283cca 100644 --- a/kvmd/apps/vnc/vncauth.py +++ b/kvmd/apps/vnc/vncauth.py @@ -25,10 +25,10 @@ import dataclasses from typing import Tuple from typing import Dict -import aiofiles - from ...logging import get_logger +from ... import aiofs + # ===== class VncAuthError(Exception): @@ -64,8 +64,7 @@ class VncAuthManager: return ({}, (not self.__enabled)) async def __inner_read_credentials(self) -> Dict[str, VncAuthKvmdCredentials]: - async with aiofiles.open(self.__path) as vc_file: - lines = (await vc_file.read()).split("\n") + lines = (await aiofs.read(self.__path)).split("\n") credentials: Dict[str, VncAuthKvmdCredentials] = {} for (lineno, line) in enumerate(lines): diff --git a/kvmd/clients/kvmd.py b/kvmd/clients/kvmd.py index 39813af2..d1934cd4 100644 --- a/kvmd/clients/kvmd.py +++ b/kvmd/clients/kvmd.py @@ -21,7 +21,6 @@ import asyncio -import asyncio.queues import contextlib import json import types @@ -133,7 +132,7 @@ class KvmdClientWs: def __init__(self, ws: aiohttp.ClientWebSocketResponse) -> None: self.__ws = ws - self.__writer_queue: asyncio.queues.Queue = asyncio.Queue() + self.__writer_queue: "asyncio.Queue[Dict]" = asyncio.Queue() self.__communicated = False async def communicate(self) -> AsyncGenerator[Dict, None]: diff --git a/kvmd/inotify.py b/kvmd/inotify.py index 422f3a0a..53578aa3 100644 --- a/kvmd/inotify.py +++ b/kvmd/inotify.py @@ -25,7 +25,6 @@ import sys import os import asyncio -import asyncio.queues import ctypes import ctypes.util import struct @@ -230,7 +229,7 @@ class Inotify: self.__moved: Dict[int, str] = {} - self.__events_queue: asyncio.queues.Queue = asyncio.Queue() + self.__events_queue: "asyncio.Queue[InotifyEvent]" = asyncio.Queue() def watch(self, path: str, mask: int) -> None: path = os.path.normpath(path) diff --git a/kvmd/plugins/hid/otg/device.py b/kvmd/plugins/hid/otg/device.py index 625aeef5..43d324b1 100644 --- a/kvmd/plugins/hid/otg/device.py +++ b/kvmd/plugins/hid/otg/device.py @@ -23,7 +23,6 @@ import os import select import multiprocessing -import multiprocessing.queues import queue import errno import time @@ -32,6 +31,7 @@ from typing import Dict from ....logging import get_logger +from .... import tools from .... import aiomulti from .... import aioproc @@ -76,7 +76,7 @@ class BaseDeviceProcess(multiprocessing.Process): # pylint: disable=too-many-in self.__noop = noop self.__fd = -1 - self.__events_queue: multiprocessing.queues.Queue = multiprocessing.Queue() + self.__events_queue: "multiprocessing.Queue[BaseEvent]" = multiprocessing.Queue() self.__state_flags = aiomulti.AioSharedFlags({"online": True, **initial_state}, notifier) self.__stop_event = multiprocessing.Event() @@ -93,16 +93,18 @@ class BaseDeviceProcess(multiprocessing.Process): # pylint: disable=too-many-in if self.__ensure_device(): # Check device and process reports if needed self.__read_all_reports() try: - event: BaseEvent = self.__events_queue.get(timeout=0.1) + event = self.__events_queue.get(timeout=0.1) except queue.Empty: if not self.__udc.can_operate(): + self._clear_queue() self.__close_device() else: - self._process_event(event) + if not self._process_event(event): + self._clear_queue() except Exception: logger.exception("Unexpected HID-%s error", self.__name) + self._clear_queue() self.__close_device() - finally: time.sleep(1) self.__close_device() @@ -112,7 +114,7 @@ class BaseDeviceProcess(multiprocessing.Process): # pylint: disable=too-many-in # ===== - def _process_event(self, event: BaseEvent) -> None: + def _process_event(self, event: BaseEvent) -> bool: raise NotImplementedError def _process_read_report(self, report: bytes) -> None: @@ -135,11 +137,7 @@ class BaseDeviceProcess(multiprocessing.Process): # pylint: disable=too-many-in self.__events_queue.put_nowait(event) def _clear_queue(self) -> None: - while not self.__events_queue.empty(): - try: - self.__events_queue.get_nowait() - except queue.Empty: - break + tools.clear_queue(self.__events_queue) def _ensure_write(self, report: bytes, reopen: bool=False, close: bool=False) -> bool: if reopen: diff --git a/kvmd/plugins/hid/otg/keyboard.py b/kvmd/plugins/hid/otg/keyboard.py index e0243798..c497f7c8 100644 --- a/kvmd/plugins/hid/otg/keyboard.py +++ b/kvmd/plugins/hid/otg/keyboard.py @@ -112,47 +112,50 @@ class KeyboardProcess(BaseDeviceProcess): # ===== - def _process_event(self, event: BaseEvent) -> None: + def _process_event(self, event: BaseEvent) -> bool: if isinstance(event, _ClearEvent): - self.__process_clear_event() + return self.__process_clear_event() elif isinstance(event, _ResetEvent): - self.__process_clear_event(reopen=True) + return self.__process_clear_event(reopen=True) elif isinstance(event, _ModifierEvent): - self.__process_modifier_event(event) + return self.__process_modifier_event(event) elif isinstance(event, _KeyEvent): - self.__process_key_event(event) + return self.__process_key_event(event) + raise RuntimeError(f"Not implemented event: {event}") - def __process_clear_event(self, reopen: bool=False) -> None: + def __process_clear_event(self, reopen: bool=False) -> bool: self.__clear_modifiers() self.__clear_keys() - self.__send_current_state(reopen=reopen) + return self.__send_current_state(reopen=reopen) - def __process_modifier_event(self, event: _ModifierEvent) -> None: + def __process_modifier_event(self, event: _ModifierEvent) -> bool: if event.modifier in self.__pressed_modifiers: # Ранее нажатый модификатор отжимаем self.__pressed_modifiers.remove(event.modifier) if not self.__send_current_state(): - return + return False if event.state: # Нажимаем если нужно self.__pressed_modifiers.add(event.modifier) - self.__send_current_state() + return self.__send_current_state() + return True - def __process_key_event(self, event: _KeyEvent) -> None: + def __process_key_event(self, event: _KeyEvent) -> bool: if event.key in self.__pressed_keys: # Ранее нажатую клавишу отжимаем self.__pressed_keys[self.__pressed_keys.index(event.key)] = None if not self.__send_current_state(): - return + return False elif event.state and None not in self.__pressed_keys: # Если нужно нажать что-то новое, но свободных слотов нет - отжимаем всё self.__clear_keys() if not self.__send_current_state(): - return + return False if event.state: # Нажимаем если нужно self.__pressed_keys[self.__pressed_keys.index(None)] = event.key - self.__send_current_state() + return self.__send_current_state() + return True # ===== diff --git a/kvmd/plugins/hid/otg/mouse.py b/kvmd/plugins/hid/otg/mouse.py index adb2469f..3e0cf0e8 100644 --- a/kvmd/plugins/hid/otg/mouse.py +++ b/kvmd/plugins/hid/otg/mouse.py @@ -110,44 +110,46 @@ class MouseProcess(BaseDeviceProcess): # ===== - def _process_event(self, event: BaseEvent) -> None: + def _process_event(self, event: BaseEvent) -> bool: if isinstance(event, _ClearEvent): - self.__process_clear_event() + return self.__process_clear_event() elif isinstance(event, _ResetEvent): - self.__process_clear_event(reopen=True) + return self.__process_clear_event(reopen=True) elif isinstance(event, _ButtonEvent): - self.__process_button_event(event) + return self.__process_button_event(event) elif isinstance(event, _MoveEvent): - self.__process_move_event(event) + return self.__process_move_event(event) elif isinstance(event, _WheelEvent): - self.__process_wheel_event(event) + return self.__process_wheel_event(event) + raise RuntimeError(f"Not implemented event: {event}") - def __process_clear_event(self, reopen: bool=False) -> None: + def __process_clear_event(self, reopen: bool=False) -> bool: self.__clear_state() - self.__send_current_state(0, 0, reopen=reopen) + return self.__send_current_state(reopen=reopen) - def __process_button_event(self, event: _ButtonEvent) -> None: + def __process_button_event(self, event: _ButtonEvent) -> bool: if event.code & self.__pressed_buttons: # Ранее нажатую кнопку отжимаем self.__pressed_buttons &= ~event.code - if not self.__send_current_state(0, 0): - return + if not self.__send_current_state(): + return False if event.state: # Нажимаем если нужно self.__pressed_buttons |= event.code - self.__send_current_state(0, 0) + return self.__send_current_state() + return True - def __process_move_event(self, event: _MoveEvent) -> None: + def __process_move_event(self, event: _MoveEvent) -> bool: self.__x = event.to_x self.__y = event.to_y - self.__send_current_state(0, 0) + return self.__send_current_state() - def __process_wheel_event(self, event: _WheelEvent) -> None: - self.__send_current_state(event.delta_x, event.delta_y) + def __process_wheel_event(self, event: _WheelEvent) -> bool: + return self.__send_current_state(event.delta_x, event.delta_y) # ===== - def __send_current_state(self, delta_x: int, delta_y: int, reopen: bool=False) -> bool: + def __send_current_state(self, delta_x: int=0, delta_y: int=0, reopen: bool=False) -> bool: report = self.__make_report( buttons=self.__pressed_buttons, to_x=self.__x, diff --git a/kvmd/plugins/hid/serial.py b/kvmd/plugins/hid/serial.py index 13a2db6d..a45cd89a 100644 --- a/kvmd/plugins/hid/serial.py +++ b/kvmd/plugins/hid/serial.py @@ -22,7 +22,6 @@ import os import multiprocessing -import multiprocessing.queues import dataclasses import queue import struct @@ -44,6 +43,7 @@ from ...logging import get_logger from ...keyboard.mappings import KEYMAP from ... import env +from ... import tools from ... import aiotools from ... import aiomulti from ... import aioproc @@ -70,7 +70,7 @@ class _RequestError(Exception): self.online = online -class _FatalRequestError(_RequestError): +class _PermRequestError(_RequestError): pass @@ -225,7 +225,7 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst self.__gpio = _Gpio(reset_pin, reset_delay) - self.__events_queue: multiprocessing.queues.Queue = multiprocessing.Queue() + self.__events_queue: "multiprocessing.Queue[_BaseEvent]" = multiprocessing.Queue() self.__notifier = aiomulti.AioProcessNotifier() self.__state_flags = aiomulti.AioSharedFlags({ @@ -321,11 +321,10 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst self.__queue_event(_MouseWheelEvent(delta_x, delta_y)) def clear_events(self) -> None: - while not self.__events_queue.empty(): - try: - self.__events_queue.get_nowait() - except queue.Empty: - break + # FIXME: Если очистка производится со стороны процесса хида, то возможна гонка между + # очисткой и добавлением события _ClearEvent. Неприятно, но не смертельно. + # Починить блокировкой после перехода на асинхронные очереди. + tools.clear_queue(self.__events_queue) self.__queue_event(_ClearEvent()) def __queue_event(self, event: _BaseEvent) -> None: @@ -344,37 +343,35 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst with self.__get_serial() as tty: while not (self.__stop_event.is_set() and self.__events_queue.qsize() == 0): try: - event: _BaseEvent = self.__events_queue.get(timeout=0.1) + event = self.__events_queue.get(timeout=0.1) except queue.Empty: self.__process_command(tty, b"\x01\x00\x00\x00\x00") # Ping else: - self.__process_command(tty, event.make_command()) + if not self.__process_command(tty, event.make_command()): + self.clear_events() - except serial.SerialException as err: - if err.errno == errno.ENOENT: + except Exception as err: + self.clear_events() + if isinstance(err, serial.SerialException) and err.errno == errno.ENOENT: # pylint: disable=no-member logger.error("Missing HID serial device: %s", self.__device_path) else: logger.exception("Unexpected HID error") - - except Exception: - logger.exception("Unexpected HID error") - - finally: time.sleep(1) def __get_serial(self) -> serial.Serial: return serial.Serial(self.__device_path, self.__speed, timeout=self.__read_timeout) - def __process_command(self, tty: serial.Serial, command: bytes) -> None: - self.__process_request(tty, self.__make_request(command)) + def __process_command(self, tty: serial.Serial, command: bytes) -> bool: + return self.__process_request(tty, self.__make_request(command)) - def __process_request(self, tty: serial.Serial, request: bytes) -> None: # pylint: disable=too-many-branches + def __process_request(self, tty: serial.Serial, request: bytes) -> bool: # pylint: disable=too-many-branches logger = get_logger() - errors: List[str] = [] - runtime_errors = False + error_messages: List[str] = [] + live_log_errors = False common_retries = self.__common_retries read_retries = self.__read_retries + error_retval = False while common_retries and read_retries: response = self.__send_request(tty, request) @@ -394,12 +391,12 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst elif code == 0x40: # CRC Error raise _TempRequestError(f"Got CRC error of request from HID: request={request!r}") elif code == 0x45: # Unknown command - raise _FatalRequestError(f"HID did not recognize the request={request!r}", online=True) + raise _PermRequestError(f"HID did not recognize the request={request!r}", online=True) elif code == 0x24: # Rebooted? - raise _FatalRequestError("No previous command state inside HID, seems it was rebooted", online=True) + raise _PermRequestError("No previous command state inside HID, seems it was rebooted", online=True) elif code == 0x20: # Done self.__state_flags.update(online=True) - return + return True elif code & 0x80: # Pong with leds self.__state_flags.update( online=True, @@ -407,33 +404,34 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst scroll=bool(code & 0x00000010), num=bool(code & 0x00000100), ) - return - else: - raise _TempRequestError(f"Invalid response from HID: request={request!r}; code=0x{code:02X}") + return True + raise _TempRequestError(f"Invalid response from HID: request={request!r}; code=0x{code:02X}") except _RequestError as err: common_retries -= 1 self.__state_flags.update(online=err.online) + error_retval = err.online - if runtime_errors: + if live_log_errors: logger.error(err.msg) else: - errors.append(err.msg) - if len(errors) > self.__errors_threshold: - for msg in errors: + error_messages.append(err.msg) + if len(error_messages) > self.__errors_threshold: + for msg in error_messages: logger.error(msg) - errors = [] - runtime_errors = True + error_messages = [] + live_log_errors = True - if isinstance(err, _FatalRequestError): + if isinstance(err, _PermRequestError): break if common_retries and read_retries: time.sleep(self.__retries_delay) - for msg in errors: + for msg in error_messages: logger.error(msg) if not (common_retries and read_retries): logger.error("Can't process HID request due many errors: %r", request) + return error_retval def __send_request(self, tty: serial.Serial, request: bytes) -> bytes: if not self.__noop: diff --git a/kvmd/plugins/msd/otg/__init__.py b/kvmd/plugins/msd/otg/__init__.py index ef925065..f55dcb42 100644 --- a/kvmd/plugins/msd/otg/__init__.py +++ b/kvmd/plugins/msd/otg/__init__.py @@ -306,7 +306,7 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes await self.__remount_storage(rw=True) self.__set_image_complete(name, False) self.__new_file_written = 0 - self.__new_file = await aiofiles.open(path, mode="w+b", buffering=0) + self.__new_file = await aiofiles.open(path, mode="w+b", buffering=0) # type: ignore await self.__notifier.notify() yield @@ -364,7 +364,7 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes try: if self.__new_file: get_logger().info("Closing new image file ...") - await self.__new_file.close() + await self.__new_file.close() # type: ignore except Exception: get_logger().exception("Can't close device file") finally: diff --git a/kvmd/plugins/msd/relay.py b/kvmd/plugins/msd/relay.py index fe1c7784..e1e21ea9 100644 --- a/kvmd/plugins/msd/relay.py +++ b/kvmd/plugins/msd/relay.py @@ -355,7 +355,7 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes if self.__connected: raise MsdConnectedError() - self.__device_file = await aiofiles.open(self.__device_info.path, mode="w+b", buffering=0) + self.__device_file = await aiofiles.open(self.__device_info.path, mode="w+b", buffering=0) # type: ignore self.__written = 0 await self.__write_image_info(name, complete=False) @@ -391,9 +391,9 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes assert self.__device_file assert self.__device_info if self.__device_info.size - self.__written > _IMAGE_INFO_SIZE: - await self.__device_file.seek(self.__device_info.size - _IMAGE_INFO_SIZE) + await self.__device_file.seek(self.__device_info.size - _IMAGE_INFO_SIZE) # type: ignore await aiofs.afile_write_now(self.__device_file, _make_image_info_bytes(name, self.__written, complete)) - await self.__device_file.seek(0) + await self.__device_file.seek(0) # type: ignore else: get_logger().error("Can't write image info because device is full") @@ -401,7 +401,7 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes try: if self.__device_file: get_logger().info("Closing device file ...") - await self.__device_file.close() + await self.__device_file.close() # type: ignore except Exception: get_logger().exception("Can't close device file") finally: diff --git a/kvmd/tools.py b/kvmd/tools.py index 6d10e227..555ddc3e 100644 --- a/kvmd/tools.py +++ b/kvmd/tools.py @@ -22,6 +22,8 @@ import operator import functools +import multiprocessing.queues +import queue from typing import Tuple from typing import List @@ -53,3 +55,12 @@ _DictValueT = TypeVar("_DictValueT") def sorted_kvs(dct: Dict[_DictKeyT, _DictValueT]) -> List[Tuple[_DictKeyT, _DictValueT]]: return sorted(dct.items(), key=operator.itemgetter(0)) + + +# ===== +def clear_queue(q: multiprocessing.queues.Queue) -> None: # pylint: disable=invalid-name + for _ in range(q.qsize()): + try: + q.get_nowait() + except queue.Empty: + break |