diff options
author | Maxim Devaev <[email protected]> | 2024-02-11 00:39:57 +0200 |
---|---|---|
committer | Maxim Devaev <[email protected]> | 2024-02-11 00:39:57 +0200 |
commit | 2a48b7e28786febb1f11be447824f7fbdfe77e2d (patch) | |
tree | c597e37e99b2c77177b6fa198045859bec81b688 | |
parent | 0b382c3d590e4da1818f0696eba6589f8b9ac7bd (diff) |
reworked server components
-rw-r--r-- | kvmd/apps/kvmd/server.py | 166 |
1 files changed, 89 insertions, 77 deletions
diff --git a/kvmd/apps/kvmd/server.py b/kvmd/apps/kvmd/server.py index 8f696fb7..c81b91ab 100644 --- a/kvmd/apps/kvmd/server.py +++ b/kvmd/apps/kvmd/server.py @@ -24,13 +24,9 @@ import asyncio import operator import dataclasses -from typing import Tuple -from typing import List -from typing import Dict from typing import Callable from typing import Coroutine from typing import AsyncGenerator -from typing import Optional from typing import Any from aiohttp.web import Request @@ -103,24 +99,50 @@ class StreamerH264NotSupported(OperationError): # ===== @dataclasses.dataclass(frozen=True) -class _Component: # pylint: disable=too-many-instance-attributes - name: str - event_type: str - obj: object - sysprep: Optional[Callable[[], None]] = None - systask: Optional[Callable[[], Coroutine[Any, Any, None]]] = None - get_state: Optional[Callable[[], Coroutine[Any, Any, Dict]]] = None - poll_state: Optional[Callable[[], AsyncGenerator[Dict, None]]] = None - cleanup: Optional[Callable[[], Coroutine[Any, Any, Dict]]] = None - - def __post_init__(self) -> None: - if isinstance(self.obj, BasePlugin): - object.__setattr__(self, "name", f"{self.name} ({self.obj.get_plugin_name()})") - - for field in ["sysprep", "systask", "get_state", "poll_state", "cleanup"]: - object.__setattr__(self, field, getattr(self.obj, field, None)) - if self.get_state or self.poll_state: - assert self.event_type, self +class _SubsystemEventSource: + get_state: (Callable[[], Coroutine[Any, Any, dict]] | None) = None + poll_state: (Callable[[], AsyncGenerator[dict, None]] | None) = None + + +class _Subsystem: + name: str + sysprep: (Callable[[], None] | None) + systask: (Callable[[], Coroutine[Any, Any, None]] | None) + cleanup: (Callable[[], Coroutine[Any, Any, dict]] | None) + sources: dict[str, _SubsystemEventSource] + + @classmethod + def make(cls, obj: object, name: str, event_type: str="") -> "_Subsystem": + if isinstance(obj, BasePlugin): + name = f"{name} ({obj.get_plugin_name()})" + sub = _Subsystem( + name=name, + sysprep=getattr(obj, "sysprep", None), + systask=getattr(obj, "systask", None), + cleanup=getattr(obj, "cleanup", None), + sources={}, + ) + if event_type: + sub.add_source( + event_type=event_type, + get_state=getattr(obj, "get_state", None), + poll_state=getattr(obj, "poll_state", None), + ) + return sub + + def add_source( + self, + event_type: str, + get_state: (Callable[[], Coroutine[Any, Any, dict]] | None), + poll_state: (Callable[[], AsyncGenerator[dict, None]] | None), + ) -> "_Subsystem": + + assert event_type + assert event_type not in self.sources, (self, event_type) + assert get_state or poll_state, (self, event_type) + self.sources[event_type] = _SubsystemEventSource(get_state, poll_state) + return self class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-instance-attributes @@ -139,9 +161,9 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins snapshoter: Snapshoter, keymap_path: str, - ignore_keys: List[str], - mouse_x_range: Tuple[int, int], - mouse_y_range: Tuple[int, int], + ignore_keys: list[str], + mouse_x_range: tuple[int, int], + mouse_y_range: tuple[int, int], stream_forever: bool, ) -> None: @@ -152,30 +174,12 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins self.__hid = hid self.__streamer = streamer self.__snapshoter = snapshoter # Not a component: No state or cleanup - self.__user_gpio = user_gpio # Has extra state "gpio_scheme_state" self.__stream_forever = stream_forever - self.__components = [ - *[ - _Component("Auth manager", "", auth_manager), - ], - *[ - _Component(f"Info manager ({sub})", f"info_{sub}_state", info_manager.get_submanager(sub)) - for sub in sorted(info_manager.get_subs()) - ], - *[ - _Component("User-GPIO", "gpio_state", user_gpio), - _Component("HID", "hid_state", hid), - _Component("ATX", "atx_state", atx), - _Component("MSD", "msd_state", msd), - _Component("Streamer", "streamer_state", streamer), - ], - ] - self.__hid_api = HidApi(hid, keymap_path, ignore_keys, mouse_x_range, mouse_y_range) # Ugly hack to get keymaps state self.__streamer_api = StreamerApi(streamer, ocr) # Same hack to get ocr langs state - self.__apis: List[object] = [ + self.__apis: list[object] = [ self, AuthApi(auth_manager), InfoApi(info_manager), @@ -189,9 +193,22 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins RedfishApi(info_manager, atx), ] + self.__subsystems = [ + _Subsystem.make(auth_manager, "Auth manager"), + _Subsystem.make(user_gpio, "User-GPIO", "gpio_state").add_source("gpio_model_state", user_gpio.get_model, None), + _Subsystem.make(hid, "HID", "hid_state").add_source("hid_keymaps_state", self.__hid_api.get_keymaps, None), + _Subsystem.make(atx, "ATX", "atx_state"), + _Subsystem.make(msd, "MSD", "msd_state"), + _Subsystem.make(streamer, "Streamer", "streamer_state").add_source("streamer_ocr_state", self.__streamer_api.get_ocr, None), + *[ + _Subsystem.make(info_manager.get_submanager(sub), f"Info manager ({sub})", f"info_{sub}_state",) + for sub in sorted(info_manager.get_subs()) + ], + ] + self.__streamer_notifier = aiotools.AioNotifier() self.__reset_streamer = False - self.__new_streamer_params: Dict = {} + self.__new_streamer_params: dict = {} # ===== STREAMER CONTROLLER @@ -228,39 +245,33 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins async def __ws_handler(self, request: Request) -> WebSocketResponse: stream = valid_bool(request.query.get("stream", True)) async with self._ws_session(request, stream=stream) as ws: - stage1 = [ - ("gpio_model_state", self.__user_gpio.get_model()), - ("hid_keymaps_state", self.__hid_api.get_keymaps()), - ("streamer_ocr_state", self.__streamer_api.get_ocr()), - ] - stage2 = [ - (comp.event_type, comp.get_state()) - for comp in self.__components - if comp.get_state + states = [ + (event_type, src.get_state()) + for sub in self.__subsystems + for (event_type, src) in sub.sources.items() + if src.get_state ] - stages = stage1 + stage2 events = dict(zip( - map(operator.itemgetter(0), stages), - await asyncio.gather(*map(operator.itemgetter(1), stages)), + map(operator.itemgetter(0), states), + await asyncio.gather(*map(operator.itemgetter(1), states)), )) - for stage in [stage1, stage2]: - await asyncio.gather(*[ - ws.send_event(event_type, events.pop(event_type)) - for (event_type, _) in stage - ]) + await asyncio.gather(*[ + ws.send_event(event_type, events.pop(event_type)) + for (event_type, _) in states + ]) await ws.send_event("loop", {}) return (await self._ws_loop(ws)) @exposed_ws("ping") - async def __ws_ping_handler(self, ws: WsSession, _: Dict) -> None: + async def __ws_ping_handler(self, ws: WsSession, _: dict) -> None: await ws.send_event("pong", {}) # ===== SYSTEM STUFF def run(self, **kwargs: Any) -> None: # type: ignore # pylint: disable=arguments-differ - for comp in self.__components: - if comp.sysprep: - comp.sysprep() + for sub in self.__subsystems: + if sub.sysprep: + sub.sysprep() aioproc.rename_process("main") super().run(**kwargs) @@ -269,11 +280,12 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins async def _init_app(self) -> None: aiotools.create_deadly_task("Stream controller", self.__stream_controller()) - for comp in self.__components: - if comp.systask: - aiotools.create_deadly_task(comp.name, comp.systask()) - if comp.poll_state: - aiotools.create_deadly_task(f"{comp.name} [poller]", self.__poll_state(comp.event_type, comp.poll_state())) + for sub in self.__subsystems: + if sub.systask: + aiotools.create_deadly_task(sub.name, sub.systask()) + for (event_type, src) in sub.sources.items(): + if src.poll_state: + aiotools.create_deadly_task(f"{sub.name} [poller]", self.__poll_state(event_type, src.poll_state())) aiotools.create_deadly_task("Stream snapshoter", self.__stream_snapshoter()) self._add_exposed(*self.__apis) @@ -289,13 +301,13 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins async def _on_cleanup(self) -> None: logger = get_logger(0) - for comp in self.__components: - if comp.cleanup: - logger.info("Cleaning up %s ...", comp.name) + for sub in self.__subsystems: + if sub.cleanup: + logger.info("Cleaning up %s ...", sub.name) try: - await comp.cleanup() # type: ignore + await sub.cleanup() # type: ignore except Exception: - logger.exception("Cleanup error on %s", comp.name) + logger.exception("Cleanup error on %s", sub.name) logger.info("On-Cleanup complete") async def _on_ws_opened(self) -> None: @@ -335,7 +347,7 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins prev = cur await self.__streamer_notifier.wait() - async def __poll_state(self, event_type: str, poller: AsyncGenerator[Dict, None]) -> None: + async def __poll_state(self, event_type: str, poller: AsyncGenerator[dict, None]) -> None: async for state in poller: await self._broadcast_ws_event(event_type, state) |