From ee3e224e396494cd0d69bb6167087a071a20349c Mon Sep 17 00:00:00 2001 From: Maxim Devaev Date: Sun, 4 Sep 2022 18:08:40 +0300 Subject: new typing style --- kvmd/htserver.py | 37 ++++++++++++++++--------------------- 1 file changed, 16 insertions(+), 21 deletions(-) (limited to 'kvmd/htserver.py') diff --git a/kvmd/htserver.py b/kvmd/htserver.py index a83fc17c..f47c1fc9 100644 --- a/kvmd/htserver.py +++ b/kvmd/htserver.py @@ -29,13 +29,8 @@ import inspect import urllib.parse import json -from typing import Tuple -from typing import List -from typing import Dict from typing import Callable from typing import AsyncGenerator -from typing import Optional -from typing import Union from typing import Any from aiohttp import ClientWebSocketResponse @@ -111,7 +106,7 @@ def exposed_http(http_method: str, path: str, auth_required: bool=True) -> Calla return set_attrs -def _get_exposed_http(obj: object) -> List[HttpExposed]: +def _get_exposed_http(obj: object) -> list[HttpExposed]: return [ HttpExposed( method=getattr(handler, _HTTP_METHOD), @@ -143,7 +138,7 @@ def exposed_ws(event_type: str) -> Callable: return set_attrs -def _get_exposed_ws(obj: object) -> List[WsExposed]: +def _get_exposed_ws(obj: object) -> list[WsExposed]: return [ WsExposed( event_type=getattr(handler, _WS_EVENT_TYPE), @@ -156,9 +151,9 @@ def _get_exposed_ws(obj: object) -> List[WsExposed]: # ===== def make_json_response( - result: Optional[Dict]=None, + result: (dict | None)=None, status: int=200, - set_cookies: Optional[Dict[str, str]]=None, + set_cookies: (dict[str, str] | None)=None, wrap_result: bool=True, ) -> Response: @@ -176,7 +171,7 @@ def make_json_response( return response -def make_json_exception(err: Exception, status: Optional[int]=None) -> Response: +def make_json_exception(err: Exception, status: (int | None)=None) -> Response: name = type(err).__name__ msg = str(err) if isinstance(err, HttpError): @@ -208,7 +203,7 @@ async def start_streaming( return response -async def stream_json(response: StreamResponse, result: Dict, ok: bool=True) -> None: +async def stream_json(response: StreamResponse, result: dict, ok: bool=True) -> None: await response.write(json.dumps({ "ok": ok, "result": result, @@ -226,9 +221,9 @@ async def stream_json_exception(response: StreamResponse, err: Exception) -> Non async def send_ws_event( - wsr: Union[ClientWebSocketResponse, WebSocketResponse], + wsr: (ClientWebSocketResponse | WebSocketResponse), event_type: str, - event: Optional[Dict], + event: (dict | None), ) -> None: await wsr.send_str(json.dumps({ @@ -237,7 +232,7 @@ async def send_ws_event( })) -def parse_ws_event(msg: str) -> Tuple[str, Dict]: +def parse_ws_event(msg: str) -> tuple[str, dict]: data = json.loads(msg) if not isinstance(data, dict): raise RuntimeError("Top-level event structure is not a dict") @@ -269,20 +264,20 @@ def set_request_auth_info(request: BaseRequest, info: str) -> None: @dataclasses.dataclass(frozen=True) class WsSession: wsr: WebSocketResponse - kwargs: Dict[str, Any] + kwargs: dict[str, Any] def __str__(self) -> str: return f"WsSession(id={id(self)}, {self.kwargs})" - async def send_event(self, event_type: str, event: Optional[Dict]) -> None: + async def send_event(self, event_type: str, event: (dict | None)) -> None: await send_ws_event(self.wsr, event_type, event) class HttpServer: def __init__(self) -> None: - self.__ws_heartbeat: Optional[float] = None - self.__ws_handlers: Dict[str, Callable] = {} - self.__ws_sessions: List[WsSession] = [] + self.__ws_heartbeat: (float | None) = None + self.__ws_handlers: dict[str, Callable] = {} + self.__ws_sessions: list[WsSession] = [] self.__ws_sessions_lock = asyncio.Lock() def run( @@ -373,7 +368,7 @@ class HttpServer: logger.error("Unknown websocket event: %r", msg.data) return ws.wsr - async def _broadcast_ws_event(self, event_type: str, event: Optional[Dict]) -> None: + async def _broadcast_ws_event(self, event_type: str, event: (dict | None)) -> None: if self.__ws_sessions: await asyncio.gather(*[ ws.send_event(event_type, event) @@ -391,7 +386,7 @@ class HttpServer: await self.__close_ws(ws) return bool(wss) - def _get_wss(self) -> List[WsSession]: + def _get_wss(self) -> list[WsSession]: return list(self.__ws_sessions) async def __close_ws(self, ws: WsSession) -> None: -- cgit v1.2.3