summaryrefslogtreecommitdiff
path: root/kvmd/apps
diff options
context:
space:
mode:
Diffstat (limited to 'kvmd/apps')
-rw-r--r--kvmd/apps/kvmd/server.py26
-rw-r--r--kvmd/apps/kvmd/streamer.py81
2 files changed, 74 insertions, 33 deletions
diff --git a/kvmd/apps/kvmd/server.py b/kvmd/apps/kvmd/server.py
index 50642725..4b6b57a0 100644
--- a/kvmd/apps/kvmd/server.py
+++ b/kvmd/apps/kvmd/server.py
@@ -151,6 +151,7 @@ class _Subsystem:
class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-instance-attributes
__EV_GPIO_STATE = "gpio_state"
__EV_INFO_STATE = "info_state"
+ __EV_STREAMER_STATE = "streamer_state"
def __init__( # pylint: disable=too-many-arguments,too-many-locals
self,
@@ -362,13 +363,16 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins
)
async def __poll_state(self, event_type: str, poller: AsyncGenerator[dict, None]) -> None:
- if event_type == self.__EV_GPIO_STATE:
- await self.__poll_gpio_state(poller)
- elif event_type == self.__EV_INFO_STATE:
- await self.__poll_info_state(poller)
- else:
- async for state in poller:
- await self._broadcast_ws_event(event_type, state)
+ match event_type:
+ case self.__EV_GPIO_STATE:
+ await self.__poll_gpio_state(poller)
+ case self.__EV_INFO_STATE:
+ await self.__poll_info_state(poller)
+ case self.__EV_STREAMER_STATE:
+ await self.__poll_streamer_state(poller)
+ case _:
+ async for state in poller:
+ await self._broadcast_ws_event(event_type, state)
async def __poll_gpio_state(self, poller: AsyncGenerator[dict, None]) -> None:
prev: dict = {"state": {"inputs": {}, "outputs": {}}}
@@ -387,3 +391,11 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins
await self._broadcast_ws_event(self.__EV_INFO_STATE, state, legacy=False)
for (key, value) in state.items():
await self._broadcast_ws_event(f"info_{key}_state", value, legacy=True)
+
+ async def __poll_streamer_state(self, poller: AsyncGenerator[dict, None]) -> None:
+ prev: dict = {}
+ async for state in poller:
+ await self._broadcast_ws_event(self.__EV_STREAMER_STATE, state, legacy=False)
+ prev.update(state)
+ if "features" in prev: # Complete/Full
+ await self._broadcast_ws_event(self.__EV_STREAMER_STATE, prev, legacy=True)
diff --git a/kvmd/apps/kvmd/streamer.py b/kvmd/apps/kvmd/streamer.py
index 5ddb7298..d02bf50d 100644
--- a/kvmd/apps/kvmd/streamer.py
+++ b/kvmd/apps/kvmd/streamer.py
@@ -137,6 +137,11 @@ class _StreamerParams:
class Streamer: # pylint: disable=too-many-instance-attributes
+ __ST_FULL = 0xFF
+ __ST_PARAMS = 0x01
+ __ST_STREAMER = 0x02
+ __ST_SNAPSHOT = 0x04
+
def __init__( # pylint: disable=too-many-arguments,too-many-locals
self,
@@ -261,6 +266,7 @@ class Streamer: # pylint: disable=too-many-instance-attributes
def set_params(self, params: dict) -> None:
assert not self.__streamer_task
+ self.__notifier.notify(self.__ST_PARAMS)
return self.__params.set_params(params)
def get_params(self) -> dict:
@@ -269,49 +275,72 @@ class Streamer: # pylint: disable=too-many-instance-attributes
# =====
async def get_state(self) -> dict:
- streamer_state = None
- if self.__streamer_task:
- session = self.__ensure_client_session()
- try:
- streamer_state = await session.get_state()
- except (aiohttp.ClientConnectionError, aiohttp.ServerConnectionError):
- pass
- except Exception:
- get_logger().exception("Invalid streamer response from /state")
-
- snapshot: (dict | None) = None
- if self.__snapshot:
- snapshot = dataclasses.asdict(self.__snapshot)
- del snapshot["headers"]
- del snapshot["data"]
-
return {
+ "features": self.__params.get_features(),
"limits": self.__params.get_limits(),
"params": self.__params.get_params(),
- "snapshot": {"saved": snapshot},
- "streamer": streamer_state,
- "features": self.__params.get_features(),
+ "streamer": (await self.__get_streamer_state()),
+ "snapshot": self.__get_snapshot_state(),
}
async def trigger_state(self) -> None:
- self.__notifier.notify(1)
+ self.__notifier.notify(self.__ST_FULL)
async def poll_state(self) -> AsyncGenerator[dict, None]:
def signal_handler(*_: Any) -> None:
get_logger(0).info("Got SIGUSR2, checking the stream state ...")
- self.__notifier.notify()
+ self.__notifier.notify(self.__ST_STREAMER)
get_logger(0).info("Installing SIGUSR2 streamer handler ...")
asyncio.get_event_loop().add_signal_handler(signal.SIGUSR2, signal_handler)
prev: dict = {}
while True:
- if (await self.__notifier.wait(timeout=self.__state_poll)) > 0:
- prev = {}
- new = await self.get_state()
- if new != prev:
+ new: dict = {}
+
+ mask = await self.__notifier.wait(timeout=self.__state_poll)
+ if mask == self.__ST_FULL:
+ new = await self.get_state()
prev = copy.deepcopy(new)
yield new
+ continue
+
+ if mask < 0:
+ mask = self.__ST_STREAMER
+
+ def check_update(key: str, value: (dict | None)) -> None:
+ if prev.get(key) != value:
+ new[key] = value
+
+ if mask & self.__ST_PARAMS:
+ check_update("params", self.__params.get_params())
+ if mask & self.__ST_STREAMER:
+ check_update("streamer", await self.__get_streamer_state())
+ if mask & self.__ST_SNAPSHOT:
+ check_update("snapshot", self.__get_snapshot_state())
+
+ if new and prev != new:
+ prev.update(copy.deepcopy(new))
+ yield new
+
+ async def __get_streamer_state(self) -> (dict | None):
+ if self.__streamer_task:
+ session = self.__ensure_client_session()
+ try:
+ return (await session.get_state())
+ except (aiohttp.ClientConnectionError, aiohttp.ServerConnectionError):
+ pass
+ except Exception:
+ get_logger().exception("Invalid streamer response from /state")
+ return None
+
+ def __get_snapshot_state(self) -> dict:
+ if self.__snapshot:
+ snapshot = dataclasses.asdict(self.__snapshot)
+ del snapshot["headers"]
+ del snapshot["data"]
+ return {"saved": snapshot}
+ return {"saved": None}
# =====
@@ -325,7 +354,7 @@ class Streamer: # pylint: disable=too-many-instance-attributes
if snapshot.online or allow_offline:
if save:
self.__snapshot = snapshot
- self.__notifier.notify()
+ self.__notifier.notify(self.__ST_SNAPSHOT)
return snapshot
logger.error("Stream is offline, no signal or so")
except (aiohttp.ClientConnectionError, aiohttp.ServerConnectionError) as ex: