summaryrefslogtreecommitdiff
path: root/kvmd
diff options
context:
space:
mode:
Diffstat (limited to 'kvmd')
-rw-r--r--kvmd/aiofs.py11
-rw-r--r--kvmd/aiogp.py3
-rw-r--r--kvmd/aiomulti.py6
-rw-r--r--kvmd/aiotools.py3
-rw-r--r--kvmd/apps/kvmd/info/hw.py9
-rw-r--r--kvmd/apps/vnc/vncauth.py7
-rw-r--r--kvmd/clients/kvmd.py3
-rw-r--r--kvmd/inotify.py3
-rw-r--r--kvmd/plugins/hid/otg/device.py20
-rw-r--r--kvmd/plugins/hid/otg/keyboard.py31
-rw-r--r--kvmd/plugins/hid/otg/mouse.py36
-rw-r--r--kvmd/plugins/hid/serial.py70
-rw-r--r--kvmd/plugins/msd/otg/__init__.py4
-rw-r--r--kvmd/plugins/msd/relay.py8
-rw-r--r--kvmd/tools.py11
15 files changed, 116 insertions, 109 deletions
diff --git a/kvmd/aiofs.py b/kvmd/aiofs.py
index c7bbd10d..53188286 100644
--- a/kvmd/aiofs.py
+++ b/kvmd/aiofs.py
@@ -29,7 +29,12 @@ from . import aiotools
# =====
+async def read(path: str) -> str:
+ async with aiofiles.open(path) as afile: # type: ignore
+ return (await afile.read())
+
+
async def afile_write_now(afile: aiofiles.base.AiofilesContextManager, data: bytes) -> None:
- await afile.write(data)
- await afile.flush()
- await aiotools.run_async(os.fsync, afile.fileno())
+ await afile.write(data) # type: ignore
+ await afile.flush() # type: ignore
+ await aiotools.run_async(os.fsync, afile.fileno()) # type: ignore
diff --git a/kvmd/aiogp.py b/kvmd/aiogp.py
index d4652d53..ba406e8d 100644
--- a/kvmd/aiogp.py
+++ b/kvmd/aiogp.py
@@ -21,7 +21,6 @@
import asyncio
-import asyncio.queues
import threading
import dataclasses
@@ -147,7 +146,7 @@ class _DebouncedValue:
self.__notifier = notifier
self.__loop = loop
- self.__queue: asyncio.queues.Queue = asyncio.Queue(loop=loop)
+ self.__queue: "asyncio.Queue[bool]" = asyncio.Queue(loop=loop)
self.__task = loop.create_task(self.__consumer_task_loop())
def set(self, value: bool) -> None:
diff --git a/kvmd/aiomulti.py b/kvmd/aiomulti.py
index 9ff60d4c..edfc5c34 100644
--- a/kvmd/aiomulti.py
+++ b/kvmd/aiomulti.py
@@ -21,8 +21,6 @@
import multiprocessing
-import multiprocessing.queues
-import multiprocessing.sharedctypes
import queue
from typing import Dict
@@ -33,7 +31,7 @@ from . import aiotools
# =====
class AioProcessNotifier:
def __init__(self) -> None:
- self.__queue: multiprocessing.queues.Queue = multiprocessing.Queue()
+ self.__queue: "multiprocessing.Queue[None]" = multiprocessing.Queue()
def notify(self) -> None:
self.__queue.put_nowait(None)
@@ -62,7 +60,7 @@ class AioSharedFlags:
self.__notifier = notifier
- self.__flags: Dict[str, multiprocessing.sharedctypes.RawValue] = {
+ self.__flags = {
key: multiprocessing.RawValue("i", int(value)) # type: ignore
for (key, value) in initial.items()
}
diff --git a/kvmd/aiotools.py b/kvmd/aiotools.py
index dfd67f44..b5d9e2fd 100644
--- a/kvmd/aiotools.py
+++ b/kvmd/aiotools.py
@@ -21,7 +21,6 @@
import asyncio
-import asyncio.queues
import functools
import types
@@ -92,7 +91,7 @@ async def wait_first(*aws: Awaitable) -> Tuple[Set[asyncio.Future], Set[asyncio.
# =====
class AioNotifier:
def __init__(self) -> None:
- self.__queue: asyncio.queues.Queue = asyncio.Queue()
+ self.__queue: "asyncio.Queue[None]" = asyncio.Queue()
async def notify(self) -> None:
await self.__queue.put(None)
diff --git a/kvmd/apps/kvmd/info/hw.py b/kvmd/apps/kvmd/info/hw.py
index e34ace5b..57a8521d 100644
--- a/kvmd/apps/kvmd/info/hw.py
+++ b/kvmd/apps/kvmd/info/hw.py
@@ -29,11 +29,10 @@ from typing import AsyncGenerator
from typing import TypeVar
from typing import Optional
-import aiofiles
-
from ....logging import get_logger
from .... import env
+from .... import aiofs
from .... import aioproc
from .base import BaseInfoSubmanager
@@ -89,8 +88,7 @@ class HwInfoSubmanager(BaseInfoSubmanager):
async def __get_dt_model(self) -> Optional[str]:
model_path = f"{env.PROCFS_PREFIX}/proc/device-tree/model"
try:
- async with aiofiles.open(model_path) as model_file:
- return (await model_file.read()).strip(" \t\r\n\0")
+ return (await aiofs.read(model_path)).strip(" \t\r\n\0")
except Exception as err:
get_logger(0).error("Can't read DT model from %s: %s", model_path, err)
return None
@@ -98,8 +96,7 @@ class HwInfoSubmanager(BaseInfoSubmanager):
async def __get_cpu_temp(self) -> Optional[float]:
temp_path = f"{env.SYSFS_PREFIX}/sys/class/thermal/thermal_zone0/temp"
try:
- async with aiofiles.open(temp_path) as temp_file:
- return int((await temp_file.read()).strip()) / 1000
+ return int((await aiofs.read(temp_path)).strip()) / 1000
except Exception as err:
get_logger(0).error("Can't read CPU temp from %s: %s", temp_path, err)
return None
diff --git a/kvmd/apps/vnc/vncauth.py b/kvmd/apps/vnc/vncauth.py
index 06420ee0..b8283cca 100644
--- a/kvmd/apps/vnc/vncauth.py
+++ b/kvmd/apps/vnc/vncauth.py
@@ -25,10 +25,10 @@ import dataclasses
from typing import Tuple
from typing import Dict
-import aiofiles
-
from ...logging import get_logger
+from ... import aiofs
+
# =====
class VncAuthError(Exception):
@@ -64,8 +64,7 @@ class VncAuthManager:
return ({}, (not self.__enabled))
async def __inner_read_credentials(self) -> Dict[str, VncAuthKvmdCredentials]:
- async with aiofiles.open(self.__path) as vc_file:
- lines = (await vc_file.read()).split("\n")
+ lines = (await aiofs.read(self.__path)).split("\n")
credentials: Dict[str, VncAuthKvmdCredentials] = {}
for (lineno, line) in enumerate(lines):
diff --git a/kvmd/clients/kvmd.py b/kvmd/clients/kvmd.py
index 39813af2..d1934cd4 100644
--- a/kvmd/clients/kvmd.py
+++ b/kvmd/clients/kvmd.py
@@ -21,7 +21,6 @@
import asyncio
-import asyncio.queues
import contextlib
import json
import types
@@ -133,7 +132,7 @@ class KvmdClientWs:
def __init__(self, ws: aiohttp.ClientWebSocketResponse) -> None:
self.__ws = ws
- self.__writer_queue: asyncio.queues.Queue = asyncio.Queue()
+ self.__writer_queue: "asyncio.Queue[Dict]" = asyncio.Queue()
self.__communicated = False
async def communicate(self) -> AsyncGenerator[Dict, None]:
diff --git a/kvmd/inotify.py b/kvmd/inotify.py
index 422f3a0a..53578aa3 100644
--- a/kvmd/inotify.py
+++ b/kvmd/inotify.py
@@ -25,7 +25,6 @@
import sys
import os
import asyncio
-import asyncio.queues
import ctypes
import ctypes.util
import struct
@@ -230,7 +229,7 @@ class Inotify:
self.__moved: Dict[int, str] = {}
- self.__events_queue: asyncio.queues.Queue = asyncio.Queue()
+ self.__events_queue: "asyncio.Queue[InotifyEvent]" = asyncio.Queue()
def watch(self, path: str, mask: int) -> None:
path = os.path.normpath(path)
diff --git a/kvmd/plugins/hid/otg/device.py b/kvmd/plugins/hid/otg/device.py
index 625aeef5..43d324b1 100644
--- a/kvmd/plugins/hid/otg/device.py
+++ b/kvmd/plugins/hid/otg/device.py
@@ -23,7 +23,6 @@
import os
import select
import multiprocessing
-import multiprocessing.queues
import queue
import errno
import time
@@ -32,6 +31,7 @@ from typing import Dict
from ....logging import get_logger
+from .... import tools
from .... import aiomulti
from .... import aioproc
@@ -76,7 +76,7 @@ class BaseDeviceProcess(multiprocessing.Process): # pylint: disable=too-many-in
self.__noop = noop
self.__fd = -1
- self.__events_queue: multiprocessing.queues.Queue = multiprocessing.Queue()
+ self.__events_queue: "multiprocessing.Queue[BaseEvent]" = multiprocessing.Queue()
self.__state_flags = aiomulti.AioSharedFlags({"online": True, **initial_state}, notifier)
self.__stop_event = multiprocessing.Event()
@@ -93,16 +93,18 @@ class BaseDeviceProcess(multiprocessing.Process): # pylint: disable=too-many-in
if self.__ensure_device(): # Check device and process reports if needed
self.__read_all_reports()
try:
- event: BaseEvent = self.__events_queue.get(timeout=0.1)
+ event = self.__events_queue.get(timeout=0.1)
except queue.Empty:
if not self.__udc.can_operate():
+ self._clear_queue()
self.__close_device()
else:
- self._process_event(event)
+ if not self._process_event(event):
+ self._clear_queue()
except Exception:
logger.exception("Unexpected HID-%s error", self.__name)
+ self._clear_queue()
self.__close_device()
- finally:
time.sleep(1)
self.__close_device()
@@ -112,7 +114,7 @@ class BaseDeviceProcess(multiprocessing.Process): # pylint: disable=too-many-in
# =====
- def _process_event(self, event: BaseEvent) -> None:
+ def _process_event(self, event: BaseEvent) -> bool:
raise NotImplementedError
def _process_read_report(self, report: bytes) -> None:
@@ -135,11 +137,7 @@ class BaseDeviceProcess(multiprocessing.Process): # pylint: disable=too-many-in
self.__events_queue.put_nowait(event)
def _clear_queue(self) -> None:
- while not self.__events_queue.empty():
- try:
- self.__events_queue.get_nowait()
- except queue.Empty:
- break
+ tools.clear_queue(self.__events_queue)
def _ensure_write(self, report: bytes, reopen: bool=False, close: bool=False) -> bool:
if reopen:
diff --git a/kvmd/plugins/hid/otg/keyboard.py b/kvmd/plugins/hid/otg/keyboard.py
index e0243798..c497f7c8 100644
--- a/kvmd/plugins/hid/otg/keyboard.py
+++ b/kvmd/plugins/hid/otg/keyboard.py
@@ -112,47 +112,50 @@ class KeyboardProcess(BaseDeviceProcess):
# =====
- def _process_event(self, event: BaseEvent) -> None:
+ def _process_event(self, event: BaseEvent) -> bool:
if isinstance(event, _ClearEvent):
- self.__process_clear_event()
+ return self.__process_clear_event()
elif isinstance(event, _ResetEvent):
- self.__process_clear_event(reopen=True)
+ return self.__process_clear_event(reopen=True)
elif isinstance(event, _ModifierEvent):
- self.__process_modifier_event(event)
+ return self.__process_modifier_event(event)
elif isinstance(event, _KeyEvent):
- self.__process_key_event(event)
+ return self.__process_key_event(event)
+ raise RuntimeError(f"Not implemented event: {event}")
- def __process_clear_event(self, reopen: bool=False) -> None:
+ def __process_clear_event(self, reopen: bool=False) -> bool:
self.__clear_modifiers()
self.__clear_keys()
- self.__send_current_state(reopen=reopen)
+ return self.__send_current_state(reopen=reopen)
- def __process_modifier_event(self, event: _ModifierEvent) -> None:
+ def __process_modifier_event(self, event: _ModifierEvent) -> bool:
if event.modifier in self.__pressed_modifiers:
# Ранее нажатый модификатор отжимаем
self.__pressed_modifiers.remove(event.modifier)
if not self.__send_current_state():
- return
+ return False
if event.state:
# Нажимаем если нужно
self.__pressed_modifiers.add(event.modifier)
- self.__send_current_state()
+ return self.__send_current_state()
+ return True
- def __process_key_event(self, event: _KeyEvent) -> None:
+ def __process_key_event(self, event: _KeyEvent) -> bool:
if event.key in self.__pressed_keys:
# Ранее нажатую клавишу отжимаем
self.__pressed_keys[self.__pressed_keys.index(event.key)] = None
if not self.__send_current_state():
- return
+ return False
elif event.state and None not in self.__pressed_keys:
# Если нужно нажать что-то новое, но свободных слотов нет - отжимаем всё
self.__clear_keys()
if not self.__send_current_state():
- return
+ return False
if event.state:
# Нажимаем если нужно
self.__pressed_keys[self.__pressed_keys.index(None)] = event.key
- self.__send_current_state()
+ return self.__send_current_state()
+ return True
# =====
diff --git a/kvmd/plugins/hid/otg/mouse.py b/kvmd/plugins/hid/otg/mouse.py
index adb2469f..3e0cf0e8 100644
--- a/kvmd/plugins/hid/otg/mouse.py
+++ b/kvmd/plugins/hid/otg/mouse.py
@@ -110,44 +110,46 @@ class MouseProcess(BaseDeviceProcess):
# =====
- def _process_event(self, event: BaseEvent) -> None:
+ def _process_event(self, event: BaseEvent) -> bool:
if isinstance(event, _ClearEvent):
- self.__process_clear_event()
+ return self.__process_clear_event()
elif isinstance(event, _ResetEvent):
- self.__process_clear_event(reopen=True)
+ return self.__process_clear_event(reopen=True)
elif isinstance(event, _ButtonEvent):
- self.__process_button_event(event)
+ return self.__process_button_event(event)
elif isinstance(event, _MoveEvent):
- self.__process_move_event(event)
+ return self.__process_move_event(event)
elif isinstance(event, _WheelEvent):
- self.__process_wheel_event(event)
+ return self.__process_wheel_event(event)
+ raise RuntimeError(f"Not implemented event: {event}")
- def __process_clear_event(self, reopen: bool=False) -> None:
+ def __process_clear_event(self, reopen: bool=False) -> bool:
self.__clear_state()
- self.__send_current_state(0, 0, reopen=reopen)
+ return self.__send_current_state(reopen=reopen)
- def __process_button_event(self, event: _ButtonEvent) -> None:
+ def __process_button_event(self, event: _ButtonEvent) -> bool:
if event.code & self.__pressed_buttons:
# Ранее нажатую кнопку отжимаем
self.__pressed_buttons &= ~event.code
- if not self.__send_current_state(0, 0):
- return
+ if not self.__send_current_state():
+ return False
if event.state:
# Нажимаем если нужно
self.__pressed_buttons |= event.code
- self.__send_current_state(0, 0)
+ return self.__send_current_state()
+ return True
- def __process_move_event(self, event: _MoveEvent) -> None:
+ def __process_move_event(self, event: _MoveEvent) -> bool:
self.__x = event.to_x
self.__y = event.to_y
- self.__send_current_state(0, 0)
+ return self.__send_current_state()
- def __process_wheel_event(self, event: _WheelEvent) -> None:
- self.__send_current_state(event.delta_x, event.delta_y)
+ def __process_wheel_event(self, event: _WheelEvent) -> bool:
+ return self.__send_current_state(event.delta_x, event.delta_y)
# =====
- def __send_current_state(self, delta_x: int, delta_y: int, reopen: bool=False) -> bool:
+ def __send_current_state(self, delta_x: int=0, delta_y: int=0, reopen: bool=False) -> bool:
report = self.__make_report(
buttons=self.__pressed_buttons,
to_x=self.__x,
diff --git a/kvmd/plugins/hid/serial.py b/kvmd/plugins/hid/serial.py
index 13a2db6d..a45cd89a 100644
--- a/kvmd/plugins/hid/serial.py
+++ b/kvmd/plugins/hid/serial.py
@@ -22,7 +22,6 @@
import os
import multiprocessing
-import multiprocessing.queues
import dataclasses
import queue
import struct
@@ -44,6 +43,7 @@ from ...logging import get_logger
from ...keyboard.mappings import KEYMAP
from ... import env
+from ... import tools
from ... import aiotools
from ... import aiomulti
from ... import aioproc
@@ -70,7 +70,7 @@ class _RequestError(Exception):
self.online = online
-class _FatalRequestError(_RequestError):
+class _PermRequestError(_RequestError):
pass
@@ -225,7 +225,7 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst
self.__gpio = _Gpio(reset_pin, reset_delay)
- self.__events_queue: multiprocessing.queues.Queue = multiprocessing.Queue()
+ self.__events_queue: "multiprocessing.Queue[_BaseEvent]" = multiprocessing.Queue()
self.__notifier = aiomulti.AioProcessNotifier()
self.__state_flags = aiomulti.AioSharedFlags({
@@ -321,11 +321,10 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst
self.__queue_event(_MouseWheelEvent(delta_x, delta_y))
def clear_events(self) -> None:
- while not self.__events_queue.empty():
- try:
- self.__events_queue.get_nowait()
- except queue.Empty:
- break
+ # FIXME: Если очистка производится со стороны процесса хида, то возможна гонка между
+ # очисткой и добавлением события _ClearEvent. Неприятно, но не смертельно.
+ # Починить блокировкой после перехода на асинхронные очереди.
+ tools.clear_queue(self.__events_queue)
self.__queue_event(_ClearEvent())
def __queue_event(self, event: _BaseEvent) -> None:
@@ -344,37 +343,35 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst
with self.__get_serial() as tty:
while not (self.__stop_event.is_set() and self.__events_queue.qsize() == 0):
try:
- event: _BaseEvent = self.__events_queue.get(timeout=0.1)
+ event = self.__events_queue.get(timeout=0.1)
except queue.Empty:
self.__process_command(tty, b"\x01\x00\x00\x00\x00") # Ping
else:
- self.__process_command(tty, event.make_command())
+ if not self.__process_command(tty, event.make_command()):
+ self.clear_events()
- except serial.SerialException as err:
- if err.errno == errno.ENOENT:
+ except Exception as err:
+ self.clear_events()
+ if isinstance(err, serial.SerialException) and err.errno == errno.ENOENT: # pylint: disable=no-member
logger.error("Missing HID serial device: %s", self.__device_path)
else:
logger.exception("Unexpected HID error")
-
- except Exception:
- logger.exception("Unexpected HID error")
-
- finally:
time.sleep(1)
def __get_serial(self) -> serial.Serial:
return serial.Serial(self.__device_path, self.__speed, timeout=self.__read_timeout)
- def __process_command(self, tty: serial.Serial, command: bytes) -> None:
- self.__process_request(tty, self.__make_request(command))
+ def __process_command(self, tty: serial.Serial, command: bytes) -> bool:
+ return self.__process_request(tty, self.__make_request(command))
- def __process_request(self, tty: serial.Serial, request: bytes) -> None: # pylint: disable=too-many-branches
+ def __process_request(self, tty: serial.Serial, request: bytes) -> bool: # pylint: disable=too-many-branches
logger = get_logger()
- errors: List[str] = []
- runtime_errors = False
+ error_messages: List[str] = []
+ live_log_errors = False
common_retries = self.__common_retries
read_retries = self.__read_retries
+ error_retval = False
while common_retries and read_retries:
response = self.__send_request(tty, request)
@@ -394,12 +391,12 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst
elif code == 0x40: # CRC Error
raise _TempRequestError(f"Got CRC error of request from HID: request={request!r}")
elif code == 0x45: # Unknown command
- raise _FatalRequestError(f"HID did not recognize the request={request!r}", online=True)
+ raise _PermRequestError(f"HID did not recognize the request={request!r}", online=True)
elif code == 0x24: # Rebooted?
- raise _FatalRequestError("No previous command state inside HID, seems it was rebooted", online=True)
+ raise _PermRequestError("No previous command state inside HID, seems it was rebooted", online=True)
elif code == 0x20: # Done
self.__state_flags.update(online=True)
- return
+ return True
elif code & 0x80: # Pong with leds
self.__state_flags.update(
online=True,
@@ -407,33 +404,34 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst
scroll=bool(code & 0x00000010),
num=bool(code & 0x00000100),
)
- return
- else:
- raise _TempRequestError(f"Invalid response from HID: request={request!r}; code=0x{code:02X}")
+ return True
+ raise _TempRequestError(f"Invalid response from HID: request={request!r}; code=0x{code:02X}")
except _RequestError as err:
common_retries -= 1
self.__state_flags.update(online=err.online)
+ error_retval = err.online
- if runtime_errors:
+ if live_log_errors:
logger.error(err.msg)
else:
- errors.append(err.msg)
- if len(errors) > self.__errors_threshold:
- for msg in errors:
+ error_messages.append(err.msg)
+ if len(error_messages) > self.__errors_threshold:
+ for msg in error_messages:
logger.error(msg)
- errors = []
- runtime_errors = True
+ error_messages = []
+ live_log_errors = True
- if isinstance(err, _FatalRequestError):
+ if isinstance(err, _PermRequestError):
break
if common_retries and read_retries:
time.sleep(self.__retries_delay)
- for msg in errors:
+ for msg in error_messages:
logger.error(msg)
if not (common_retries and read_retries):
logger.error("Can't process HID request due many errors: %r", request)
+ return error_retval
def __send_request(self, tty: serial.Serial, request: bytes) -> bytes:
if not self.__noop:
diff --git a/kvmd/plugins/msd/otg/__init__.py b/kvmd/plugins/msd/otg/__init__.py
index ef925065..f55dcb42 100644
--- a/kvmd/plugins/msd/otg/__init__.py
+++ b/kvmd/plugins/msd/otg/__init__.py
@@ -306,7 +306,7 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes
await self.__remount_storage(rw=True)
self.__set_image_complete(name, False)
self.__new_file_written = 0
- self.__new_file = await aiofiles.open(path, mode="w+b", buffering=0)
+ self.__new_file = await aiofiles.open(path, mode="w+b", buffering=0) # type: ignore
await self.__notifier.notify()
yield
@@ -364,7 +364,7 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes
try:
if self.__new_file:
get_logger().info("Closing new image file ...")
- await self.__new_file.close()
+ await self.__new_file.close() # type: ignore
except Exception:
get_logger().exception("Can't close device file")
finally:
diff --git a/kvmd/plugins/msd/relay.py b/kvmd/plugins/msd/relay.py
index fe1c7784..e1e21ea9 100644
--- a/kvmd/plugins/msd/relay.py
+++ b/kvmd/plugins/msd/relay.py
@@ -355,7 +355,7 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes
if self.__connected:
raise MsdConnectedError()
- self.__device_file = await aiofiles.open(self.__device_info.path, mode="w+b", buffering=0)
+ self.__device_file = await aiofiles.open(self.__device_info.path, mode="w+b", buffering=0) # type: ignore
self.__written = 0
await self.__write_image_info(name, complete=False)
@@ -391,9 +391,9 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes
assert self.__device_file
assert self.__device_info
if self.__device_info.size - self.__written > _IMAGE_INFO_SIZE:
- await self.__device_file.seek(self.__device_info.size - _IMAGE_INFO_SIZE)
+ await self.__device_file.seek(self.__device_info.size - _IMAGE_INFO_SIZE) # type: ignore
await aiofs.afile_write_now(self.__device_file, _make_image_info_bytes(name, self.__written, complete))
- await self.__device_file.seek(0)
+ await self.__device_file.seek(0) # type: ignore
else:
get_logger().error("Can't write image info because device is full")
@@ -401,7 +401,7 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes
try:
if self.__device_file:
get_logger().info("Closing device file ...")
- await self.__device_file.close()
+ await self.__device_file.close() # type: ignore
except Exception:
get_logger().exception("Can't close device file")
finally:
diff --git a/kvmd/tools.py b/kvmd/tools.py
index 6d10e227..555ddc3e 100644
--- a/kvmd/tools.py
+++ b/kvmd/tools.py
@@ -22,6 +22,8 @@
import operator
import functools
+import multiprocessing.queues
+import queue
from typing import Tuple
from typing import List
@@ -53,3 +55,12 @@ _DictValueT = TypeVar("_DictValueT")
def sorted_kvs(dct: Dict[_DictKeyT, _DictValueT]) -> List[Tuple[_DictKeyT, _DictValueT]]:
return sorted(dct.items(), key=operator.itemgetter(0))
+
+
+# =====
+def clear_queue(q: multiprocessing.queues.Queue) -> None: # pylint: disable=invalid-name
+ for _ in range(q.qsize()):
+ try:
+ q.get_nowait()
+ except queue.Empty:
+ break