diff options
author | Maxim Devaev <[email protected]> | 2022-09-04 18:08:40 +0300 |
---|---|---|
committer | Maxim Devaev <[email protected]> | 2022-09-04 18:08:40 +0300 |
commit | ee3e224e396494cd0d69bb6167087a071a20349c (patch) | |
tree | 5becd28570e58a03c6e1e231d0db24c264a73f88 /kvmd/apps | |
parent | 4b75221e9470b4a009955d7677f16adf8e23e302 (diff) |
new typing style
Diffstat (limited to 'kvmd/apps')
48 files changed, 241 insertions, 387 deletions
diff --git a/kvmd/apps/__init__.py b/kvmd/apps/__init__.py index b03b7893..ffd8daae 100644 --- a/kvmd/apps/__init__.py +++ b/kvmd/apps/__init__.py @@ -27,12 +27,6 @@ import argparse import logging import logging.config -from typing import Tuple -from typing import List -from typing import Dict -from typing import Type -from typing import Optional - import pygments import pygments.lexers.data import pygments.formatters @@ -109,14 +103,14 @@ from ..validators.hw import valid_otg_ethernet # ===== def init( - prog: Optional[str]=None, - description: Optional[str]=None, + prog: (str | None)=None, + description: (str | None)=None, add_help: bool=True, check_run: bool=False, cli_logging: bool=False, - argv: Optional[List[str]]=None, + argv: (list[str] | None)=None, **load: bool, -) -> Tuple[argparse.ArgumentParser, List[str], Section]: +) -> tuple[argparse.ArgumentParser, list[str], Section]: argv = (argv or sys.argv) assert len(argv) > 0 @@ -170,10 +164,10 @@ def init( # ===== -def _init_config(config_path: str, override_options: List[str], **load_flags: bool) -> Section: +def _init_config(config_path: str, override_options: list[str], **load_flags: bool) -> Section: config_path = os.path.expanduser(config_path) try: - raw_config: Dict = load_yaml_file(config_path) + raw_config: dict = load_yaml_file(config_path) except Exception as err: raise SystemExit(f"ConfigError: Can't read config file {config_path!r}:\n{tools.efmt(err)}") if not isinstance(raw_config, dict): @@ -194,7 +188,7 @@ def _init_config(config_path: str, override_options: List[str], **load_flags: bo raise SystemExit(f"ConfigError: {err}") -def _patch_raw(raw_config: Dict) -> None: # pylint: disable=too-many-branches +def _patch_raw(raw_config: dict) -> None: # pylint: disable=too-many-branches if isinstance(raw_config.get("otg"), dict): for (old, new) in [ ("msd", "msd"), @@ -250,9 +244,9 @@ def _patch_raw(raw_config: Dict) -> None: # pylint: disable=too-many-branches def _patch_dynamic( # pylint: disable=too-many-locals - raw_config: Dict, + raw_config: dict, config: Section, - scheme: Dict, + scheme: dict, load_auth: bool=False, load_hid: bool=False, load_atx: bool=False, @@ -279,7 +273,7 @@ def _patch_dynamic( # pylint: disable=too-many-locals if load_gpio: driver: str - drivers: Dict[str, Type[BaseUserGpioDriver]] = {} # Name to drivers + drivers: dict[str, type[BaseUserGpioDriver]] = {} # Name to drivers for (driver, params) in { # type: ignore "__gpio__": {}, **tools.rget(raw_config, "kvmd", "gpio", "drivers"), @@ -343,7 +337,7 @@ def _dump_config(config: Section) -> None: print(dump) -def _get_config_scheme() -> Dict: +def _get_config_scheme() -> dict: return { "logging": Option({}), diff --git a/kvmd/apps/cleanup/__init__.py b/kvmd/apps/cleanup/__init__.py index 3e33d09f..5e83bd9a 100644 --- a/kvmd/apps/cleanup/__init__.py +++ b/kvmd/apps/cleanup/__init__.py @@ -24,9 +24,6 @@ import os import signal import time -from typing import List -from typing import Optional - import psutil from ...logging import get_logger @@ -74,7 +71,7 @@ def _remove_sockets(config: Section) -> None: # ===== -def main(argv: Optional[List[str]]=None) -> None: +def main(argv: (list[str] | None)=None) -> None: config = init( prog="kvmd-cleanup", description="Kill KVMD and clear resources", diff --git a/kvmd/apps/edidconf/__init__.py b/kvmd/apps/edidconf/__init__.py index 53edf5fd..e6dcc610 100644 --- a/kvmd/apps/edidconf/__init__.py +++ b/kvmd/apps/edidconf/__init__.py @@ -28,11 +28,9 @@ import contextlib import argparse import time -from typing import List from typing import IO from typing import Generator from typing import Callable -from typing import Optional from ...validators.basic import valid_bool from ...validators.basic import valid_int_f0 @@ -180,7 +178,7 @@ def _make_format_hex(size: int) -> Callable[[int], str]: # ===== -def main(argv: Optional[List[str]]=None) -> None: # pylint: disable=too-many-branches +def main(argv: (list[str] | None)=None) -> None: # pylint: disable=too-many-branches # (parent_parser, argv, _) = init( # add_help=False, # argv=argv, diff --git a/kvmd/apps/htpasswd/__init__.py b/kvmd/apps/htpasswd/__init__.py index c46564dd..dd1213f7 100644 --- a/kvmd/apps/htpasswd/__init__.py +++ b/kvmd/apps/htpasswd/__init__.py @@ -28,9 +28,7 @@ import contextlib import textwrap import argparse -from typing import List from typing import Generator -from typing import Optional import passlib.apache @@ -125,7 +123,7 @@ def _cmd_delete(config: Section, options: argparse.Namespace) -> None: # ===== -def main(argv: Optional[List[str]]=None) -> None: +def main(argv: (list[str] | None)=None) -> None: (parent_parser, argv, config) = init( add_help=False, cli_logging=True, diff --git a/kvmd/apps/ipmi/__init__.py b/kvmd/apps/ipmi/__init__.py index fc450228..13aa9730 100644 --- a/kvmd/apps/ipmi/__init__.py +++ b/kvmd/apps/ipmi/__init__.py @@ -20,9 +20,6 @@ # ========================================================================== # -from typing import List -from typing import Optional - from ...clients.kvmd import KvmdClient from ... import htclient @@ -34,7 +31,7 @@ from .server import IpmiServer # ===== -def main(argv: Optional[List[str]]=None) -> None: +def main(argv: (list[str] | None)=None) -> None: config = init( prog="kvmd-ipmi", description="IPMI to KVMD proxy", diff --git a/kvmd/apps/ipmi/auth.py b/kvmd/apps/ipmi/auth.py index 0ad3d208..2e495af8 100644 --- a/kvmd/apps/ipmi/auth.py +++ b/kvmd/apps/ipmi/auth.py @@ -22,9 +22,6 @@ import dataclasses -from typing import List -from typing import Dict - # ===== class IpmiPasswdError(Exception): @@ -55,8 +52,8 @@ class IpmiAuthManager: def get_credentials(self, ipmi_user: str) -> IpmiUserCredentials: return self.__credentials[ipmi_user] - def __parse_passwd_file(self, lines: List[str]) -> Dict[str, IpmiUserCredentials]: - credentials: Dict[str, IpmiUserCredentials] = {} + def __parse_passwd_file(self, lines: list[str]) -> dict[str, IpmiUserCredentials]: + credentials: dict[str, IpmiUserCredentials] = {} for (lineno, line) in enumerate(lines): if len(line.strip()) == 0 or line.lstrip().startswith("#"): continue diff --git a/kvmd/apps/ipmi/server.py b/kvmd/apps/ipmi/server.py index 597e6dce..87265607 100644 --- a/kvmd/apps/ipmi/server.py +++ b/kvmd/apps/ipmi/server.py @@ -28,9 +28,6 @@ import multiprocessing import functools import queue -from typing import Dict -from typing import Optional - import aiohttp import serial @@ -83,8 +80,8 @@ class IpmiServer(BaseIpmiServer): # pylint: disable=too-many-instance-attribute self.__sol_proxy_port = (sol_proxy_port or port) self.__sol_lock = threading.Lock() - self.__sol_console: Optional[IpmiConsole] = None - self.__sol_thread: Optional[threading.Thread] = None + self.__sol_console: (IpmiConsole | None) = None + self.__sol_thread: (threading.Thread | None) = None self.__sol_stop = False def run(self) -> None: @@ -100,7 +97,7 @@ class IpmiServer(BaseIpmiServer): # pylint: disable=too-many-instance-attribute # ===== - def handle_raw_request(self, request: Dict, session: IpmiServerSession) -> None: + def handle_raw_request(self, request: dict, session: IpmiServerSession) -> None: handler = { (6, 1): (lambda _, session: self.send_device_id(session)), # Get device ID (6, 7): self.__get_power_state_handler, # Power state @@ -125,13 +122,13 @@ class IpmiServer(BaseIpmiServer): # pylint: disable=too-many-instance-attribute # ===== - def __get_power_state_handler(self, _: Dict, session: IpmiServerSession) -> None: + def __get_power_state_handler(self, _: dict, session: IpmiServerSession) -> None: # https://github.com/arcress0/ipmiutil/blob/e2f6e95127d22e555f959f136d9bb9543c763896/util/ireset.c#L654 result = self.__make_request(session, "atx.get_state() [power]", "atx.get_state") data = [(0 if result["leds"]["power"] else 5)] session.send_ipmi_response(data=data) - def __get_selftest_status_handler(self, _: Dict, session: IpmiServerSession) -> None: + def __get_selftest_status_handler(self, _: dict, session: IpmiServerSession) -> None: # https://github.com/arcress0/ipmiutil/blob/e2f6e95127d22e555f959f136d9bb9543c763896/util/ihealth.c#L858 data = [0x0055] try: @@ -140,12 +137,12 @@ class IpmiServer(BaseIpmiServer): # pylint: disable=too-many-instance-attribute data = [0] session.send_ipmi_response(data=data) - def __get_chassis_status_handler(self, _: Dict, session: IpmiServerSession) -> None: + def __get_chassis_status_handler(self, _: dict, session: IpmiServerSession) -> None: result = self.__make_request(session, "atx.get_state() [chassis]", "atx.get_state") data = [int(result["leds"]["power"]), 0, 0] session.send_ipmi_response(data=data) - def __chassis_control_handler(self, request: Dict, session: IpmiServerSession) -> None: + def __chassis_control_handler(self, request: dict, session: IpmiServerSession) -> None: action = { 0: "off_hard", 1: "on", @@ -179,7 +176,7 @@ class IpmiServer(BaseIpmiServer): # pylint: disable=too-many-instance-attribute # ===== - def __activate_sol_handler(self, _: Dict, session: IpmiServerSession) -> None: + def __activate_sol_handler(self, _: dict, session: IpmiServerSession) -> None: with self.__sol_lock: if not self.__sol_device_path: session.send_ipmi_response(code=0x81) # SOL disabled @@ -198,7 +195,7 @@ class IpmiServer(BaseIpmiServer): # pylint: disable=too-many-instance-attribute ]) self.__start_sol_worker(session) - def __deactivate_sol_handler(self, _: Dict, session: IpmiServerSession) -> None: + def __deactivate_sol_handler(self, _: dict, session: IpmiServerSession) -> None: with self.__sol_lock: if not self.__sol_device_path: session.send_ipmi_response(code=0x81) diff --git a/kvmd/apps/janus/__init__.py b/kvmd/apps/janus/__init__.py index fc67aeb6..d8945e57 100644 --- a/kvmd/apps/janus/__init__.py +++ b/kvmd/apps/janus/__init__.py @@ -20,16 +20,13 @@ # ========================================================================== # -from typing import List -from typing import Optional - from .. import init from .runner import JanusRunner # ===== -def main(argv: Optional[List[str]]=None) -> None: +def main(argv: (list[str] | None)=None) -> None: config = init( prog="kvmd-Janus", description="Janus WebRTC Gateway Runner", diff --git a/kvmd/apps/janus/runner.py b/kvmd/apps/janus/runner.py index 799fd94c..c2d01dad 100644 --- a/kvmd/apps/janus/runner.py +++ b/kvmd/apps/janus/runner.py @@ -3,10 +3,6 @@ import asyncio.subprocess import socket import dataclasses -from typing import Tuple -from typing import List -from typing import Optional - import netifaces from ... import tools @@ -42,9 +38,9 @@ class JanusRunner: # pylint: disable=too-many-instance-attributes check_retries: int, check_retries_delay: float, - cmd: List[str], - cmd_remove: List[str], - cmd_append: List[str], + cmd: list[str], + cmd_remove: list[str], + cmd_append: list[str], ) -> None: self.__stun = Stun(stun_host, stun_port, stun_timeout, stun_retries, stun_retries_delay) @@ -55,8 +51,8 @@ class JanusRunner: # pylint: disable=too-many-instance-attributes self.__cmd = tools.build_cmd(cmd, cmd_remove, cmd_append) - self.__janus_task: Optional[asyncio.Task] = None - self.__janus_proc: Optional[asyncio.subprocess.Process] = None # pylint: disable=no-member + self.__janus_task: (asyncio.Task | None) = None + self.__janus_proc: (asyncio.subprocess.Process | None) = None # pylint: disable=no-member def run(self) -> None: logger = get_logger(0) @@ -68,7 +64,7 @@ class JanusRunner: # pylint: disable=too-many-instance-attributes async def __run(self) -> None: logger = get_logger(0) - prev_netcfg: Optional[_Netcfg] = None + prev_netcfg: (_Netcfg | None) = None while True: retry = 0 netcfg = _Netcfg() @@ -117,7 +113,7 @@ class JanusRunner: # pylint: disable=too-many-instance-attributes get_logger().error("Can't get default IP: %s", tools.efmt(err)) return "" - async def __get_stun_info(self, src_ip: str) -> Tuple[Stun, Tuple[str, str]]: + async def __get_stun_info(self, src_ip: str) -> tuple[Stun, tuple[str, str]]: try: return (self.__stun, (await self.__stun.get_info(src_ip, 0))) except Exception as err: diff --git a/kvmd/apps/janus/stun.py b/kvmd/apps/janus/stun.py index dc41b961..1233d7ea 100644 --- a/kvmd/apps/janus/stun.py +++ b/kvmd/apps/janus/stun.py @@ -5,10 +5,6 @@ import struct import secrets import dataclasses -from typing import Tuple -from typing import Dict -from typing import Optional - from ... import tools from ... import aiotools @@ -25,9 +21,9 @@ class StunAddress: @dataclasses.dataclass(frozen=True) class StunResponse: ok: bool - ext: Optional[StunAddress] = dataclasses.field(default=None) - src: Optional[StunAddress] = dataclasses.field(default=None) - changed: Optional[StunAddress] = dataclasses.field(default=None) + ext: (StunAddress | None) = dataclasses.field(default=None) + src: (StunAddress | None) = dataclasses.field(default=None) + changed: (StunAddress | None) = dataclasses.field(default=None) class StunNatType: @@ -60,9 +56,9 @@ class Stun: self.__retries = retries self.__retries_delay = retries_delay - self.__sock: Optional[socket.socket] = None + self.__sock: (socket.socket | None) = None - async def get_info(self, src_ip: str, src_port: int) -> Tuple[str, str]: + async def get_info(self, src_ip: str, src_port: int) -> tuple[str, str]: (family, _, _, _, addr) = socket.getaddrinfo(src_ip, src_port, type=socket.SOCK_DGRAM)[0] try: with socket.socket(family, socket.SOCK_DGRAM) as self.__sock: @@ -74,7 +70,7 @@ class Stun: finally: self.__sock = None - async def __get_nat_type(self, src_ip: str) -> Tuple[str, StunResponse]: # pylint: disable=too-many-return-statements + async def __get_nat_type(self, src_ip: str) -> tuple[str, StunResponse]: # pylint: disable=too-many-return-statements first = await self.__make_request("First probe") if not first.ok: return (StunNatType.BLOCKED, first) @@ -128,7 +124,7 @@ class Stun: ctx, self.__retries, error) return StunResponse(ok=False) - parsed: Dict[str, StunAddress] = {} + parsed: dict[str, StunAddress] = {} offset = 0 remaining = len(response) while remaining > 0: @@ -146,7 +142,7 @@ class Stun: remaining -= (4 + attr_len) return StunResponse(ok=True, **parsed) - async def __inner_make_request(self, trans_id: bytes, request: bytes, host: str, port: int) -> Tuple[bytes, str]: + async def __inner_make_request(self, trans_id: bytes, request: bytes, host: str, port: int) -> tuple[bytes, str]: assert self.__sock is not None request = struct.pack(">HH", 0x0001, len(request)) + trans_id + request # Bind Request diff --git a/kvmd/apps/kvmd/__init__.py b/kvmd/apps/kvmd/__init__.py index f7536577..5249bdd7 100644 --- a/kvmd/apps/kvmd/__init__.py +++ b/kvmd/apps/kvmd/__init__.py @@ -20,9 +20,6 @@ # ========================================================================== # -from typing import List -from typing import Optional - from ...logging import get_logger from ...plugins.hid import get_hid_class @@ -42,7 +39,7 @@ from .server import KvmdServer # ===== -def main(argv: Optional[List[str]]=None) -> None: +def main(argv: (list[str] | None)=None) -> None: config = init( prog="kvmd", description="The main PiKVM daemon", diff --git a/kvmd/apps/kvmd/api/export.py b/kvmd/apps/kvmd/api/export.py index ca367f30..384fe903 100644 --- a/kvmd/apps/kvmd/api/export.py +++ b/kvmd/apps/kvmd/api/export.py @@ -23,7 +23,6 @@ import asyncio from typing import Any -from typing import List from aiohttp.web import Request from aiohttp.web import Response @@ -56,7 +55,7 @@ class ExportApi: self.__info_manager.get_submanager("fan").get_state(), self.__user_gpio.get_state(), ]) - rows: List[str] = [] + rows: list[str] = [] self.__append_prometheus_rows(rows, atx_state["enabled"], "pikvm_atx_enabled") self.__append_prometheus_rows(rows, atx_state["leds"]["power"], "pikvm_atx_power") @@ -71,7 +70,7 @@ class ExportApi: return Response(text="\n".join(rows)) - def __append_prometheus_rows(self, rows: List[str], value: Any, path: str) -> None: + def __append_prometheus_rows(self, rows: list[str], value: Any, path: str) -> None: if isinstance(value, bool): value = int(value) if isinstance(value, (int, float)): diff --git a/kvmd/apps/kvmd/api/hid.py b/kvmd/apps/kvmd/api/hid.py index 93658e88..6d993d4a 100644 --- a/kvmd/apps/kvmd/api/hid.py +++ b/kvmd/apps/kvmd/api/hid.py @@ -24,10 +24,6 @@ import os import stat import functools -from typing import Tuple -from typing import List -from typing import Dict -from typing import Set from typing import Callable from aiohttp.web import Request @@ -64,10 +60,10 @@ class HidApi: hid: BaseHid, keymap_path: str, - ignore_keys: List[str], + ignore_keys: list[str], - mouse_x_range: Tuple[int, int], - mouse_y_range: Tuple[int, int], + mouse_x_range: tuple[int, int], + mouse_y_range: tuple[int, int], ) -> None: self.__hid = hid @@ -112,8 +108,8 @@ class HidApi: # ===== - async def get_keymaps(self) -> Dict: # Ugly hack to generate hid_keymaps_state (see server.py) - keymaps: Set[str] = set() + async def get_keymaps(self) -> dict: # Ugly hack to generate hid_keymaps_state (see server.py) + keymaps: set[str] = set() for keymap_name in os.listdir(self.__keymaps_dir_path): path = os.path.join(self.__keymaps_dir_path, keymap_name) if os.access(path, os.R_OK) and stat.S_ISREG(os.stat(path).st_mode): @@ -139,7 +135,7 @@ class HidApi: self.__hid.send_key_events(text_to_web_keys(text, symmap)) return make_json_response() - def __ensure_symmap(self, keymap_name: str) -> Dict[int, Dict[int, str]]: + def __ensure_symmap(self, keymap_name: str) -> dict[int, dict[int, str]]: keymap_name = valid_printable_filename(keymap_name, "keymap") path = os.path.join(self.__keymaps_dir_path, keymap_name) try: @@ -151,14 +147,14 @@ class HidApi: return self.__inner_ensure_symmap(path, st.st_mtime) @functools.lru_cache(maxsize=10) - def __inner_ensure_symmap(self, path: str, mtime: int) -> Dict[int, Dict[int, str]]: + def __inner_ensure_symmap(self, path: str, mtime: int) -> dict[int, dict[int, str]]: _ = mtime # For LRU return build_symmap(path) # ===== @exposed_ws("key") - async def __ws_key_handler(self, _: WsSession, event: Dict) -> None: + async def __ws_key_handler(self, _: WsSession, event: dict) -> None: try: key = valid_hid_key(event["key"]) state = valid_bool(event["state"]) @@ -168,7 +164,7 @@ class HidApi: self.__hid.send_key_events([(key, state)]) @exposed_ws("mouse_button") - async def __ws_mouse_button_handler(self, _: WsSession, event: Dict) -> None: + async def __ws_mouse_button_handler(self, _: WsSession, event: dict) -> None: try: button = valid_hid_mouse_button(event["button"]) state = valid_bool(event["state"]) @@ -177,7 +173,7 @@ class HidApi: self.__hid.send_mouse_button_event(button, state) @exposed_ws("mouse_move") - async def __ws_mouse_move_handler(self, _: WsSession, event: Dict) -> None: + async def __ws_mouse_move_handler(self, _: WsSession, event: dict) -> None: try: to_x = valid_hid_mouse_move(event["to"]["x"]) to_y = valid_hid_mouse_move(event["to"]["y"]) @@ -186,14 +182,14 @@ class HidApi: self.__send_mouse_move_event_remapped(to_x, to_y) @exposed_ws("mouse_relative") - async def __ws_mouse_relative_handler(self, _: WsSession, event: Dict) -> None: + async def __ws_mouse_relative_handler(self, _: WsSession, event: dict) -> None: self.__process_delta_ws_request(event, self.__hid.send_mouse_relative_event) @exposed_ws("mouse_wheel") - async def __ws_mouse_wheel_handler(self, _: WsSession, event: Dict) -> None: + async def __ws_mouse_wheel_handler(self, _: WsSession, event: dict) -> None: self.__process_delta_ws_request(event, self.__hid.send_mouse_wheel_event) - def __process_delta_ws_request(self, event: Dict, handler: Callable[[int, int], None]) -> None: + def __process_delta_ws_request(self, event: dict, handler: Callable[[int, int], None]) -> None: try: raw_delta = event["delta"] deltas = [ diff --git a/kvmd/apps/kvmd/api/info.py b/kvmd/apps/kvmd/api/info.py index bb7caf79..5517ced0 100644 --- a/kvmd/apps/kvmd/api/info.py +++ b/kvmd/apps/kvmd/api/info.py @@ -22,8 +22,6 @@ import asyncio -from typing import List - from aiohttp.web import Request from aiohttp.web import Response @@ -51,7 +49,7 @@ class InfoApi: ]))) return make_json_response(results) - def __valid_info_fields(self, request: Request) -> List[str]: + def __valid_info_fields(self, request: Request) -> list[str]: subs = self.__info_manager.get_subs() return sorted(valid_info_fields( arg=request.query.get("fields", ",".join(subs)), diff --git a/kvmd/apps/kvmd/api/msd.py b/kvmd/apps/kvmd/api/msd.py index 18d3ba2c..16118141 100644 --- a/kvmd/apps/kvmd/api/msd.py +++ b/kvmd/apps/kvmd/api/msd.py @@ -23,10 +23,7 @@ import lzma import time -from typing import Dict from typing import AsyncGenerator -from typing import Optional -from typing import Union import aiohttp import zstandard @@ -153,7 +150,7 @@ class MsdApi: return make_json_response(self.__make_write_info(name, size, written)) @exposed_http("POST", "/msd/write_remote") - async def __write_remote_handler(self, request: Request) -> Union[Response, StreamResponse]: # pylint: disable=too-many-locals + async def __write_remote_handler(self, request: Request) -> (Response | StreamResponse): # pylint: disable=too-many-locals url = valid_url(request.query.get("url")) insecure = valid_bool(request.query.get("insecure", False)) timeout = valid_float_f01(request.query.get("timeout", 10.0)) @@ -161,7 +158,7 @@ class MsdApi: name = "" size = written = 0 - response: Optional[StreamResponse] = None + response: (StreamResponse | None) = None async def stream_write_info() -> None: assert response is not None @@ -206,11 +203,11 @@ class MsdApi: return make_json_exception(err, 400) raise - def __get_remove_incomplete(self, request: Request) -> Optional[bool]: - flag: Optional[str] = request.query.get("remove_incomplete") + def __get_remove_incomplete(self, request: Request) -> (bool | None): + flag: (str | None) = request.query.get("remove_incomplete") return (valid_bool(flag) if flag is not None else None) - def __make_write_info(self, name: str, size: int, written: int) -> Dict: + def __make_write_info(self, name: str, size: int, written: int) -> dict: return {"image": {"name": name, "size": size, "written": written}} # ===== diff --git a/kvmd/apps/kvmd/api/streamer.py b/kvmd/apps/kvmd/api/streamer.py index 401795ce..3030a925 100644 --- a/kvmd/apps/kvmd/api/streamer.py +++ b/kvmd/apps/kvmd/api/streamer.py @@ -20,9 +20,6 @@ # ========================================================================== # -from typing import List -from typing import Dict - from aiohttp.web import Request from aiohttp.web import Response @@ -102,10 +99,10 @@ class StreamerApi: # ===== - async def get_ocr(self) -> Dict: # XXX: Ugly hack + async def get_ocr(self) -> dict: # XXX: Ugly hack enabled = self.__ocr.is_available() - default: List[str] = [] - available: List[str] = [] + default: list[str] = [] + available: list[str] = [] if enabled: default = self.__ocr.get_default_langs() available = self.__ocr.get_available_langs() diff --git a/kvmd/apps/kvmd/auth.py b/kvmd/apps/kvmd/auth.py index 0f437a6b..cfc2fb6a 100644 --- a/kvmd/apps/kvmd/auth.py +++ b/kvmd/apps/kvmd/auth.py @@ -22,10 +22,6 @@ import secrets -from typing import List -from typing import Dict -from typing import Optional - from ...logging import get_logger from ... import aiotools @@ -40,12 +36,12 @@ class AuthManager: self, internal_type: str, - internal_kwargs: Dict, + internal_kwargs: dict, external_type: str, - external_kwargs: Dict, + external_kwargs: dict, - force_internal_users: List[str], + force_internal_users: list[str], enabled: bool, ) -> None: @@ -53,19 +49,19 @@ class AuthManager: if not enabled: get_logger().warning("AUTHORIZATION IS DISABLED") - self.__internal_service: Optional[BaseAuthService] = None + self.__internal_service: (BaseAuthService | None) = None if enabled: self.__internal_service = get_auth_service_class(internal_type)(**internal_kwargs) get_logger().info("Using internal auth service %r", self.__internal_service.get_plugin_name()) - self.__external_service: Optional[BaseAuthService] = None + self.__external_service: (BaseAuthService | None) = None if enabled and external_type: self.__external_service = get_auth_service_class(external_type)(**external_kwargs) get_logger().info("Using external auth service %r", self.__external_service.get_plugin_name()) self.__force_internal_users = force_internal_users - self.__tokens: Dict[str, str] = {} # {token: user} + self.__tokens: dict[str, str] = {} # {token: user} def is_auth_enabled(self) -> bool: return self.__enabled @@ -88,7 +84,7 @@ class AuthManager: get_logger().error("Got access denied for user %r from auth service %r", user, service.get_plugin_name()) return ok - async def login(self, user: str, passwd: str) -> Optional[str]: + async def login(self, user: str, passwd: str) -> (str | None): assert user == user.strip() assert user assert self.__enabled @@ -109,7 +105,7 @@ class AuthManager: if user: get_logger().info("Logged out user %r", user) - def check(self, token: str) -> Optional[str]: + def check(self, token: str) -> (str | None): assert self.__enabled return self.__tokens.get(token) diff --git a/kvmd/apps/kvmd/info/__init__.py b/kvmd/apps/kvmd/info/__init__.py index f8871d72..032dbb2d 100644 --- a/kvmd/apps/kvmd/info/__init__.py +++ b/kvmd/apps/kvmd/info/__init__.py @@ -20,8 +20,6 @@ # ========================================================================== # -from typing import Set - from ....yamlconf import Section from .base import BaseInfoSubmanager @@ -45,7 +43,7 @@ class InfoManager: "fan": FanInfoSubmanager(**config.kvmd.info.fan._unpack()), } - def get_subs(self) -> Set[str]: + def get_subs(self) -> set[str]: return set(self.__subs) def get_submanager(self, name: str) -> BaseInfoSubmanager: diff --git a/kvmd/apps/kvmd/info/auth.py b/kvmd/apps/kvmd/info/auth.py index bf1c6160..6aef3348 100644 --- a/kvmd/apps/kvmd/info/auth.py +++ b/kvmd/apps/kvmd/info/auth.py @@ -20,8 +20,6 @@ # ========================================================================== # -from typing import Dict - from .base import BaseInfoSubmanager @@ -30,5 +28,5 @@ class AuthInfoSubmanager(BaseInfoSubmanager): def __init__(self, enabled: bool) -> None: self.__enabled = enabled - async def get_state(self) -> Dict: + async def get_state(self) -> dict: return {"enabled": self.__enabled} diff --git a/kvmd/apps/kvmd/info/base.py b/kvmd/apps/kvmd/info/base.py index ae447cf5..51dbda18 100644 --- a/kvmd/apps/kvmd/info/base.py +++ b/kvmd/apps/kvmd/info/base.py @@ -20,11 +20,7 @@ # ========================================================================== # -from typing import Dict -from typing import Optional - - # ===== class BaseInfoSubmanager: - async def get_state(self) -> Optional[Dict]: + async def get_state(self) -> (dict | None): raise NotImplementedError diff --git a/kvmd/apps/kvmd/info/extras.py b/kvmd/apps/kvmd/info/extras.py index 9e70d05c..07a5eabb 100644 --- a/kvmd/apps/kvmd/info/extras.py +++ b/kvmd/apps/kvmd/info/extras.py @@ -24,9 +24,6 @@ import os import re import asyncio -from typing import Dict -from typing import Optional - from ....logging import get_logger from ....yamlconf import Section @@ -45,7 +42,7 @@ class ExtrasInfoSubmanager(BaseInfoSubmanager): def __init__(self, global_config: Section) -> None: self.__global_config = global_config - async def get_state(self) -> Optional[Dict]: + async def get_state(self) -> (dict | None): try: sui = sysunit.SystemdUnitInfo() await sui.open() @@ -53,7 +50,7 @@ class ExtrasInfoSubmanager(BaseInfoSubmanager): get_logger(0).error("Can't open systemd bus to get extras state: %s", tools.efmt(err)) sui = None try: - extras: Dict[str, Dict] = {} + extras: dict[str, dict] = {} for extra in (await asyncio.gather(*[ self.__read_extra(sui, name) for name in os.listdir(self.__get_extras_path()) @@ -71,7 +68,7 @@ class ExtrasInfoSubmanager(BaseInfoSubmanager): def __get_extras_path(self, *parts: str) -> str: return os.path.join(self.__global_config.kvmd.info.extras, *parts) - async def __read_extra(self, sui: Optional[sysunit.SystemdUnitInfo], name: str) -> Dict: + async def __read_extra(self, sui: (sysunit.SystemdUnitInfo | None), name: str) -> dict: try: extra = await aiotools.run_async(load_yaml_file, self.__get_extras_path(name, "manifest.yaml")) await self.__rewrite_app_daemon(sui, extra) @@ -81,7 +78,7 @@ class ExtrasInfoSubmanager(BaseInfoSubmanager): get_logger(0).exception("Can't read extra %r", name) return {} - async def __rewrite_app_daemon(self, sui: Optional[sysunit.SystemdUnitInfo], extra: Dict) -> None: + async def __rewrite_app_daemon(self, sui: (sysunit.SystemdUnitInfo | None), extra: dict) -> None: daemon = extra.get("daemon", "") if isinstance(daemon, str) and daemon.strip(): extra["enabled"] = extra["started"] = False @@ -91,7 +88,7 @@ class ExtrasInfoSubmanager(BaseInfoSubmanager): except Exception as err: get_logger(0).error("Can't get info about the service %r: %s", daemon, tools.efmt(err)) - def __rewrite_app_port(self, extra: Dict) -> None: + def __rewrite_app_port(self, extra: dict) -> None: port_path = extra.get("port", "") if isinstance(port_path, str) and port_path.strip(): extra["port"] = 0 diff --git a/kvmd/apps/kvmd/info/fan.py b/kvmd/apps/kvmd/info/fan.py index afa7afce..7cff7217 100644 --- a/kvmd/apps/kvmd/info/fan.py +++ b/kvmd/apps/kvmd/info/fan.py @@ -23,9 +23,7 @@ import copy import asyncio -from typing import Dict from typing import AsyncGenerator -from typing import Optional import aiohttp @@ -55,15 +53,15 @@ class FanInfoSubmanager(BaseInfoSubmanager): self.__timeout = timeout self.__state_poll = state_poll - async def get_state(self) -> Dict: + async def get_state(self) -> dict: monitored = await self.__get_monitored() return { "monitored": monitored, "state": ((await self.__get_fan_state() if monitored else None)), } - async def poll_state(self) -> AsyncGenerator[Dict, None]: - prev_state: Dict = {} + async def poll_state(self) -> AsyncGenerator[dict, None]: + prev_state: dict = {} while True: if self.__unix_path: pure = state = await self.get_state() @@ -93,7 +91,7 @@ class FanInfoSubmanager(BaseInfoSubmanager): get_logger(0).error("Can't get info about the service %r: %s", self.__daemon, tools.efmt(err)) return False - async def __get_fan_state(self) -> Optional[Dict]: + async def __get_fan_state(self) -> (dict | None): try: async with self.__make_http_session() as session: async with session.get("http://localhost/state") as response: @@ -104,7 +102,7 @@ class FanInfoSubmanager(BaseInfoSubmanager): return None def __make_http_session(self) -> aiohttp.ClientSession: - kwargs: Dict = { + kwargs: dict = { "headers": { "User-Agent": htclient.make_user_agent("KVMD"), }, diff --git a/kvmd/apps/kvmd/info/hw.py b/kvmd/apps/kvmd/info/hw.py index a99fb09d..86efd761 100644 --- a/kvmd/apps/kvmd/info/hw.py +++ b/kvmd/apps/kvmd/info/hw.py @@ -23,12 +23,9 @@ import os import asyncio -from typing import List -from typing import Dict from typing import Callable from typing import AsyncGenerator from typing import TypeVar -from typing import Optional from ....logging import get_logger @@ -48,16 +45,16 @@ _RetvalT = TypeVar("_RetvalT") class HwInfoSubmanager(BaseInfoSubmanager): def __init__( self, - vcgencmd_cmd: List[str], + vcgencmd_cmd: list[str], state_poll: float, ) -> None: self.__vcgencmd_cmd = vcgencmd_cmd self.__state_poll = state_poll - self.__dt_cache: Dict[str, str] = {} + self.__dt_cache: dict[str, str] = {} - async def get_state(self) -> Dict: + async def get_state(self) -> dict: (model, serial, cpu_temp, throttling) = await asyncio.gather( self.__read_dt_file("model"), self.__read_dt_file("serial-number"), @@ -78,8 +75,8 @@ class HwInfoSubmanager(BaseInfoSubmanager): }, } - async def poll_state(self) -> AsyncGenerator[Dict, None]: - prev_state: Dict = {} + async def poll_state(self) -> AsyncGenerator[dict, None]: + prev_state: dict = {} while True: state = await self.get_state() if state != prev_state: @@ -89,7 +86,7 @@ class HwInfoSubmanager(BaseInfoSubmanager): # ===== - async def __read_dt_file(self, name: str) -> Optional[str]: + async def __read_dt_file(self, name: str) -> (str | None): if name not in self.__dt_cache: path = os.path.join(f"{env.PROCFS_PREFIX}/proc/device-tree", name) try: @@ -99,7 +96,7 @@ class HwInfoSubmanager(BaseInfoSubmanager): return None return self.__dt_cache[name] - async def __get_cpu_temp(self) -> Optional[float]: + async def __get_cpu_temp(self) -> (float | None): temp_path = f"{env.SYSFS_PREFIX}/sys/class/thermal/thermal_zone0/temp" try: return int((await aiofs.read(temp_path)).strip()) / 1000 @@ -107,7 +104,7 @@ class HwInfoSubmanager(BaseInfoSubmanager): get_logger(0).error("Can't read CPU temp from %s: %s", temp_path, err) return None - async def __get_throttling(self) -> Optional[Dict]: + async def __get_throttling(self) -> (dict | None): # https://www.raspberrypi.org/forums/viewtopic.php?f=63&t=147781&start=50#p972790 flags = await self.__parse_vcgencmd( arg="get_throttled", @@ -133,7 +130,7 @@ class HwInfoSubmanager(BaseInfoSubmanager): } return None - async def __parse_vcgencmd(self, arg: str, parser: Callable[[str], _RetvalT]) -> Optional[_RetvalT]: + async def __parse_vcgencmd(self, arg: str, parser: Callable[[str], _RetvalT]) -> (_RetvalT | None): cmd = [*self.__vcgencmd_cmd, arg] try: text = (await aioproc.read_process(cmd, err_to_null=True))[1] diff --git a/kvmd/apps/kvmd/info/meta.py b/kvmd/apps/kvmd/info/meta.py index 86a9be26..6a92d7ea 100644 --- a/kvmd/apps/kvmd/info/meta.py +++ b/kvmd/apps/kvmd/info/meta.py @@ -20,9 +20,6 @@ # ========================================================================== # -from typing import Dict -from typing import Optional - from ....logging import get_logger from ....yamlconf.loader import load_yaml_file @@ -37,7 +34,7 @@ class MetaInfoSubmanager(BaseInfoSubmanager): def __init__(self, meta_path: str) -> None: self.__meta_path = meta_path - async def get_state(self) -> Optional[Dict]: + async def get_state(self) -> (dict | None): try: return ((await aiotools.run_async(load_yaml_file, self.__meta_path)) or {}) except Exception: diff --git a/kvmd/apps/kvmd/info/system.py b/kvmd/apps/kvmd/info/system.py index 5c46a5c6..9037a0c2 100644 --- a/kvmd/apps/kvmd/info/system.py +++ b/kvmd/apps/kvmd/info/system.py @@ -24,9 +24,6 @@ import os import asyncio import platform -from typing import List -from typing import Dict - from ....logging import get_logger from .... import aioproc @@ -38,10 +35,10 @@ from .base import BaseInfoSubmanager # ===== class SystemInfoSubmanager(BaseInfoSubmanager): - def __init__(self, streamer_cmd: List[str]) -> None: + def __init__(self, streamer_cmd: list[str]) -> None: self.__streamer_cmd = streamer_cmd - async def get_state(self) -> Dict: + async def get_state(self) -> dict: streamer_info = await self.__get_streamer_info() uname_info = platform.uname() # Uname using the internal cache return { @@ -55,9 +52,9 @@ class SystemInfoSubmanager(BaseInfoSubmanager): # ===== - async def __get_streamer_info(self) -> Dict: + async def __get_streamer_info(self) -> dict: version = "" - features: Dict[str, bool] = {} + features: dict[str, bool] = {} try: path = self.__streamer_cmd[0] ((_, version), (_, features_text)) = await asyncio.gather( diff --git a/kvmd/apps/kvmd/logreader.py b/kvmd/apps/kvmd/logreader.py index dfe2125a..cdd46744 100644 --- a/kvmd/apps/kvmd/logreader.py +++ b/kvmd/apps/kvmd/logreader.py @@ -24,7 +24,6 @@ import re import asyncio import time -from typing import Dict from typing import AsyncGenerator import systemd.journal @@ -32,7 +31,7 @@ import systemd.journal # ===== class LogReader: - async def poll_log(self, seek: int, follow: bool) -> AsyncGenerator[Dict, None]: + async def poll_log(self, seek: int, follow: bool) -> AsyncGenerator[dict, None]: reader = systemd.journal.Reader() reader.this_boot() reader.this_machine() @@ -59,7 +58,7 @@ class LogReader: else: await asyncio.sleep(1) - def __entry_to_record(self, entry: Dict) -> Dict[str, Dict]: + def __entry_to_record(self, entry: dict) -> dict[str, dict]: return { "dt": entry["__REALTIME_TIMESTAMP"], "service": entry["_SYSTEMD_UNIT"], diff --git a/kvmd/apps/kvmd/streamer.py b/kvmd/apps/kvmd/streamer.py index fb209033..096737c8 100644 --- a/kvmd/apps/kvmd/streamer.py +++ b/kvmd/apps/kvmd/streamer.py @@ -27,11 +27,7 @@ import asyncio.subprocess import dataclasses import functools -from typing import Tuple -from typing import List -from typing import Dict from typing import AsyncGenerator -from typing import Optional from typing import Any import aiohttp @@ -53,7 +49,7 @@ class StreamerSnapshot: width: int height: int mtime: float - headers: Tuple[Tuple[str, str], ...] + headers: tuple[tuple[str, str], ...] data: bytes async def make_preview(self, max_width: int, max_height: int, quality: int) -> bytes: @@ -98,7 +94,7 @@ class _StreamerParams: quality: int, resolution: str, - available_resolutions: List[str], + available_resolutions: list[str], desired_fps: int, desired_fps_min: int, @@ -117,8 +113,8 @@ class _StreamerParams: self.__has_resolution = bool(resolution) self.__has_h264 = bool(h264_bitrate) - self.__params: Dict = {self.__DESIRED_FPS: min(max(desired_fps, desired_fps_min), desired_fps_max)} - self.__limits: Dict = {self.__DESIRED_FPS: {"min": desired_fps_min, "max": desired_fps_max}} + self.__params: dict = {self.__DESIRED_FPS: min(max(desired_fps, desired_fps_min), desired_fps_max)} + self.__limits: dict = {self.__DESIRED_FPS: {"min": desired_fps_min, "max": desired_fps_max}} if self.__has_quality: self.__params[self.__QUALITY] = quality @@ -133,23 +129,23 @@ class _StreamerParams: self.__params[self.__H264_GOP] = min(max(h264_gop, h264_gop_min), h264_gop_max) self.__limits[self.__H264_GOP] = {"min": h264_gop_min, "max": h264_gop_max} - def get_features(self) -> Dict: + def get_features(self) -> dict: return { self.__QUALITY: self.__has_quality, self.__RESOLUTION: self.__has_resolution, "h264": self.__has_h264, } - def get_limits(self) -> Dict: + def get_limits(self) -> dict: limits = dict(self.__limits) if self.__has_resolution: limits[self.__AVAILABLE_RESOLUTIONS] = list(limits[self.__AVAILABLE_RESOLUTIONS]) return limits - def get_params(self) -> Dict: + def get_params(self) -> dict: return dict(self.__params) - def set_params(self, params: Dict) -> None: + def set_params(self, params: dict) -> None: new_params = dict(self.__params) if self.__QUALITY in params and self.__has_quality: @@ -187,9 +183,9 @@ class Streamer: # pylint: disable=too-many-instance-attributes process_name_prefix: str, - cmd: List[str], - cmd_remove: List[str], - cmd_append: List[str], + cmd: list[str], + cmd_remove: list[str], + cmd_append: list[str], **params_kwargs: Any, ) -> None: @@ -207,15 +203,15 @@ class Streamer: # pylint: disable=too-many-instance-attributes self.__params = _StreamerParams(**params_kwargs) - self.__stop_task: Optional[asyncio.Task] = None + self.__stop_task: (asyncio.Task | None) = None self.__stop_wip = False - self.__streamer_task: Optional[asyncio.Task] = None - self.__streamer_proc: Optional[asyncio.subprocess.Process] = None # pylint: disable=no-member + self.__streamer_task: (asyncio.Task | None) = None + self.__streamer_proc: (asyncio.subprocess.Process | None) = None # pylint: disable=no-member - self.__http_session: Optional[aiohttp.ClientSession] = None + self.__http_session: (aiohttp.ClientSession | None) = None - self.__snapshot: Optional[StreamerSnapshot] = None + self.__snapshot: (StreamerSnapshot | None) = None self.__notifier = aiotools.AioNotifier() @@ -280,16 +276,16 @@ class Streamer: # pylint: disable=too-many-instance-attributes # ===== - def set_params(self, params: Dict) -> None: + def set_params(self, params: dict) -> None: assert not self.__streamer_task return self.__params.set_params(params) - def get_params(self) -> Dict: + def get_params(self) -> dict: return self.__params.get_params() # ===== - async def get_state(self) -> Dict: + async def get_state(self) -> dict: streamer_state = None if self.__streamer_task: session = self.__ensure_http_session() @@ -302,7 +298,7 @@ class Streamer: # pylint: disable=too-many-instance-attributes except Exception: get_logger().exception("Invalid streamer response from /state") - snapshot: Optional[Dict] = None + snapshot: (dict | None) = None if self.__snapshot: snapshot = dataclasses.asdict(self.__snapshot) del snapshot["headers"] @@ -316,7 +312,7 @@ class Streamer: # pylint: disable=too-many-instance-attributes "features": self.__params.get_features(), } - async def poll_state(self) -> AsyncGenerator[Dict, None]: + async def poll_state(self) -> AsyncGenerator[dict, None]: def signal_handler(*_: Any) -> None: get_logger(0).info("Got SIGUSR2, checking the stream state ...") self.__notifier.notify() @@ -324,8 +320,8 @@ class Streamer: # pylint: disable=too-many-instance-attributes get_logger(0).info("Installing SIGUSR2 streamer handler ...") asyncio.get_event_loop().add_signal_handler(signal.SIGUSR2, signal_handler) - waiter_task: Optional[asyncio.Task] = None - prev_state: Dict = {} + waiter_task: (asyncio.Task | None) = None + prev_state: dict = {} while True: state = await self.get_state() if state != prev_state: @@ -339,7 +335,7 @@ class Streamer: # pylint: disable=too-many-instance-attributes # ===== - async def take_snapshot(self, save: bool, load: bool, allow_offline: bool) -> Optional[StreamerSnapshot]: + async def take_snapshot(self, save: bool, load: bool, allow_offline: bool) -> (StreamerSnapshot | None): if load: return self.__snapshot else: @@ -395,7 +391,7 @@ class Streamer: # pylint: disable=too-many-instance-attributes def __ensure_http_session(self) -> aiohttp.ClientSession: if not self.__http_session: - kwargs: Dict = { + kwargs: dict = { "headers": {"User-Agent": htclient.make_user_agent("KVMD")}, "connector": aiohttp.UnixConnector(path=self.__unix_path), "timeout": aiohttp.ClientTimeout(total=self.__timeout), diff --git a/kvmd/apps/kvmd/sysunit.py b/kvmd/apps/kvmd/sysunit.py index dea5e961..1358ee61 100644 --- a/kvmd/apps/kvmd/sysunit.py +++ b/kvmd/apps/kvmd/sysunit.py @@ -22,10 +22,6 @@ import types -from typing import Tuple -from typing import Type -from typing import Optional - import dbus_next import dbus_next.aio import dbus_next.aio.proxy_object @@ -36,11 +32,11 @@ import dbus_next.errors # ===== class SystemdUnitInfo: def __init__(self) -> None: - self.__bus: Optional[dbus_next.aio.MessageBus] = None - self.__intr: Optional[dbus_next.introspection.Node] = None - self.__manager: Optional[dbus_next.aio.proxy_object.ProxyInterface] = None + self.__bus: (dbus_next.aio.MessageBus | None) = None + self.__intr: (dbus_next.introspection.Node | None) = None + self.__manager: (dbus_next.aio.proxy_object.ProxyInterface | None) = None - async def get_status(self, name: str) -> Tuple[bool, bool]: + async def get_status(self, name: str) -> tuple[bool, bool]: assert self.__bus is not None assert self.__intr is not None assert self.__manager is not None @@ -89,7 +85,7 @@ class SystemdUnitInfo: async def __aexit__( self, - _exc_type: Type[BaseException], + _exc_type: type[BaseException], _exc: BaseException, _tb: types.TracebackType, ) -> None: diff --git a/kvmd/apps/kvmd/tesseract.py b/kvmd/apps/kvmd/tesseract.py index 58cba9ae..fff4df2d 100644 --- a/kvmd/apps/kvmd/tesseract.py +++ b/kvmd/apps/kvmd/tesseract.py @@ -36,10 +36,7 @@ from ctypes import c_char_p from ctypes import c_void_p from ctypes import c_char -from typing import List -from typing import Set from typing import Generator -from typing import Optional from PIL import ImageOps from PIL import Image as PilImage @@ -60,7 +57,7 @@ class _TessBaseAPI(Structure): pass -def _load_libtesseract() -> Optional[ctypes.CDLL]: +def _load_libtesseract() -> (ctypes.CDLL | None): try: path = ctypes.util.find_library("tesseract") if not path: @@ -88,7 +85,7 @@ _libtess = _load_libtesseract() @contextlib.contextmanager -def _tess_api(data_dir_path: str, langs: List[str]) -> Generator[_TessBaseAPI, None, None]: +def _tess_api(data_dir_path: str, langs: list[str]) -> Generator[_TessBaseAPI, None, None]: if not _libtess: raise OcrError("Tesseract is not available") api = _libtess.TessBaseAPICreate() @@ -107,19 +104,19 @@ _LANG_SUFFIX = ".traineddata" # ===== class TesseractOcr: - def __init__(self, data_dir_path: str, default_langs: List[str]) -> None: + def __init__(self, data_dir_path: str, default_langs: list[str]) -> None: self.__data_dir_path = data_dir_path self.__default_langs = default_langs def is_available(self) -> bool: return bool(_libtess) - def get_default_langs(self) -> List[str]: + def get_default_langs(self) -> list[str]: return list(self.__default_langs) - def get_available_langs(self) -> List[str]: + def get_available_langs(self) -> list[str]: # Это быстрее чем, инициализация либы и TessBaseAPIGetAvailableLanguagesAsVector() - langs: Set[str] = set() + langs: set[str] = set() for lang_name in os.listdir(self.__data_dir_path): if lang_name.endswith(_LANG_SUFFIX): path = os.path.join(self.__data_dir_path, lang_name) @@ -129,12 +126,12 @@ class TesseractOcr: langs.add(lang) return sorted(langs) - async def recognize(self, data: bytes, langs: List[str], left: int, top: int, right: int, bottom: int) -> str: + async def recognize(self, data: bytes, langs: list[str], left: int, top: int, right: int, bottom: int) -> str: if not langs: langs = self.__default_langs return (await aiotools.run_async(self.__inner_recognize, data, langs, left, top, right, bottom)) - def __inner_recognize(self, data: bytes, langs: List[str], left: int, top: int, right: int, bottom: int) -> str: + def __inner_recognize(self, data: bytes, langs: list[str], left: int, top: int, right: int, bottom: int) -> str: with _tess_api(self.__data_dir_path, langs) as api: assert _libtess with io.BytesIO(data) as bio: diff --git a/kvmd/apps/kvmd/ugpio.py b/kvmd/apps/kvmd/ugpio.py index b29b7ce4..8b7b31f8 100644 --- a/kvmd/apps/kvmd/ugpio.py +++ b/kvmd/apps/kvmd/ugpio.py @@ -22,11 +22,8 @@ import asyncio -from typing import List -from typing import Dict from typing import AsyncGenerator from typing import Callable -from typing import Optional from typing import Any from ...logging import get_logger @@ -83,7 +80,7 @@ class _GpioInput: self.__driver = driver self.__driver.register_input(self.__pin, config.debounce) - def get_scheme(self) -> Dict: + def get_scheme(self) -> dict: return { "hw": { "driver": self.__driver.get_instance_id(), @@ -91,7 +88,7 @@ class _GpioInput: }, } - async def get_state(self) -> Dict: + async def get_state(self) -> dict: (online, state) = (True, False) try: state = (await self.__driver.read(self.__pin) ^ self.__inverted) @@ -139,7 +136,7 @@ class _GpioOutput: # pylint: disable=too-many-instance-attributes self.__region = aiotools.AioExclusiveRegion(GpioChannelIsBusyError, notifier) - def get_scheme(self) -> Dict: + def get_scheme(self) -> dict: return { "switch": self.__switch, "pulse": { @@ -153,7 +150,7 @@ class _GpioOutput: # pylint: disable=too-many-instance-attributes }, } - async def get_state(self) -> Dict: + async def get_state(self) -> dict: busy = self.__region.is_busy() (online, state) = (True, False) if not busy: @@ -246,8 +243,8 @@ class UserGpio: for (driver, drv_config) in tools.sorted_kvs(config.drivers) } - self.__inputs: Dict[str, _GpioInput] = {} - self.__outputs: Dict[str, _GpioOutput] = {} + self.__inputs: dict[str, _GpioInput] = {} + self.__outputs: dict[str, _GpioOutput] = {} for (channel, ch_config) in tools.sorted_kvs(config.scheme): driver = self.__drivers[ch_config.driver] @@ -256,7 +253,7 @@ class UserGpio: else: # output: self.__outputs[channel] = _GpioOutput(channel, ch_config, driver, self.__notifier) - async def get_model(self) -> Dict: + async def get_model(self) -> dict: return { "scheme": { "inputs": {channel: gin.get_scheme() for (channel, gin) in self.__inputs.items()}, @@ -265,14 +262,14 @@ class UserGpio: "view": self.__make_view(), } - async def get_state(self) -> Dict: + async def get_state(self) -> dict: return { "inputs": {channel: await gin.get_state() for (channel, gin) in self.__inputs.items()}, "outputs": {channel: await gout.get_state() for (channel, gout) in self.__outputs.items()}, } - async def poll_state(self) -> AsyncGenerator[Dict, None]: - prev_state: Dict = {} + async def poll_state(self) -> AsyncGenerator[dict, None]: + prev_state: dict = {} while True: state = await self.get_state() if state != prev_state: @@ -313,14 +310,14 @@ class UserGpio: # ===== - def __make_view(self) -> Dict: - table: List[Optional[List[Dict]]] = [] + def __make_view(self) -> dict: + table: list[list[dict] | None] = [] for row in self.__view["table"]: if len(row) == 0: table.append(None) continue - items: List[Dict] = [] + items: list[dict] = [] for item in map(str.strip, row): if item.startswith("#") or len(item) == 0: items.append(self.__make_view_label(item)) @@ -338,14 +335,14 @@ class UserGpio: "table": table, } - def __make_view_label(self, item: str) -> Dict: + def __make_view_label(self, item: str) -> dict: assert item.startswith("#") return { "type": "label", "text": item[1:].strip(), } - def __make_view_input(self, parts: List[str]) -> Dict: + def __make_view_input(self, parts: list[str]) -> dict: assert len(parts) >= 1 color = (parts[1] if len(parts) > 1 else None) if color not in ["green", "yellow", "red"]: @@ -356,7 +353,7 @@ class UserGpio: "color": color, } - def __make_view_output(self, parts: List[str]) -> Dict: + def __make_view_output(self, parts: list[str]) -> dict: assert len(parts) >= 1 confirm = False text = "Click" diff --git a/kvmd/apps/otg/__init__.py b/kvmd/apps/otg/__init__.py index 9b6f5e69..4c811d2b 100644 --- a/kvmd/apps/otg/__init__.py +++ b/kvmd/apps/otg/__init__.py @@ -29,10 +29,6 @@ import argparse from os.path import join # pylint: disable=ungrouped-imports -from typing import List -from typing import Optional -from typing import Union - from ...logging import get_logger from ...yamlconf import Section @@ -78,7 +74,7 @@ def _unlink(path: str, optional: bool=False) -> None: os.unlink(path) -def _write(path: str, value: Union[str, int], optional: bool=False) -> None: +def _write(path: str, value: (str | int), optional: bool=False) -> None: logger = get_logger() if optional and not os.access(path, os.F_OK): logger.info("WRITE --- [SKIPPED] %s", path) @@ -320,7 +316,7 @@ def _cmd_stop(config: Section) -> None: # ===== -def main(argv: Optional[List[str]]=None) -> None: +def main(argv: (list[str] | None)=None) -> None: (parent_parser, argv, config) = init( add_help=False, argv=argv, diff --git a/kvmd/apps/otg/hid/keyboard.py b/kvmd/apps/otg/hid/keyboard.py index d10a4b73..6a4d0184 100644 --- a/kvmd/apps/otg/hid/keyboard.py +++ b/kvmd/apps/otg/hid/keyboard.py @@ -20,13 +20,11 @@ # ========================================================================== # -from typing import Optional - from . import Hid # ===== -def make_keyboard_hid(report_id: Optional[int]=None) -> Hid: +def make_keyboard_hid(report_id: (int | None)=None) -> Hid: return Hid( protocol=1, # Keyboard protocol subclass=1, # Boot interface subclass diff --git a/kvmd/apps/otg/hid/mouse.py b/kvmd/apps/otg/hid/mouse.py index 71249569..e0f7350b 100644 --- a/kvmd/apps/otg/hid/mouse.py +++ b/kvmd/apps/otg/hid/mouse.py @@ -20,13 +20,11 @@ # ========================================================================== # -from typing import Optional - from . import Hid # ===== -def make_mouse_hid(absolute: bool, horizontal_wheel: bool, report_id: Optional[int]=None) -> Hid: +def make_mouse_hid(absolute: bool, horizontal_wheel: bool, report_id: (int | None)=None) -> Hid: maker = (_make_absolute_hid if absolute else _make_relative_hid) return maker(horizontal_wheel, report_id) @@ -42,7 +40,7 @@ _HORIZONTAL_WHEEL = [ ] -def _make_absolute_hid(horizontal_wheel: bool, report_id: Optional[int]) -> Hid: +def _make_absolute_hid(horizontal_wheel: bool, report_id: (int | None)) -> Hid: return Hid( protocol=0, # None protocol subclass=0, # No subclass @@ -106,7 +104,7 @@ def _make_absolute_hid(horizontal_wheel: bool, report_id: Optional[int]) -> Hid: ) -def _make_relative_hid(horizontal_wheel: bool, report_id: Optional[int]) -> Hid: +def _make_relative_hid(horizontal_wheel: bool, report_id: (int | None)) -> Hid: return Hid( protocol=2, # Mouse protocol subclass=1, # Boot interface subclass diff --git a/kvmd/apps/otgconf/__init__.py b/kvmd/apps/otgconf/__init__.py index 389e1721..1a1e0f99 100644 --- a/kvmd/apps/otgconf/__init__.py +++ b/kvmd/apps/otgconf/__init__.py @@ -26,10 +26,7 @@ import contextlib import argparse import time -from typing import List -from typing import Dict from typing import Generator -from typing import Optional import yaml @@ -64,7 +61,7 @@ class _GadgetControl: with open(udc_path, "w") as udc_file: udc_file.write(udc) - def __read_metas(self) -> Generator[Dict, None, None]: + def __read_metas(self) -> Generator[dict, None, None]: for meta_name in sorted(os.listdir(self.__meta_path)): with open(os.path.join(self.__meta_path, meta_name)) as meta_file: yield json.loads(meta_file.read()) @@ -111,7 +108,7 @@ class _GadgetControl: # ===== -def main(argv: Optional[List[str]]=None) -> None: +def main(argv: (list[str] | None)=None) -> None: (parent_parser, argv, config) = init( add_help=False, cli_logging=True, diff --git a/kvmd/apps/otgmsd/__init__.py b/kvmd/apps/otgmsd/__init__.py index 0d32331b..2540eb0b 100644 --- a/kvmd/apps/otgmsd/__init__.py +++ b/kvmd/apps/otgmsd/__init__.py @@ -24,9 +24,6 @@ import os import errno import argparse -from typing import List -from typing import Optional - from ...validators.basic import valid_bool from ...validators.basic import valid_int_f0 from ...validators.os import valid_abs_file @@ -57,7 +54,7 @@ def _set_param(gadget: str, instance: int, param: str, value: str) -> None: # ===== -def main(argv: Optional[List[str]]=None) -> None: +def main(argv: (list[str] | None)=None) -> None: (parent_parser, argv, config) = init( add_help=False, cli_logging=True, diff --git a/kvmd/apps/otgnet/__init__.py b/kvmd/apps/otgnet/__init__.py index 7a5e4cc7..98213346 100644 --- a/kvmd/apps/otgnet/__init__.py +++ b/kvmd/apps/otgnet/__init__.py @@ -26,9 +26,6 @@ import dataclasses import itertools import argparse -from typing import List -from typing import Optional - from ...logging import get_logger from ...yamlconf import Section @@ -66,25 +63,25 @@ class _Netcfg: # pylint: disable=too-many-instance-attributes class _Service: # pylint: disable=too-many-instance-attributes def __init__(self, config: Section) -> None: self.__iface_net: str = config.otgnet.iface.net - self.__ip_cmd: List[str] = config.otgnet.iface.ip_cmd + self.__ip_cmd: list[str] = config.otgnet.iface.ip_cmd self.__allow_icmp: bool = config.otgnet.firewall.allow_icmp - self.__allow_tcp: List[int] = sorted(set(config.otgnet.firewall.allow_tcp)) - self.__allow_udp: List[int] = sorted(set(config.otgnet.firewall.allow_udp)) + self.__allow_tcp: list[int] = sorted(set(config.otgnet.firewall.allow_tcp)) + self.__allow_udp: list[int] = sorted(set(config.otgnet.firewall.allow_udp)) self.__forward_iface: str = config.otgnet.firewall.forward_iface - self.__iptables_cmd: List[str] = config.otgnet.firewall.iptables_cmd + self.__iptables_cmd: list[str] = config.otgnet.firewall.iptables_cmd - def build_cmd(key: str) -> List[str]: + def build_cmd(key: str) -> list[str]: return tools.build_cmd( getattr(config.otgnet.commands, key), getattr(config.otgnet.commands, f"{key}_remove"), getattr(config.otgnet.commands, f"{key}_append"), ) - self.__pre_start_cmd: List[str] = build_cmd("pre_start_cmd") - self.__post_start_cmd: List[str] = build_cmd("post_start_cmd") - self.__pre_stop_cmd: List[str] = build_cmd("pre_stop_cmd") - self.__post_stop_cmd: List[str] = build_cmd("post_stop_cmd") + self.__pre_start_cmd: list[str] = build_cmd("pre_start_cmd") + self.__post_start_cmd: list[str] = build_cmd("post_start_cmd") + self.__pre_stop_cmd: list[str] = build_cmd("pre_stop_cmd") + self.__post_stop_cmd: list[str] = build_cmd("post_stop_cmd") self.__gadget: str = config.otg.gadget self.__driver: str = config.otg.devices.ethernet.driver @@ -101,7 +98,7 @@ class _Service: # pylint: disable=too-many-instance-attributes key: str(value) for (key, value) in dataclasses.asdict(netcfg).items() } - ctls: List[BaseCtl] = [ + ctls: list[BaseCtl] = [ CustomCtl(self.__pre_start_cmd, self.__post_stop_cmd, placeholders), IfaceUpCtl(self.__ip_cmd, netcfg.iface), *([IptablesAllowIcmpCtl(self.__iptables_cmd, netcfg.iface)] if self.__allow_icmp else []), @@ -185,7 +182,7 @@ class _Service: # pylint: disable=too-many-instance-attributes # ===== -def main(argv: Optional[List[str]]=None) -> None: +def main(argv: (list[str] | None)=None) -> None: (parent_parser, argv, config) = init( add_help=False, argv=argv, diff --git a/kvmd/apps/otgnet/netctl.py b/kvmd/apps/otgnet/netctl.py index f186ea34..15b5ea8c 100644 --- a/kvmd/apps/otgnet/netctl.py +++ b/kvmd/apps/otgnet/netctl.py @@ -20,50 +20,46 @@ # ========================================================================== # -from typing import List -from typing import Dict - - # ===== class BaseCtl: - def get_command(self, direct: bool) -> List[str]: + def get_command(self, direct: bool) -> list[str]: raise NotImplementedError class IfaceUpCtl(BaseCtl): - def __init__(self, base_cmd: List[str], iface: str) -> None: + def __init__(self, base_cmd: list[str], iface: str) -> None: self.__base_cmd = base_cmd self.__iface = iface - def get_command(self, direct: bool) -> List[str]: + def get_command(self, direct: bool) -> list[str]: return [*self.__base_cmd, "link", "set", self.__iface, ("up" if direct else "down")] class IfaceAddIpCtl(BaseCtl): - def __init__(self, base_cmd: List[str], iface: str, cidr: str) -> None: + def __init__(self, base_cmd: list[str], iface: str, cidr: str) -> None: self.__base_cmd = base_cmd self.__iface = iface self.__cidr = cidr - def get_command(self, direct: bool) -> List[str]: + def get_command(self, direct: bool) -> list[str]: return [*self.__base_cmd, "address", ("add" if direct else "del"), self.__cidr, "dev", self.__iface] class IptablesDropAllCtl(BaseCtl): - def __init__(self, base_cmd: List[str], iface: str) -> None: + def __init__(self, base_cmd: list[str], iface: str) -> None: self.__base_cmd = base_cmd self.__iface = iface - def get_command(self, direct: bool) -> List[str]: + def get_command(self, direct: bool) -> list[str]: return [*self.__base_cmd, ("-A" if direct else "-D"), "INPUT", "-i", self.__iface, "-j", "DROP"] class IptablesAllowIcmpCtl(BaseCtl): - def __init__(self, base_cmd: List[str], iface: str) -> None: + def __init__(self, base_cmd: list[str], iface: str) -> None: self.__base_cmd = base_cmd self.__iface = iface - def get_command(self, direct: bool) -> List[str]: + def get_command(self, direct: bool) -> list[str]: return [ *self.__base_cmd, ("-A" if direct else "-D"), "INPUT", "-i", self.__iface, "-p", "icmp", "-j", "ACCEPT", @@ -71,13 +67,13 @@ class IptablesAllowIcmpCtl(BaseCtl): class IptablesAllowPortCtl(BaseCtl): - def __init__(self, base_cmd: List[str], iface: str, port: int, tcp: bool) -> None: + def __init__(self, base_cmd: list[str], iface: str, port: int, tcp: bool) -> None: self.__base_cmd = base_cmd self.__iface = iface self.__port = port self.__proto = ("tcp" if tcp else "udp") - def get_command(self, direct: bool) -> List[str]: + def get_command(self, direct: bool) -> list[str]: return [ *self.__base_cmd, ("-A" if direct else "-D"), "INPUT", "-i", self.__iface, "-p", self.__proto, @@ -86,11 +82,11 @@ class IptablesAllowPortCtl(BaseCtl): class IptablesForwardOut(BaseCtl): - def __init__(self, base_cmd: List[str], iface: str) -> None: + def __init__(self, base_cmd: list[str], iface: str) -> None: self.__base_cmd = base_cmd self.__iface = iface - def get_command(self, direct: bool) -> List[str]: + def get_command(self, direct: bool) -> list[str]: return [ *self.__base_cmd, "--table", "nat", @@ -100,11 +96,11 @@ class IptablesForwardOut(BaseCtl): class IptablesForwardIn(BaseCtl): - def __init__(self, base_cmd: List[str], iface: str) -> None: + def __init__(self, base_cmd: list[str], iface: str) -> None: self.__base_cmd = base_cmd self.__iface = iface - def get_command(self, direct: bool) -> List[str]: + def get_command(self, direct: bool) -> list[str]: return [ *self.__base_cmd, ("-A" if direct else "-D"), "FORWARD", @@ -115,16 +111,16 @@ class IptablesForwardIn(BaseCtl): class CustomCtl(BaseCtl): def __init__( self, - direct_cmd: List[str], - reverse_cmd: List[str], - placeholders: Dict[str, str], + direct_cmd: list[str], + reverse_cmd: list[str], + placeholders: dict[str, str], ) -> None: self.__direct_cmd = direct_cmd self.__reverse_cmd = reverse_cmd self.__placeholders = placeholders - def get_command(self, direct: bool) -> List[str]: + def get_command(self, direct: bool) -> list[str]: return [ part.format(**self.__placeholders) for part in (self.__direct_cmd if direct else self.__reverse_cmd) diff --git a/kvmd/apps/pst/__init__.py b/kvmd/apps/pst/__init__.py index 2bb29a08..8f92e39d 100644 --- a/kvmd/apps/pst/__init__.py +++ b/kvmd/apps/pst/__init__.py @@ -20,9 +20,6 @@ # ========================================================================== # -from typing import List -from typing import Optional - from ...logging import get_logger from .. import init @@ -31,7 +28,7 @@ from .server import PstServer # ===== -def main(argv: Optional[List[str]]=None) -> None: +def main(argv: (list[str] | None)=None) -> None: config = init( prog="kvmd-pst", description="The KVMD persistent storage manager", diff --git a/kvmd/apps/pst/server.py b/kvmd/apps/pst/server.py index a492a29b..1df26f61 100644 --- a/kvmd/apps/pst/server.py +++ b/kvmd/apps/pst/server.py @@ -23,9 +23,6 @@ import os import asyncio -from typing import List -from typing import Dict - from aiohttp.web import Request from aiohttp.web import WebSocketResponse @@ -48,7 +45,7 @@ class PstServer(HttpServer): # pylint: disable=too-many-arguments,too-many-inst storage_path: str, ro_retries_delay: float, ro_cleanup_delay: float, - remount_cmd: List[str], + remount_cmd: list[str], ) -> None: super().__init__() @@ -69,7 +66,7 @@ class PstServer(HttpServer): # pylint: disable=too-many-arguments,too-many-inst return (await self._ws_loop(ws)) @exposed_ws("ping") - async def __ws_ping_handler(self, ws: WsSession, _: Dict) -> None: + async def __ws_ping_handler(self, ws: WsSession, _: dict) -> None: await ws.send_event("pong", {}) # ===== SYSTEM STUFF diff --git a/kvmd/apps/pstrun/__init__.py b/kvmd/apps/pstrun/__init__.py index 2b1b895a..c164e533 100644 --- a/kvmd/apps/pstrun/__init__.py +++ b/kvmd/apps/pstrun/__init__.py @@ -27,9 +27,6 @@ import asyncio import asyncio.subprocess import argparse -from typing import List -from typing import Optional - import aiohttp from ...logging import get_logger @@ -53,7 +50,7 @@ def _preexec() -> None: get_logger(0).info("Can't perform tcsetpgrp(0): %s", tools.efmt(err)) -async def _run_process(cmd: List[str], data_path: str) -> asyncio.subprocess.Process: # pylint: disable=no-member +async def _run_process(cmd: list[str], data_path: str) -> asyncio.subprocess.Process: # pylint: disable=no-member # https://stackoverflow.com/questions/58918188/why-is-stdin-not-propagated-to-child-process-of-different-process-group if os.isatty(0): signal.signal(signal.SIGTTOU, signal.SIG_IGN) @@ -67,11 +64,11 @@ async def _run_process(cmd: List[str], data_path: str) -> asyncio.subprocess.Pro )) -async def _run_cmd_ws(cmd: List[str], ws: aiohttp.ClientWebSocketResponse) -> int: # pylint: disable=too-many-branches +async def _run_cmd_ws(cmd: list[str], ws: aiohttp.ClientWebSocketResponse) -> int: # pylint: disable=too-many-branches logger = get_logger(0) - receive_task: Optional[asyncio.Task] = None - proc_task: Optional[asyncio.Task] = None - proc: Optional[asyncio.subprocess.Process] = None # pylint: disable=no-member + receive_task: (asyncio.Task | None) = None + proc_task: (asyncio.Task | None) = None + proc: (asyncio.subprocess.Process | None) = None # pylint: disable=no-member try: # pylint: disable=too-many-nested-blocks while True: @@ -120,7 +117,7 @@ async def _run_cmd_ws(cmd: List[str], ws: aiohttp.ClientWebSocketResponse) -> in return 1 -async def _run_cmd(cmd: List[str], unix_path: str) -> None: +async def _run_cmd(cmd: list[str], unix_path: str) -> None: get_logger(0).info("Opening PST session ...") async with aiohttp.ClientSession( headers={"User-Agent": htclient.make_user_agent("KVMD-PSTRun")}, @@ -132,7 +129,7 @@ async def _run_cmd(cmd: List[str], unix_path: str) -> None: # ===== -def main(argv: Optional[List[str]]=None) -> None: +def main(argv: (list[str] | None)=None) -> None: (parent_parser, argv, config) = init( add_help=False, cli_logging=True, diff --git a/kvmd/apps/vnc/__init__.py b/kvmd/apps/vnc/__init__.py index d7e2c68a..3653f416 100644 --- a/kvmd/apps/vnc/__init__.py +++ b/kvmd/apps/vnc/__init__.py @@ -20,9 +20,6 @@ # ========================================================================== # -from typing import List -from typing import Optional - from ...clients.kvmd import KvmdClient from ...clients.streamer import StreamFormats from ...clients.streamer import BaseStreamerClient @@ -38,7 +35,7 @@ from .server import VncServer # ===== -def main(argv: Optional[List[str]]=None) -> None: +def main(argv: (list[str] | None)=None) -> None: config = init( prog="kvmd-vnc", description="VNC to KVMD proxy", @@ -48,12 +45,12 @@ def main(argv: Optional[List[str]]=None) -> None: user_agent = htclient.make_user_agent("KVMD-VNC") - def make_memsink_streamer(name: str, fmt: int) -> Optional[MemsinkStreamerClient]: + def make_memsink_streamer(name: str, fmt: int) -> (MemsinkStreamerClient | None): if getattr(config.memsink, name).sink: return MemsinkStreamerClient(name.upper(), fmt, **getattr(config.memsink, name)._unpack()) return None - streamers: List[BaseStreamerClient] = list(filter(None, [ + streamers: list[BaseStreamerClient] = list(filter(None, [ make_memsink_streamer("h264", StreamFormats.H264), make_memsink_streamer("jpeg", StreamFormats.JPEG), HttpStreamerClient(name="JPEG", user_agent=user_agent, **config.streamer._unpack()), diff --git a/kvmd/apps/vnc/rfb/__init__.py b/kvmd/apps/vnc/rfb/__init__.py index 059d4d1d..f715f9b7 100644 --- a/kvmd/apps/vnc/rfb/__init__.py +++ b/kvmd/apps/vnc/rfb/__init__.py @@ -23,9 +23,6 @@ import asyncio import ssl -from typing import Tuple -from typing import List -from typing import Dict from typing import Callable from typing import Coroutine @@ -66,7 +63,7 @@ class RfbClient(RfbClientStream): # pylint: disable=too-many-instance-attribute width: int, height: int, name: str, - vnc_passwds: List[str], + vnc_passwds: list[str], vencrypt: bool, none_auth_only: bool, ) -> None: @@ -103,7 +100,7 @@ class RfbClient(RfbClientStream): # pylint: disable=too-many-instance-attribute finally: await aiotools.shield_fg(self.__cleanup(tasks)) - async def __cleanup(self, tasks: List[asyncio.Task]) -> None: + async def __cleanup(self, tasks: list[asyncio.Task]) -> None: for task in tasks: task.cancel() await asyncio.gather(*tasks, return_exceptions=True) @@ -150,7 +147,7 @@ class RfbClient(RfbClientStream): # pylint: disable=too-many-instance-attribute async def _on_ext_key_event(self, code: int, state: bool) -> None: raise NotImplementedError - async def _on_pointer_event(self, buttons: Dict[str, bool], wheel: Dict[str, int], move: Dict[str, int]) -> None: + async def _on_pointer_event(self, buttons: dict[str, bool], wheel: dict[str, int], move: dict[str, int]) -> None: raise NotImplementedError async def _on_cut_event(self, text: str) -> None: @@ -232,7 +229,7 @@ class RfbClient(RfbClientStream): # pylint: disable=too-many-instance-attribute # ===== async def __handshake_security(self) -> None: - sec_types: Dict[int, Tuple[str, Callable]] = {} + sec_types: dict[int, tuple[str, Callable]] = {} if self.__vencrypt and self.__rfb_version > 3: sec_types[19] = ("VeNCrypt", self.__handshake_security_vencrypt) if self.__none_auth_only: diff --git a/kvmd/apps/vnc/rfb/crypto.py b/kvmd/apps/vnc/rfb/crypto.py index 854dceb3..8953bef5 100644 --- a/kvmd/apps/vnc/rfb/crypto.py +++ b/kvmd/apps/vnc/rfb/crypto.py @@ -22,8 +22,6 @@ import os -from typing import List - import passlib.crypto.des @@ -43,7 +41,7 @@ def rfb_encrypt_challenge(challenge: bytes, passwd: bytes) -> bytes: def _make_key(passwd: bytes) -> bytes: passwd = (passwd + b"\0" * 8)[:8] - key: List[int] = [] + key: list[int] = [] for ch in passwd: btgt = 0 for index in range(8): diff --git a/kvmd/apps/vnc/rfb/encodings.py b/kvmd/apps/vnc/rfb/encodings.py index de51dde5..e153b5e8 100644 --- a/kvmd/apps/vnc/rfb/encodings.py +++ b/kvmd/apps/vnc/rfb/encodings.py @@ -22,10 +22,6 @@ import dataclasses -from typing import List -from typing import Dict -from typing import FrozenSet -from typing import Union from typing import Any @@ -45,13 +41,13 @@ class RfbEncodings: H264 = 50 # Open H.264 Encoding -def _make_meta(variants: Union[int, FrozenSet[int]]) -> Dict: +def _make_meta(variants: (int | frozenset[int])) -> dict: return {"variants": (frozenset([variants]) if isinstance(variants, int) else variants)} @dataclasses.dataclass(frozen=True) class RfbClientEncodings: # pylint: disable=too-many-instance-attributes - encodings: FrozenSet[int] + encodings: frozenset[int] has_resize: bool = dataclasses.field(default=False, metadata=_make_meta(RfbEncodings.RESIZE)) # noqa: E224 has_rename: bool = dataclasses.field(default=False, metadata=_make_meta(RfbEncodings.RENAME)) # noqa: E224 @@ -63,8 +59,8 @@ class RfbClientEncodings: # pylint: disable=too-many-instance-attributes has_h264: bool = dataclasses.field(default=False, metadata=_make_meta(RfbEncodings.H264)) # noqa: E224 - def get_summary(self) -> List[str]: - summary: List[str] = [f"encodings -- {sorted(self.encodings)}"] + def get_summary(self) -> list[str]: + summary: list[str] = [f"encodings -- {sorted(self.encodings)}"] for field in dataclasses.fields(self): if field.name != "encodings": found = ", ".join(map(str, sorted(map(int, self.__get_found(field))))) @@ -80,7 +76,7 @@ class RfbClientEncodings: # pylint: disable=too-many-instance-attributes def __set_value(self, key: str, value: Any) -> None: object.__setattr__(self, key, value) - def __get_found(self, field: dataclasses.Field) -> FrozenSet[int]: + def __get_found(self, field: dataclasses.Field) -> frozenset[int]: return self.encodings.intersection(field.metadata["variants"]) def __get_tight_jpeg_quality(self) -> int: diff --git a/kvmd/apps/vnc/rfb/stream.py b/kvmd/apps/vnc/rfb/stream.py index ee42436e..dfa6cb13 100644 --- a/kvmd/apps/vnc/rfb/stream.py +++ b/kvmd/apps/vnc/rfb/stream.py @@ -24,9 +24,6 @@ import asyncio import ssl import struct -from typing import Tuple -from typing import Union - from .... import aiotools from .errors import RfbConnectionError @@ -57,7 +54,7 @@ class RfbClientStream: except (ConnectionError, asyncio.IncompleteReadError) as err: raise RfbConnectionError(f"Can't read {msg}", err) - async def _read_struct(self, msg: str, fmt: str) -> Tuple[int, ...]: + async def _read_struct(self, msg: str, fmt: str) -> tuple[int, ...]: assert len(fmt) > 1 try: fmt = f">{fmt}" @@ -73,7 +70,7 @@ class RfbClientStream: # ===== - async def _write_struct(self, msg: str, fmt: str, *values: Union[int, bytes], drain: bool=True) -> None: + async def _write_struct(self, msg: str, fmt: str, *values: (int | bytes), drain: bool=True) -> None: try: if not fmt: for value in values: diff --git a/kvmd/apps/vnc/server.py b/kvmd/apps/vnc/server.py index 7fac9aa4..c6ea076a 100644 --- a/kvmd/apps/vnc/server.py +++ b/kvmd/apps/vnc/server.py @@ -26,11 +26,6 @@ import socket import dataclasses import contextlib -from typing import List -from typing import Dict -from typing import Union -from typing import Optional - import aiohttp from ...logging import get_logger @@ -83,12 +78,12 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes desired_fps: int, keymap_name: str, - symmap: Dict[int, Dict[int, str]], + symmap: dict[int, dict[int, str]], kvmd: KvmdClient, - streamers: List[BaseStreamerClient], + streamers: list[BaseStreamerClient], - vnc_credentials: Dict[str, VncAuthKvmdCredentials], + vnc_credentials: dict[str, VncAuthKvmdCredentials], vencrypt: bool, none_auth_only: bool, shared_params: _SharedParams, @@ -122,15 +117,15 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes self.__stage2_encodings_accepted = aiotools.AioStage() self.__stage3_ws_connected = aiotools.AioStage() - self.__kvmd_session: Optional[KvmdClientSession] = None - self.__kvmd_ws: Optional[KvmdClientWs] = None + self.__kvmd_session: (KvmdClientSession | None) = None + self.__kvmd_ws: (KvmdClientWs | None) = None self.__fb_notifier = aiotools.AioNotifier() - self.__fb_queue: "asyncio.Queue[Dict]" = asyncio.Queue() + self.__fb_queue: "asyncio.Queue[dict]" = asyncio.Queue() # Эти состояния шарить не обязательно - бекенд исключает дублирующиеся события. # Все это нужно только чтобы не посылать лишние жсоны в сокет KVMD - self.__mouse_buttons: Dict[str, Optional[bool]] = dict.fromkeys(["left", "right", "middle"], None) + self.__mouse_buttons: dict[str, (bool | None)] = dict.fromkeys(["left", "right", "middle"], None) self.__mouse_move = {"x": -1, "y": -1} self.__lock = asyncio.Lock() @@ -175,7 +170,7 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes finally: self.__kvmd_ws = None - async def __process_ws_event(self, event_type: str, event: Dict) -> None: + async def __process_ws_event(self, event_type: str, event: dict) -> None: if event_type == "info_meta_state": try: host = event["server"]["host"] @@ -225,7 +220,7 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes StreamFormats.JPEG: "has_tight", StreamFormats.H264: "has_h264", } - streamer: Optional[BaseStreamerClient] = None + streamer: (BaseStreamerClient | None) = None for streamer in self.__streamers: if getattr(self._encodings, formats[streamer.get_format()]): get_logger(0).info("%s [streamer]: Using preferred %s", self._remote, streamer) @@ -237,12 +232,12 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes get_logger(0).info("%s [streamer]: Using default %s", self._remote, streamer) return streamer - async def __queue_frame(self, frame: Union[Dict, str]) -> None: + async def __queue_frame(self, frame: (dict | str)) -> None: if isinstance(frame, str): frame = await self.__make_text_frame(frame) self.__fb_queue.put_nowait(frame) - async def __make_text_frame(self, text: str) -> Dict: + async def __make_text_frame(self, text: str) -> dict: return { "data": (await make_text_jpeg(self._width, self._height, self._encodings.tight_jpeg_quality, text)), "width": self._width, @@ -252,7 +247,7 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes async def __fb_sender_task_loop(self) -> None: # pylint: disable=too-many-branches has_h264_key = False - last: Optional[Dict] = None + last: (dict | None) = None while True: await self.__fb_notifier.wait() @@ -351,7 +346,7 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes if self.__kvmd_ws: await self.__kvmd_ws.send_key_event(web_key, state) - def __switch_modifiers(self, key: Union[int, str], state: bool) -> bool: + def __switch_modifiers(self, key: (int | str), state: bool) -> bool: mod = 0 if key in X11Modifiers.SHIFTS or key in WebModifiers.SHIFTS: mod = SymmapModifiers.SHIFT @@ -367,7 +362,7 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes self.__modifiers &= ~mod return True - async def _on_pointer_event(self, buttons: Dict[str, bool], wheel: Dict[str, int], move: Dict[str, int]) -> None: + async def _on_pointer_event(self, buttons: dict[str, bool], wheel: dict[str, int], move: dict[str, int]) -> None: if self.__kvmd_ws: for (button, state) in buttons.items(): if self.__mouse_buttons[button] != state: @@ -434,7 +429,7 @@ class VncServer: # pylint: disable=too-many-instance-attributes keymap_path: str, kvmd: KvmdClient, - streamers: List[BaseStreamerClient], + streamers: list[BaseStreamerClient], vnc_auth_manager: VncAuthManager, ) -> None: diff --git a/kvmd/apps/vnc/vncauth.py b/kvmd/apps/vnc/vncauth.py index b0f702d1..c7c04864 100644 --- a/kvmd/apps/vnc/vncauth.py +++ b/kvmd/apps/vnc/vncauth.py @@ -22,9 +22,6 @@ import dataclasses -from typing import Tuple -from typing import Dict - from ...logging import get_logger from ... import aiofs @@ -53,7 +50,7 @@ class VncAuthManager: self.__path = path self.__enabled = enabled - async def read_credentials(self) -> Tuple[Dict[str, VncAuthKvmdCredentials], bool]: + async def read_credentials(self) -> tuple[dict[str, VncAuthKvmdCredentials], bool]: if self.__enabled: try: return (await self.__inner_read_credentials(), True) @@ -63,10 +60,10 @@ class VncAuthManager: get_logger(0).exception("Unhandled exception while reading VNCAuth passwd file") return ({}, (not self.__enabled)) - async def __inner_read_credentials(self) -> Dict[str, VncAuthKvmdCredentials]: + async def __inner_read_credentials(self) -> dict[str, VncAuthKvmdCredentials]: lines = (await aiofs.read(self.__path)).split("\n") - credentials: Dict[str, VncAuthKvmdCredentials] = {} + credentials: dict[str, VncAuthKvmdCredentials] = {} for (lineno, line) in enumerate(lines): if len(line.strip()) == 0 or line.lstrip().startswith("#"): continue diff --git a/kvmd/apps/watchdog/__init__.py b/kvmd/apps/watchdog/__init__.py index e7271ac1..c8a50729 100644 --- a/kvmd/apps/watchdog/__init__.py +++ b/kvmd/apps/watchdog/__init__.py @@ -24,9 +24,6 @@ import argparse import errno import time -from typing import List -from typing import Optional - from ...logging import get_logger from ...yamlconf import Section @@ -104,7 +101,7 @@ def _cmd_cancel(config: Section) -> None: # ===== -def main(argv: Optional[List[str]]=None) -> None: +def main(argv: (list[str] | None)=None) -> None: (parent_parser, argv, config) = init(add_help=False, argv=argv) parser = argparse.ArgumentParser( prog="kvmd-watchdog", |