From a26aee3543883faf9a5b83832b274604f4f69263 Mon Sep 17 00:00:00 2001 From: Maxim Devaev Date: Wed, 23 Oct 2024 19:27:24 +0300 Subject: partial streamer events --- kvmd/apps/kvmd/server.py | 26 +++++++++++---- kvmd/apps/kvmd/streamer.py | 81 +++++++++++++++++++++++++++++++--------------- 2 files changed, 74 insertions(+), 33 deletions(-) (limited to 'kvmd') diff --git a/kvmd/apps/kvmd/server.py b/kvmd/apps/kvmd/server.py index 50642725..4b6b57a0 100644 --- a/kvmd/apps/kvmd/server.py +++ b/kvmd/apps/kvmd/server.py @@ -151,6 +151,7 @@ class _Subsystem: class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-instance-attributes __EV_GPIO_STATE = "gpio_state" __EV_INFO_STATE = "info_state" + __EV_STREAMER_STATE = "streamer_state" def __init__( # pylint: disable=too-many-arguments,too-many-locals self, @@ -362,13 +363,16 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins ) async def __poll_state(self, event_type: str, poller: AsyncGenerator[dict, None]) -> None: - if event_type == self.__EV_GPIO_STATE: - await self.__poll_gpio_state(poller) - elif event_type == self.__EV_INFO_STATE: - await self.__poll_info_state(poller) - else: - async for state in poller: - await self._broadcast_ws_event(event_type, state) + match event_type: + case self.__EV_GPIO_STATE: + await self.__poll_gpio_state(poller) + case self.__EV_INFO_STATE: + await self.__poll_info_state(poller) + case self.__EV_STREAMER_STATE: + await self.__poll_streamer_state(poller) + case _: + async for state in poller: + await self._broadcast_ws_event(event_type, state) async def __poll_gpio_state(self, poller: AsyncGenerator[dict, None]) -> None: prev: dict = {"state": {"inputs": {}, "outputs": {}}} @@ -387,3 +391,11 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins await self._broadcast_ws_event(self.__EV_INFO_STATE, state, legacy=False) for (key, value) in state.items(): await self._broadcast_ws_event(f"info_{key}_state", value, legacy=True) + + async def __poll_streamer_state(self, poller: AsyncGenerator[dict, None]) -> None: + prev: dict = {} + async for state in poller: + await self._broadcast_ws_event(self.__EV_STREAMER_STATE, state, legacy=False) + prev.update(state) + if "features" in prev: # Complete/Full + await self._broadcast_ws_event(self.__EV_STREAMER_STATE, prev, legacy=True) diff --git a/kvmd/apps/kvmd/streamer.py b/kvmd/apps/kvmd/streamer.py index 5ddb7298..d02bf50d 100644 --- a/kvmd/apps/kvmd/streamer.py +++ b/kvmd/apps/kvmd/streamer.py @@ -137,6 +137,11 @@ class _StreamerParams: class Streamer: # pylint: disable=too-many-instance-attributes + __ST_FULL = 0xFF + __ST_PARAMS = 0x01 + __ST_STREAMER = 0x02 + __ST_SNAPSHOT = 0x04 + def __init__( # pylint: disable=too-many-arguments,too-many-locals self, @@ -261,6 +266,7 @@ class Streamer: # pylint: disable=too-many-instance-attributes def set_params(self, params: dict) -> None: assert not self.__streamer_task + self.__notifier.notify(self.__ST_PARAMS) return self.__params.set_params(params) def get_params(self) -> dict: @@ -269,49 +275,72 @@ class Streamer: # pylint: disable=too-many-instance-attributes # ===== async def get_state(self) -> dict: - streamer_state = None - if self.__streamer_task: - session = self.__ensure_client_session() - try: - streamer_state = await session.get_state() - except (aiohttp.ClientConnectionError, aiohttp.ServerConnectionError): - pass - except Exception: - get_logger().exception("Invalid streamer response from /state") - - snapshot: (dict | None) = None - if self.__snapshot: - snapshot = dataclasses.asdict(self.__snapshot) - del snapshot["headers"] - del snapshot["data"] - return { + "features": self.__params.get_features(), "limits": self.__params.get_limits(), "params": self.__params.get_params(), - "snapshot": {"saved": snapshot}, - "streamer": streamer_state, - "features": self.__params.get_features(), + "streamer": (await self.__get_streamer_state()), + "snapshot": self.__get_snapshot_state(), } async def trigger_state(self) -> None: - self.__notifier.notify(1) + self.__notifier.notify(self.__ST_FULL) async def poll_state(self) -> AsyncGenerator[dict, None]: def signal_handler(*_: Any) -> None: get_logger(0).info("Got SIGUSR2, checking the stream state ...") - self.__notifier.notify() + self.__notifier.notify(self.__ST_STREAMER) get_logger(0).info("Installing SIGUSR2 streamer handler ...") asyncio.get_event_loop().add_signal_handler(signal.SIGUSR2, signal_handler) prev: dict = {} while True: - if (await self.__notifier.wait(timeout=self.__state_poll)) > 0: - prev = {} - new = await self.get_state() - if new != prev: + new: dict = {} + + mask = await self.__notifier.wait(timeout=self.__state_poll) + if mask == self.__ST_FULL: + new = await self.get_state() prev = copy.deepcopy(new) yield new + continue + + if mask < 0: + mask = self.__ST_STREAMER + + def check_update(key: str, value: (dict | None)) -> None: + if prev.get(key) != value: + new[key] = value + + if mask & self.__ST_PARAMS: + check_update("params", self.__params.get_params()) + if mask & self.__ST_STREAMER: + check_update("streamer", await self.__get_streamer_state()) + if mask & self.__ST_SNAPSHOT: + check_update("snapshot", self.__get_snapshot_state()) + + if new and prev != new: + prev.update(copy.deepcopy(new)) + yield new + + async def __get_streamer_state(self) -> (dict | None): + if self.__streamer_task: + session = self.__ensure_client_session() + try: + return (await session.get_state()) + except (aiohttp.ClientConnectionError, aiohttp.ServerConnectionError): + pass + except Exception: + get_logger().exception("Invalid streamer response from /state") + return None + + def __get_snapshot_state(self) -> dict: + if self.__snapshot: + snapshot = dataclasses.asdict(self.__snapshot) + del snapshot["headers"] + del snapshot["data"] + return {"saved": snapshot} + return {"saved": None} # ===== @@ -325,7 +354,7 @@ class Streamer: # pylint: disable=too-many-instance-attributes if snapshot.online or allow_offline: if save: self.__snapshot = snapshot - self.__notifier.notify() + self.__notifier.notify(self.__ST_SNAPSHOT) return snapshot logger.error("Stream is offline, no signal or so") except (aiohttp.ClientConnectionError, aiohttp.ServerConnectionError) as ex: -- cgit v1.2.3