diff options
author | Maxim Devaev <[email protected]> | 2024-10-21 17:46:59 +0300 |
---|---|---|
committer | Maxim Devaev <[email protected]> | 2024-10-21 17:46:59 +0300 |
commit | cda32a083faf3e7326cfe317336e473c905c6dfd (patch) | |
tree | 19445e4098d4603f3b2cd296504a648110712af1 /kvmd/apps | |
parent | b67a2325842a6f407d3935f8445d50cb8bf307f2 (diff) |
new events model
Diffstat (limited to 'kvmd/apps')
-rw-r--r-- | kvmd/apps/kvmd/api/export.py | 9 | ||||
-rw-r--r-- | kvmd/apps/kvmd/api/info.py | 16 | ||||
-rw-r--r-- | kvmd/apps/kvmd/api/redfish.py | 6 | ||||
-rw-r--r-- | kvmd/apps/kvmd/info/__init__.py | 51 | ||||
-rw-r--r-- | kvmd/apps/kvmd/info/auth.py | 13 | ||||
-rw-r--r-- | kvmd/apps/kvmd/info/base.py | 10 | ||||
-rw-r--r-- | kvmd/apps/kvmd/info/extras.py | 11 | ||||
-rw-r--r-- | kvmd/apps/kvmd/info/fan.py | 25 | ||||
-rw-r--r-- | kvmd/apps/kvmd/info/hw.py | 19 | ||||
-rw-r--r-- | kvmd/apps/kvmd/info/meta.py | 11 | ||||
-rw-r--r-- | kvmd/apps/kvmd/info/system.py | 12 | ||||
-rw-r--r-- | kvmd/apps/kvmd/server.py | 26 | ||||
-rw-r--r-- | kvmd/apps/kvmd/streamer.py | 27 | ||||
-rw-r--r-- | kvmd/apps/kvmd/ugpio.py | 10 | ||||
-rw-r--r-- | kvmd/apps/vnc/server.py | 23 |
15 files changed, 184 insertions, 85 deletions
diff --git a/kvmd/apps/kvmd/api/export.py b/kvmd/apps/kvmd/api/export.py index 998c5d81..bb048f53 100644 --- a/kvmd/apps/kvmd/api/export.py +++ b/kvmd/apps/kvmd/api/export.py @@ -55,10 +55,9 @@ class ExportApi: @async_lru.alru_cache(maxsize=1, ttl=5) async def __get_prometheus_metrics(self) -> str: - (atx_state, hw_state, fan_state, gpio_state) = await asyncio.gather(*[ + (atx_state, info_state, gpio_state) = await asyncio.gather(*[ self.__atx.get_state(), - self.__info_manager.get_submanager("hw").get_state(), - self.__info_manager.get_submanager("fan").get_state(), + self.__info_manager.get_state(["hw", "fan"]), self.__user_gpio.get_state(), ]) rows: list[str] = [] @@ -72,8 +71,8 @@ class ExportApi: for key in ["online", "state"]: self.__append_prometheus_rows(rows, ch_state["state"], f"pikvm_gpio_{mode}_{key}_{channel}") - self.__append_prometheus_rows(rows, hw_state["health"], "pikvm_hw") # type: ignore - self.__append_prometheus_rows(rows, fan_state, "pikvm_fan") + self.__append_prometheus_rows(rows, info_state["hw"]["health"], "pikvm_hw") # type: ignore + self.__append_prometheus_rows(rows, info_state["fan"], "pikvm_fan") return "\n".join(rows) diff --git a/kvmd/apps/kvmd/api/info.py b/kvmd/apps/kvmd/api/info.py index a0be01a5..89d45a84 100644 --- a/kvmd/apps/kvmd/api/info.py +++ b/kvmd/apps/kvmd/api/info.py @@ -20,8 +20,6 @@ # ========================================================================== # -import asyncio - from aiohttp.web import Request from aiohttp.web import Response @@ -43,15 +41,11 @@ class InfoApi: @exposed_http("GET", "/info") async def __common_state_handler(self, req: Request) -> Response: fields = self.__valid_info_fields(req) - results = dict(zip(fields, await asyncio.gather(*[ - self.__info_manager.get_submanager(field).get_state() - for field in fields - ]))) - return make_json_response(results) + return make_json_response(await self.__info_manager.get_state(fields)) def __valid_info_fields(self, req: Request) -> list[str]: - subs = self.__info_manager.get_subs() + available = self.__info_manager.get_subs() return sorted(valid_info_fields( - arg=req.query.get("fields", ",".join(subs)), - variants=subs, - ) or subs) + arg=req.query.get("fields", ",".join(available)), + variants=available, + ) or available) diff --git a/kvmd/apps/kvmd/api/redfish.py b/kvmd/apps/kvmd/api/redfish.py index e1822496..3b248685 100644 --- a/kvmd/apps/kvmd/api/redfish.py +++ b/kvmd/apps/kvmd/api/redfish.py @@ -88,12 +88,12 @@ class RedfishApi: @exposed_http("GET", "/redfish/v1/Systems/0") async def __server_handler(self, _: Request) -> Response: - (atx_state, meta_state) = await asyncio.gather(*[ + (atx_state, info_state) = await asyncio.gather(*[ self.__atx.get_state(), - self.__info_manager.get_submanager("meta").get_state(), + self.__info_manager.get_state(["meta"]), ]) try: - host = str(meta_state.get("server", {})["host"]) # type: ignore + host = str(info_state["meta"].get("server", {})["host"]) # type: ignore except Exception: host = "" return make_json_response({ diff --git a/kvmd/apps/kvmd/info/__init__.py b/kvmd/apps/kvmd/info/__init__.py index 983ef6d6..b346c10c 100644 --- a/kvmd/apps/kvmd/info/__init__.py +++ b/kvmd/apps/kvmd/info/__init__.py @@ -20,6 +20,10 @@ # ========================================================================== # +import asyncio + +from typing import AsyncGenerator + from ....yamlconf import Section from .base import BaseInfoSubmanager @@ -34,17 +38,50 @@ from .fan import FanInfoSubmanager # ===== class InfoManager: def __init__(self, config: Section) -> None: - self.__subs = { + self.__subs: dict[str, BaseInfoSubmanager] = { "system": SystemInfoSubmanager(config.kvmd.streamer.cmd), - "auth": AuthInfoSubmanager(config.kvmd.auth.enabled), - "meta": MetaInfoSubmanager(config.kvmd.info.meta), + "auth": AuthInfoSubmanager(config.kvmd.auth.enabled), + "meta": MetaInfoSubmanager(config.kvmd.info.meta), "extras": ExtrasInfoSubmanager(config), - "hw": HwInfoSubmanager(**config.kvmd.info.hw._unpack()), - "fan": FanInfoSubmanager(**config.kvmd.info.fan._unpack()), + "hw": HwInfoSubmanager(**config.kvmd.info.hw._unpack()), + "fan": FanInfoSubmanager(**config.kvmd.info.fan._unpack()), } + self.__queue: "asyncio.Queue[tuple[str, (dict | None)]]" = asyncio.Queue() def get_subs(self) -> set[str]: return set(self.__subs) - def get_submanager(self, name: str) -> BaseInfoSubmanager: - return self.__subs[name] + async def get_state(self, fields: (list[str] | None)=None) -> dict: + fields = (fields or list(self.__subs)) + return dict(zip(fields, await asyncio.gather(*[ + self.__subs[field].get_state() + for field in fields + ]))) + + async def trigger_state(self) -> None: + await asyncio.gather(*[ + sub.trigger_state() + for sub in self.__subs.values() + ]) + + async def poll_state(self) -> AsyncGenerator[dict, None]: + while True: + (field, value) = await self.__queue.get() + yield {field: value} + + async def systask(self) -> None: + tasks = [ + asyncio.create_task(self.__poller(field)) + for field in self.__subs + ] + try: + await asyncio.gather(*tasks) + except Exception: + for task in tasks: + task.cancel() + await asyncio.gather(*tasks, return_exceptions=True) + raise + + async def __poller(self, field: str) -> None: + async for state in self.__subs[field].poll_state(): + self.__queue.put_nowait((field, state)) diff --git a/kvmd/apps/kvmd/info/auth.py b/kvmd/apps/kvmd/info/auth.py index 2077afff..301cffe3 100644 --- a/kvmd/apps/kvmd/info/auth.py +++ b/kvmd/apps/kvmd/info/auth.py @@ -20,6 +20,10 @@ # ========================================================================== # +from typing import AsyncGenerator + +from .... import aiotools + from .base import BaseInfoSubmanager @@ -27,6 +31,15 @@ from .base import BaseInfoSubmanager class AuthInfoSubmanager(BaseInfoSubmanager): def __init__(self, enabled: bool) -> None: self.__enabled = enabled + self.__notifier = aiotools.AioNotifier() async def get_state(self) -> dict: return {"enabled": self.__enabled} + + async def trigger_state(self) -> None: + self.__notifier.notify() + + async def poll_state(self) -> AsyncGenerator[(dict | None), None]: + while True: + await self.__notifier.wait() + yield (await self.get_state()) diff --git a/kvmd/apps/kvmd/info/base.py b/kvmd/apps/kvmd/info/base.py index 87494e70..d090ed34 100644 --- a/kvmd/apps/kvmd/info/base.py +++ b/kvmd/apps/kvmd/info/base.py @@ -20,7 +20,17 @@ # ========================================================================== # +from typing import AsyncGenerator + + # ===== class BaseInfoSubmanager: async def get_state(self) -> (dict | None): raise NotImplementedError + + async def trigger_state(self) -> None: + raise NotImplementedError + + async def poll_state(self) -> AsyncGenerator[(dict | None), None]: + yield None + raise NotImplementedError diff --git a/kvmd/apps/kvmd/info/extras.py b/kvmd/apps/kvmd/info/extras.py index 07225013..a855ad15 100644 --- a/kvmd/apps/kvmd/info/extras.py +++ b/kvmd/apps/kvmd/info/extras.py @@ -24,6 +24,8 @@ import os import re import asyncio +from typing import AsyncGenerator + from ....logging import get_logger from ....yamlconf import Section @@ -41,6 +43,7 @@ from .base import BaseInfoSubmanager class ExtrasInfoSubmanager(BaseInfoSubmanager): def __init__(self, global_config: Section) -> None: self.__global_config = global_config + self.__notifier = aiotools.AioNotifier() async def get_state(self) -> (dict | None): try: @@ -65,6 +68,14 @@ class ExtrasInfoSubmanager(BaseInfoSubmanager): if sui is not None: await aiotools.shield_fg(sui.close()) + async def trigger_state(self) -> None: + self.__notifier.notify() + + async def poll_state(self) -> AsyncGenerator[(dict | None), None]: + while True: + await self.__notifier.wait() + yield (await self.get_state()) + def __get_extras_path(self, *parts: str) -> str: return os.path.join(self.__global_config.kvmd.info.extras, *parts) diff --git a/kvmd/apps/kvmd/info/fan.py b/kvmd/apps/kvmd/info/fan.py index 247aff7d..8f3f69c8 100644 --- a/kvmd/apps/kvmd/info/fan.py +++ b/kvmd/apps/kvmd/info/fan.py @@ -21,7 +21,6 @@ import copy -import asyncio from typing import AsyncGenerator @@ -53,6 +52,8 @@ class FanInfoSubmanager(BaseInfoSubmanager): self.__timeout = timeout self.__state_poll = state_poll + self.__notifier = aiotools.AioNotifier() + async def get_state(self) -> dict: monitored = await self.__get_monitored() return { @@ -60,24 +61,28 @@ class FanInfoSubmanager(BaseInfoSubmanager): "state": ((await self.__get_fan_state() if monitored else None)), } - async def poll_state(self) -> AsyncGenerator[dict, None]: - prev_state: dict = {} + async def trigger_state(self) -> None: + self.__notifier.notify(1) + + async def poll_state(self) -> AsyncGenerator[(dict | None), None]: + prev: dict = {} while True: if self.__unix_path: - pure = state = await self.get_state() + if (await self.__notifier.wait(timeout=self.__state_poll)) > 0: + prev = {} + new = await self.get_state() + pure = copy.deepcopy(new) if pure["state"] is not None: try: - pure = copy.deepcopy(state) pure["state"]["service"]["now_ts"] = 0 except Exception: pass - if pure != prev_state: - yield state - prev_state = pure - await asyncio.sleep(self.__state_poll) + if pure != prev: + prev = pure + yield new else: + await self.__notifier.wait() yield (await self.get_state()) - await aiotools.wait_infinite() # ===== diff --git a/kvmd/apps/kvmd/info/hw.py b/kvmd/apps/kvmd/info/hw.py index 458bc1ec..81cd1af6 100644 --- a/kvmd/apps/kvmd/info/hw.py +++ b/kvmd/apps/kvmd/info/hw.py @@ -22,6 +22,7 @@ import os import asyncio +import copy from typing import Callable from typing import AsyncGenerator @@ -60,6 +61,8 @@ class HwInfoSubmanager(BaseInfoSubmanager): self.__dt_cache: dict[str, str] = {} + self.__notifier = aiotools.AioNotifier() + async def get_state(self) -> dict: ( base, @@ -97,14 +100,18 @@ class HwInfoSubmanager(BaseInfoSubmanager): }, } + async def trigger_state(self) -> None: + self.__notifier.notify(1) + async def poll_state(self) -> AsyncGenerator[dict, None]: - prev_state: dict = {} + prev: dict = {} while True: - state = await self.get_state() - if state != prev_state: - yield state - prev_state = state - await asyncio.sleep(self.__state_poll) + if (await self.__notifier.wait(timeout=self.__state_poll)) > 0: + prev = {} + new = await self.get_state() + if new != prev: + prev = copy.deepcopy(new) + yield new # ===== diff --git a/kvmd/apps/kvmd/info/meta.py b/kvmd/apps/kvmd/info/meta.py index b66b76b1..996e648a 100644 --- a/kvmd/apps/kvmd/info/meta.py +++ b/kvmd/apps/kvmd/info/meta.py @@ -20,6 +20,8 @@ # ========================================================================== # +from typing import AsyncGenerator + from ....logging import get_logger from ....yamlconf.loader import load_yaml_file @@ -33,6 +35,7 @@ from .base import BaseInfoSubmanager class MetaInfoSubmanager(BaseInfoSubmanager): def __init__(self, meta_path: str) -> None: self.__meta_path = meta_path + self.__notifier = aiotools.AioNotifier() async def get_state(self) -> (dict | None): try: @@ -40,3 +43,11 @@ class MetaInfoSubmanager(BaseInfoSubmanager): except Exception: get_logger(0).exception("Can't parse meta") return None + + async def trigger_state(self) -> None: + self.__notifier.notify() + + async def poll_state(self) -> AsyncGenerator[(dict | None), None]: + while True: + await self.__notifier.wait() + yield (await self.get_state()) diff --git a/kvmd/apps/kvmd/info/system.py b/kvmd/apps/kvmd/info/system.py index 462b52c9..d4a450de 100644 --- a/kvmd/apps/kvmd/info/system.py +++ b/kvmd/apps/kvmd/info/system.py @@ -24,8 +24,11 @@ import os import asyncio import platform +from typing import AsyncGenerator + from ....logging import get_logger +from .... import aiotools from .... import aioproc from .... import __version__ @@ -37,6 +40,7 @@ from .base import BaseInfoSubmanager class SystemInfoSubmanager(BaseInfoSubmanager): def __init__(self, streamer_cmd: list[str]) -> None: self.__streamer_cmd = streamer_cmd + self.__notifier = aiotools.AioNotifier() async def get_state(self) -> dict: streamer_info = await self.__get_streamer_info() @@ -50,6 +54,14 @@ class SystemInfoSubmanager(BaseInfoSubmanager): }, } + async def trigger_state(self) -> None: + self.__notifier.notify() + + async def poll_state(self) -> AsyncGenerator[(dict | None), None]: + while True: + await self.__notifier.wait() + yield (await self.get_state()) + # ===== async def __get_streamer_info(self) -> dict: diff --git a/kvmd/apps/kvmd/server.py b/kvmd/apps/kvmd/server.py index 7e3c7d48..50642725 100644 --- a/kvmd/apps/kvmd/server.py +++ b/kvmd/apps/kvmd/server.py @@ -150,6 +150,7 @@ class _Subsystem: class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-instance-attributes __EV_GPIO_STATE = "gpio_state" + __EV_INFO_STATE = "info_state" def __init__( # pylint: disable=too-many-arguments,too-many-locals self, @@ -200,15 +201,12 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins self.__subsystems = [ _Subsystem.make(auth_manager, "Auth manager"), - _Subsystem.make(user_gpio, "User-GPIO", self.__EV_GPIO_STATE), - _Subsystem.make(hid, "HID", "hid_state").add_source("hid_keymaps_state", self.__hid_api.get_keymaps, None, None), - _Subsystem.make(atx, "ATX", "atx_state"), - _Subsystem.make(msd, "MSD", "msd_state"), - _Subsystem.make(streamer, "Streamer", "streamer_state").add_source("streamer_ocr_state", self.__streamer_api.get_ocr, None, None), - *[ - _Subsystem.make(info_manager.get_submanager(sub), f"Info manager ({sub})", f"info_{sub}_state",) - for sub in sorted(info_manager.get_subs()) - ], + _Subsystem.make(user_gpio, "User-GPIO", self.__EV_GPIO_STATE), + _Subsystem.make(hid, "HID", "hid_state").add_source("hid_keymaps_state", self.__hid_api.get_keymaps, None, None), + _Subsystem.make(atx, "ATX", "atx_state"), + _Subsystem.make(msd, "MSD", "msd_state"), + _Subsystem.make(streamer, "Streamer", "streamer_state").add_source("streamer_ocr_state", self.__streamer_api.get_ocr, None, None), + _Subsystem.make(info_manager, "Info manager", self.__EV_INFO_STATE), ] self.__streamer_notifier = aiotools.AioNotifier() @@ -251,6 +249,7 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins stream = valid_bool(req.query.get("stream", True)) legacy = valid_bool(req.query.get("legacy", True)) async with self._ws_session(req, stream=stream, legacy=legacy) as ws: + await ws.send_event("loop", {}) states = [ (event_type, src.get_state()) for sub in self.__subsystems @@ -269,7 +268,6 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins for src in sub.sources.values(): if src.trigger_state: await src.trigger_state() - await ws.send_event("loop", {}) return (await self._ws_loop(ws)) @exposed_ws("ping") @@ -366,6 +364,8 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins async def __poll_state(self, event_type: str, poller: AsyncGenerator[dict, None]) -> None: if event_type == self.__EV_GPIO_STATE: await self.__poll_gpio_state(poller) + elif event_type == self.__EV_INFO_STATE: + await self.__poll_info_state(poller) else: async for state in poller: await self._broadcast_ws_event(event_type, state) @@ -381,3 +381,9 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins prev["state"]["inputs"].update(state["state"].get("inputs", {})) prev["state"]["outputs"].update(state["state"].get("outputs", {})) await self._broadcast_ws_event(self.__EV_GPIO_STATE, prev["state"], legacy=True) + + async def __poll_info_state(self, poller: AsyncGenerator[dict, None]) -> None: + async for state in poller: + await self._broadcast_ws_event(self.__EV_INFO_STATE, state, legacy=False) + for (key, value) in state.items(): + await self._broadcast_ws_event(f"info_{key}_state", value, legacy=True) diff --git a/kvmd/apps/kvmd/streamer.py b/kvmd/apps/kvmd/streamer.py index c365e19d..c0a56ad1 100644 --- a/kvmd/apps/kvmd/streamer.py +++ b/kvmd/apps/kvmd/streamer.py @@ -26,6 +26,7 @@ import asyncio import asyncio.subprocess import dataclasses import functools +import copy from typing import AsyncGenerator from typing import Any @@ -136,7 +137,7 @@ class _StreamerParams: } def get_limits(self) -> dict: - limits = dict(self.__limits) + limits = copy.deepcopy(self.__limits) if self.__has_resolution: limits[self.__AVAILABLE_RESOLUTIONS] = list(limits[self.__AVAILABLE_RESOLUTIONS]) return limits @@ -323,6 +324,9 @@ class Streamer: # pylint: disable=too-many-instance-attributes "features": self.__params.get_features(), } + async def trigger_state(self) -> None: + self.__notifier.notify(1) + async def poll_state(self) -> AsyncGenerator[dict, None]: def signal_handler(*_: Any) -> None: get_logger(0).info("Got SIGUSR2, checking the stream state ...") @@ -331,21 +335,14 @@ class Streamer: # pylint: disable=too-many-instance-attributes get_logger(0).info("Installing SIGUSR2 streamer handler ...") asyncio.get_event_loop().add_signal_handler(signal.SIGUSR2, signal_handler) - waiter_task: (asyncio.Task | None) = None - prev_state: dict = {} + prev: dict = {} while True: - state = await self.get_state() - if state != prev_state: - yield state - prev_state = state - - if waiter_task is None: - waiter_task = asyncio.create_task(self.__notifier.wait()) - if waiter_task in (await aiotools.wait_first( - asyncio.ensure_future(asyncio.sleep(self.__state_poll)), - waiter_task, - ))[0]: - waiter_task = None + if (await self.__notifier.wait(timeout=self.__state_poll)) > 0: + prev = {} + new = await self.get_state() + if new != prev: + prev = copy.deepcopy(new) + yield new # ===== diff --git a/kvmd/apps/kvmd/ugpio.py b/kvmd/apps/kvmd/ugpio.py index d893d2a0..11c60777 100644 --- a/kvmd/apps/kvmd/ugpio.py +++ b/kvmd/apps/kvmd/ugpio.py @@ -234,7 +234,6 @@ class _GpioOutput: # pylint: disable=too-many-instance-attributes class UserGpio: def __init__(self, config: Section, otg_config: Section) -> None: self.__notifier = aiotools.AioNotifier() - self.__full_state_requested = True self.__drivers = { driver: get_ugpio_driver_class(drv_config.type)( @@ -269,14 +268,12 @@ class UserGpio: } async def trigger_state(self) -> None: - self.__full_state_requested = True - self.__notifier.notify() + self.__notifier.notify(1) async def poll_state(self) -> AsyncGenerator[dict, None]: prev: dict = {"inputs": {}, "outputs": {}} while True: # pylint: disable=too-many-nested-blocks - if self.__full_state_requested: - self.__full_state_requested = False + if (await self.__notifier.wait()) > 0: full = await self.get_state() prev = copy.deepcopy(full["state"]) yield full @@ -285,14 +282,13 @@ class UserGpio: diff: dict = {} for sub in ["inputs", "outputs"]: for ch in new[sub]: - if new[sub][ch] != prev[sub][ch]: + if new[sub][ch] != prev[sub].get(ch): if sub not in diff: diff[sub] = {} diff[sub][ch] = new[sub][ch] if diff: prev = copy.deepcopy(new) yield {"state": diff} - await self.__notifier.wait() async def __get_io_state(self) -> dict: return { diff --git a/kvmd/apps/vnc/server.py b/kvmd/apps/vnc/server.py index f8e97050..a6e89311 100644 --- a/kvmd/apps/vnc/server.py +++ b/kvmd/apps/vnc/server.py @@ -175,17 +175,18 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes self.__kvmd_ws = None async def __process_ws_event(self, event_type: str, event: dict) -> None: - if event_type == "info_meta_state": - try: - host = event["server"]["host"] - except Exception: - host = None - else: - if isinstance(host, str): - name = f"PiKVM: {host}" - if self._encodings.has_rename: - await self._send_rename(name) - self.__shared_params.name = name + if event_type == "info_state": + if "meta" in event: + try: + host = event["meta"]["server"]["host"] + except Exception: + host = None + else: + if isinstance(host, str): + name = f"PiKVM: {host}" + if self._encodings.has_rename: + await self._send_rename(name) + self.__shared_params.name = name elif event_type == "hid_state": if self._encodings.has_leds_state: |