summaryrefslogtreecommitdiff
path: root/kvmd
diff options
context:
space:
mode:
Diffstat (limited to 'kvmd')
-rw-r--r--kvmd/aiomulti.py27
-rw-r--r--kvmd/aiotools.py19
-rw-r--r--kvmd/apps/kvmd/api/export.py9
-rw-r--r--kvmd/apps/kvmd/api/info.py16
-rw-r--r--kvmd/apps/kvmd/api/redfish.py6
-rw-r--r--kvmd/apps/kvmd/info/__init__.py51
-rw-r--r--kvmd/apps/kvmd/info/auth.py13
-rw-r--r--kvmd/apps/kvmd/info/base.py10
-rw-r--r--kvmd/apps/kvmd/info/extras.py11
-rw-r--r--kvmd/apps/kvmd/info/fan.py25
-rw-r--r--kvmd/apps/kvmd/info/hw.py19
-rw-r--r--kvmd/apps/kvmd/info/meta.py11
-rw-r--r--kvmd/apps/kvmd/info/system.py12
-rw-r--r--kvmd/apps/kvmd/server.py26
-rw-r--r--kvmd/apps/kvmd/streamer.py27
-rw-r--r--kvmd/apps/kvmd/ugpio.py10
-rw-r--r--kvmd/apps/vnc/server.py23
-rw-r--r--kvmd/clients/kvmd.py13
-rw-r--r--kvmd/plugins/atx/__init__.py3
-rw-r--r--kvmd/plugins/atx/disabled.py8
-rw-r--r--kvmd/plugins/atx/gpio.py17
-rw-r--r--kvmd/plugins/hid/__init__.py3
-rw-r--r--kvmd/plugins/hid/_mcu/__init__.py17
-rw-r--r--kvmd/plugins/hid/bt/__init__.py17
-rw-r--r--kvmd/plugins/hid/ch9329/__init__.py17
-rw-r--r--kvmd/plugins/hid/otg/__init__.py18
-rw-r--r--kvmd/plugins/msd/__init__.py3
-rw-r--r--kvmd/plugins/msd/disabled.py8
-rw-r--r--kvmd/plugins/msd/otg/__init__.py17
29 files changed, 309 insertions, 147 deletions
diff --git a/kvmd/aiomulti.py b/kvmd/aiomulti.py
index 653651cb..a4537204 100644
--- a/kvmd/aiomulti.py
+++ b/kvmd/aiomulti.py
@@ -59,14 +59,25 @@ def queue_get_last_sync( # pylint: disable=invalid-name
# =====
class AioProcessNotifier:
def __init__(self) -> None:
- self.__queue: "multiprocessing.Queue[None]" = multiprocessing.Queue()
-
- def notify(self) -> None:
- self.__queue.put_nowait(None)
-
- async def wait(self) -> None:
- while not (await queue_get_last(self.__queue, 0.1))[0]:
- pass
+ self.__queue: "multiprocessing.Queue[int]" = multiprocessing.Queue()
+
+ def notify(self, mask: int=0) -> None:
+ self.__queue.put_nowait(mask)
+
+ async def wait(self) -> int:
+ while True:
+ mask = await aiotools.run_async(self.__get)
+ if mask >= 0:
+ return mask
+
+ def __get(self) -> int:
+ try:
+ mask = self.__queue.get(timeout=0.1)
+ while not self.__queue.empty():
+ mask |= self.__queue.get()
+ return mask
+ except queue.Empty:
+ return -1
# =====
diff --git a/kvmd/aiotools.py b/kvmd/aiotools.py
index 5d284a94..b1747f16 100644
--- a/kvmd/aiotools.py
+++ b/kvmd/aiotools.py
@@ -232,25 +232,26 @@ async def close_writer(writer: asyncio.StreamWriter) -> bool:
# =====
class AioNotifier:
def __init__(self) -> None:
- self.__queue: "asyncio.Queue[None]" = asyncio.Queue()
+ self.__queue: "asyncio.Queue[int]" = asyncio.Queue()
- def notify(self) -> None:
- self.__queue.put_nowait(None)
+ def notify(self, mask: int=0) -> None:
+ self.__queue.put_nowait(mask)
- async def wait(self, timeout: (float | None)=None) -> None:
+ async def wait(self, timeout: (float | None)=None) -> int:
+ mask = 0
if timeout is None:
- await self.__queue.get()
+ mask = await self.__queue.get()
else:
try:
- await asyncio.wait_for(
+ mask = await asyncio.wait_for(
asyncio.ensure_future(self.__queue.get()),
timeout=timeout,
)
except asyncio.TimeoutError:
- return # False
+ return -1
while not self.__queue.empty():
- await self.__queue.get()
- # return True
+ mask |= await self.__queue.get()
+ return mask
# =====
diff --git a/kvmd/apps/kvmd/api/export.py b/kvmd/apps/kvmd/api/export.py
index 998c5d81..bb048f53 100644
--- a/kvmd/apps/kvmd/api/export.py
+++ b/kvmd/apps/kvmd/api/export.py
@@ -55,10 +55,9 @@ class ExportApi:
@async_lru.alru_cache(maxsize=1, ttl=5)
async def __get_prometheus_metrics(self) -> str:
- (atx_state, hw_state, fan_state, gpio_state) = await asyncio.gather(*[
+ (atx_state, info_state, gpio_state) = await asyncio.gather(*[
self.__atx.get_state(),
- self.__info_manager.get_submanager("hw").get_state(),
- self.__info_manager.get_submanager("fan").get_state(),
+ self.__info_manager.get_state(["hw", "fan"]),
self.__user_gpio.get_state(),
])
rows: list[str] = []
@@ -72,8 +71,8 @@ class ExportApi:
for key in ["online", "state"]:
self.__append_prometheus_rows(rows, ch_state["state"], f"pikvm_gpio_{mode}_{key}_{channel}")
- self.__append_prometheus_rows(rows, hw_state["health"], "pikvm_hw") # type: ignore
- self.__append_prometheus_rows(rows, fan_state, "pikvm_fan")
+ self.__append_prometheus_rows(rows, info_state["hw"]["health"], "pikvm_hw") # type: ignore
+ self.__append_prometheus_rows(rows, info_state["fan"], "pikvm_fan")
return "\n".join(rows)
diff --git a/kvmd/apps/kvmd/api/info.py b/kvmd/apps/kvmd/api/info.py
index a0be01a5..89d45a84 100644
--- a/kvmd/apps/kvmd/api/info.py
+++ b/kvmd/apps/kvmd/api/info.py
@@ -20,8 +20,6 @@
# ========================================================================== #
-import asyncio
-
from aiohttp.web import Request
from aiohttp.web import Response
@@ -43,15 +41,11 @@ class InfoApi:
@exposed_http("GET", "/info")
async def __common_state_handler(self, req: Request) -> Response:
fields = self.__valid_info_fields(req)
- results = dict(zip(fields, await asyncio.gather(*[
- self.__info_manager.get_submanager(field).get_state()
- for field in fields
- ])))
- return make_json_response(results)
+ return make_json_response(await self.__info_manager.get_state(fields))
def __valid_info_fields(self, req: Request) -> list[str]:
- subs = self.__info_manager.get_subs()
+ available = self.__info_manager.get_subs()
return sorted(valid_info_fields(
- arg=req.query.get("fields", ",".join(subs)),
- variants=subs,
- ) or subs)
+ arg=req.query.get("fields", ",".join(available)),
+ variants=available,
+ ) or available)
diff --git a/kvmd/apps/kvmd/api/redfish.py b/kvmd/apps/kvmd/api/redfish.py
index e1822496..3b248685 100644
--- a/kvmd/apps/kvmd/api/redfish.py
+++ b/kvmd/apps/kvmd/api/redfish.py
@@ -88,12 +88,12 @@ class RedfishApi:
@exposed_http("GET", "/redfish/v1/Systems/0")
async def __server_handler(self, _: Request) -> Response:
- (atx_state, meta_state) = await asyncio.gather(*[
+ (atx_state, info_state) = await asyncio.gather(*[
self.__atx.get_state(),
- self.__info_manager.get_submanager("meta").get_state(),
+ self.__info_manager.get_state(["meta"]),
])
try:
- host = str(meta_state.get("server", {})["host"]) # type: ignore
+ host = str(info_state["meta"].get("server", {})["host"]) # type: ignore
except Exception:
host = ""
return make_json_response({
diff --git a/kvmd/apps/kvmd/info/__init__.py b/kvmd/apps/kvmd/info/__init__.py
index 983ef6d6..b346c10c 100644
--- a/kvmd/apps/kvmd/info/__init__.py
+++ b/kvmd/apps/kvmd/info/__init__.py
@@ -20,6 +20,10 @@
# ========================================================================== #
+import asyncio
+
+from typing import AsyncGenerator
+
from ....yamlconf import Section
from .base import BaseInfoSubmanager
@@ -34,17 +38,50 @@ from .fan import FanInfoSubmanager
# =====
class InfoManager:
def __init__(self, config: Section) -> None:
- self.__subs = {
+ self.__subs: dict[str, BaseInfoSubmanager] = {
"system": SystemInfoSubmanager(config.kvmd.streamer.cmd),
- "auth": AuthInfoSubmanager(config.kvmd.auth.enabled),
- "meta": MetaInfoSubmanager(config.kvmd.info.meta),
+ "auth": AuthInfoSubmanager(config.kvmd.auth.enabled),
+ "meta": MetaInfoSubmanager(config.kvmd.info.meta),
"extras": ExtrasInfoSubmanager(config),
- "hw": HwInfoSubmanager(**config.kvmd.info.hw._unpack()),
- "fan": FanInfoSubmanager(**config.kvmd.info.fan._unpack()),
+ "hw": HwInfoSubmanager(**config.kvmd.info.hw._unpack()),
+ "fan": FanInfoSubmanager(**config.kvmd.info.fan._unpack()),
}
+ self.__queue: "asyncio.Queue[tuple[str, (dict | None)]]" = asyncio.Queue()
def get_subs(self) -> set[str]:
return set(self.__subs)
- def get_submanager(self, name: str) -> BaseInfoSubmanager:
- return self.__subs[name]
+ async def get_state(self, fields: (list[str] | None)=None) -> dict:
+ fields = (fields or list(self.__subs))
+ return dict(zip(fields, await asyncio.gather(*[
+ self.__subs[field].get_state()
+ for field in fields
+ ])))
+
+ async def trigger_state(self) -> None:
+ await asyncio.gather(*[
+ sub.trigger_state()
+ for sub in self.__subs.values()
+ ])
+
+ async def poll_state(self) -> AsyncGenerator[dict, None]:
+ while True:
+ (field, value) = await self.__queue.get()
+ yield {field: value}
+
+ async def systask(self) -> None:
+ tasks = [
+ asyncio.create_task(self.__poller(field))
+ for field in self.__subs
+ ]
+ try:
+ await asyncio.gather(*tasks)
+ except Exception:
+ for task in tasks:
+ task.cancel()
+ await asyncio.gather(*tasks, return_exceptions=True)
+ raise
+
+ async def __poller(self, field: str) -> None:
+ async for state in self.__subs[field].poll_state():
+ self.__queue.put_nowait((field, state))
diff --git a/kvmd/apps/kvmd/info/auth.py b/kvmd/apps/kvmd/info/auth.py
index 2077afff..301cffe3 100644
--- a/kvmd/apps/kvmd/info/auth.py
+++ b/kvmd/apps/kvmd/info/auth.py
@@ -20,6 +20,10 @@
# ========================================================================== #
+from typing import AsyncGenerator
+
+from .... import aiotools
+
from .base import BaseInfoSubmanager
@@ -27,6 +31,15 @@ from .base import BaseInfoSubmanager
class AuthInfoSubmanager(BaseInfoSubmanager):
def __init__(self, enabled: bool) -> None:
self.__enabled = enabled
+ self.__notifier = aiotools.AioNotifier()
async def get_state(self) -> dict:
return {"enabled": self.__enabled}
+
+ async def trigger_state(self) -> None:
+ self.__notifier.notify()
+
+ async def poll_state(self) -> AsyncGenerator[(dict | None), None]:
+ while True:
+ await self.__notifier.wait()
+ yield (await self.get_state())
diff --git a/kvmd/apps/kvmd/info/base.py b/kvmd/apps/kvmd/info/base.py
index 87494e70..d090ed34 100644
--- a/kvmd/apps/kvmd/info/base.py
+++ b/kvmd/apps/kvmd/info/base.py
@@ -20,7 +20,17 @@
# ========================================================================== #
+from typing import AsyncGenerator
+
+
# =====
class BaseInfoSubmanager:
async def get_state(self) -> (dict | None):
raise NotImplementedError
+
+ async def trigger_state(self) -> None:
+ raise NotImplementedError
+
+ async def poll_state(self) -> AsyncGenerator[(dict | None), None]:
+ yield None
+ raise NotImplementedError
diff --git a/kvmd/apps/kvmd/info/extras.py b/kvmd/apps/kvmd/info/extras.py
index 07225013..a855ad15 100644
--- a/kvmd/apps/kvmd/info/extras.py
+++ b/kvmd/apps/kvmd/info/extras.py
@@ -24,6 +24,8 @@ import os
import re
import asyncio
+from typing import AsyncGenerator
+
from ....logging import get_logger
from ....yamlconf import Section
@@ -41,6 +43,7 @@ from .base import BaseInfoSubmanager
class ExtrasInfoSubmanager(BaseInfoSubmanager):
def __init__(self, global_config: Section) -> None:
self.__global_config = global_config
+ self.__notifier = aiotools.AioNotifier()
async def get_state(self) -> (dict | None):
try:
@@ -65,6 +68,14 @@ class ExtrasInfoSubmanager(BaseInfoSubmanager):
if sui is not None:
await aiotools.shield_fg(sui.close())
+ async def trigger_state(self) -> None:
+ self.__notifier.notify()
+
+ async def poll_state(self) -> AsyncGenerator[(dict | None), None]:
+ while True:
+ await self.__notifier.wait()
+ yield (await self.get_state())
+
def __get_extras_path(self, *parts: str) -> str:
return os.path.join(self.__global_config.kvmd.info.extras, *parts)
diff --git a/kvmd/apps/kvmd/info/fan.py b/kvmd/apps/kvmd/info/fan.py
index 247aff7d..8f3f69c8 100644
--- a/kvmd/apps/kvmd/info/fan.py
+++ b/kvmd/apps/kvmd/info/fan.py
@@ -21,7 +21,6 @@
import copy
-import asyncio
from typing import AsyncGenerator
@@ -53,6 +52,8 @@ class FanInfoSubmanager(BaseInfoSubmanager):
self.__timeout = timeout
self.__state_poll = state_poll
+ self.__notifier = aiotools.AioNotifier()
+
async def get_state(self) -> dict:
monitored = await self.__get_monitored()
return {
@@ -60,24 +61,28 @@ class FanInfoSubmanager(BaseInfoSubmanager):
"state": ((await self.__get_fan_state() if monitored else None)),
}
- async def poll_state(self) -> AsyncGenerator[dict, None]:
- prev_state: dict = {}
+ async def trigger_state(self) -> None:
+ self.__notifier.notify(1)
+
+ async def poll_state(self) -> AsyncGenerator[(dict | None), None]:
+ prev: dict = {}
while True:
if self.__unix_path:
- pure = state = await self.get_state()
+ if (await self.__notifier.wait(timeout=self.__state_poll)) > 0:
+ prev = {}
+ new = await self.get_state()
+ pure = copy.deepcopy(new)
if pure["state"] is not None:
try:
- pure = copy.deepcopy(state)
pure["state"]["service"]["now_ts"] = 0
except Exception:
pass
- if pure != prev_state:
- yield state
- prev_state = pure
- await asyncio.sleep(self.__state_poll)
+ if pure != prev:
+ prev = pure
+ yield new
else:
+ await self.__notifier.wait()
yield (await self.get_state())
- await aiotools.wait_infinite()
# =====
diff --git a/kvmd/apps/kvmd/info/hw.py b/kvmd/apps/kvmd/info/hw.py
index 458bc1ec..81cd1af6 100644
--- a/kvmd/apps/kvmd/info/hw.py
+++ b/kvmd/apps/kvmd/info/hw.py
@@ -22,6 +22,7 @@
import os
import asyncio
+import copy
from typing import Callable
from typing import AsyncGenerator
@@ -60,6 +61,8 @@ class HwInfoSubmanager(BaseInfoSubmanager):
self.__dt_cache: dict[str, str] = {}
+ self.__notifier = aiotools.AioNotifier()
+
async def get_state(self) -> dict:
(
base,
@@ -97,14 +100,18 @@ class HwInfoSubmanager(BaseInfoSubmanager):
},
}
+ async def trigger_state(self) -> None:
+ self.__notifier.notify(1)
+
async def poll_state(self) -> AsyncGenerator[dict, None]:
- prev_state: dict = {}
+ prev: dict = {}
while True:
- state = await self.get_state()
- if state != prev_state:
- yield state
- prev_state = state
- await asyncio.sleep(self.__state_poll)
+ if (await self.__notifier.wait(timeout=self.__state_poll)) > 0:
+ prev = {}
+ new = await self.get_state()
+ if new != prev:
+ prev = copy.deepcopy(new)
+ yield new
# =====
diff --git a/kvmd/apps/kvmd/info/meta.py b/kvmd/apps/kvmd/info/meta.py
index b66b76b1..996e648a 100644
--- a/kvmd/apps/kvmd/info/meta.py
+++ b/kvmd/apps/kvmd/info/meta.py
@@ -20,6 +20,8 @@
# ========================================================================== #
+from typing import AsyncGenerator
+
from ....logging import get_logger
from ....yamlconf.loader import load_yaml_file
@@ -33,6 +35,7 @@ from .base import BaseInfoSubmanager
class MetaInfoSubmanager(BaseInfoSubmanager):
def __init__(self, meta_path: str) -> None:
self.__meta_path = meta_path
+ self.__notifier = aiotools.AioNotifier()
async def get_state(self) -> (dict | None):
try:
@@ -40,3 +43,11 @@ class MetaInfoSubmanager(BaseInfoSubmanager):
except Exception:
get_logger(0).exception("Can't parse meta")
return None
+
+ async def trigger_state(self) -> None:
+ self.__notifier.notify()
+
+ async def poll_state(self) -> AsyncGenerator[(dict | None), None]:
+ while True:
+ await self.__notifier.wait()
+ yield (await self.get_state())
diff --git a/kvmd/apps/kvmd/info/system.py b/kvmd/apps/kvmd/info/system.py
index 462b52c9..d4a450de 100644
--- a/kvmd/apps/kvmd/info/system.py
+++ b/kvmd/apps/kvmd/info/system.py
@@ -24,8 +24,11 @@ import os
import asyncio
import platform
+from typing import AsyncGenerator
+
from ....logging import get_logger
+from .... import aiotools
from .... import aioproc
from .... import __version__
@@ -37,6 +40,7 @@ from .base import BaseInfoSubmanager
class SystemInfoSubmanager(BaseInfoSubmanager):
def __init__(self, streamer_cmd: list[str]) -> None:
self.__streamer_cmd = streamer_cmd
+ self.__notifier = aiotools.AioNotifier()
async def get_state(self) -> dict:
streamer_info = await self.__get_streamer_info()
@@ -50,6 +54,14 @@ class SystemInfoSubmanager(BaseInfoSubmanager):
},
}
+ async def trigger_state(self) -> None:
+ self.__notifier.notify()
+
+ async def poll_state(self) -> AsyncGenerator[(dict | None), None]:
+ while True:
+ await self.__notifier.wait()
+ yield (await self.get_state())
+
# =====
async def __get_streamer_info(self) -> dict:
diff --git a/kvmd/apps/kvmd/server.py b/kvmd/apps/kvmd/server.py
index 7e3c7d48..50642725 100644
--- a/kvmd/apps/kvmd/server.py
+++ b/kvmd/apps/kvmd/server.py
@@ -150,6 +150,7 @@ class _Subsystem:
class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-instance-attributes
__EV_GPIO_STATE = "gpio_state"
+ __EV_INFO_STATE = "info_state"
def __init__( # pylint: disable=too-many-arguments,too-many-locals
self,
@@ -200,15 +201,12 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins
self.__subsystems = [
_Subsystem.make(auth_manager, "Auth manager"),
- _Subsystem.make(user_gpio, "User-GPIO", self.__EV_GPIO_STATE),
- _Subsystem.make(hid, "HID", "hid_state").add_source("hid_keymaps_state", self.__hid_api.get_keymaps, None, None),
- _Subsystem.make(atx, "ATX", "atx_state"),
- _Subsystem.make(msd, "MSD", "msd_state"),
- _Subsystem.make(streamer, "Streamer", "streamer_state").add_source("streamer_ocr_state", self.__streamer_api.get_ocr, None, None),
- *[
- _Subsystem.make(info_manager.get_submanager(sub), f"Info manager ({sub})", f"info_{sub}_state",)
- for sub in sorted(info_manager.get_subs())
- ],
+ _Subsystem.make(user_gpio, "User-GPIO", self.__EV_GPIO_STATE),
+ _Subsystem.make(hid, "HID", "hid_state").add_source("hid_keymaps_state", self.__hid_api.get_keymaps, None, None),
+ _Subsystem.make(atx, "ATX", "atx_state"),
+ _Subsystem.make(msd, "MSD", "msd_state"),
+ _Subsystem.make(streamer, "Streamer", "streamer_state").add_source("streamer_ocr_state", self.__streamer_api.get_ocr, None, None),
+ _Subsystem.make(info_manager, "Info manager", self.__EV_INFO_STATE),
]
self.__streamer_notifier = aiotools.AioNotifier()
@@ -251,6 +249,7 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins
stream = valid_bool(req.query.get("stream", True))
legacy = valid_bool(req.query.get("legacy", True))
async with self._ws_session(req, stream=stream, legacy=legacy) as ws:
+ await ws.send_event("loop", {})
states = [
(event_type, src.get_state())
for sub in self.__subsystems
@@ -269,7 +268,6 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins
for src in sub.sources.values():
if src.trigger_state:
await src.trigger_state()
- await ws.send_event("loop", {})
return (await self._ws_loop(ws))
@exposed_ws("ping")
@@ -366,6 +364,8 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins
async def __poll_state(self, event_type: str, poller: AsyncGenerator[dict, None]) -> None:
if event_type == self.__EV_GPIO_STATE:
await self.__poll_gpio_state(poller)
+ elif event_type == self.__EV_INFO_STATE:
+ await self.__poll_info_state(poller)
else:
async for state in poller:
await self._broadcast_ws_event(event_type, state)
@@ -381,3 +381,9 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins
prev["state"]["inputs"].update(state["state"].get("inputs", {}))
prev["state"]["outputs"].update(state["state"].get("outputs", {}))
await self._broadcast_ws_event(self.__EV_GPIO_STATE, prev["state"], legacy=True)
+
+ async def __poll_info_state(self, poller: AsyncGenerator[dict, None]) -> None:
+ async for state in poller:
+ await self._broadcast_ws_event(self.__EV_INFO_STATE, state, legacy=False)
+ for (key, value) in state.items():
+ await self._broadcast_ws_event(f"info_{key}_state", value, legacy=True)
diff --git a/kvmd/apps/kvmd/streamer.py b/kvmd/apps/kvmd/streamer.py
index c365e19d..c0a56ad1 100644
--- a/kvmd/apps/kvmd/streamer.py
+++ b/kvmd/apps/kvmd/streamer.py
@@ -26,6 +26,7 @@ import asyncio
import asyncio.subprocess
import dataclasses
import functools
+import copy
from typing import AsyncGenerator
from typing import Any
@@ -136,7 +137,7 @@ class _StreamerParams:
}
def get_limits(self) -> dict:
- limits = dict(self.__limits)
+ limits = copy.deepcopy(self.__limits)
if self.__has_resolution:
limits[self.__AVAILABLE_RESOLUTIONS] = list(limits[self.__AVAILABLE_RESOLUTIONS])
return limits
@@ -323,6 +324,9 @@ class Streamer: # pylint: disable=too-many-instance-attributes
"features": self.__params.get_features(),
}
+ async def trigger_state(self) -> None:
+ self.__notifier.notify(1)
+
async def poll_state(self) -> AsyncGenerator[dict, None]:
def signal_handler(*_: Any) -> None:
get_logger(0).info("Got SIGUSR2, checking the stream state ...")
@@ -331,21 +335,14 @@ class Streamer: # pylint: disable=too-many-instance-attributes
get_logger(0).info("Installing SIGUSR2 streamer handler ...")
asyncio.get_event_loop().add_signal_handler(signal.SIGUSR2, signal_handler)
- waiter_task: (asyncio.Task | None) = None
- prev_state: dict = {}
+ prev: dict = {}
while True:
- state = await self.get_state()
- if state != prev_state:
- yield state
- prev_state = state
-
- if waiter_task is None:
- waiter_task = asyncio.create_task(self.__notifier.wait())
- if waiter_task in (await aiotools.wait_first(
- asyncio.ensure_future(asyncio.sleep(self.__state_poll)),
- waiter_task,
- ))[0]:
- waiter_task = None
+ if (await self.__notifier.wait(timeout=self.__state_poll)) > 0:
+ prev = {}
+ new = await self.get_state()
+ if new != prev:
+ prev = copy.deepcopy(new)
+ yield new
# =====
diff --git a/kvmd/apps/kvmd/ugpio.py b/kvmd/apps/kvmd/ugpio.py
index d893d2a0..11c60777 100644
--- a/kvmd/apps/kvmd/ugpio.py
+++ b/kvmd/apps/kvmd/ugpio.py
@@ -234,7 +234,6 @@ class _GpioOutput: # pylint: disable=too-many-instance-attributes
class UserGpio:
def __init__(self, config: Section, otg_config: Section) -> None:
self.__notifier = aiotools.AioNotifier()
- self.__full_state_requested = True
self.__drivers = {
driver: get_ugpio_driver_class(drv_config.type)(
@@ -269,14 +268,12 @@ class UserGpio:
}
async def trigger_state(self) -> None:
- self.__full_state_requested = True
- self.__notifier.notify()
+ self.__notifier.notify(1)
async def poll_state(self) -> AsyncGenerator[dict, None]:
prev: dict = {"inputs": {}, "outputs": {}}
while True: # pylint: disable=too-many-nested-blocks
- if self.__full_state_requested:
- self.__full_state_requested = False
+ if (await self.__notifier.wait()) > 0:
full = await self.get_state()
prev = copy.deepcopy(full["state"])
yield full
@@ -285,14 +282,13 @@ class UserGpio:
diff: dict = {}
for sub in ["inputs", "outputs"]:
for ch in new[sub]:
- if new[sub][ch] != prev[sub][ch]:
+ if new[sub][ch] != prev[sub].get(ch):
if sub not in diff:
diff[sub] = {}
diff[sub][ch] = new[sub][ch]
if diff:
prev = copy.deepcopy(new)
yield {"state": diff}
- await self.__notifier.wait()
async def __get_io_state(self) -> dict:
return {
diff --git a/kvmd/apps/vnc/server.py b/kvmd/apps/vnc/server.py
index f8e97050..a6e89311 100644
--- a/kvmd/apps/vnc/server.py
+++ b/kvmd/apps/vnc/server.py
@@ -175,17 +175,18 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes
self.__kvmd_ws = None
async def __process_ws_event(self, event_type: str, event: dict) -> None:
- if event_type == "info_meta_state":
- try:
- host = event["server"]["host"]
- except Exception:
- host = None
- else:
- if isinstance(host, str):
- name = f"PiKVM: {host}"
- if self._encodings.has_rename:
- await self._send_rename(name)
- self.__shared_params.name = name
+ if event_type == "info_state":
+ if "meta" in event:
+ try:
+ host = event["meta"]["server"]["host"]
+ except Exception:
+ host = None
+ else:
+ if isinstance(host, str):
+ name = f"PiKVM: {host}"
+ if self._encodings.has_rename:
+ await self._send_rename(name)
+ self.__shared_params.name = name
elif event_type == "hid_state":
if self._encodings.has_leds_state:
diff --git a/kvmd/clients/kvmd.py b/kvmd/clients/kvmd.py
index ff8581ef..ddf0d024 100644
--- a/kvmd/clients/kvmd.py
+++ b/kvmd/clients/kvmd.py
@@ -222,7 +222,7 @@ class KvmdClientSession:
@contextlib.asynccontextmanager
async def ws(self) -> AsyncGenerator[KvmdClientWs, None]:
session = self.__ensure_http_session()
- async with session.ws_connect(self.__make_url("ws")) as ws:
+ async with session.ws_connect(self.__make_url("ws"), params={"legacy": 0}) as ws:
yield KvmdClientWs(ws)
def __ensure_http_session(self) -> aiohttp.ClientSession:
@@ -267,16 +267,15 @@ class KvmdClient:
)
def __make_http_session(self, user: str, passwd: str) -> aiohttp.ClientSession:
- kwargs: dict = {
- "headers": {
+ return aiohttp.ClientSession(
+ headers={
"X-KVMD-User": user,
"X-KVMD-Passwd": passwd,
"User-Agent": self.__user_agent,
},
- "connector": aiohttp.UnixConnector(path=self.__unix_path),
- "timeout": aiohttp.ClientTimeout(total=self.__timeout),
- }
- return aiohttp.ClientSession(**kwargs)
+ connector=aiohttp.UnixConnector(path=self.__unix_path),
+ timeout=aiohttp.ClientTimeout(total=self.__timeout),
+ )
def __make_url(self, handle: str) -> str:
assert not handle.startswith("/"), handle
diff --git a/kvmd/plugins/atx/__init__.py b/kvmd/plugins/atx/__init__.py
index e9496a65..7545b030 100644
--- a/kvmd/plugins/atx/__init__.py
+++ b/kvmd/plugins/atx/__init__.py
@@ -48,6 +48,9 @@ class BaseAtx(BasePlugin):
async def get_state(self) -> dict:
raise NotImplementedError
+ async def trigger_state(self) -> None:
+ raise NotImplementedError
+
async def poll_state(self) -> AsyncGenerator[dict, None]:
yield {}
raise NotImplementedError
diff --git a/kvmd/plugins/atx/disabled.py b/kvmd/plugins/atx/disabled.py
index d9abec00..60c2fa5b 100644
--- a/kvmd/plugins/atx/disabled.py
+++ b/kvmd/plugins/atx/disabled.py
@@ -36,6 +36,9 @@ class AtxDisabledError(AtxOperationError):
# =====
class Plugin(BaseAtx):
+ def __init__(self) -> None:
+ self.__notifier = aiotools.AioNotifier()
+
async def get_state(self) -> dict:
return {
"enabled": False,
@@ -46,10 +49,13 @@ class Plugin(BaseAtx):
},
}
+ async def trigger_state(self) -> None:
+ self.__notifier.notify()
+
async def poll_state(self) -> AsyncGenerator[dict, None]:
while True:
+ await self.__notifier.wait()
yield (await self.get_state())
- await aiotools.wait_infinite()
# =====
diff --git a/kvmd/plugins/atx/gpio.py b/kvmd/plugins/atx/gpio.py
index 538aafaf..e42b3959 100644
--- a/kvmd/plugins/atx/gpio.py
+++ b/kvmd/plugins/atx/gpio.py
@@ -21,6 +21,7 @@
import asyncio
+import copy
from typing import AsyncGenerator
@@ -130,14 +131,18 @@ class Plugin(BaseAtx): # pylint: disable=too-many-instance-attributes
},
}
+ async def trigger_state(self) -> None:
+ self.__notifier.notify(1)
+
async def poll_state(self) -> AsyncGenerator[dict, None]:
- prev_state: dict = {}
+ prev: dict = {}
while True:
- state = await self.get_state()
- if state != prev_state:
- yield state
- prev_state = state
- await self.__notifier.wait()
+ if (await self.__notifier.wait()) > 0:
+ prev = {}
+ new = await self.get_state()
+ if new != prev:
+ prev = copy.deepcopy(new)
+ yield new
async def systask(self) -> None:
await self.__reader.poll()
diff --git a/kvmd/plugins/hid/__init__.py b/kvmd/plugins/hid/__init__.py
index 1c2efeec..3cba01f4 100644
--- a/kvmd/plugins/hid/__init__.py
+++ b/kvmd/plugins/hid/__init__.py
@@ -63,6 +63,9 @@ class BaseHid(BasePlugin):
async def get_state(self) -> dict:
raise NotImplementedError
+ async def trigger_state(self) -> None:
+ raise NotImplementedError
+
async def poll_state(self) -> AsyncGenerator[dict, None]:
yield {}
raise NotImplementedError
diff --git a/kvmd/plugins/hid/_mcu/__init__.py b/kvmd/plugins/hid/_mcu/__init__.py
index 53665fb2..e058fd6c 100644
--- a/kvmd/plugins/hid/_mcu/__init__.py
+++ b/kvmd/plugins/hid/_mcu/__init__.py
@@ -23,6 +23,7 @@
import multiprocessing
import contextlib
import queue
+import copy
import time
from typing import Iterable
@@ -232,14 +233,18 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many-
**self._get_jiggler_state(),
}
+ async def trigger_state(self) -> None:
+ self.__notifier.notify(1)
+
async def poll_state(self) -> AsyncGenerator[dict, None]:
- prev_state: dict = {}
+ prev: dict = {}
while True:
- state = await self.get_state()
- if state != prev_state:
- yield state
- prev_state = state
- await self.__notifier.wait()
+ if (await self.__notifier.wait()) > 0:
+ prev = {}
+ new = await self.get_state()
+ if new != prev:
+ prev = copy.deepcopy(new)
+ yield new
async def reset(self) -> None:
self.__reset_required_event.set()
diff --git a/kvmd/plugins/hid/bt/__init__.py b/kvmd/plugins/hid/bt/__init__.py
index ece6b59b..bca8f9a5 100644
--- a/kvmd/plugins/hid/bt/__init__.py
+++ b/kvmd/plugins/hid/bt/__init__.py
@@ -21,6 +21,7 @@
import multiprocessing
+import copy
import time
from typing import Iterable
@@ -158,14 +159,18 @@ class Plugin(BaseHid): # pylint: disable=too-many-instance-attributes
**self._get_jiggler_state(),
}
+ async def trigger_state(self) -> None:
+ self.__notifier.notify(1)
+
async def poll_state(self) -> AsyncGenerator[dict, None]:
- prev_state: dict = {}
+ prev: dict = {}
while True:
- state = await self.get_state()
- if state != prev_state:
- yield state
- prev_state = state
- await self.__notifier.wait()
+ if (await self.__notifier.wait()) > 0:
+ prev = {}
+ new = await self.get_state()
+ if new != prev:
+ prev = copy.deepcopy(new)
+ yield new
async def reset(self) -> None:
self.clear_events()
diff --git a/kvmd/plugins/hid/ch9329/__init__.py b/kvmd/plugins/hid/ch9329/__init__.py
index 3245505d..f93be95c 100644
--- a/kvmd/plugins/hid/ch9329/__init__.py
+++ b/kvmd/plugins/hid/ch9329/__init__.py
@@ -22,6 +22,7 @@
import multiprocessing
import queue
+import copy
import time
from typing import Iterable
@@ -119,14 +120,18 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst
**self._get_jiggler_state(),
}
+ async def trigger_state(self) -> None:
+ self.__notifier.notify(1)
+
async def poll_state(self) -> AsyncGenerator[dict, None]:
- prev_state: dict = {}
+ prev: dict = {}
while True:
- state = await self.get_state()
- if state != prev_state:
- yield state
- prev_state = state
- await self.__notifier.wait()
+ if (await self.__notifier.wait()) > 0:
+ prev = {}
+ new = await self.get_state()
+ if new != prev:
+ prev = copy.deepcopy(new)
+ yield new
async def reset(self) -> None:
self.__reset_required_event.set()
diff --git a/kvmd/plugins/hid/otg/__init__.py b/kvmd/plugins/hid/otg/__init__.py
index 3516546d..7686ebdd 100644
--- a/kvmd/plugins/hid/otg/__init__.py
+++ b/kvmd/plugins/hid/otg/__init__.py
@@ -20,6 +20,8 @@
# ========================================================================== #
+import copy
+
from typing import Iterable
from typing import AsyncGenerator
from typing import Any
@@ -150,14 +152,18 @@ class Plugin(BaseHid): # pylint: disable=too-many-instance-attributes
**self._get_jiggler_state(),
}
+ async def trigger_state(self) -> None:
+ self.__notifier.notify(1)
+
async def poll_state(self) -> AsyncGenerator[dict, None]:
- prev_state: dict = {}
+ prev: dict = {}
while True:
- state = await self.get_state()
- if state != prev_state:
- yield state
- prev_state = state
- await self.__notifier.wait()
+ if (await self.__notifier.wait()) > 0:
+ prev = {}
+ new = await self.get_state()
+ if new != prev:
+ prev = copy.deepcopy(new)
+ yield new
async def reset(self) -> None:
self.__keyboard_proc.send_reset_event()
diff --git a/kvmd/plugins/msd/__init__.py b/kvmd/plugins/msd/__init__.py
index e0210921..11cd8be5 100644
--- a/kvmd/plugins/msd/__init__.py
+++ b/kvmd/plugins/msd/__init__.py
@@ -117,6 +117,9 @@ class BaseMsd(BasePlugin):
async def get_state(self) -> dict:
raise NotImplementedError()
+ async def trigger_state(self) -> None:
+ raise NotImplementedError()
+
async def poll_state(self) -> AsyncGenerator[dict, None]:
if self is not None: # XXX: Vulture and pylint hack
raise NotImplementedError()
diff --git a/kvmd/plugins/msd/disabled.py b/kvmd/plugins/msd/disabled.py
index ad49875e..b9f14f6e 100644
--- a/kvmd/plugins/msd/disabled.py
+++ b/kvmd/plugins/msd/disabled.py
@@ -40,6 +40,9 @@ class MsdDisabledError(MsdOperationError):
# =====
class Plugin(BaseMsd):
+ def __init__(self) -> None:
+ self.__notifier = aiotools.AioNotifier()
+
async def get_state(self) -> dict:
return {
"enabled": False,
@@ -49,10 +52,13 @@ class Plugin(BaseMsd):
"drive": None,
}
+ async def trigger_state(self) -> None:
+ self.__notifier.notify()
+
async def poll_state(self) -> AsyncGenerator[dict, None]:
while True:
+ await self.__notifier.wait()
yield (await self.get_state())
- await aiotools.wait_infinite()
async def reset(self) -> None:
raise MsdDisabledError()
diff --git a/kvmd/plugins/msd/otg/__init__.py b/kvmd/plugins/msd/otg/__init__.py
index b652cdd0..0bb9f489 100644
--- a/kvmd/plugins/msd/otg/__init__.py
+++ b/kvmd/plugins/msd/otg/__init__.py
@@ -24,6 +24,7 @@ import asyncio
import contextlib
import dataclasses
import functools
+import copy
import time
from typing import AsyncGenerator
@@ -195,14 +196,18 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes
"drive": vd,
}
+ async def trigger_state(self) -> None:
+ self.__notifier.notify(1)
+
async def poll_state(self) -> AsyncGenerator[dict, None]:
- prev_state: dict = {}
+ prev: dict = {}
while True:
- state = await self.get_state()
- if state != prev_state:
- yield state
- prev_state = state
- await self.__notifier.wait()
+ if (await self.__notifier.wait()) > 0:
+ prev = {}
+ new = await self.get_state()
+ if new != prev:
+ prev = copy.deepcopy(new)
+ yield new
async def systask(self) -> None:
await self.__watch_inotify()