summaryrefslogtreecommitdiff
path: root/kvmd/htserver.py
diff options
context:
space:
mode:
authorMaxim Devaev <[email protected]>2022-09-04 18:08:40 +0300
committerMaxim Devaev <[email protected]>2022-09-04 18:08:40 +0300
commitee3e224e396494cd0d69bb6167087a071a20349c (patch)
tree5becd28570e58a03c6e1e231d0db24c264a73f88 /kvmd/htserver.py
parent4b75221e9470b4a009955d7677f16adf8e23e302 (diff)
new typing style
Diffstat (limited to 'kvmd/htserver.py')
-rw-r--r--kvmd/htserver.py37
1 files changed, 16 insertions, 21 deletions
diff --git a/kvmd/htserver.py b/kvmd/htserver.py
index a83fc17c..f47c1fc9 100644
--- a/kvmd/htserver.py
+++ b/kvmd/htserver.py
@@ -29,13 +29,8 @@ import inspect
import urllib.parse
import json
-from typing import Tuple
-from typing import List
-from typing import Dict
from typing import Callable
from typing import AsyncGenerator
-from typing import Optional
-from typing import Union
from typing import Any
from aiohttp import ClientWebSocketResponse
@@ -111,7 +106,7 @@ def exposed_http(http_method: str, path: str, auth_required: bool=True) -> Calla
return set_attrs
-def _get_exposed_http(obj: object) -> List[HttpExposed]:
+def _get_exposed_http(obj: object) -> list[HttpExposed]:
return [
HttpExposed(
method=getattr(handler, _HTTP_METHOD),
@@ -143,7 +138,7 @@ def exposed_ws(event_type: str) -> Callable:
return set_attrs
-def _get_exposed_ws(obj: object) -> List[WsExposed]:
+def _get_exposed_ws(obj: object) -> list[WsExposed]:
return [
WsExposed(
event_type=getattr(handler, _WS_EVENT_TYPE),
@@ -156,9 +151,9 @@ def _get_exposed_ws(obj: object) -> List[WsExposed]:
# =====
def make_json_response(
- result: Optional[Dict]=None,
+ result: (dict | None)=None,
status: int=200,
- set_cookies: Optional[Dict[str, str]]=None,
+ set_cookies: (dict[str, str] | None)=None,
wrap_result: bool=True,
) -> Response:
@@ -176,7 +171,7 @@ def make_json_response(
return response
-def make_json_exception(err: Exception, status: Optional[int]=None) -> Response:
+def make_json_exception(err: Exception, status: (int | None)=None) -> Response:
name = type(err).__name__
msg = str(err)
if isinstance(err, HttpError):
@@ -208,7 +203,7 @@ async def start_streaming(
return response
-async def stream_json(response: StreamResponse, result: Dict, ok: bool=True) -> None:
+async def stream_json(response: StreamResponse, result: dict, ok: bool=True) -> None:
await response.write(json.dumps({
"ok": ok,
"result": result,
@@ -226,9 +221,9 @@ async def stream_json_exception(response: StreamResponse, err: Exception) -> Non
async def send_ws_event(
- wsr: Union[ClientWebSocketResponse, WebSocketResponse],
+ wsr: (ClientWebSocketResponse | WebSocketResponse),
event_type: str,
- event: Optional[Dict],
+ event: (dict | None),
) -> None:
await wsr.send_str(json.dumps({
@@ -237,7 +232,7 @@ async def send_ws_event(
}))
-def parse_ws_event(msg: str) -> Tuple[str, Dict]:
+def parse_ws_event(msg: str) -> tuple[str, dict]:
data = json.loads(msg)
if not isinstance(data, dict):
raise RuntimeError("Top-level event structure is not a dict")
@@ -269,20 +264,20 @@ def set_request_auth_info(request: BaseRequest, info: str) -> None:
@dataclasses.dataclass(frozen=True)
class WsSession:
wsr: WebSocketResponse
- kwargs: Dict[str, Any]
+ kwargs: dict[str, Any]
def __str__(self) -> str:
return f"WsSession(id={id(self)}, {self.kwargs})"
- async def send_event(self, event_type: str, event: Optional[Dict]) -> None:
+ async def send_event(self, event_type: str, event: (dict | None)) -> None:
await send_ws_event(self.wsr, event_type, event)
class HttpServer:
def __init__(self) -> None:
- self.__ws_heartbeat: Optional[float] = None
- self.__ws_handlers: Dict[str, Callable] = {}
- self.__ws_sessions: List[WsSession] = []
+ self.__ws_heartbeat: (float | None) = None
+ self.__ws_handlers: dict[str, Callable] = {}
+ self.__ws_sessions: list[WsSession] = []
self.__ws_sessions_lock = asyncio.Lock()
def run(
@@ -373,7 +368,7 @@ class HttpServer:
logger.error("Unknown websocket event: %r", msg.data)
return ws.wsr
- async def _broadcast_ws_event(self, event_type: str, event: Optional[Dict]) -> None:
+ async def _broadcast_ws_event(self, event_type: str, event: (dict | None)) -> None:
if self.__ws_sessions:
await asyncio.gather(*[
ws.send_event(event_type, event)
@@ -391,7 +386,7 @@ class HttpServer:
await self.__close_ws(ws)
return bool(wss)
- def _get_wss(self) -> List[WsSession]:
+ def _get_wss(self) -> list[WsSession]:
return list(self.__ws_sessions)
async def __close_ws(self, ws: WsSession) -> None: