diff options
author | Maxim Devaev <[email protected]> | 2022-06-14 18:18:21 +0300 |
---|---|---|
committer | Maxim Devaev <[email protected]> | 2022-06-14 18:18:21 +0300 |
commit | 88c7796551010faf30e7f7f843432af919ea7ce0 (patch) | |
tree | 9ca033133bad8e10878cd5b45510499588e40262 /kvmd/apps | |
parent | 37e5118fff1e63e5af0183e7900bb6a7bc708a34 (diff) |
common websocket code
Diffstat (limited to 'kvmd/apps')
-rw-r--r-- | kvmd/apps/kvmd/api/hid.py | 12 | ||||
-rw-r--r-- | kvmd/apps/kvmd/server.py | 83 |
2 files changed, 25 insertions, 70 deletions
diff --git a/kvmd/apps/kvmd/api/hid.py b/kvmd/apps/kvmd/api/hid.py index ddbaadd8..a683921c 100644 --- a/kvmd/apps/kvmd/api/hid.py +++ b/kvmd/apps/kvmd/api/hid.py @@ -32,7 +32,6 @@ from typing import Callable from aiohttp.web import Request from aiohttp.web import Response -from aiohttp.web import WebSocketResponse from ....mouse import MouseRange @@ -42,6 +41,7 @@ from ....keyboard.printer import text_to_web_keys from ....htserver import exposed_http from ....htserver import exposed_ws from ....htserver import make_json_response +from ....htserver import WsSession from ....plugins.hid import BaseHid @@ -158,7 +158,7 @@ class HidApi: # ===== @exposed_ws("key") - async def __ws_key_handler(self, _: WebSocketResponse, event: Dict) -> None: + async def __ws_key_handler(self, _: WsSession, event: Dict) -> None: try: key = valid_hid_key(event["key"]) state = valid_bool(event["state"]) @@ -168,7 +168,7 @@ class HidApi: self.__hid.send_key_events([(key, state)]) @exposed_ws("mouse_button") - async def __ws_mouse_button_handler(self, _: WebSocketResponse, event: Dict) -> None: + async def __ws_mouse_button_handler(self, _: WsSession, event: Dict) -> None: try: button = valid_hid_mouse_button(event["button"]) state = valid_bool(event["state"]) @@ -177,7 +177,7 @@ class HidApi: self.__hid.send_mouse_button_event(button, state) @exposed_ws("mouse_move") - async def __ws_mouse_move_handler(self, _: WebSocketResponse, event: Dict) -> None: + async def __ws_mouse_move_handler(self, _: WsSession, event: Dict) -> None: try: to_x = valid_hid_mouse_move(event["to"]["x"]) to_y = valid_hid_mouse_move(event["to"]["y"]) @@ -186,11 +186,11 @@ class HidApi: self.__send_mouse_move_event_remapped(to_x, to_y) @exposed_ws("mouse_relative") - async def __ws_mouse_relative_handler(self, _: WebSocketResponse, event: Dict) -> None: + async def __ws_mouse_relative_handler(self, _: WsSession, event: Dict) -> None: self.__process_delta_ws_request(event, self.__hid.send_mouse_relative_event) @exposed_ws("mouse_wheel") - async def __ws_mouse_wheel_handler(self, _: WebSocketResponse, event: Dict) -> None: + async def __ws_mouse_wheel_handler(self, _: WsSession, event: Dict) -> None: self.__process_delta_ws_request(event, self.__hid.send_mouse_wheel_event) def __process_delta_ws_request(self, event: Dict, handler: Callable[[int, int], None]) -> None: diff --git a/kvmd/apps/kvmd/server.py b/kvmd/apps/kvmd/server.py index 20b2cb7e..5adc8d8d 100644 --- a/kvmd/apps/kvmd/server.py +++ b/kvmd/apps/kvmd/server.py @@ -27,7 +27,6 @@ import dataclasses from typing import Tuple from typing import List from typing import Dict -from typing import Set from typing import Callable from typing import Coroutine from typing import AsyncGenerator @@ -48,12 +47,8 @@ from ... import aioproc from ...htserver import HttpExposed from ...htserver import exposed_http from ...htserver import exposed_ws -from ...htserver import get_exposed_http -from ...htserver import get_exposed_ws from ...htserver import make_json_response -from ...htserver import send_ws_event -from ...htserver import broadcast_ws_event -from ...htserver import process_ws_messages +from ...htserver import WsSession from ...htserver import HttpServer from ...plugins import BasePlugin @@ -128,15 +123,6 @@ class _Component: # pylint: disable=too-many-instance-attributes assert self.event_type, self [email protected](frozen=True) -class _WsClient: - ws: WebSocketResponse - stream: bool - - def __str__(self) -> str: - return f"WsClient(id={id(self)}, stream={self.stream})" - - class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-instance-attributes def __init__( # pylint: disable=too-many-arguments,too-many-locals self, @@ -160,6 +146,8 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins stream_forever: bool, ) -> None: + super().__init__() + self.__auth_manager = auth_manager self.__hid = hid self.__streamer = streamer @@ -201,11 +189,6 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins RedfishApi(info_manager, atx), ] - self.__ws_handlers: Dict[str, Callable] = {} - - self.__ws_clients: Set[_WsClient] = set() - self.__ws_clients_lock = asyncio.Lock() - self.__streamer_notifier = aiotools.AioNotifier() self.__reset_streamer = False self.__new_streamer_params: Dict = {} @@ -244,11 +227,7 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins @exposed_http("GET", "/ws") async def __ws_handler(self, request: Request) -> WebSocketResponse: stream = valid_bool(request.query.get("stream", "true")) - ws = await self._make_ws_response(request) - client = _WsClient(ws, stream) - await self.__register_ws_client(client) - - try: + async with self._ws_session(request, stream=stream) as ws: stage1 = [ ("gpio_model_state", self.__user_gpio.get_model()), ("hid_keymaps_state", self.__hid_api.get_keymaps()), @@ -266,19 +245,15 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins )) for stage in [stage1, stage2]: await asyncio.gather(*[ - send_ws_event(ws, event_type, events.pop(event_type)) + ws.send_event(event_type, events.pop(event_type)) for (event_type, _) in stage ]) - - await send_ws_event(ws, "loop", {}) - await process_ws_messages(ws, self.__ws_handlers) - return ws - finally: - await self.__remove_ws_client(client) + await ws.send_event("loop", {}) + return (await self._ws_loop(ws)) @exposed_ws("ping") - async def __ws_ping_handler(self, ws: WebSocketResponse, _: Dict) -> None: - await send_ws_event(ws, "pong", {}) + async def __ws_ping_handler(self, ws: WsSession, _: Dict) -> None: + await ws.send_event("pong", {}) # ===== SYSTEM STUFF @@ -300,26 +275,16 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins if comp.poll_state: aiotools.create_deadly_task(f"{comp.name} [poller]", self.__poll_state(comp.event_type, comp.poll_state())) aiotools.create_deadly_task("Stream snapshoter", self.__stream_snapshoter()) - - for api in self.__apis: - for http_exposed in get_exposed_http(api): - self._add_exposed(http_exposed) - for ws_exposed in get_exposed_ws(api): - self.__ws_handlers[ws_exposed.event_type] = ws_exposed.handler + self._add_exposed(*self.__apis) async def _on_shutdown(self) -> None: logger = get_logger(0) - logger.info("Waiting short tasks ...") await aiotools.wait_all_short_tasks() - logger.info("Stopping system tasks ...") await aiotools.stop_all_deadly_tasks() - logger.info("Disconnecting clients ...") - for client in list(self.__ws_clients): - await self.__remove_ws_client(client) - + await self._close_all_wss() logger.info("On-Shutdown complete") async def _on_cleanup(self) -> None: @@ -333,25 +298,18 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins logger.exception("Cleanup error on %s", comp.name) logger.info("On-Cleanup complete") - async def __register_ws_client(self, client: _WsClient) -> None: - async with self.__ws_clients_lock: - self.__ws_clients.add(client) - get_logger().info("Registered new client socket: %s; clients now: %d", client, len(self.__ws_clients)) + async def _on_ws_opened(self) -> None: await self.__streamer_notifier.notify() - async def __remove_ws_client(self, client: _WsClient) -> None: - async with self.__ws_clients_lock: - self.__hid.clear_events() - try: - self.__ws_clients.remove(client) - get_logger().info("Removed client socket: %s; clients now: %d", client, len(self.__ws_clients)) - await client.ws.close() - except Exception: - pass + async def _on_ws_closed(self) -> None: + self.__hid.clear_events() await self.__streamer_notifier.notify() def __has_stream_clients(self) -> bool: - return bool(sum(map(operator.attrgetter("stream"), self.__ws_clients))) + return bool(sum(map( + (lambda ws: ws.kwargs["stream"]), + self._get_wss(), + ))) # ===== SYSTEM TASKS @@ -379,10 +337,7 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins async def __poll_state(self, event_type: str, poller: AsyncGenerator[Dict, None]) -> None: async for state in poller: - await broadcast_ws_event([ - client.ws - for client in list(self.__ws_clients) - ], event_type, state) + await self._broadcast_ws_event(event_type, state) async def __stream_snapshoter(self) -> None: await self.__snapshoter.run( |