summaryrefslogtreecommitdiff
path: root/kvmd
diff options
context:
space:
mode:
authorMaxim Devaev <[email protected]>2024-10-19 08:59:52 +0300
committerMaxim Devaev <[email protected]>2024-10-19 08:59:52 +0300
commit90d8e745e39b5e49fcb1d50c1efa95148593496b (patch)
treef098894e16275d8fe80b8cb16bb615ff3a106e5e /kvmd
parent3852d0a4568c3726d0182f3b00a9a3e8bb7e8af9 (diff)
gpio diff events mode
Diffstat (limited to 'kvmd')
-rw-r--r--kvmd/apps/kvmd/api/ugpio.py5
-rw-r--r--kvmd/apps/kvmd/server.py49
-rw-r--r--kvmd/apps/kvmd/ugpio.py122
-rw-r--r--kvmd/htserver.py3
4 files changed, 118 insertions, 61 deletions
diff --git a/kvmd/apps/kvmd/api/ugpio.py b/kvmd/apps/kvmd/api/ugpio.py
index ddaefefa..b2ac02be 100644
--- a/kvmd/apps/kvmd/api/ugpio.py
+++ b/kvmd/apps/kvmd/api/ugpio.py
@@ -42,10 +42,7 @@ class UserGpioApi:
@exposed_http("GET", "/gpio")
async def __state_handler(self, _: Request) -> Response:
- return make_json_response({
- "model": (await self.__user_gpio.get_model()),
- "state": (await self.__user_gpio.get_state()),
- })
+ return make_json_response(await self.__user_gpio.get_state())
@exposed_http("POST", "/gpio/switch")
async def __switch_handler(self, req: Request) -> Response:
diff --git a/kvmd/apps/kvmd/server.py b/kvmd/apps/kvmd/server.py
index 366958ce..7e3c7d48 100644
--- a/kvmd/apps/kvmd/server.py
+++ b/kvmd/apps/kvmd/server.py
@@ -100,8 +100,9 @@ class StreamerH264NotSupported(OperationError):
# =====
@dataclasses.dataclass(frozen=True)
class _SubsystemEventSource:
- get_state: (Callable[[], Coroutine[Any, Any, dict]] | None) = None
- poll_state: (Callable[[], AsyncGenerator[dict, None]] | None) = None
+ get_state: (Callable[[], Coroutine[Any, Any, dict]] | None) = None
+ trigger_state: (Callable[[], Coroutine[Any, Any, None]] | None) = None
+ poll_state: (Callable[[], AsyncGenerator[dict, None]] | None) = None
@dataclasses.dataclass
@@ -127,6 +128,7 @@ class _Subsystem:
sub.add_source(
event_type=event_type,
get_state=getattr(obj, "get_state", None),
+ trigger_state=getattr(obj, "trigger_state", None),
poll_state=getattr(obj, "poll_state", None),
)
return sub
@@ -135,17 +137,20 @@ class _Subsystem:
self,
event_type: str,
get_state: (Callable[[], Coroutine[Any, Any, dict]] | None),
+ trigger_state: (Callable[[], Coroutine[Any, Any, None]] | None),
poll_state: (Callable[[], AsyncGenerator[dict, None]] | None),
) -> "_Subsystem":
assert event_type
assert event_type not in self.sources, (self, event_type)
assert get_state or poll_state, (self, event_type)
- self.sources[event_type] = _SubsystemEventSource(get_state, poll_state)
+ self.sources[event_type] = _SubsystemEventSource(get_state, trigger_state, poll_state)
return self
class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-instance-attributes
+ __EV_GPIO_STATE = "gpio_state"
+
def __init__( # pylint: disable=too-many-arguments,too-many-locals
self,
auth_manager: AuthManager,
@@ -195,11 +200,11 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins
self.__subsystems = [
_Subsystem.make(auth_manager, "Auth manager"),
- _Subsystem.make(user_gpio, "User-GPIO", "gpio_state").add_source("gpio_model_state", user_gpio.get_model, None),
- _Subsystem.make(hid, "HID", "hid_state").add_source("hid_keymaps_state", self.__hid_api.get_keymaps, None),
+ _Subsystem.make(user_gpio, "User-GPIO", self.__EV_GPIO_STATE),
+ _Subsystem.make(hid, "HID", "hid_state").add_source("hid_keymaps_state", self.__hid_api.get_keymaps, None, None),
_Subsystem.make(atx, "ATX", "atx_state"),
_Subsystem.make(msd, "MSD", "msd_state"),
- _Subsystem.make(streamer, "Streamer", "streamer_state").add_source("streamer_ocr_state", self.__streamer_api.get_ocr, None),
+ _Subsystem.make(streamer, "Streamer", "streamer_state").add_source("streamer_ocr_state", self.__streamer_api.get_ocr, None, None),
*[
_Subsystem.make(info_manager.get_submanager(sub), f"Info manager ({sub})", f"info_{sub}_state",)
for sub in sorted(info_manager.get_subs())
@@ -244,12 +249,13 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins
@exposed_http("GET", "/ws")
async def __ws_handler(self, req: Request) -> WebSocketResponse:
stream = valid_bool(req.query.get("stream", True))
- async with self._ws_session(req, stream=stream) as ws:
+ legacy = valid_bool(req.query.get("legacy", True))
+ async with self._ws_session(req, stream=stream, legacy=legacy) as ws:
states = [
(event_type, src.get_state())
for sub in self.__subsystems
for (event_type, src) in sub.sources.items()
- if src.get_state
+ if src.get_state and not src.trigger_state
]
events = dict(zip(
map(operator.itemgetter(0), states),
@@ -259,6 +265,10 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins
ws.send_event(event_type, events.pop(event_type))
for (event_type, _) in states
])
+ for sub in self.__subsystems:
+ for src in sub.sources.values():
+ if src.trigger_state:
+ await src.trigger_state()
await ws.send_event("loop", {})
return (await self._ws_loop(ws))
@@ -347,12 +357,27 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins
prev = cur
await self.__streamer_notifier.wait()
- async def __poll_state(self, event_type: str, poller: AsyncGenerator[dict, None]) -> None:
- async for state in poller:
- await self._broadcast_ws_event(event_type, state)
-
async def __stream_snapshoter(self) -> None:
await self.__snapshoter.run(
is_live=self.__has_stream_clients,
notifier=self.__streamer_notifier,
)
+
+ async def __poll_state(self, event_type: str, poller: AsyncGenerator[dict, None]) -> None:
+ if event_type == self.__EV_GPIO_STATE:
+ await self.__poll_gpio_state(poller)
+ else:
+ async for state in poller:
+ await self._broadcast_ws_event(event_type, state)
+
+ async def __poll_gpio_state(self, poller: AsyncGenerator[dict, None]) -> None:
+ prev: dict = {"state": {"inputs": {}, "outputs": {}}}
+ async for state in poller:
+ await self._broadcast_ws_event(self.__EV_GPIO_STATE, state, legacy=False)
+ if "model" in state: # We have only "model"+"state" or "model" event
+ prev = state
+ await self._broadcast_ws_event("gpio_model_state", prev["model"], legacy=True)
+ else:
+ prev["state"]["inputs"].update(state["state"].get("inputs", {}))
+ prev["state"]["outputs"].update(state["state"].get("outputs", {}))
+ await self._broadcast_ws_event(self.__EV_GPIO_STATE, prev["state"], legacy=True)
diff --git a/kvmd/apps/kvmd/ugpio.py b/kvmd/apps/kvmd/ugpio.py
index 872d5f6c..d893d2a0 100644
--- a/kvmd/apps/kvmd/ugpio.py
+++ b/kvmd/apps/kvmd/ugpio.py
@@ -21,6 +21,7 @@
import asyncio
+import copy
from typing import AsyncGenerator
from typing import Callable
@@ -68,12 +69,12 @@ class GpioChannelIsBusyError(IsBusyError, GpioError):
class _GpioInput:
def __init__(
self,
- channel: str,
+ ch: str,
config: Section,
driver: BaseUserGpioDriver,
) -> None:
- self.__channel = channel
+ self.__ch = ch
self.__pin: str = str(config.pin)
self.__inverted: bool = config.inverted
@@ -100,7 +101,7 @@ class _GpioInput:
}
def __str__(self) -> str:
- return f"Input({self.__channel}, driver={self.__driver}, pin={self.__pin})"
+ return f"Input({self.__ch}, driver={self.__driver}, pin={self.__pin})"
__repr__ = __str__
@@ -108,13 +109,13 @@ class _GpioInput:
class _GpioOutput: # pylint: disable=too-many-instance-attributes
def __init__(
self,
- channel: str,
+ ch: str,
config: Section,
driver: BaseUserGpioDriver,
notifier: aiotools.AioNotifier,
) -> None:
- self.__channel = channel
+ self.__ch = ch
self.__pin: str = str(config.pin)
self.__inverted: bool = config.inverted
@@ -224,7 +225,7 @@ class _GpioOutput: # pylint: disable=too-many-instance-attributes
await self.__driver.write(self.__pin, (state ^ self.__inverted))
def __str__(self) -> str:
- return f"Output({self.__channel}, driver={self.__driver}, pin={self.__pin})"
+ return f"Output({self.__ch}, driver={self.__driver}, pin={self.__pin})"
__repr__ = __str__
@@ -232,9 +233,8 @@ class _GpioOutput: # pylint: disable=too-many-instance-attributes
# =====
class UserGpio:
def __init__(self, config: Section, otg_config: Section) -> None:
- self.__view = config.view
-
self.__notifier = aiotools.AioNotifier()
+ self.__full_state_requested = True
self.__drivers = {
driver: get_ugpio_driver_class(drv_config.type)(
@@ -249,45 +249,64 @@ class UserGpio:
self.__inputs: dict[str, _GpioInput] = {}
self.__outputs: dict[str, _GpioOutput] = {}
- for (channel, ch_config) in tools.sorted_kvs(config.scheme):
+ for (ch, ch_config) in tools.sorted_kvs(config.scheme):
driver = self.__drivers[ch_config.driver]
if ch_config.mode == UserGpioModes.INPUT:
- self.__inputs[channel] = _GpioInput(channel, ch_config, driver)
+ self.__inputs[ch] = _GpioInput(ch, ch_config, driver)
else: # output:
- self.__outputs[channel] = _GpioOutput(channel, ch_config, driver, self.__notifier)
+ self.__outputs[ch] = _GpioOutput(ch, ch_config, driver, self.__notifier)
+
+ self.__scheme = self.__make_scheme()
+ self.__view = self.__make_view(config.view)
- async def get_model(self) -> dict:
+ async def get_state(self) -> dict:
return {
- "scheme": {
- "inputs": {channel: gin.get_scheme() for (channel, gin) in self.__inputs.items()},
- "outputs": {
- channel: gout.get_scheme()
- for (channel, gout) in self.__outputs.items()
- if not gout.is_const()
- },
+ "model": {
+ "scheme": copy.deepcopy(self.__scheme),
+ "view": copy.deepcopy(self.__view),
},
- "view": self.__make_view(),
+ "state": (await self.__get_io_state()),
}
- async def get_state(self) -> dict:
+ async def trigger_state(self) -> None:
+ self.__full_state_requested = True
+ self.__notifier.notify()
+
+ async def poll_state(self) -> AsyncGenerator[dict, None]:
+ prev: dict = {"inputs": {}, "outputs": {}}
+ while True: # pylint: disable=too-many-nested-blocks
+ if self.__full_state_requested:
+ self.__full_state_requested = False
+ full = await self.get_state()
+ prev = copy.deepcopy(full["state"])
+ yield full
+ else:
+ new = await self.__get_io_state()
+ diff: dict = {}
+ for sub in ["inputs", "outputs"]:
+ for ch in new[sub]:
+ if new[sub][ch] != prev[sub][ch]:
+ if sub not in diff:
+ diff[sub] = {}
+ diff[sub][ch] = new[sub][ch]
+ if diff:
+ prev = copy.deepcopy(new)
+ yield {"state": diff}
+ await self.__notifier.wait()
+
+ async def __get_io_state(self) -> dict:
return {
- "inputs": {channel: await gin.get_state() for (channel, gin) in self.__inputs.items()},
+ "inputs": {
+ ch: (await gin.get_state())
+ for (ch, gin) in self.__inputs.items()
+ },
"outputs": {
- channel: await gout.get_state()
- for (channel, gout) in self.__outputs.items()
+ ch: (await gout.get_state())
+ for (ch, gout) in self.__outputs.items()
if not gout.is_const()
},
}
- async def poll_state(self) -> AsyncGenerator[dict, None]:
- prev_state: dict = {}
- while True:
- state = await self.get_state()
- if state != prev_state:
- yield state
- prev_state = state
- await self.__notifier.wait()
-
def sysprep(self) -> None:
get_logger(0).info("Preparing User-GPIO drivers ...")
for (_, driver) in tools.sorted_kvs(self.__drivers):
@@ -307,28 +326,43 @@ class UserGpio:
except Exception:
get_logger().exception("Can't cleanup driver %s", driver)
- async def switch(self, channel: str, state: bool, wait: bool) -> None:
- gout = self.__outputs.get(channel)
+ async def switch(self, ch: str, state: bool, wait: bool) -> None:
+ gout = self.__outputs.get(ch)
if gout is None:
raise GpioChannelNotFoundError()
await gout.switch(state, wait)
- async def pulse(self, channel: str, delay: float, wait: bool) -> None:
- gout = self.__outputs.get(channel)
+ async def pulse(self, ch: str, delay: float, wait: bool) -> None:
+ gout = self.__outputs.get(ch)
if gout is None:
raise GpioChannelNotFoundError()
await gout.pulse(delay, wait)
# =====
- def __make_view(self) -> dict:
+ def __make_scheme(self) -> dict:
+ return {
+ "inputs": {
+ ch: gin.get_scheme()
+ for (ch, gin) in self.__inputs.items()
+ },
+ "outputs": {
+ ch: gout.get_scheme()
+ for (ch, gout) in self.__outputs.items()
+ if not gout.is_const()
+ },
+ }
+
+ # =====
+
+ def __make_view(self, view: dict) -> dict:
return {
- "header": {"title": self.__make_view_title()},
- "table": self.__make_view_table(),
+ "header": {"title": self.__make_view_title(view)},
+ "table": self.__make_view_table(view),
}
- def __make_view_title(self) -> list[dict]:
- raw_title = self.__view["header"]["title"]
+ def __make_view_title(self, view: dict) -> list[dict]:
+ raw_title = view["header"]["title"]
title: list[dict] = []
if isinstance(raw_title, list):
for item in raw_title:
@@ -342,9 +376,9 @@ class UserGpio:
title.append(self.__make_item_label(f"#{raw_title}"))
return title
- def __make_view_table(self) -> list[list[dict] | None]:
+ def __make_view_table(self, view: dict) -> list[list[dict] | None]:
table: list[list[dict] | None] = []
- for row in self.__view["table"]:
+ for row in view["table"]:
if len(row) == 0:
table.append(None)
continue
diff --git a/kvmd/htserver.py b/kvmd/htserver.py
index b0d78e82..351c1328 100644
--- a/kvmd/htserver.py
+++ b/kvmd/htserver.py
@@ -384,7 +384,7 @@ class HttpServer:
break
return ws.wsr
- async def _broadcast_ws_event(self, event_type: str, event: (dict | None)) -> None:
+ async def _broadcast_ws_event(self, event_type: str, event: (dict | None), legacy: (bool | None)=None) -> None:
if self.__ws_sessions:
await asyncio.gather(*[
ws.send_event(event_type, event)
@@ -393,6 +393,7 @@ class HttpServer:
not ws.wsr.closed
and ws.wsr._req is not None # pylint: disable=protected-access
and ws.wsr._req.transport is not None # pylint: disable=protected-access
+ and (legacy is None or ws.kwargs.get("legacy") == legacy)
)
], return_exceptions=True)