summaryrefslogtreecommitdiff
path: root/kvmd/apps
diff options
context:
space:
mode:
authorMaxim Devaev <[email protected]>2022-09-04 18:08:40 +0300
committerMaxim Devaev <[email protected]>2022-09-04 18:08:40 +0300
commitee3e224e396494cd0d69bb6167087a071a20349c (patch)
tree5becd28570e58a03c6e1e231d0db24c264a73f88 /kvmd/apps
parent4b75221e9470b4a009955d7677f16adf8e23e302 (diff)
new typing style
Diffstat (limited to 'kvmd/apps')
-rw-r--r--kvmd/apps/__init__.py28
-rw-r--r--kvmd/apps/cleanup/__init__.py5
-rw-r--r--kvmd/apps/edidconf/__init__.py4
-rw-r--r--kvmd/apps/htpasswd/__init__.py4
-rw-r--r--kvmd/apps/ipmi/__init__.py5
-rw-r--r--kvmd/apps/ipmi/auth.py7
-rw-r--r--kvmd/apps/ipmi/server.py21
-rw-r--r--kvmd/apps/janus/__init__.py5
-rw-r--r--kvmd/apps/janus/runner.py18
-rw-r--r--kvmd/apps/janus/stun.py20
-rw-r--r--kvmd/apps/kvmd/__init__.py5
-rw-r--r--kvmd/apps/kvmd/api/export.py5
-rw-r--r--kvmd/apps/kvmd/api/hid.py30
-rw-r--r--kvmd/apps/kvmd/api/info.py4
-rw-r--r--kvmd/apps/kvmd/api/msd.py13
-rw-r--r--kvmd/apps/kvmd/api/streamer.py9
-rw-r--r--kvmd/apps/kvmd/auth.py20
-rw-r--r--kvmd/apps/kvmd/info/__init__.py4
-rw-r--r--kvmd/apps/kvmd/info/auth.py4
-rw-r--r--kvmd/apps/kvmd/info/base.py6
-rw-r--r--kvmd/apps/kvmd/info/extras.py13
-rw-r--r--kvmd/apps/kvmd/info/fan.py12
-rw-r--r--kvmd/apps/kvmd/info/hw.py21
-rw-r--r--kvmd/apps/kvmd/info/meta.py5
-rw-r--r--kvmd/apps/kvmd/info/system.py11
-rw-r--r--kvmd/apps/kvmd/logreader.py5
-rw-r--r--kvmd/apps/kvmd/streamer.py54
-rw-r--r--kvmd/apps/kvmd/sysunit.py14
-rw-r--r--kvmd/apps/kvmd/tesseract.py19
-rw-r--r--kvmd/apps/kvmd/ugpio.py35
-rw-r--r--kvmd/apps/otg/__init__.py8
-rw-r--r--kvmd/apps/otg/hid/keyboard.py4
-rw-r--r--kvmd/apps/otg/hid/mouse.py8
-rw-r--r--kvmd/apps/otgconf/__init__.py7
-rw-r--r--kvmd/apps/otgmsd/__init__.py5
-rw-r--r--kvmd/apps/otgnet/__init__.py25
-rw-r--r--kvmd/apps/otgnet/netctl.py42
-rw-r--r--kvmd/apps/pst/__init__.py5
-rw-r--r--kvmd/apps/pst/server.py7
-rw-r--r--kvmd/apps/pstrun/__init__.py17
-rw-r--r--kvmd/apps/vnc/__init__.py9
-rw-r--r--kvmd/apps/vnc/rfb/__init__.py11
-rw-r--r--kvmd/apps/vnc/rfb/crypto.py4
-rw-r--r--kvmd/apps/vnc/rfb/encodings.py14
-rw-r--r--kvmd/apps/vnc/rfb/stream.py7
-rw-r--r--kvmd/apps/vnc/server.py35
-rw-r--r--kvmd/apps/vnc/vncauth.py9
-rw-r--r--kvmd/apps/watchdog/__init__.py5
48 files changed, 241 insertions, 387 deletions
diff --git a/kvmd/apps/__init__.py b/kvmd/apps/__init__.py
index b03b7893..ffd8daae 100644
--- a/kvmd/apps/__init__.py
+++ b/kvmd/apps/__init__.py
@@ -27,12 +27,6 @@ import argparse
import logging
import logging.config
-from typing import Tuple
-from typing import List
-from typing import Dict
-from typing import Type
-from typing import Optional
-
import pygments
import pygments.lexers.data
import pygments.formatters
@@ -109,14 +103,14 @@ from ..validators.hw import valid_otg_ethernet
# =====
def init(
- prog: Optional[str]=None,
- description: Optional[str]=None,
+ prog: (str | None)=None,
+ description: (str | None)=None,
add_help: bool=True,
check_run: bool=False,
cli_logging: bool=False,
- argv: Optional[List[str]]=None,
+ argv: (list[str] | None)=None,
**load: bool,
-) -> Tuple[argparse.ArgumentParser, List[str], Section]:
+) -> tuple[argparse.ArgumentParser, list[str], Section]:
argv = (argv or sys.argv)
assert len(argv) > 0
@@ -170,10 +164,10 @@ def init(
# =====
-def _init_config(config_path: str, override_options: List[str], **load_flags: bool) -> Section:
+def _init_config(config_path: str, override_options: list[str], **load_flags: bool) -> Section:
config_path = os.path.expanduser(config_path)
try:
- raw_config: Dict = load_yaml_file(config_path)
+ raw_config: dict = load_yaml_file(config_path)
except Exception as err:
raise SystemExit(f"ConfigError: Can't read config file {config_path!r}:\n{tools.efmt(err)}")
if not isinstance(raw_config, dict):
@@ -194,7 +188,7 @@ def _init_config(config_path: str, override_options: List[str], **load_flags: bo
raise SystemExit(f"ConfigError: {err}")
-def _patch_raw(raw_config: Dict) -> None: # pylint: disable=too-many-branches
+def _patch_raw(raw_config: dict) -> None: # pylint: disable=too-many-branches
if isinstance(raw_config.get("otg"), dict):
for (old, new) in [
("msd", "msd"),
@@ -250,9 +244,9 @@ def _patch_raw(raw_config: Dict) -> None: # pylint: disable=too-many-branches
def _patch_dynamic( # pylint: disable=too-many-locals
- raw_config: Dict,
+ raw_config: dict,
config: Section,
- scheme: Dict,
+ scheme: dict,
load_auth: bool=False,
load_hid: bool=False,
load_atx: bool=False,
@@ -279,7 +273,7 @@ def _patch_dynamic( # pylint: disable=too-many-locals
if load_gpio:
driver: str
- drivers: Dict[str, Type[BaseUserGpioDriver]] = {} # Name to drivers
+ drivers: dict[str, type[BaseUserGpioDriver]] = {} # Name to drivers
for (driver, params) in { # type: ignore
"__gpio__": {},
**tools.rget(raw_config, "kvmd", "gpio", "drivers"),
@@ -343,7 +337,7 @@ def _dump_config(config: Section) -> None:
print(dump)
-def _get_config_scheme() -> Dict:
+def _get_config_scheme() -> dict:
return {
"logging": Option({}),
diff --git a/kvmd/apps/cleanup/__init__.py b/kvmd/apps/cleanup/__init__.py
index 3e33d09f..5e83bd9a 100644
--- a/kvmd/apps/cleanup/__init__.py
+++ b/kvmd/apps/cleanup/__init__.py
@@ -24,9 +24,6 @@ import os
import signal
import time
-from typing import List
-from typing import Optional
-
import psutil
from ...logging import get_logger
@@ -74,7 +71,7 @@ def _remove_sockets(config: Section) -> None:
# =====
-def main(argv: Optional[List[str]]=None) -> None:
+def main(argv: (list[str] | None)=None) -> None:
config = init(
prog="kvmd-cleanup",
description="Kill KVMD and clear resources",
diff --git a/kvmd/apps/edidconf/__init__.py b/kvmd/apps/edidconf/__init__.py
index 53edf5fd..e6dcc610 100644
--- a/kvmd/apps/edidconf/__init__.py
+++ b/kvmd/apps/edidconf/__init__.py
@@ -28,11 +28,9 @@ import contextlib
import argparse
import time
-from typing import List
from typing import IO
from typing import Generator
from typing import Callable
-from typing import Optional
from ...validators.basic import valid_bool
from ...validators.basic import valid_int_f0
@@ -180,7 +178,7 @@ def _make_format_hex(size: int) -> Callable[[int], str]:
# =====
-def main(argv: Optional[List[str]]=None) -> None: # pylint: disable=too-many-branches
+def main(argv: (list[str] | None)=None) -> None: # pylint: disable=too-many-branches
# (parent_parser, argv, _) = init(
# add_help=False,
# argv=argv,
diff --git a/kvmd/apps/htpasswd/__init__.py b/kvmd/apps/htpasswd/__init__.py
index c46564dd..dd1213f7 100644
--- a/kvmd/apps/htpasswd/__init__.py
+++ b/kvmd/apps/htpasswd/__init__.py
@@ -28,9 +28,7 @@ import contextlib
import textwrap
import argparse
-from typing import List
from typing import Generator
-from typing import Optional
import passlib.apache
@@ -125,7 +123,7 @@ def _cmd_delete(config: Section, options: argparse.Namespace) -> None:
# =====
-def main(argv: Optional[List[str]]=None) -> None:
+def main(argv: (list[str] | None)=None) -> None:
(parent_parser, argv, config) = init(
add_help=False,
cli_logging=True,
diff --git a/kvmd/apps/ipmi/__init__.py b/kvmd/apps/ipmi/__init__.py
index fc450228..13aa9730 100644
--- a/kvmd/apps/ipmi/__init__.py
+++ b/kvmd/apps/ipmi/__init__.py
@@ -20,9 +20,6 @@
# ========================================================================== #
-from typing import List
-from typing import Optional
-
from ...clients.kvmd import KvmdClient
from ... import htclient
@@ -34,7 +31,7 @@ from .server import IpmiServer
# =====
-def main(argv: Optional[List[str]]=None) -> None:
+def main(argv: (list[str] | None)=None) -> None:
config = init(
prog="kvmd-ipmi",
description="IPMI to KVMD proxy",
diff --git a/kvmd/apps/ipmi/auth.py b/kvmd/apps/ipmi/auth.py
index 0ad3d208..2e495af8 100644
--- a/kvmd/apps/ipmi/auth.py
+++ b/kvmd/apps/ipmi/auth.py
@@ -22,9 +22,6 @@
import dataclasses
-from typing import List
-from typing import Dict
-
# =====
class IpmiPasswdError(Exception):
@@ -55,8 +52,8 @@ class IpmiAuthManager:
def get_credentials(self, ipmi_user: str) -> IpmiUserCredentials:
return self.__credentials[ipmi_user]
- def __parse_passwd_file(self, lines: List[str]) -> Dict[str, IpmiUserCredentials]:
- credentials: Dict[str, IpmiUserCredentials] = {}
+ def __parse_passwd_file(self, lines: list[str]) -> dict[str, IpmiUserCredentials]:
+ credentials: dict[str, IpmiUserCredentials] = {}
for (lineno, line) in enumerate(lines):
if len(line.strip()) == 0 or line.lstrip().startswith("#"):
continue
diff --git a/kvmd/apps/ipmi/server.py b/kvmd/apps/ipmi/server.py
index 597e6dce..87265607 100644
--- a/kvmd/apps/ipmi/server.py
+++ b/kvmd/apps/ipmi/server.py
@@ -28,9 +28,6 @@ import multiprocessing
import functools
import queue
-from typing import Dict
-from typing import Optional
-
import aiohttp
import serial
@@ -83,8 +80,8 @@ class IpmiServer(BaseIpmiServer): # pylint: disable=too-many-instance-attribute
self.__sol_proxy_port = (sol_proxy_port or port)
self.__sol_lock = threading.Lock()
- self.__sol_console: Optional[IpmiConsole] = None
- self.__sol_thread: Optional[threading.Thread] = None
+ self.__sol_console: (IpmiConsole | None) = None
+ self.__sol_thread: (threading.Thread | None) = None
self.__sol_stop = False
def run(self) -> None:
@@ -100,7 +97,7 @@ class IpmiServer(BaseIpmiServer): # pylint: disable=too-many-instance-attribute
# =====
- def handle_raw_request(self, request: Dict, session: IpmiServerSession) -> None:
+ def handle_raw_request(self, request: dict, session: IpmiServerSession) -> None:
handler = {
(6, 1): (lambda _, session: self.send_device_id(session)), # Get device ID
(6, 7): self.__get_power_state_handler, # Power state
@@ -125,13 +122,13 @@ class IpmiServer(BaseIpmiServer): # pylint: disable=too-many-instance-attribute
# =====
- def __get_power_state_handler(self, _: Dict, session: IpmiServerSession) -> None:
+ def __get_power_state_handler(self, _: dict, session: IpmiServerSession) -> None:
# https://github.com/arcress0/ipmiutil/blob/e2f6e95127d22e555f959f136d9bb9543c763896/util/ireset.c#L654
result = self.__make_request(session, "atx.get_state() [power]", "atx.get_state")
data = [(0 if result["leds"]["power"] else 5)]
session.send_ipmi_response(data=data)
- def __get_selftest_status_handler(self, _: Dict, session: IpmiServerSession) -> None:
+ def __get_selftest_status_handler(self, _: dict, session: IpmiServerSession) -> None:
# https://github.com/arcress0/ipmiutil/blob/e2f6e95127d22e555f959f136d9bb9543c763896/util/ihealth.c#L858
data = [0x0055]
try:
@@ -140,12 +137,12 @@ class IpmiServer(BaseIpmiServer): # pylint: disable=too-many-instance-attribute
data = [0]
session.send_ipmi_response(data=data)
- def __get_chassis_status_handler(self, _: Dict, session: IpmiServerSession) -> None:
+ def __get_chassis_status_handler(self, _: dict, session: IpmiServerSession) -> None:
result = self.__make_request(session, "atx.get_state() [chassis]", "atx.get_state")
data = [int(result["leds"]["power"]), 0, 0]
session.send_ipmi_response(data=data)
- def __chassis_control_handler(self, request: Dict, session: IpmiServerSession) -> None:
+ def __chassis_control_handler(self, request: dict, session: IpmiServerSession) -> None:
action = {
0: "off_hard",
1: "on",
@@ -179,7 +176,7 @@ class IpmiServer(BaseIpmiServer): # pylint: disable=too-many-instance-attribute
# =====
- def __activate_sol_handler(self, _: Dict, session: IpmiServerSession) -> None:
+ def __activate_sol_handler(self, _: dict, session: IpmiServerSession) -> None:
with self.__sol_lock:
if not self.__sol_device_path:
session.send_ipmi_response(code=0x81) # SOL disabled
@@ -198,7 +195,7 @@ class IpmiServer(BaseIpmiServer): # pylint: disable=too-many-instance-attribute
])
self.__start_sol_worker(session)
- def __deactivate_sol_handler(self, _: Dict, session: IpmiServerSession) -> None:
+ def __deactivate_sol_handler(self, _: dict, session: IpmiServerSession) -> None:
with self.__sol_lock:
if not self.__sol_device_path:
session.send_ipmi_response(code=0x81)
diff --git a/kvmd/apps/janus/__init__.py b/kvmd/apps/janus/__init__.py
index fc67aeb6..d8945e57 100644
--- a/kvmd/apps/janus/__init__.py
+++ b/kvmd/apps/janus/__init__.py
@@ -20,16 +20,13 @@
# ========================================================================== #
-from typing import List
-from typing import Optional
-
from .. import init
from .runner import JanusRunner
# =====
-def main(argv: Optional[List[str]]=None) -> None:
+def main(argv: (list[str] | None)=None) -> None:
config = init(
prog="kvmd-Janus",
description="Janus WebRTC Gateway Runner",
diff --git a/kvmd/apps/janus/runner.py b/kvmd/apps/janus/runner.py
index 799fd94c..c2d01dad 100644
--- a/kvmd/apps/janus/runner.py
+++ b/kvmd/apps/janus/runner.py
@@ -3,10 +3,6 @@ import asyncio.subprocess
import socket
import dataclasses
-from typing import Tuple
-from typing import List
-from typing import Optional
-
import netifaces
from ... import tools
@@ -42,9 +38,9 @@ class JanusRunner: # pylint: disable=too-many-instance-attributes
check_retries: int,
check_retries_delay: float,
- cmd: List[str],
- cmd_remove: List[str],
- cmd_append: List[str],
+ cmd: list[str],
+ cmd_remove: list[str],
+ cmd_append: list[str],
) -> None:
self.__stun = Stun(stun_host, stun_port, stun_timeout, stun_retries, stun_retries_delay)
@@ -55,8 +51,8 @@ class JanusRunner: # pylint: disable=too-many-instance-attributes
self.__cmd = tools.build_cmd(cmd, cmd_remove, cmd_append)
- self.__janus_task: Optional[asyncio.Task] = None
- self.__janus_proc: Optional[asyncio.subprocess.Process] = None # pylint: disable=no-member
+ self.__janus_task: (asyncio.Task | None) = None
+ self.__janus_proc: (asyncio.subprocess.Process | None) = None # pylint: disable=no-member
def run(self) -> None:
logger = get_logger(0)
@@ -68,7 +64,7 @@ class JanusRunner: # pylint: disable=too-many-instance-attributes
async def __run(self) -> None:
logger = get_logger(0)
- prev_netcfg: Optional[_Netcfg] = None
+ prev_netcfg: (_Netcfg | None) = None
while True:
retry = 0
netcfg = _Netcfg()
@@ -117,7 +113,7 @@ class JanusRunner: # pylint: disable=too-many-instance-attributes
get_logger().error("Can't get default IP: %s", tools.efmt(err))
return ""
- async def __get_stun_info(self, src_ip: str) -> Tuple[Stun, Tuple[str, str]]:
+ async def __get_stun_info(self, src_ip: str) -> tuple[Stun, tuple[str, str]]:
try:
return (self.__stun, (await self.__stun.get_info(src_ip, 0)))
except Exception as err:
diff --git a/kvmd/apps/janus/stun.py b/kvmd/apps/janus/stun.py
index dc41b961..1233d7ea 100644
--- a/kvmd/apps/janus/stun.py
+++ b/kvmd/apps/janus/stun.py
@@ -5,10 +5,6 @@ import struct
import secrets
import dataclasses
-from typing import Tuple
-from typing import Dict
-from typing import Optional
-
from ... import tools
from ... import aiotools
@@ -25,9 +21,9 @@ class StunAddress:
@dataclasses.dataclass(frozen=True)
class StunResponse:
ok: bool
- ext: Optional[StunAddress] = dataclasses.field(default=None)
- src: Optional[StunAddress] = dataclasses.field(default=None)
- changed: Optional[StunAddress] = dataclasses.field(default=None)
+ ext: (StunAddress | None) = dataclasses.field(default=None)
+ src: (StunAddress | None) = dataclasses.field(default=None)
+ changed: (StunAddress | None) = dataclasses.field(default=None)
class StunNatType:
@@ -60,9 +56,9 @@ class Stun:
self.__retries = retries
self.__retries_delay = retries_delay
- self.__sock: Optional[socket.socket] = None
+ self.__sock: (socket.socket | None) = None
- async def get_info(self, src_ip: str, src_port: int) -> Tuple[str, str]:
+ async def get_info(self, src_ip: str, src_port: int) -> tuple[str, str]:
(family, _, _, _, addr) = socket.getaddrinfo(src_ip, src_port, type=socket.SOCK_DGRAM)[0]
try:
with socket.socket(family, socket.SOCK_DGRAM) as self.__sock:
@@ -74,7 +70,7 @@ class Stun:
finally:
self.__sock = None
- async def __get_nat_type(self, src_ip: str) -> Tuple[str, StunResponse]: # pylint: disable=too-many-return-statements
+ async def __get_nat_type(self, src_ip: str) -> tuple[str, StunResponse]: # pylint: disable=too-many-return-statements
first = await self.__make_request("First probe")
if not first.ok:
return (StunNatType.BLOCKED, first)
@@ -128,7 +124,7 @@ class Stun:
ctx, self.__retries, error)
return StunResponse(ok=False)
- parsed: Dict[str, StunAddress] = {}
+ parsed: dict[str, StunAddress] = {}
offset = 0
remaining = len(response)
while remaining > 0:
@@ -146,7 +142,7 @@ class Stun:
remaining -= (4 + attr_len)
return StunResponse(ok=True, **parsed)
- async def __inner_make_request(self, trans_id: bytes, request: bytes, host: str, port: int) -> Tuple[bytes, str]:
+ async def __inner_make_request(self, trans_id: bytes, request: bytes, host: str, port: int) -> tuple[bytes, str]:
assert self.__sock is not None
request = struct.pack(">HH", 0x0001, len(request)) + trans_id + request # Bind Request
diff --git a/kvmd/apps/kvmd/__init__.py b/kvmd/apps/kvmd/__init__.py
index f7536577..5249bdd7 100644
--- a/kvmd/apps/kvmd/__init__.py
+++ b/kvmd/apps/kvmd/__init__.py
@@ -20,9 +20,6 @@
# ========================================================================== #
-from typing import List
-from typing import Optional
-
from ...logging import get_logger
from ...plugins.hid import get_hid_class
@@ -42,7 +39,7 @@ from .server import KvmdServer
# =====
-def main(argv: Optional[List[str]]=None) -> None:
+def main(argv: (list[str] | None)=None) -> None:
config = init(
prog="kvmd",
description="The main PiKVM daemon",
diff --git a/kvmd/apps/kvmd/api/export.py b/kvmd/apps/kvmd/api/export.py
index ca367f30..384fe903 100644
--- a/kvmd/apps/kvmd/api/export.py
+++ b/kvmd/apps/kvmd/api/export.py
@@ -23,7 +23,6 @@
import asyncio
from typing import Any
-from typing import List
from aiohttp.web import Request
from aiohttp.web import Response
@@ -56,7 +55,7 @@ class ExportApi:
self.__info_manager.get_submanager("fan").get_state(),
self.__user_gpio.get_state(),
])
- rows: List[str] = []
+ rows: list[str] = []
self.__append_prometheus_rows(rows, atx_state["enabled"], "pikvm_atx_enabled")
self.__append_prometheus_rows(rows, atx_state["leds"]["power"], "pikvm_atx_power")
@@ -71,7 +70,7 @@ class ExportApi:
return Response(text="\n".join(rows))
- def __append_prometheus_rows(self, rows: List[str], value: Any, path: str) -> None:
+ def __append_prometheus_rows(self, rows: list[str], value: Any, path: str) -> None:
if isinstance(value, bool):
value = int(value)
if isinstance(value, (int, float)):
diff --git a/kvmd/apps/kvmd/api/hid.py b/kvmd/apps/kvmd/api/hid.py
index 93658e88..6d993d4a 100644
--- a/kvmd/apps/kvmd/api/hid.py
+++ b/kvmd/apps/kvmd/api/hid.py
@@ -24,10 +24,6 @@ import os
import stat
import functools
-from typing import Tuple
-from typing import List
-from typing import Dict
-from typing import Set
from typing import Callable
from aiohttp.web import Request
@@ -64,10 +60,10 @@ class HidApi:
hid: BaseHid,
keymap_path: str,
- ignore_keys: List[str],
+ ignore_keys: list[str],
- mouse_x_range: Tuple[int, int],
- mouse_y_range: Tuple[int, int],
+ mouse_x_range: tuple[int, int],
+ mouse_y_range: tuple[int, int],
) -> None:
self.__hid = hid
@@ -112,8 +108,8 @@ class HidApi:
# =====
- async def get_keymaps(self) -> Dict: # Ugly hack to generate hid_keymaps_state (see server.py)
- keymaps: Set[str] = set()
+ async def get_keymaps(self) -> dict: # Ugly hack to generate hid_keymaps_state (see server.py)
+ keymaps: set[str] = set()
for keymap_name in os.listdir(self.__keymaps_dir_path):
path = os.path.join(self.__keymaps_dir_path, keymap_name)
if os.access(path, os.R_OK) and stat.S_ISREG(os.stat(path).st_mode):
@@ -139,7 +135,7 @@ class HidApi:
self.__hid.send_key_events(text_to_web_keys(text, symmap))
return make_json_response()
- def __ensure_symmap(self, keymap_name: str) -> Dict[int, Dict[int, str]]:
+ def __ensure_symmap(self, keymap_name: str) -> dict[int, dict[int, str]]:
keymap_name = valid_printable_filename(keymap_name, "keymap")
path = os.path.join(self.__keymaps_dir_path, keymap_name)
try:
@@ -151,14 +147,14 @@ class HidApi:
return self.__inner_ensure_symmap(path, st.st_mtime)
@functools.lru_cache(maxsize=10)
- def __inner_ensure_symmap(self, path: str, mtime: int) -> Dict[int, Dict[int, str]]:
+ def __inner_ensure_symmap(self, path: str, mtime: int) -> dict[int, dict[int, str]]:
_ = mtime # For LRU
return build_symmap(path)
# =====
@exposed_ws("key")
- async def __ws_key_handler(self, _: WsSession, event: Dict) -> None:
+ async def __ws_key_handler(self, _: WsSession, event: dict) -> None:
try:
key = valid_hid_key(event["key"])
state = valid_bool(event["state"])
@@ -168,7 +164,7 @@ class HidApi:
self.__hid.send_key_events([(key, state)])
@exposed_ws("mouse_button")
- async def __ws_mouse_button_handler(self, _: WsSession, event: Dict) -> None:
+ async def __ws_mouse_button_handler(self, _: WsSession, event: dict) -> None:
try:
button = valid_hid_mouse_button(event["button"])
state = valid_bool(event["state"])
@@ -177,7 +173,7 @@ class HidApi:
self.__hid.send_mouse_button_event(button, state)
@exposed_ws("mouse_move")
- async def __ws_mouse_move_handler(self, _: WsSession, event: Dict) -> None:
+ async def __ws_mouse_move_handler(self, _: WsSession, event: dict) -> None:
try:
to_x = valid_hid_mouse_move(event["to"]["x"])
to_y = valid_hid_mouse_move(event["to"]["y"])
@@ -186,14 +182,14 @@ class HidApi:
self.__send_mouse_move_event_remapped(to_x, to_y)
@exposed_ws("mouse_relative")
- async def __ws_mouse_relative_handler(self, _: WsSession, event: Dict) -> None:
+ async def __ws_mouse_relative_handler(self, _: WsSession, event: dict) -> None:
self.__process_delta_ws_request(event, self.__hid.send_mouse_relative_event)
@exposed_ws("mouse_wheel")
- async def __ws_mouse_wheel_handler(self, _: WsSession, event: Dict) -> None:
+ async def __ws_mouse_wheel_handler(self, _: WsSession, event: dict) -> None:
self.__process_delta_ws_request(event, self.__hid.send_mouse_wheel_event)
- def __process_delta_ws_request(self, event: Dict, handler: Callable[[int, int], None]) -> None:
+ def __process_delta_ws_request(self, event: dict, handler: Callable[[int, int], None]) -> None:
try:
raw_delta = event["delta"]
deltas = [
diff --git a/kvmd/apps/kvmd/api/info.py b/kvmd/apps/kvmd/api/info.py
index bb7caf79..5517ced0 100644
--- a/kvmd/apps/kvmd/api/info.py
+++ b/kvmd/apps/kvmd/api/info.py
@@ -22,8 +22,6 @@
import asyncio
-from typing import List
-
from aiohttp.web import Request
from aiohttp.web import Response
@@ -51,7 +49,7 @@ class InfoApi:
])))
return make_json_response(results)
- def __valid_info_fields(self, request: Request) -> List[str]:
+ def __valid_info_fields(self, request: Request) -> list[str]:
subs = self.__info_manager.get_subs()
return sorted(valid_info_fields(
arg=request.query.get("fields", ",".join(subs)),
diff --git a/kvmd/apps/kvmd/api/msd.py b/kvmd/apps/kvmd/api/msd.py
index 18d3ba2c..16118141 100644
--- a/kvmd/apps/kvmd/api/msd.py
+++ b/kvmd/apps/kvmd/api/msd.py
@@ -23,10 +23,7 @@
import lzma
import time
-from typing import Dict
from typing import AsyncGenerator
-from typing import Optional
-from typing import Union
import aiohttp
import zstandard
@@ -153,7 +150,7 @@ class MsdApi:
return make_json_response(self.__make_write_info(name, size, written))
@exposed_http("POST", "/msd/write_remote")
- async def __write_remote_handler(self, request: Request) -> Union[Response, StreamResponse]: # pylint: disable=too-many-locals
+ async def __write_remote_handler(self, request: Request) -> (Response | StreamResponse): # pylint: disable=too-many-locals
url = valid_url(request.query.get("url"))
insecure = valid_bool(request.query.get("insecure", False))
timeout = valid_float_f01(request.query.get("timeout", 10.0))
@@ -161,7 +158,7 @@ class MsdApi:
name = ""
size = written = 0
- response: Optional[StreamResponse] = None
+ response: (StreamResponse | None) = None
async def stream_write_info() -> None:
assert response is not None
@@ -206,11 +203,11 @@ class MsdApi:
return make_json_exception(err, 400)
raise
- def __get_remove_incomplete(self, request: Request) -> Optional[bool]:
- flag: Optional[str] = request.query.get("remove_incomplete")
+ def __get_remove_incomplete(self, request: Request) -> (bool | None):
+ flag: (str | None) = request.query.get("remove_incomplete")
return (valid_bool(flag) if flag is not None else None)
- def __make_write_info(self, name: str, size: int, written: int) -> Dict:
+ def __make_write_info(self, name: str, size: int, written: int) -> dict:
return {"image": {"name": name, "size": size, "written": written}}
# =====
diff --git a/kvmd/apps/kvmd/api/streamer.py b/kvmd/apps/kvmd/api/streamer.py
index 401795ce..3030a925 100644
--- a/kvmd/apps/kvmd/api/streamer.py
+++ b/kvmd/apps/kvmd/api/streamer.py
@@ -20,9 +20,6 @@
# ========================================================================== #
-from typing import List
-from typing import Dict
-
from aiohttp.web import Request
from aiohttp.web import Response
@@ -102,10 +99,10 @@ class StreamerApi:
# =====
- async def get_ocr(self) -> Dict: # XXX: Ugly hack
+ async def get_ocr(self) -> dict: # XXX: Ugly hack
enabled = self.__ocr.is_available()
- default: List[str] = []
- available: List[str] = []
+ default: list[str] = []
+ available: list[str] = []
if enabled:
default = self.__ocr.get_default_langs()
available = self.__ocr.get_available_langs()
diff --git a/kvmd/apps/kvmd/auth.py b/kvmd/apps/kvmd/auth.py
index 0f437a6b..cfc2fb6a 100644
--- a/kvmd/apps/kvmd/auth.py
+++ b/kvmd/apps/kvmd/auth.py
@@ -22,10 +22,6 @@
import secrets
-from typing import List
-from typing import Dict
-from typing import Optional
-
from ...logging import get_logger
from ... import aiotools
@@ -40,12 +36,12 @@ class AuthManager:
self,
internal_type: str,
- internal_kwargs: Dict,
+ internal_kwargs: dict,
external_type: str,
- external_kwargs: Dict,
+ external_kwargs: dict,
- force_internal_users: List[str],
+ force_internal_users: list[str],
enabled: bool,
) -> None:
@@ -53,19 +49,19 @@ class AuthManager:
if not enabled:
get_logger().warning("AUTHORIZATION IS DISABLED")
- self.__internal_service: Optional[BaseAuthService] = None
+ self.__internal_service: (BaseAuthService | None) = None
if enabled:
self.__internal_service = get_auth_service_class(internal_type)(**internal_kwargs)
get_logger().info("Using internal auth service %r", self.__internal_service.get_plugin_name())
- self.__external_service: Optional[BaseAuthService] = None
+ self.__external_service: (BaseAuthService | None) = None
if enabled and external_type:
self.__external_service = get_auth_service_class(external_type)(**external_kwargs)
get_logger().info("Using external auth service %r", self.__external_service.get_plugin_name())
self.__force_internal_users = force_internal_users
- self.__tokens: Dict[str, str] = {} # {token: user}
+ self.__tokens: dict[str, str] = {} # {token: user}
def is_auth_enabled(self) -> bool:
return self.__enabled
@@ -88,7 +84,7 @@ class AuthManager:
get_logger().error("Got access denied for user %r from auth service %r", user, service.get_plugin_name())
return ok
- async def login(self, user: str, passwd: str) -> Optional[str]:
+ async def login(self, user: str, passwd: str) -> (str | None):
assert user == user.strip()
assert user
assert self.__enabled
@@ -109,7 +105,7 @@ class AuthManager:
if user:
get_logger().info("Logged out user %r", user)
- def check(self, token: str) -> Optional[str]:
+ def check(self, token: str) -> (str | None):
assert self.__enabled
return self.__tokens.get(token)
diff --git a/kvmd/apps/kvmd/info/__init__.py b/kvmd/apps/kvmd/info/__init__.py
index f8871d72..032dbb2d 100644
--- a/kvmd/apps/kvmd/info/__init__.py
+++ b/kvmd/apps/kvmd/info/__init__.py
@@ -20,8 +20,6 @@
# ========================================================================== #
-from typing import Set
-
from ....yamlconf import Section
from .base import BaseInfoSubmanager
@@ -45,7 +43,7 @@ class InfoManager:
"fan": FanInfoSubmanager(**config.kvmd.info.fan._unpack()),
}
- def get_subs(self) -> Set[str]:
+ def get_subs(self) -> set[str]:
return set(self.__subs)
def get_submanager(self, name: str) -> BaseInfoSubmanager:
diff --git a/kvmd/apps/kvmd/info/auth.py b/kvmd/apps/kvmd/info/auth.py
index bf1c6160..6aef3348 100644
--- a/kvmd/apps/kvmd/info/auth.py
+++ b/kvmd/apps/kvmd/info/auth.py
@@ -20,8 +20,6 @@
# ========================================================================== #
-from typing import Dict
-
from .base import BaseInfoSubmanager
@@ -30,5 +28,5 @@ class AuthInfoSubmanager(BaseInfoSubmanager):
def __init__(self, enabled: bool) -> None:
self.__enabled = enabled
- async def get_state(self) -> Dict:
+ async def get_state(self) -> dict:
return {"enabled": self.__enabled}
diff --git a/kvmd/apps/kvmd/info/base.py b/kvmd/apps/kvmd/info/base.py
index ae447cf5..51dbda18 100644
--- a/kvmd/apps/kvmd/info/base.py
+++ b/kvmd/apps/kvmd/info/base.py
@@ -20,11 +20,7 @@
# ========================================================================== #
-from typing import Dict
-from typing import Optional
-
-
# =====
class BaseInfoSubmanager:
- async def get_state(self) -> Optional[Dict]:
+ async def get_state(self) -> (dict | None):
raise NotImplementedError
diff --git a/kvmd/apps/kvmd/info/extras.py b/kvmd/apps/kvmd/info/extras.py
index 9e70d05c..07a5eabb 100644
--- a/kvmd/apps/kvmd/info/extras.py
+++ b/kvmd/apps/kvmd/info/extras.py
@@ -24,9 +24,6 @@ import os
import re
import asyncio
-from typing import Dict
-from typing import Optional
-
from ....logging import get_logger
from ....yamlconf import Section
@@ -45,7 +42,7 @@ class ExtrasInfoSubmanager(BaseInfoSubmanager):
def __init__(self, global_config: Section) -> None:
self.__global_config = global_config
- async def get_state(self) -> Optional[Dict]:
+ async def get_state(self) -> (dict | None):
try:
sui = sysunit.SystemdUnitInfo()
await sui.open()
@@ -53,7 +50,7 @@ class ExtrasInfoSubmanager(BaseInfoSubmanager):
get_logger(0).error("Can't open systemd bus to get extras state: %s", tools.efmt(err))
sui = None
try:
- extras: Dict[str, Dict] = {}
+ extras: dict[str, dict] = {}
for extra in (await asyncio.gather(*[
self.__read_extra(sui, name)
for name in os.listdir(self.__get_extras_path())
@@ -71,7 +68,7 @@ class ExtrasInfoSubmanager(BaseInfoSubmanager):
def __get_extras_path(self, *parts: str) -> str:
return os.path.join(self.__global_config.kvmd.info.extras, *parts)
- async def __read_extra(self, sui: Optional[sysunit.SystemdUnitInfo], name: str) -> Dict:
+ async def __read_extra(self, sui: (sysunit.SystemdUnitInfo | None), name: str) -> dict:
try:
extra = await aiotools.run_async(load_yaml_file, self.__get_extras_path(name, "manifest.yaml"))
await self.__rewrite_app_daemon(sui, extra)
@@ -81,7 +78,7 @@ class ExtrasInfoSubmanager(BaseInfoSubmanager):
get_logger(0).exception("Can't read extra %r", name)
return {}
- async def __rewrite_app_daemon(self, sui: Optional[sysunit.SystemdUnitInfo], extra: Dict) -> None:
+ async def __rewrite_app_daemon(self, sui: (sysunit.SystemdUnitInfo | None), extra: dict) -> None:
daemon = extra.get("daemon", "")
if isinstance(daemon, str) and daemon.strip():
extra["enabled"] = extra["started"] = False
@@ -91,7 +88,7 @@ class ExtrasInfoSubmanager(BaseInfoSubmanager):
except Exception as err:
get_logger(0).error("Can't get info about the service %r: %s", daemon, tools.efmt(err))
- def __rewrite_app_port(self, extra: Dict) -> None:
+ def __rewrite_app_port(self, extra: dict) -> None:
port_path = extra.get("port", "")
if isinstance(port_path, str) and port_path.strip():
extra["port"] = 0
diff --git a/kvmd/apps/kvmd/info/fan.py b/kvmd/apps/kvmd/info/fan.py
index afa7afce..7cff7217 100644
--- a/kvmd/apps/kvmd/info/fan.py
+++ b/kvmd/apps/kvmd/info/fan.py
@@ -23,9 +23,7 @@
import copy
import asyncio
-from typing import Dict
from typing import AsyncGenerator
-from typing import Optional
import aiohttp
@@ -55,15 +53,15 @@ class FanInfoSubmanager(BaseInfoSubmanager):
self.__timeout = timeout
self.__state_poll = state_poll
- async def get_state(self) -> Dict:
+ async def get_state(self) -> dict:
monitored = await self.__get_monitored()
return {
"monitored": monitored,
"state": ((await self.__get_fan_state() if monitored else None)),
}
- async def poll_state(self) -> AsyncGenerator[Dict, None]:
- prev_state: Dict = {}
+ async def poll_state(self) -> AsyncGenerator[dict, None]:
+ prev_state: dict = {}
while True:
if self.__unix_path:
pure = state = await self.get_state()
@@ -93,7 +91,7 @@ class FanInfoSubmanager(BaseInfoSubmanager):
get_logger(0).error("Can't get info about the service %r: %s", self.__daemon, tools.efmt(err))
return False
- async def __get_fan_state(self) -> Optional[Dict]:
+ async def __get_fan_state(self) -> (dict | None):
try:
async with self.__make_http_session() as session:
async with session.get("http://localhost/state") as response:
@@ -104,7 +102,7 @@ class FanInfoSubmanager(BaseInfoSubmanager):
return None
def __make_http_session(self) -> aiohttp.ClientSession:
- kwargs: Dict = {
+ kwargs: dict = {
"headers": {
"User-Agent": htclient.make_user_agent("KVMD"),
},
diff --git a/kvmd/apps/kvmd/info/hw.py b/kvmd/apps/kvmd/info/hw.py
index a99fb09d..86efd761 100644
--- a/kvmd/apps/kvmd/info/hw.py
+++ b/kvmd/apps/kvmd/info/hw.py
@@ -23,12 +23,9 @@
import os
import asyncio
-from typing import List
-from typing import Dict
from typing import Callable
from typing import AsyncGenerator
from typing import TypeVar
-from typing import Optional
from ....logging import get_logger
@@ -48,16 +45,16 @@ _RetvalT = TypeVar("_RetvalT")
class HwInfoSubmanager(BaseInfoSubmanager):
def __init__(
self,
- vcgencmd_cmd: List[str],
+ vcgencmd_cmd: list[str],
state_poll: float,
) -> None:
self.__vcgencmd_cmd = vcgencmd_cmd
self.__state_poll = state_poll
- self.__dt_cache: Dict[str, str] = {}
+ self.__dt_cache: dict[str, str] = {}
- async def get_state(self) -> Dict:
+ async def get_state(self) -> dict:
(model, serial, cpu_temp, throttling) = await asyncio.gather(
self.__read_dt_file("model"),
self.__read_dt_file("serial-number"),
@@ -78,8 +75,8 @@ class HwInfoSubmanager(BaseInfoSubmanager):
},
}
- async def poll_state(self) -> AsyncGenerator[Dict, None]:
- prev_state: Dict = {}
+ async def poll_state(self) -> AsyncGenerator[dict, None]:
+ prev_state: dict = {}
while True:
state = await self.get_state()
if state != prev_state:
@@ -89,7 +86,7 @@ class HwInfoSubmanager(BaseInfoSubmanager):
# =====
- async def __read_dt_file(self, name: str) -> Optional[str]:
+ async def __read_dt_file(self, name: str) -> (str | None):
if name not in self.__dt_cache:
path = os.path.join(f"{env.PROCFS_PREFIX}/proc/device-tree", name)
try:
@@ -99,7 +96,7 @@ class HwInfoSubmanager(BaseInfoSubmanager):
return None
return self.__dt_cache[name]
- async def __get_cpu_temp(self) -> Optional[float]:
+ async def __get_cpu_temp(self) -> (float | None):
temp_path = f"{env.SYSFS_PREFIX}/sys/class/thermal/thermal_zone0/temp"
try:
return int((await aiofs.read(temp_path)).strip()) / 1000
@@ -107,7 +104,7 @@ class HwInfoSubmanager(BaseInfoSubmanager):
get_logger(0).error("Can't read CPU temp from %s: %s", temp_path, err)
return None
- async def __get_throttling(self) -> Optional[Dict]:
+ async def __get_throttling(self) -> (dict | None):
# https://www.raspberrypi.org/forums/viewtopic.php?f=63&t=147781&start=50#p972790
flags = await self.__parse_vcgencmd(
arg="get_throttled",
@@ -133,7 +130,7 @@ class HwInfoSubmanager(BaseInfoSubmanager):
}
return None
- async def __parse_vcgencmd(self, arg: str, parser: Callable[[str], _RetvalT]) -> Optional[_RetvalT]:
+ async def __parse_vcgencmd(self, arg: str, parser: Callable[[str], _RetvalT]) -> (_RetvalT | None):
cmd = [*self.__vcgencmd_cmd, arg]
try:
text = (await aioproc.read_process(cmd, err_to_null=True))[1]
diff --git a/kvmd/apps/kvmd/info/meta.py b/kvmd/apps/kvmd/info/meta.py
index 86a9be26..6a92d7ea 100644
--- a/kvmd/apps/kvmd/info/meta.py
+++ b/kvmd/apps/kvmd/info/meta.py
@@ -20,9 +20,6 @@
# ========================================================================== #
-from typing import Dict
-from typing import Optional
-
from ....logging import get_logger
from ....yamlconf.loader import load_yaml_file
@@ -37,7 +34,7 @@ class MetaInfoSubmanager(BaseInfoSubmanager):
def __init__(self, meta_path: str) -> None:
self.__meta_path = meta_path
- async def get_state(self) -> Optional[Dict]:
+ async def get_state(self) -> (dict | None):
try:
return ((await aiotools.run_async(load_yaml_file, self.__meta_path)) or {})
except Exception:
diff --git a/kvmd/apps/kvmd/info/system.py b/kvmd/apps/kvmd/info/system.py
index 5c46a5c6..9037a0c2 100644
--- a/kvmd/apps/kvmd/info/system.py
+++ b/kvmd/apps/kvmd/info/system.py
@@ -24,9 +24,6 @@ import os
import asyncio
import platform
-from typing import List
-from typing import Dict
-
from ....logging import get_logger
from .... import aioproc
@@ -38,10 +35,10 @@ from .base import BaseInfoSubmanager
# =====
class SystemInfoSubmanager(BaseInfoSubmanager):
- def __init__(self, streamer_cmd: List[str]) -> None:
+ def __init__(self, streamer_cmd: list[str]) -> None:
self.__streamer_cmd = streamer_cmd
- async def get_state(self) -> Dict:
+ async def get_state(self) -> dict:
streamer_info = await self.__get_streamer_info()
uname_info = platform.uname() # Uname using the internal cache
return {
@@ -55,9 +52,9 @@ class SystemInfoSubmanager(BaseInfoSubmanager):
# =====
- async def __get_streamer_info(self) -> Dict:
+ async def __get_streamer_info(self) -> dict:
version = ""
- features: Dict[str, bool] = {}
+ features: dict[str, bool] = {}
try:
path = self.__streamer_cmd[0]
((_, version), (_, features_text)) = await asyncio.gather(
diff --git a/kvmd/apps/kvmd/logreader.py b/kvmd/apps/kvmd/logreader.py
index dfe2125a..cdd46744 100644
--- a/kvmd/apps/kvmd/logreader.py
+++ b/kvmd/apps/kvmd/logreader.py
@@ -24,7 +24,6 @@ import re
import asyncio
import time
-from typing import Dict
from typing import AsyncGenerator
import systemd.journal
@@ -32,7 +31,7 @@ import systemd.journal
# =====
class LogReader:
- async def poll_log(self, seek: int, follow: bool) -> AsyncGenerator[Dict, None]:
+ async def poll_log(self, seek: int, follow: bool) -> AsyncGenerator[dict, None]:
reader = systemd.journal.Reader()
reader.this_boot()
reader.this_machine()
@@ -59,7 +58,7 @@ class LogReader:
else:
await asyncio.sleep(1)
- def __entry_to_record(self, entry: Dict) -> Dict[str, Dict]:
+ def __entry_to_record(self, entry: dict) -> dict[str, dict]:
return {
"dt": entry["__REALTIME_TIMESTAMP"],
"service": entry["_SYSTEMD_UNIT"],
diff --git a/kvmd/apps/kvmd/streamer.py b/kvmd/apps/kvmd/streamer.py
index fb209033..096737c8 100644
--- a/kvmd/apps/kvmd/streamer.py
+++ b/kvmd/apps/kvmd/streamer.py
@@ -27,11 +27,7 @@ import asyncio.subprocess
import dataclasses
import functools
-from typing import Tuple
-from typing import List
-from typing import Dict
from typing import AsyncGenerator
-from typing import Optional
from typing import Any
import aiohttp
@@ -53,7 +49,7 @@ class StreamerSnapshot:
width: int
height: int
mtime: float
- headers: Tuple[Tuple[str, str], ...]
+ headers: tuple[tuple[str, str], ...]
data: bytes
async def make_preview(self, max_width: int, max_height: int, quality: int) -> bytes:
@@ -98,7 +94,7 @@ class _StreamerParams:
quality: int,
resolution: str,
- available_resolutions: List[str],
+ available_resolutions: list[str],
desired_fps: int,
desired_fps_min: int,
@@ -117,8 +113,8 @@ class _StreamerParams:
self.__has_resolution = bool(resolution)
self.__has_h264 = bool(h264_bitrate)
- self.__params: Dict = {self.__DESIRED_FPS: min(max(desired_fps, desired_fps_min), desired_fps_max)}
- self.__limits: Dict = {self.__DESIRED_FPS: {"min": desired_fps_min, "max": desired_fps_max}}
+ self.__params: dict = {self.__DESIRED_FPS: min(max(desired_fps, desired_fps_min), desired_fps_max)}
+ self.__limits: dict = {self.__DESIRED_FPS: {"min": desired_fps_min, "max": desired_fps_max}}
if self.__has_quality:
self.__params[self.__QUALITY] = quality
@@ -133,23 +129,23 @@ class _StreamerParams:
self.__params[self.__H264_GOP] = min(max(h264_gop, h264_gop_min), h264_gop_max)
self.__limits[self.__H264_GOP] = {"min": h264_gop_min, "max": h264_gop_max}
- def get_features(self) -> Dict:
+ def get_features(self) -> dict:
return {
self.__QUALITY: self.__has_quality,
self.__RESOLUTION: self.__has_resolution,
"h264": self.__has_h264,
}
- def get_limits(self) -> Dict:
+ def get_limits(self) -> dict:
limits = dict(self.__limits)
if self.__has_resolution:
limits[self.__AVAILABLE_RESOLUTIONS] = list(limits[self.__AVAILABLE_RESOLUTIONS])
return limits
- def get_params(self) -> Dict:
+ def get_params(self) -> dict:
return dict(self.__params)
- def set_params(self, params: Dict) -> None:
+ def set_params(self, params: dict) -> None:
new_params = dict(self.__params)
if self.__QUALITY in params and self.__has_quality:
@@ -187,9 +183,9 @@ class Streamer: # pylint: disable=too-many-instance-attributes
process_name_prefix: str,
- cmd: List[str],
- cmd_remove: List[str],
- cmd_append: List[str],
+ cmd: list[str],
+ cmd_remove: list[str],
+ cmd_append: list[str],
**params_kwargs: Any,
) -> None:
@@ -207,15 +203,15 @@ class Streamer: # pylint: disable=too-many-instance-attributes
self.__params = _StreamerParams(**params_kwargs)
- self.__stop_task: Optional[asyncio.Task] = None
+ self.__stop_task: (asyncio.Task | None) = None
self.__stop_wip = False
- self.__streamer_task: Optional[asyncio.Task] = None
- self.__streamer_proc: Optional[asyncio.subprocess.Process] = None # pylint: disable=no-member
+ self.__streamer_task: (asyncio.Task | None) = None
+ self.__streamer_proc: (asyncio.subprocess.Process | None) = None # pylint: disable=no-member
- self.__http_session: Optional[aiohttp.ClientSession] = None
+ self.__http_session: (aiohttp.ClientSession | None) = None
- self.__snapshot: Optional[StreamerSnapshot] = None
+ self.__snapshot: (StreamerSnapshot | None) = None
self.__notifier = aiotools.AioNotifier()
@@ -280,16 +276,16 @@ class Streamer: # pylint: disable=too-many-instance-attributes
# =====
- def set_params(self, params: Dict) -> None:
+ def set_params(self, params: dict) -> None:
assert not self.__streamer_task
return self.__params.set_params(params)
- def get_params(self) -> Dict:
+ def get_params(self) -> dict:
return self.__params.get_params()
# =====
- async def get_state(self) -> Dict:
+ async def get_state(self) -> dict:
streamer_state = None
if self.__streamer_task:
session = self.__ensure_http_session()
@@ -302,7 +298,7 @@ class Streamer: # pylint: disable=too-many-instance-attributes
except Exception:
get_logger().exception("Invalid streamer response from /state")
- snapshot: Optional[Dict] = None
+ snapshot: (dict | None) = None
if self.__snapshot:
snapshot = dataclasses.asdict(self.__snapshot)
del snapshot["headers"]
@@ -316,7 +312,7 @@ class Streamer: # pylint: disable=too-many-instance-attributes
"features": self.__params.get_features(),
}
- async def poll_state(self) -> AsyncGenerator[Dict, None]:
+ async def poll_state(self) -> AsyncGenerator[dict, None]:
def signal_handler(*_: Any) -> None:
get_logger(0).info("Got SIGUSR2, checking the stream state ...")
self.__notifier.notify()
@@ -324,8 +320,8 @@ class Streamer: # pylint: disable=too-many-instance-attributes
get_logger(0).info("Installing SIGUSR2 streamer handler ...")
asyncio.get_event_loop().add_signal_handler(signal.SIGUSR2, signal_handler)
- waiter_task: Optional[asyncio.Task] = None
- prev_state: Dict = {}
+ waiter_task: (asyncio.Task | None) = None
+ prev_state: dict = {}
while True:
state = await self.get_state()
if state != prev_state:
@@ -339,7 +335,7 @@ class Streamer: # pylint: disable=too-many-instance-attributes
# =====
- async def take_snapshot(self, save: bool, load: bool, allow_offline: bool) -> Optional[StreamerSnapshot]:
+ async def take_snapshot(self, save: bool, load: bool, allow_offline: bool) -> (StreamerSnapshot | None):
if load:
return self.__snapshot
else:
@@ -395,7 +391,7 @@ class Streamer: # pylint: disable=too-many-instance-attributes
def __ensure_http_session(self) -> aiohttp.ClientSession:
if not self.__http_session:
- kwargs: Dict = {
+ kwargs: dict = {
"headers": {"User-Agent": htclient.make_user_agent("KVMD")},
"connector": aiohttp.UnixConnector(path=self.__unix_path),
"timeout": aiohttp.ClientTimeout(total=self.__timeout),
diff --git a/kvmd/apps/kvmd/sysunit.py b/kvmd/apps/kvmd/sysunit.py
index dea5e961..1358ee61 100644
--- a/kvmd/apps/kvmd/sysunit.py
+++ b/kvmd/apps/kvmd/sysunit.py
@@ -22,10 +22,6 @@
import types
-from typing import Tuple
-from typing import Type
-from typing import Optional
-
import dbus_next
import dbus_next.aio
import dbus_next.aio.proxy_object
@@ -36,11 +32,11 @@ import dbus_next.errors
# =====
class SystemdUnitInfo:
def __init__(self) -> None:
- self.__bus: Optional[dbus_next.aio.MessageBus] = None
- self.__intr: Optional[dbus_next.introspection.Node] = None
- self.__manager: Optional[dbus_next.aio.proxy_object.ProxyInterface] = None
+ self.__bus: (dbus_next.aio.MessageBus | None) = None
+ self.__intr: (dbus_next.introspection.Node | None) = None
+ self.__manager: (dbus_next.aio.proxy_object.ProxyInterface | None) = None
- async def get_status(self, name: str) -> Tuple[bool, bool]:
+ async def get_status(self, name: str) -> tuple[bool, bool]:
assert self.__bus is not None
assert self.__intr is not None
assert self.__manager is not None
@@ -89,7 +85,7 @@ class SystemdUnitInfo:
async def __aexit__(
self,
- _exc_type: Type[BaseException],
+ _exc_type: type[BaseException],
_exc: BaseException,
_tb: types.TracebackType,
) -> None:
diff --git a/kvmd/apps/kvmd/tesseract.py b/kvmd/apps/kvmd/tesseract.py
index 58cba9ae..fff4df2d 100644
--- a/kvmd/apps/kvmd/tesseract.py
+++ b/kvmd/apps/kvmd/tesseract.py
@@ -36,10 +36,7 @@ from ctypes import c_char_p
from ctypes import c_void_p
from ctypes import c_char
-from typing import List
-from typing import Set
from typing import Generator
-from typing import Optional
from PIL import ImageOps
from PIL import Image as PilImage
@@ -60,7 +57,7 @@ class _TessBaseAPI(Structure):
pass
-def _load_libtesseract() -> Optional[ctypes.CDLL]:
+def _load_libtesseract() -> (ctypes.CDLL | None):
try:
path = ctypes.util.find_library("tesseract")
if not path:
@@ -88,7 +85,7 @@ _libtess = _load_libtesseract()
@contextlib.contextmanager
-def _tess_api(data_dir_path: str, langs: List[str]) -> Generator[_TessBaseAPI, None, None]:
+def _tess_api(data_dir_path: str, langs: list[str]) -> Generator[_TessBaseAPI, None, None]:
if not _libtess:
raise OcrError("Tesseract is not available")
api = _libtess.TessBaseAPICreate()
@@ -107,19 +104,19 @@ _LANG_SUFFIX = ".traineddata"
# =====
class TesseractOcr:
- def __init__(self, data_dir_path: str, default_langs: List[str]) -> None:
+ def __init__(self, data_dir_path: str, default_langs: list[str]) -> None:
self.__data_dir_path = data_dir_path
self.__default_langs = default_langs
def is_available(self) -> bool:
return bool(_libtess)
- def get_default_langs(self) -> List[str]:
+ def get_default_langs(self) -> list[str]:
return list(self.__default_langs)
- def get_available_langs(self) -> List[str]:
+ def get_available_langs(self) -> list[str]:
# Это быстрее чем, инициализация либы и TessBaseAPIGetAvailableLanguagesAsVector()
- langs: Set[str] = set()
+ langs: set[str] = set()
for lang_name in os.listdir(self.__data_dir_path):
if lang_name.endswith(_LANG_SUFFIX):
path = os.path.join(self.__data_dir_path, lang_name)
@@ -129,12 +126,12 @@ class TesseractOcr:
langs.add(lang)
return sorted(langs)
- async def recognize(self, data: bytes, langs: List[str], left: int, top: int, right: int, bottom: int) -> str:
+ async def recognize(self, data: bytes, langs: list[str], left: int, top: int, right: int, bottom: int) -> str:
if not langs:
langs = self.__default_langs
return (await aiotools.run_async(self.__inner_recognize, data, langs, left, top, right, bottom))
- def __inner_recognize(self, data: bytes, langs: List[str], left: int, top: int, right: int, bottom: int) -> str:
+ def __inner_recognize(self, data: bytes, langs: list[str], left: int, top: int, right: int, bottom: int) -> str:
with _tess_api(self.__data_dir_path, langs) as api:
assert _libtess
with io.BytesIO(data) as bio:
diff --git a/kvmd/apps/kvmd/ugpio.py b/kvmd/apps/kvmd/ugpio.py
index b29b7ce4..8b7b31f8 100644
--- a/kvmd/apps/kvmd/ugpio.py
+++ b/kvmd/apps/kvmd/ugpio.py
@@ -22,11 +22,8 @@
import asyncio
-from typing import List
-from typing import Dict
from typing import AsyncGenerator
from typing import Callable
-from typing import Optional
from typing import Any
from ...logging import get_logger
@@ -83,7 +80,7 @@ class _GpioInput:
self.__driver = driver
self.__driver.register_input(self.__pin, config.debounce)
- def get_scheme(self) -> Dict:
+ def get_scheme(self) -> dict:
return {
"hw": {
"driver": self.__driver.get_instance_id(),
@@ -91,7 +88,7 @@ class _GpioInput:
},
}
- async def get_state(self) -> Dict:
+ async def get_state(self) -> dict:
(online, state) = (True, False)
try:
state = (await self.__driver.read(self.__pin) ^ self.__inverted)
@@ -139,7 +136,7 @@ class _GpioOutput: # pylint: disable=too-many-instance-attributes
self.__region = aiotools.AioExclusiveRegion(GpioChannelIsBusyError, notifier)
- def get_scheme(self) -> Dict:
+ def get_scheme(self) -> dict:
return {
"switch": self.__switch,
"pulse": {
@@ -153,7 +150,7 @@ class _GpioOutput: # pylint: disable=too-many-instance-attributes
},
}
- async def get_state(self) -> Dict:
+ async def get_state(self) -> dict:
busy = self.__region.is_busy()
(online, state) = (True, False)
if not busy:
@@ -246,8 +243,8 @@ class UserGpio:
for (driver, drv_config) in tools.sorted_kvs(config.drivers)
}
- self.__inputs: Dict[str, _GpioInput] = {}
- self.__outputs: Dict[str, _GpioOutput] = {}
+ self.__inputs: dict[str, _GpioInput] = {}
+ self.__outputs: dict[str, _GpioOutput] = {}
for (channel, ch_config) in tools.sorted_kvs(config.scheme):
driver = self.__drivers[ch_config.driver]
@@ -256,7 +253,7 @@ class UserGpio:
else: # output:
self.__outputs[channel] = _GpioOutput(channel, ch_config, driver, self.__notifier)
- async def get_model(self) -> Dict:
+ async def get_model(self) -> dict:
return {
"scheme": {
"inputs": {channel: gin.get_scheme() for (channel, gin) in self.__inputs.items()},
@@ -265,14 +262,14 @@ class UserGpio:
"view": self.__make_view(),
}
- async def get_state(self) -> Dict:
+ async def get_state(self) -> dict:
return {
"inputs": {channel: await gin.get_state() for (channel, gin) in self.__inputs.items()},
"outputs": {channel: await gout.get_state() for (channel, gout) in self.__outputs.items()},
}
- async def poll_state(self) -> AsyncGenerator[Dict, None]:
- prev_state: Dict = {}
+ async def poll_state(self) -> AsyncGenerator[dict, None]:
+ prev_state: dict = {}
while True:
state = await self.get_state()
if state != prev_state:
@@ -313,14 +310,14 @@ class UserGpio:
# =====
- def __make_view(self) -> Dict:
- table: List[Optional[List[Dict]]] = []
+ def __make_view(self) -> dict:
+ table: list[list[dict] | None] = []
for row in self.__view["table"]:
if len(row) == 0:
table.append(None)
continue
- items: List[Dict] = []
+ items: list[dict] = []
for item in map(str.strip, row):
if item.startswith("#") or len(item) == 0:
items.append(self.__make_view_label(item))
@@ -338,14 +335,14 @@ class UserGpio:
"table": table,
}
- def __make_view_label(self, item: str) -> Dict:
+ def __make_view_label(self, item: str) -> dict:
assert item.startswith("#")
return {
"type": "label",
"text": item[1:].strip(),
}
- def __make_view_input(self, parts: List[str]) -> Dict:
+ def __make_view_input(self, parts: list[str]) -> dict:
assert len(parts) >= 1
color = (parts[1] if len(parts) > 1 else None)
if color not in ["green", "yellow", "red"]:
@@ -356,7 +353,7 @@ class UserGpio:
"color": color,
}
- def __make_view_output(self, parts: List[str]) -> Dict:
+ def __make_view_output(self, parts: list[str]) -> dict:
assert len(parts) >= 1
confirm = False
text = "Click"
diff --git a/kvmd/apps/otg/__init__.py b/kvmd/apps/otg/__init__.py
index 9b6f5e69..4c811d2b 100644
--- a/kvmd/apps/otg/__init__.py
+++ b/kvmd/apps/otg/__init__.py
@@ -29,10 +29,6 @@ import argparse
from os.path import join # pylint: disable=ungrouped-imports
-from typing import List
-from typing import Optional
-from typing import Union
-
from ...logging import get_logger
from ...yamlconf import Section
@@ -78,7 +74,7 @@ def _unlink(path: str, optional: bool=False) -> None:
os.unlink(path)
-def _write(path: str, value: Union[str, int], optional: bool=False) -> None:
+def _write(path: str, value: (str | int), optional: bool=False) -> None:
logger = get_logger()
if optional and not os.access(path, os.F_OK):
logger.info("WRITE --- [SKIPPED] %s", path)
@@ -320,7 +316,7 @@ def _cmd_stop(config: Section) -> None:
# =====
-def main(argv: Optional[List[str]]=None) -> None:
+def main(argv: (list[str] | None)=None) -> None:
(parent_parser, argv, config) = init(
add_help=False,
argv=argv,
diff --git a/kvmd/apps/otg/hid/keyboard.py b/kvmd/apps/otg/hid/keyboard.py
index d10a4b73..6a4d0184 100644
--- a/kvmd/apps/otg/hid/keyboard.py
+++ b/kvmd/apps/otg/hid/keyboard.py
@@ -20,13 +20,11 @@
# ========================================================================== #
-from typing import Optional
-
from . import Hid
# =====
-def make_keyboard_hid(report_id: Optional[int]=None) -> Hid:
+def make_keyboard_hid(report_id: (int | None)=None) -> Hid:
return Hid(
protocol=1, # Keyboard protocol
subclass=1, # Boot interface subclass
diff --git a/kvmd/apps/otg/hid/mouse.py b/kvmd/apps/otg/hid/mouse.py
index 71249569..e0f7350b 100644
--- a/kvmd/apps/otg/hid/mouse.py
+++ b/kvmd/apps/otg/hid/mouse.py
@@ -20,13 +20,11 @@
# ========================================================================== #
-from typing import Optional
-
from . import Hid
# =====
-def make_mouse_hid(absolute: bool, horizontal_wheel: bool, report_id: Optional[int]=None) -> Hid:
+def make_mouse_hid(absolute: bool, horizontal_wheel: bool, report_id: (int | None)=None) -> Hid:
maker = (_make_absolute_hid if absolute else _make_relative_hid)
return maker(horizontal_wheel, report_id)
@@ -42,7 +40,7 @@ _HORIZONTAL_WHEEL = [
]
-def _make_absolute_hid(horizontal_wheel: bool, report_id: Optional[int]) -> Hid:
+def _make_absolute_hid(horizontal_wheel: bool, report_id: (int | None)) -> Hid:
return Hid(
protocol=0, # None protocol
subclass=0, # No subclass
@@ -106,7 +104,7 @@ def _make_absolute_hid(horizontal_wheel: bool, report_id: Optional[int]) -> Hid:
)
-def _make_relative_hid(horizontal_wheel: bool, report_id: Optional[int]) -> Hid:
+def _make_relative_hid(horizontal_wheel: bool, report_id: (int | None)) -> Hid:
return Hid(
protocol=2, # Mouse protocol
subclass=1, # Boot interface subclass
diff --git a/kvmd/apps/otgconf/__init__.py b/kvmd/apps/otgconf/__init__.py
index 389e1721..1a1e0f99 100644
--- a/kvmd/apps/otgconf/__init__.py
+++ b/kvmd/apps/otgconf/__init__.py
@@ -26,10 +26,7 @@ import contextlib
import argparse
import time
-from typing import List
-from typing import Dict
from typing import Generator
-from typing import Optional
import yaml
@@ -64,7 +61,7 @@ class _GadgetControl:
with open(udc_path, "w") as udc_file:
udc_file.write(udc)
- def __read_metas(self) -> Generator[Dict, None, None]:
+ def __read_metas(self) -> Generator[dict, None, None]:
for meta_name in sorted(os.listdir(self.__meta_path)):
with open(os.path.join(self.__meta_path, meta_name)) as meta_file:
yield json.loads(meta_file.read())
@@ -111,7 +108,7 @@ class _GadgetControl:
# =====
-def main(argv: Optional[List[str]]=None) -> None:
+def main(argv: (list[str] | None)=None) -> None:
(parent_parser, argv, config) = init(
add_help=False,
cli_logging=True,
diff --git a/kvmd/apps/otgmsd/__init__.py b/kvmd/apps/otgmsd/__init__.py
index 0d32331b..2540eb0b 100644
--- a/kvmd/apps/otgmsd/__init__.py
+++ b/kvmd/apps/otgmsd/__init__.py
@@ -24,9 +24,6 @@ import os
import errno
import argparse
-from typing import List
-from typing import Optional
-
from ...validators.basic import valid_bool
from ...validators.basic import valid_int_f0
from ...validators.os import valid_abs_file
@@ -57,7 +54,7 @@ def _set_param(gadget: str, instance: int, param: str, value: str) -> None:
# =====
-def main(argv: Optional[List[str]]=None) -> None:
+def main(argv: (list[str] | None)=None) -> None:
(parent_parser, argv, config) = init(
add_help=False,
cli_logging=True,
diff --git a/kvmd/apps/otgnet/__init__.py b/kvmd/apps/otgnet/__init__.py
index 7a5e4cc7..98213346 100644
--- a/kvmd/apps/otgnet/__init__.py
+++ b/kvmd/apps/otgnet/__init__.py
@@ -26,9 +26,6 @@ import dataclasses
import itertools
import argparse
-from typing import List
-from typing import Optional
-
from ...logging import get_logger
from ...yamlconf import Section
@@ -66,25 +63,25 @@ class _Netcfg: # pylint: disable=too-many-instance-attributes
class _Service: # pylint: disable=too-many-instance-attributes
def __init__(self, config: Section) -> None:
self.__iface_net: str = config.otgnet.iface.net
- self.__ip_cmd: List[str] = config.otgnet.iface.ip_cmd
+ self.__ip_cmd: list[str] = config.otgnet.iface.ip_cmd
self.__allow_icmp: bool = config.otgnet.firewall.allow_icmp
- self.__allow_tcp: List[int] = sorted(set(config.otgnet.firewall.allow_tcp))
- self.__allow_udp: List[int] = sorted(set(config.otgnet.firewall.allow_udp))
+ self.__allow_tcp: list[int] = sorted(set(config.otgnet.firewall.allow_tcp))
+ self.__allow_udp: list[int] = sorted(set(config.otgnet.firewall.allow_udp))
self.__forward_iface: str = config.otgnet.firewall.forward_iface
- self.__iptables_cmd: List[str] = config.otgnet.firewall.iptables_cmd
+ self.__iptables_cmd: list[str] = config.otgnet.firewall.iptables_cmd
- def build_cmd(key: str) -> List[str]:
+ def build_cmd(key: str) -> list[str]:
return tools.build_cmd(
getattr(config.otgnet.commands, key),
getattr(config.otgnet.commands, f"{key}_remove"),
getattr(config.otgnet.commands, f"{key}_append"),
)
- self.__pre_start_cmd: List[str] = build_cmd("pre_start_cmd")
- self.__post_start_cmd: List[str] = build_cmd("post_start_cmd")
- self.__pre_stop_cmd: List[str] = build_cmd("pre_stop_cmd")
- self.__post_stop_cmd: List[str] = build_cmd("post_stop_cmd")
+ self.__pre_start_cmd: list[str] = build_cmd("pre_start_cmd")
+ self.__post_start_cmd: list[str] = build_cmd("post_start_cmd")
+ self.__pre_stop_cmd: list[str] = build_cmd("pre_stop_cmd")
+ self.__post_stop_cmd: list[str] = build_cmd("post_stop_cmd")
self.__gadget: str = config.otg.gadget
self.__driver: str = config.otg.devices.ethernet.driver
@@ -101,7 +98,7 @@ class _Service: # pylint: disable=too-many-instance-attributes
key: str(value)
for (key, value) in dataclasses.asdict(netcfg).items()
}
- ctls: List[BaseCtl] = [
+ ctls: list[BaseCtl] = [
CustomCtl(self.__pre_start_cmd, self.__post_stop_cmd, placeholders),
IfaceUpCtl(self.__ip_cmd, netcfg.iface),
*([IptablesAllowIcmpCtl(self.__iptables_cmd, netcfg.iface)] if self.__allow_icmp else []),
@@ -185,7 +182,7 @@ class _Service: # pylint: disable=too-many-instance-attributes
# =====
-def main(argv: Optional[List[str]]=None) -> None:
+def main(argv: (list[str] | None)=None) -> None:
(parent_parser, argv, config) = init(
add_help=False,
argv=argv,
diff --git a/kvmd/apps/otgnet/netctl.py b/kvmd/apps/otgnet/netctl.py
index f186ea34..15b5ea8c 100644
--- a/kvmd/apps/otgnet/netctl.py
+++ b/kvmd/apps/otgnet/netctl.py
@@ -20,50 +20,46 @@
# ========================================================================== #
-from typing import List
-from typing import Dict
-
-
# =====
class BaseCtl:
- def get_command(self, direct: bool) -> List[str]:
+ def get_command(self, direct: bool) -> list[str]:
raise NotImplementedError
class IfaceUpCtl(BaseCtl):
- def __init__(self, base_cmd: List[str], iface: str) -> None:
+ def __init__(self, base_cmd: list[str], iface: str) -> None:
self.__base_cmd = base_cmd
self.__iface = iface
- def get_command(self, direct: bool) -> List[str]:
+ def get_command(self, direct: bool) -> list[str]:
return [*self.__base_cmd, "link", "set", self.__iface, ("up" if direct else "down")]
class IfaceAddIpCtl(BaseCtl):
- def __init__(self, base_cmd: List[str], iface: str, cidr: str) -> None:
+ def __init__(self, base_cmd: list[str], iface: str, cidr: str) -> None:
self.__base_cmd = base_cmd
self.__iface = iface
self.__cidr = cidr
- def get_command(self, direct: bool) -> List[str]:
+ def get_command(self, direct: bool) -> list[str]:
return [*self.__base_cmd, "address", ("add" if direct else "del"), self.__cidr, "dev", self.__iface]
class IptablesDropAllCtl(BaseCtl):
- def __init__(self, base_cmd: List[str], iface: str) -> None:
+ def __init__(self, base_cmd: list[str], iface: str) -> None:
self.__base_cmd = base_cmd
self.__iface = iface
- def get_command(self, direct: bool) -> List[str]:
+ def get_command(self, direct: bool) -> list[str]:
return [*self.__base_cmd, ("-A" if direct else "-D"), "INPUT", "-i", self.__iface, "-j", "DROP"]
class IptablesAllowIcmpCtl(BaseCtl):
- def __init__(self, base_cmd: List[str], iface: str) -> None:
+ def __init__(self, base_cmd: list[str], iface: str) -> None:
self.__base_cmd = base_cmd
self.__iface = iface
- def get_command(self, direct: bool) -> List[str]:
+ def get_command(self, direct: bool) -> list[str]:
return [
*self.__base_cmd,
("-A" if direct else "-D"), "INPUT", "-i", self.__iface, "-p", "icmp", "-j", "ACCEPT",
@@ -71,13 +67,13 @@ class IptablesAllowIcmpCtl(BaseCtl):
class IptablesAllowPortCtl(BaseCtl):
- def __init__(self, base_cmd: List[str], iface: str, port: int, tcp: bool) -> None:
+ def __init__(self, base_cmd: list[str], iface: str, port: int, tcp: bool) -> None:
self.__base_cmd = base_cmd
self.__iface = iface
self.__port = port
self.__proto = ("tcp" if tcp else "udp")
- def get_command(self, direct: bool) -> List[str]:
+ def get_command(self, direct: bool) -> list[str]:
return [
*self.__base_cmd,
("-A" if direct else "-D"), "INPUT", "-i", self.__iface, "-p", self.__proto,
@@ -86,11 +82,11 @@ class IptablesAllowPortCtl(BaseCtl):
class IptablesForwardOut(BaseCtl):
- def __init__(self, base_cmd: List[str], iface: str) -> None:
+ def __init__(self, base_cmd: list[str], iface: str) -> None:
self.__base_cmd = base_cmd
self.__iface = iface
- def get_command(self, direct: bool) -> List[str]:
+ def get_command(self, direct: bool) -> list[str]:
return [
*self.__base_cmd,
"--table", "nat",
@@ -100,11 +96,11 @@ class IptablesForwardOut(BaseCtl):
class IptablesForwardIn(BaseCtl):
- def __init__(self, base_cmd: List[str], iface: str) -> None:
+ def __init__(self, base_cmd: list[str], iface: str) -> None:
self.__base_cmd = base_cmd
self.__iface = iface
- def get_command(self, direct: bool) -> List[str]:
+ def get_command(self, direct: bool) -> list[str]:
return [
*self.__base_cmd,
("-A" if direct else "-D"), "FORWARD",
@@ -115,16 +111,16 @@ class IptablesForwardIn(BaseCtl):
class CustomCtl(BaseCtl):
def __init__(
self,
- direct_cmd: List[str],
- reverse_cmd: List[str],
- placeholders: Dict[str, str],
+ direct_cmd: list[str],
+ reverse_cmd: list[str],
+ placeholders: dict[str, str],
) -> None:
self.__direct_cmd = direct_cmd
self.__reverse_cmd = reverse_cmd
self.__placeholders = placeholders
- def get_command(self, direct: bool) -> List[str]:
+ def get_command(self, direct: bool) -> list[str]:
return [
part.format(**self.__placeholders)
for part in (self.__direct_cmd if direct else self.__reverse_cmd)
diff --git a/kvmd/apps/pst/__init__.py b/kvmd/apps/pst/__init__.py
index 2bb29a08..8f92e39d 100644
--- a/kvmd/apps/pst/__init__.py
+++ b/kvmd/apps/pst/__init__.py
@@ -20,9 +20,6 @@
# ========================================================================== #
-from typing import List
-from typing import Optional
-
from ...logging import get_logger
from .. import init
@@ -31,7 +28,7 @@ from .server import PstServer
# =====
-def main(argv: Optional[List[str]]=None) -> None:
+def main(argv: (list[str] | None)=None) -> None:
config = init(
prog="kvmd-pst",
description="The KVMD persistent storage manager",
diff --git a/kvmd/apps/pst/server.py b/kvmd/apps/pst/server.py
index a492a29b..1df26f61 100644
--- a/kvmd/apps/pst/server.py
+++ b/kvmd/apps/pst/server.py
@@ -23,9 +23,6 @@
import os
import asyncio
-from typing import List
-from typing import Dict
-
from aiohttp.web import Request
from aiohttp.web import WebSocketResponse
@@ -48,7 +45,7 @@ class PstServer(HttpServer): # pylint: disable=too-many-arguments,too-many-inst
storage_path: str,
ro_retries_delay: float,
ro_cleanup_delay: float,
- remount_cmd: List[str],
+ remount_cmd: list[str],
) -> None:
super().__init__()
@@ -69,7 +66,7 @@ class PstServer(HttpServer): # pylint: disable=too-many-arguments,too-many-inst
return (await self._ws_loop(ws))
@exposed_ws("ping")
- async def __ws_ping_handler(self, ws: WsSession, _: Dict) -> None:
+ async def __ws_ping_handler(self, ws: WsSession, _: dict) -> None:
await ws.send_event("pong", {})
# ===== SYSTEM STUFF
diff --git a/kvmd/apps/pstrun/__init__.py b/kvmd/apps/pstrun/__init__.py
index 2b1b895a..c164e533 100644
--- a/kvmd/apps/pstrun/__init__.py
+++ b/kvmd/apps/pstrun/__init__.py
@@ -27,9 +27,6 @@ import asyncio
import asyncio.subprocess
import argparse
-from typing import List
-from typing import Optional
-
import aiohttp
from ...logging import get_logger
@@ -53,7 +50,7 @@ def _preexec() -> None:
get_logger(0).info("Can't perform tcsetpgrp(0): %s", tools.efmt(err))
-async def _run_process(cmd: List[str], data_path: str) -> asyncio.subprocess.Process: # pylint: disable=no-member
+async def _run_process(cmd: list[str], data_path: str) -> asyncio.subprocess.Process: # pylint: disable=no-member
# https://stackoverflow.com/questions/58918188/why-is-stdin-not-propagated-to-child-process-of-different-process-group
if os.isatty(0):
signal.signal(signal.SIGTTOU, signal.SIG_IGN)
@@ -67,11 +64,11 @@ async def _run_process(cmd: List[str], data_path: str) -> asyncio.subprocess.Pro
))
-async def _run_cmd_ws(cmd: List[str], ws: aiohttp.ClientWebSocketResponse) -> int: # pylint: disable=too-many-branches
+async def _run_cmd_ws(cmd: list[str], ws: aiohttp.ClientWebSocketResponse) -> int: # pylint: disable=too-many-branches
logger = get_logger(0)
- receive_task: Optional[asyncio.Task] = None
- proc_task: Optional[asyncio.Task] = None
- proc: Optional[asyncio.subprocess.Process] = None # pylint: disable=no-member
+ receive_task: (asyncio.Task | None) = None
+ proc_task: (asyncio.Task | None) = None
+ proc: (asyncio.subprocess.Process | None) = None # pylint: disable=no-member
try: # pylint: disable=too-many-nested-blocks
while True:
@@ -120,7 +117,7 @@ async def _run_cmd_ws(cmd: List[str], ws: aiohttp.ClientWebSocketResponse) -> in
return 1
-async def _run_cmd(cmd: List[str], unix_path: str) -> None:
+async def _run_cmd(cmd: list[str], unix_path: str) -> None:
get_logger(0).info("Opening PST session ...")
async with aiohttp.ClientSession(
headers={"User-Agent": htclient.make_user_agent("KVMD-PSTRun")},
@@ -132,7 +129,7 @@ async def _run_cmd(cmd: List[str], unix_path: str) -> None:
# =====
-def main(argv: Optional[List[str]]=None) -> None:
+def main(argv: (list[str] | None)=None) -> None:
(parent_parser, argv, config) = init(
add_help=False,
cli_logging=True,
diff --git a/kvmd/apps/vnc/__init__.py b/kvmd/apps/vnc/__init__.py
index d7e2c68a..3653f416 100644
--- a/kvmd/apps/vnc/__init__.py
+++ b/kvmd/apps/vnc/__init__.py
@@ -20,9 +20,6 @@
# ========================================================================== #
-from typing import List
-from typing import Optional
-
from ...clients.kvmd import KvmdClient
from ...clients.streamer import StreamFormats
from ...clients.streamer import BaseStreamerClient
@@ -38,7 +35,7 @@ from .server import VncServer
# =====
-def main(argv: Optional[List[str]]=None) -> None:
+def main(argv: (list[str] | None)=None) -> None:
config = init(
prog="kvmd-vnc",
description="VNC to KVMD proxy",
@@ -48,12 +45,12 @@ def main(argv: Optional[List[str]]=None) -> None:
user_agent = htclient.make_user_agent("KVMD-VNC")
- def make_memsink_streamer(name: str, fmt: int) -> Optional[MemsinkStreamerClient]:
+ def make_memsink_streamer(name: str, fmt: int) -> (MemsinkStreamerClient | None):
if getattr(config.memsink, name).sink:
return MemsinkStreamerClient(name.upper(), fmt, **getattr(config.memsink, name)._unpack())
return None
- streamers: List[BaseStreamerClient] = list(filter(None, [
+ streamers: list[BaseStreamerClient] = list(filter(None, [
make_memsink_streamer("h264", StreamFormats.H264),
make_memsink_streamer("jpeg", StreamFormats.JPEG),
HttpStreamerClient(name="JPEG", user_agent=user_agent, **config.streamer._unpack()),
diff --git a/kvmd/apps/vnc/rfb/__init__.py b/kvmd/apps/vnc/rfb/__init__.py
index 059d4d1d..f715f9b7 100644
--- a/kvmd/apps/vnc/rfb/__init__.py
+++ b/kvmd/apps/vnc/rfb/__init__.py
@@ -23,9 +23,6 @@
import asyncio
import ssl
-from typing import Tuple
-from typing import List
-from typing import Dict
from typing import Callable
from typing import Coroutine
@@ -66,7 +63,7 @@ class RfbClient(RfbClientStream): # pylint: disable=too-many-instance-attribute
width: int,
height: int,
name: str,
- vnc_passwds: List[str],
+ vnc_passwds: list[str],
vencrypt: bool,
none_auth_only: bool,
) -> None:
@@ -103,7 +100,7 @@ class RfbClient(RfbClientStream): # pylint: disable=too-many-instance-attribute
finally:
await aiotools.shield_fg(self.__cleanup(tasks))
- async def __cleanup(self, tasks: List[asyncio.Task]) -> None:
+ async def __cleanup(self, tasks: list[asyncio.Task]) -> None:
for task in tasks:
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
@@ -150,7 +147,7 @@ class RfbClient(RfbClientStream): # pylint: disable=too-many-instance-attribute
async def _on_ext_key_event(self, code: int, state: bool) -> None:
raise NotImplementedError
- async def _on_pointer_event(self, buttons: Dict[str, bool], wheel: Dict[str, int], move: Dict[str, int]) -> None:
+ async def _on_pointer_event(self, buttons: dict[str, bool], wheel: dict[str, int], move: dict[str, int]) -> None:
raise NotImplementedError
async def _on_cut_event(self, text: str) -> None:
@@ -232,7 +229,7 @@ class RfbClient(RfbClientStream): # pylint: disable=too-many-instance-attribute
# =====
async def __handshake_security(self) -> None:
- sec_types: Dict[int, Tuple[str, Callable]] = {}
+ sec_types: dict[int, tuple[str, Callable]] = {}
if self.__vencrypt and self.__rfb_version > 3:
sec_types[19] = ("VeNCrypt", self.__handshake_security_vencrypt)
if self.__none_auth_only:
diff --git a/kvmd/apps/vnc/rfb/crypto.py b/kvmd/apps/vnc/rfb/crypto.py
index 854dceb3..8953bef5 100644
--- a/kvmd/apps/vnc/rfb/crypto.py
+++ b/kvmd/apps/vnc/rfb/crypto.py
@@ -22,8 +22,6 @@
import os
-from typing import List
-
import passlib.crypto.des
@@ -43,7 +41,7 @@ def rfb_encrypt_challenge(challenge: bytes, passwd: bytes) -> bytes:
def _make_key(passwd: bytes) -> bytes:
passwd = (passwd + b"\0" * 8)[:8]
- key: List[int] = []
+ key: list[int] = []
for ch in passwd:
btgt = 0
for index in range(8):
diff --git a/kvmd/apps/vnc/rfb/encodings.py b/kvmd/apps/vnc/rfb/encodings.py
index de51dde5..e153b5e8 100644
--- a/kvmd/apps/vnc/rfb/encodings.py
+++ b/kvmd/apps/vnc/rfb/encodings.py
@@ -22,10 +22,6 @@
import dataclasses
-from typing import List
-from typing import Dict
-from typing import FrozenSet
-from typing import Union
from typing import Any
@@ -45,13 +41,13 @@ class RfbEncodings:
H264 = 50 # Open H.264 Encoding
-def _make_meta(variants: Union[int, FrozenSet[int]]) -> Dict:
+def _make_meta(variants: (int | frozenset[int])) -> dict:
return {"variants": (frozenset([variants]) if isinstance(variants, int) else variants)}
@dataclasses.dataclass(frozen=True)
class RfbClientEncodings: # pylint: disable=too-many-instance-attributes
- encodings: FrozenSet[int]
+ encodings: frozenset[int]
has_resize: bool = dataclasses.field(default=False, metadata=_make_meta(RfbEncodings.RESIZE)) # noqa: E224
has_rename: bool = dataclasses.field(default=False, metadata=_make_meta(RfbEncodings.RENAME)) # noqa: E224
@@ -63,8 +59,8 @@ class RfbClientEncodings: # pylint: disable=too-many-instance-attributes
has_h264: bool = dataclasses.field(default=False, metadata=_make_meta(RfbEncodings.H264)) # noqa: E224
- def get_summary(self) -> List[str]:
- summary: List[str] = [f"encodings -- {sorted(self.encodings)}"]
+ def get_summary(self) -> list[str]:
+ summary: list[str] = [f"encodings -- {sorted(self.encodings)}"]
for field in dataclasses.fields(self):
if field.name != "encodings":
found = ", ".join(map(str, sorted(map(int, self.__get_found(field)))))
@@ -80,7 +76,7 @@ class RfbClientEncodings: # pylint: disable=too-many-instance-attributes
def __set_value(self, key: str, value: Any) -> None:
object.__setattr__(self, key, value)
- def __get_found(self, field: dataclasses.Field) -> FrozenSet[int]:
+ def __get_found(self, field: dataclasses.Field) -> frozenset[int]:
return self.encodings.intersection(field.metadata["variants"])
def __get_tight_jpeg_quality(self) -> int:
diff --git a/kvmd/apps/vnc/rfb/stream.py b/kvmd/apps/vnc/rfb/stream.py
index ee42436e..dfa6cb13 100644
--- a/kvmd/apps/vnc/rfb/stream.py
+++ b/kvmd/apps/vnc/rfb/stream.py
@@ -24,9 +24,6 @@ import asyncio
import ssl
import struct
-from typing import Tuple
-from typing import Union
-
from .... import aiotools
from .errors import RfbConnectionError
@@ -57,7 +54,7 @@ class RfbClientStream:
except (ConnectionError, asyncio.IncompleteReadError) as err:
raise RfbConnectionError(f"Can't read {msg}", err)
- async def _read_struct(self, msg: str, fmt: str) -> Tuple[int, ...]:
+ async def _read_struct(self, msg: str, fmt: str) -> tuple[int, ...]:
assert len(fmt) > 1
try:
fmt = f">{fmt}"
@@ -73,7 +70,7 @@ class RfbClientStream:
# =====
- async def _write_struct(self, msg: str, fmt: str, *values: Union[int, bytes], drain: bool=True) -> None:
+ async def _write_struct(self, msg: str, fmt: str, *values: (int | bytes), drain: bool=True) -> None:
try:
if not fmt:
for value in values:
diff --git a/kvmd/apps/vnc/server.py b/kvmd/apps/vnc/server.py
index 7fac9aa4..c6ea076a 100644
--- a/kvmd/apps/vnc/server.py
+++ b/kvmd/apps/vnc/server.py
@@ -26,11 +26,6 @@ import socket
import dataclasses
import contextlib
-from typing import List
-from typing import Dict
-from typing import Union
-from typing import Optional
-
import aiohttp
from ...logging import get_logger
@@ -83,12 +78,12 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes
desired_fps: int,
keymap_name: str,
- symmap: Dict[int, Dict[int, str]],
+ symmap: dict[int, dict[int, str]],
kvmd: KvmdClient,
- streamers: List[BaseStreamerClient],
+ streamers: list[BaseStreamerClient],
- vnc_credentials: Dict[str, VncAuthKvmdCredentials],
+ vnc_credentials: dict[str, VncAuthKvmdCredentials],
vencrypt: bool,
none_auth_only: bool,
shared_params: _SharedParams,
@@ -122,15 +117,15 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes
self.__stage2_encodings_accepted = aiotools.AioStage()
self.__stage3_ws_connected = aiotools.AioStage()
- self.__kvmd_session: Optional[KvmdClientSession] = None
- self.__kvmd_ws: Optional[KvmdClientWs] = None
+ self.__kvmd_session: (KvmdClientSession | None) = None
+ self.__kvmd_ws: (KvmdClientWs | None) = None
self.__fb_notifier = aiotools.AioNotifier()
- self.__fb_queue: "asyncio.Queue[Dict]" = asyncio.Queue()
+ self.__fb_queue: "asyncio.Queue[dict]" = asyncio.Queue()
# Эти состояния шарить не обязательно - бекенд исключает дублирующиеся события.
# Все это нужно только чтобы не посылать лишние жсоны в сокет KVMD
- self.__mouse_buttons: Dict[str, Optional[bool]] = dict.fromkeys(["left", "right", "middle"], None)
+ self.__mouse_buttons: dict[str, (bool | None)] = dict.fromkeys(["left", "right", "middle"], None)
self.__mouse_move = {"x": -1, "y": -1}
self.__lock = asyncio.Lock()
@@ -175,7 +170,7 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes
finally:
self.__kvmd_ws = None
- async def __process_ws_event(self, event_type: str, event: Dict) -> None:
+ async def __process_ws_event(self, event_type: str, event: dict) -> None:
if event_type == "info_meta_state":
try:
host = event["server"]["host"]
@@ -225,7 +220,7 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes
StreamFormats.JPEG: "has_tight",
StreamFormats.H264: "has_h264",
}
- streamer: Optional[BaseStreamerClient] = None
+ streamer: (BaseStreamerClient | None) = None
for streamer in self.__streamers:
if getattr(self._encodings, formats[streamer.get_format()]):
get_logger(0).info("%s [streamer]: Using preferred %s", self._remote, streamer)
@@ -237,12 +232,12 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes
get_logger(0).info("%s [streamer]: Using default %s", self._remote, streamer)
return streamer
- async def __queue_frame(self, frame: Union[Dict, str]) -> None:
+ async def __queue_frame(self, frame: (dict | str)) -> None:
if isinstance(frame, str):
frame = await self.__make_text_frame(frame)
self.__fb_queue.put_nowait(frame)
- async def __make_text_frame(self, text: str) -> Dict:
+ async def __make_text_frame(self, text: str) -> dict:
return {
"data": (await make_text_jpeg(self._width, self._height, self._encodings.tight_jpeg_quality, text)),
"width": self._width,
@@ -252,7 +247,7 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes
async def __fb_sender_task_loop(self) -> None: # pylint: disable=too-many-branches
has_h264_key = False
- last: Optional[Dict] = None
+ last: (dict | None) = None
while True:
await self.__fb_notifier.wait()
@@ -351,7 +346,7 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes
if self.__kvmd_ws:
await self.__kvmd_ws.send_key_event(web_key, state)
- def __switch_modifiers(self, key: Union[int, str], state: bool) -> bool:
+ def __switch_modifiers(self, key: (int | str), state: bool) -> bool:
mod = 0
if key in X11Modifiers.SHIFTS or key in WebModifiers.SHIFTS:
mod = SymmapModifiers.SHIFT
@@ -367,7 +362,7 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes
self.__modifiers &= ~mod
return True
- async def _on_pointer_event(self, buttons: Dict[str, bool], wheel: Dict[str, int], move: Dict[str, int]) -> None:
+ async def _on_pointer_event(self, buttons: dict[str, bool], wheel: dict[str, int], move: dict[str, int]) -> None:
if self.__kvmd_ws:
for (button, state) in buttons.items():
if self.__mouse_buttons[button] != state:
@@ -434,7 +429,7 @@ class VncServer: # pylint: disable=too-many-instance-attributes
keymap_path: str,
kvmd: KvmdClient,
- streamers: List[BaseStreamerClient],
+ streamers: list[BaseStreamerClient],
vnc_auth_manager: VncAuthManager,
) -> None:
diff --git a/kvmd/apps/vnc/vncauth.py b/kvmd/apps/vnc/vncauth.py
index b0f702d1..c7c04864 100644
--- a/kvmd/apps/vnc/vncauth.py
+++ b/kvmd/apps/vnc/vncauth.py
@@ -22,9 +22,6 @@
import dataclasses
-from typing import Tuple
-from typing import Dict
-
from ...logging import get_logger
from ... import aiofs
@@ -53,7 +50,7 @@ class VncAuthManager:
self.__path = path
self.__enabled = enabled
- async def read_credentials(self) -> Tuple[Dict[str, VncAuthKvmdCredentials], bool]:
+ async def read_credentials(self) -> tuple[dict[str, VncAuthKvmdCredentials], bool]:
if self.__enabled:
try:
return (await self.__inner_read_credentials(), True)
@@ -63,10 +60,10 @@ class VncAuthManager:
get_logger(0).exception("Unhandled exception while reading VNCAuth passwd file")
return ({}, (not self.__enabled))
- async def __inner_read_credentials(self) -> Dict[str, VncAuthKvmdCredentials]:
+ async def __inner_read_credentials(self) -> dict[str, VncAuthKvmdCredentials]:
lines = (await aiofs.read(self.__path)).split("\n")
- credentials: Dict[str, VncAuthKvmdCredentials] = {}
+ credentials: dict[str, VncAuthKvmdCredentials] = {}
for (lineno, line) in enumerate(lines):
if len(line.strip()) == 0 or line.lstrip().startswith("#"):
continue
diff --git a/kvmd/apps/watchdog/__init__.py b/kvmd/apps/watchdog/__init__.py
index e7271ac1..c8a50729 100644
--- a/kvmd/apps/watchdog/__init__.py
+++ b/kvmd/apps/watchdog/__init__.py
@@ -24,9 +24,6 @@ import argparse
import errno
import time
-from typing import List
-from typing import Optional
-
from ...logging import get_logger
from ...yamlconf import Section
@@ -104,7 +101,7 @@ def _cmd_cancel(config: Section) -> None:
# =====
-def main(argv: Optional[List[str]]=None) -> None:
+def main(argv: (list[str] | None)=None) -> None:
(parent_parser, argv, config) = init(add_help=False, argv=argv)
parser = argparse.ArgumentParser(
prog="kvmd-watchdog",