summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMaxim Devaev <[email protected]>2024-12-18 06:39:18 +0200
committerMaxim Devaev <[email protected]>2024-12-18 06:39:18 +0200
commitaf2ee26a2f022bff01ca446814a35c4aea14d5ca (patch)
tree8826291c60ea78935a113f5f2a17f11a288e6ab1
parent596334735e1e7a0ebd685ff7df2a8dbd776763f0 (diff)
kvmd-media server
-rw-r--r--configs/kvmd/main/v0-hdmi-zero2w.yaml6
-rw-r--r--configs/kvmd/main/v1-hdmi-rpi3.yaml6
-rw-r--r--configs/kvmd/main/v1-hdmi-zero2w.yaml6
-rw-r--r--configs/kvmd/main/v2-hdmi-rpi3.yaml6
-rw-r--r--configs/kvmd/main/v2-hdmi-rpi4.yaml6
-rw-r--r--configs/kvmd/main/v2-hdmi-zero2w.yaml6
-rw-r--r--configs/kvmd/main/v3-hdmi-rpi4.yaml6
-rw-r--r--configs/kvmd/main/v4mini-hdmi-rpi4.yaml6
-rw-r--r--configs/kvmd/main/v4plus-hdmi-rpi4.yaml6
-rw-r--r--configs/os/services/kvmd-media.service16
-rw-r--r--configs/os/sysusers.conf6
-rw-r--r--extras/media/manifest.yaml5
-rw-r--r--extras/media/nginx.ctx-http.conf3
-rw-r--r--extras/media/nginx.ctx-server.conf7
-rw-r--r--kvmd/aiotools.py2
-rw-r--r--kvmd/apps/__init__.py26
-rw-r--r--kvmd/apps/kvmd/server.py4
-rw-r--r--kvmd/apps/media/__init__.py48
-rw-r--r--kvmd/apps/media/__main__.py24
-rw-r--r--kvmd/apps/media/server.py190
-rw-r--r--kvmd/apps/pst/server.py4
-rw-r--r--kvmd/clients/streamer.py4
-rw-r--r--kvmd/htserver.py37
-rw-r--r--kvmd/tools.py5
-rwxr-xr-xsetup.py2
25 files changed, 419 insertions, 18 deletions
diff --git a/configs/kvmd/main/v0-hdmi-zero2w.yaml b/configs/kvmd/main/v0-hdmi-zero2w.yaml
index 96e89761..a187ff97 100644
--- a/configs/kvmd/main/v0-hdmi-zero2w.yaml
+++ b/configs/kvmd/main/v0-hdmi-zero2w.yaml
@@ -47,6 +47,12 @@ kvmd:
- "--h264-gop={h264_gop}"
+media:
+ memsink:
+ h264:
+ sink: "kvmd::ustreamer::h264"
+
+
vnc:
memsink:
jpeg:
diff --git a/configs/kvmd/main/v1-hdmi-rpi3.yaml b/configs/kvmd/main/v1-hdmi-rpi3.yaml
index e8e442f3..cf427f2f 100644
--- a/configs/kvmd/main/v1-hdmi-rpi3.yaml
+++ b/configs/kvmd/main/v1-hdmi-rpi3.yaml
@@ -56,6 +56,12 @@ kvmd:
- "--h264-gop={h264_gop}"
+media:
+ memsink:
+ h264:
+ sink: "kvmd::ustreamer::h264"
+
+
vnc:
memsink:
jpeg:
diff --git a/configs/kvmd/main/v1-hdmi-zero2w.yaml b/configs/kvmd/main/v1-hdmi-zero2w.yaml
index e8e442f3..cf427f2f 100644
--- a/configs/kvmd/main/v1-hdmi-zero2w.yaml
+++ b/configs/kvmd/main/v1-hdmi-zero2w.yaml
@@ -56,6 +56,12 @@ kvmd:
- "--h264-gop={h264_gop}"
+media:
+ memsink:
+ h264:
+ sink: "kvmd::ustreamer::h264"
+
+
vnc:
memsink:
jpeg:
diff --git a/configs/kvmd/main/v2-hdmi-rpi3.yaml b/configs/kvmd/main/v2-hdmi-rpi3.yaml
index 3bfc000f..55de21db 100644
--- a/configs/kvmd/main/v2-hdmi-rpi3.yaml
+++ b/configs/kvmd/main/v2-hdmi-rpi3.yaml
@@ -47,6 +47,12 @@ kvmd:
- "--h264-gop={h264_gop}"
+media:
+ memsink:
+ h264:
+ sink: "kvmd::ustreamer::h264"
+
+
vnc:
memsink:
jpeg:
diff --git a/configs/kvmd/main/v2-hdmi-rpi4.yaml b/configs/kvmd/main/v2-hdmi-rpi4.yaml
index ea3c8c10..532b30b0 100644
--- a/configs/kvmd/main/v2-hdmi-rpi4.yaml
+++ b/configs/kvmd/main/v2-hdmi-rpi4.yaml
@@ -48,6 +48,12 @@ kvmd:
- "--h264-gop={h264_gop}"
+media:
+ memsink:
+ h264:
+ sink: "kvmd::ustreamer::h264"
+
+
vnc:
memsink:
jpeg:
diff --git a/configs/kvmd/main/v2-hdmi-zero2w.yaml b/configs/kvmd/main/v2-hdmi-zero2w.yaml
index 3bfc000f..55de21db 100644
--- a/configs/kvmd/main/v2-hdmi-zero2w.yaml
+++ b/configs/kvmd/main/v2-hdmi-zero2w.yaml
@@ -47,6 +47,12 @@ kvmd:
- "--h264-gop={h264_gop}"
+media:
+ memsink:
+ h264:
+ sink: "kvmd::ustreamer::h264"
+
+
vnc:
memsink:
jpeg:
diff --git a/configs/kvmd/main/v3-hdmi-rpi4.yaml b/configs/kvmd/main/v3-hdmi-rpi4.yaml
index 50b140b0..6494d327 100644
--- a/configs/kvmd/main/v3-hdmi-rpi4.yaml
+++ b/configs/kvmd/main/v3-hdmi-rpi4.yaml
@@ -60,6 +60,12 @@ kvmd:
pulse: false
+media:
+ memsink:
+ h264:
+ sink: "kvmd::ustreamer::h264"
+
+
vnc:
memsink:
jpeg:
diff --git a/configs/kvmd/main/v4mini-hdmi-rpi4.yaml b/configs/kvmd/main/v4mini-hdmi-rpi4.yaml
index 410544d7..0ab4412f 100644
--- a/configs/kvmd/main/v4mini-hdmi-rpi4.yaml
+++ b/configs/kvmd/main/v4mini-hdmi-rpi4.yaml
@@ -85,6 +85,12 @@ kvmd:
pulse: false
+media:
+ memsink:
+ h264:
+ sink: "kvmd::ustreamer::h264"
+
+
vnc:
memsink:
jpeg:
diff --git a/configs/kvmd/main/v4plus-hdmi-rpi4.yaml b/configs/kvmd/main/v4plus-hdmi-rpi4.yaml
index c59be781..484b728a 100644
--- a/configs/kvmd/main/v4plus-hdmi-rpi4.yaml
+++ b/configs/kvmd/main/v4plus-hdmi-rpi4.yaml
@@ -88,6 +88,12 @@ kvmd:
pulse: false
+media:
+ memsink:
+ h264:
+ sink: "kvmd::ustreamer::h264"
+
+
vnc:
memsink:
jpeg:
diff --git a/configs/os/services/kvmd-media.service b/configs/os/services/kvmd-media.service
new file mode 100644
index 00000000..610d4859
--- /dev/null
+++ b/configs/os/services/kvmd-media.service
@@ -0,0 +1,16 @@
+[Unit]
+Description=PiKVM - Media proxy server
+After=kvmd.service
+
+[Service]
+User=kvmd-media
+Group=kvmd-media
+Type=simple
+Restart=always
+RestartSec=3
+
+ExecStart=/usr/bin/kvmd-media --run
+TimeoutStopSec=3
+
+[Install]
+WantedBy=multi-user.target
diff --git a/configs/os/sysusers.conf b/configs/os/sysusers.conf
index 0359974d..4ab263b5 100644
--- a/configs/os/sysusers.conf
+++ b/configs/os/sysusers.conf
@@ -1,4 +1,5 @@
g kvmd - -
+g kvmd-media - -
g kvmd-pst - -
g kvmd-ipmi - -
g kvmd-vnc - -
@@ -7,6 +8,7 @@ g kvmd-janus - -
g kvmd-certbot - -
u kvmd - "PiKVM - The main daemon" -
+u kvmd-media - "PiKVM - The media proxy"
u kvmd-pst - "PiKVM - Persistent storage" -
u kvmd-ipmi - "PiKVM - IPMI to KVMD proxy" -
u kvmd-vnc - "PiKVM - VNC to KVMD/Streamer proxy" -
@@ -19,8 +21,11 @@ m kvmd gpio
m kvmd uucp
m kvmd spi
m kvmd systemd-journal
+m kvmd kvmd-media
m kvmd kvmd-pst
+m kvmd-media kvmd
+
m kvmd-pst kvmd
m kvmd-ipmi kvmd
@@ -32,6 +37,7 @@ m kvmd-janus kvmd
m kvmd-janus audio
m kvmd-nginx kvmd
+m kvmd-nginx kvmd-media
m kvmd-nginx kvmd-janus
m kvmd-nginx kvmd-certbot
diff --git a/extras/media/manifest.yaml b/extras/media/manifest.yaml
new file mode 100644
index 00000000..f81c1bbf
--- /dev/null
+++ b/extras/media/manifest.yaml
@@ -0,0 +1,5 @@
+name: Media
+description: KVMD Media Proxy
+path: media
+daemon: kvmd-media
+place: -1
diff --git a/extras/media/nginx.ctx-http.conf b/extras/media/nginx.ctx-http.conf
new file mode 100644
index 00000000..d4ff7ac3
--- /dev/null
+++ b/extras/media/nginx.ctx-http.conf
@@ -0,0 +1,3 @@
+upstream media {
+ server unix:/run/kvmd/media.sock fail_timeout=0s max_fails=0;
+}
diff --git a/extras/media/nginx.ctx-server.conf b/extras/media/nginx.ctx-server.conf
new file mode 100644
index 00000000..67f877fc
--- /dev/null
+++ b/extras/media/nginx.ctx-server.conf
@@ -0,0 +1,7 @@
+location /media/ws {
+ rewrite ^/media/ws$ /ws break;
+ rewrite ^/media/ws\?(.*)$ /ws?$1 break;
+ proxy_pass http://media;
+ include /etc/kvmd/nginx/loc-proxy.conf;
+ include /etc/kvmd/nginx/loc-websocket.conf;
+}
diff --git a/kvmd/aiotools.py b/kvmd/aiotools.py
index 6183690f..f400ad3c 100644
--- a/kvmd/aiotools.py
+++ b/kvmd/aiotools.py
@@ -171,7 +171,7 @@ def create_deadly_task(name: str, coro: Coroutine) -> asyncio.Task:
except asyncio.CancelledError:
pass
except Exception:
- logger.exception("Unhandled exception in deadly task, killing myself ...")
+ logger.exception("Unhandled exception in deadly task %r, killing myself ...", name)
pid = os.getpid()
if pid == 1:
os._exit(1) # Docker workaround # pylint: disable=protected-access
diff --git a/kvmd/apps/__init__.py b/kvmd/apps/__init__.py
index 2090e5c6..8a1fc96e 100644
--- a/kvmd/apps/__init__.py
+++ b/kvmd/apps/__init__.py
@@ -509,6 +509,32 @@ def _get_config_scheme() -> dict:
},
},
+ "media": {
+ "server": {
+ "unix": Option("/run/kvmd/media.sock", type=valid_abs_path, unpack_as="unix_path"),
+ "unix_rm": Option(True, type=valid_bool),
+ "unix_mode": Option(0o660, type=valid_unix_mode),
+ "heartbeat": Option(15.0, type=valid_float_f01),
+ "access_log_format": Option("[%P / %{X-Real-IP}i] '%r' => %s; size=%b ---"
+ " referer='%{Referer}i'; user_agent='%{User-Agent}i'"),
+ },
+
+ "memsink": {
+ "jpeg": {
+ "sink": Option("", unpack_as="obj"),
+ "lock_timeout": Option(1.0, type=valid_float_f01),
+ "wait_timeout": Option(1.0, type=valid_float_f01),
+ "drop_same_frames": Option(0.0, type=valid_float_f0),
+ },
+ "h264": {
+ "sink": Option("", unpack_as="obj"),
+ "lock_timeout": Option(1.0, type=valid_float_f01),
+ "wait_timeout": Option(1.0, type=valid_float_f01),
+ "drop_same_frames": Option(0.0, type=valid_float_f0),
+ },
+ },
+ },
+
"pst": {
"server": {
"unix": Option("/run/kvmd/pst.sock", type=valid_abs_path, unpack_as="unix_path"),
diff --git a/kvmd/apps/kvmd/server.py b/kvmd/apps/kvmd/server.py
index 92eb496c..8e1f4adc 100644
--- a/kvmd/apps/kvmd/server.py
+++ b/kvmd/apps/kvmd/server.py
@@ -298,10 +298,10 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins
logger.exception("Cleanup error on %s", sub.name)
logger.info("On-Cleanup complete")
- async def _on_ws_opened(self) -> None:
+ async def _on_ws_opened(self, _: WsSession) -> None:
self.__streamer_notifier.notify()
- async def _on_ws_closed(self) -> None:
+ async def _on_ws_closed(self, _: WsSession) -> None:
self.__hid.clear_events()
self.__streamer_notifier.notify()
diff --git a/kvmd/apps/media/__init__.py b/kvmd/apps/media/__init__.py
new file mode 100644
index 00000000..325a817c
--- /dev/null
+++ b/kvmd/apps/media/__init__.py
@@ -0,0 +1,48 @@
+# ========================================================================== #
+# #
+# KVMD - The main PiKVM daemon. #
+# #
+# Copyright (C) 2020 Maxim Devaev <[email protected]> #
+# #
+# This program is free software: you can redistribute it and/or modify #
+# it under the terms of the GNU General Public License as published by #
+# the Free Software Foundation, either version 3 of the License, or #
+# (at your option) any later version. #
+# #
+# This program is distributed in the hope that it will be useful, #
+# but WITHOUT ANY WARRANTY; without even the implied warranty of #
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
+# GNU General Public License for more details. #
+# #
+# You should have received a copy of the GNU General Public License #
+# along with this program. If not, see <https://www.gnu.org/licenses/>. #
+# #
+# ========================================================================== #
+
+
+from ...clients.streamer import StreamerFormats
+from ...clients.streamer import MemsinkStreamerClient
+
+from .. import init
+
+from .server import MediaServer
+
+
+# =====
+def main(argv: (list[str] | None)=None) -> None:
+ config = init(
+ prog="kvmd-media",
+ description="The media proxy",
+ check_run=True,
+ argv=argv,
+ )[2].media
+
+ def make_streamer(name: str, fmt: int) -> (MemsinkStreamerClient | None):
+ if getattr(config.memsink, name).sink:
+ return MemsinkStreamerClient(name.upper(), fmt, **getattr(config.memsink, name)._unpack())
+ return None
+
+ MediaServer(
+ h264_streamer=make_streamer("h264", StreamerFormats.H264),
+ jpeg_streamer=make_streamer("jpeg", StreamerFormats.JPEG),
+ ).run(**config.server._unpack())
diff --git a/kvmd/apps/media/__main__.py b/kvmd/apps/media/__main__.py
new file mode 100644
index 00000000..ab578e06
--- /dev/null
+++ b/kvmd/apps/media/__main__.py
@@ -0,0 +1,24 @@
+# ========================================================================== #
+# #
+# KVMD - The main PiKVM daemon. #
+# #
+# Copyright (C) 2020 Maxim Devaev <[email protected]> #
+# #
+# This program is free software: you can redistribute it and/or modify #
+# it under the terms of the GNU General Public License as published by #
+# the Free Software Foundation, either version 3 of the License, or #
+# (at your option) any later version. #
+# #
+# This program is distributed in the hope that it will be useful, #
+# but WITHOUT ANY WARRANTY; without even the implied warranty of #
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
+# GNU General Public License for more details. #
+# #
+# You should have received a copy of the GNU General Public License #
+# along with this program. If not, see <https://www.gnu.org/licenses/>. #
+# #
+# ========================================================================== #
+
+
+from . import main
+main()
diff --git a/kvmd/apps/media/server.py b/kvmd/apps/media/server.py
new file mode 100644
index 00000000..2763ffa0
--- /dev/null
+++ b/kvmd/apps/media/server.py
@@ -0,0 +1,190 @@
+# ========================================================================== #
+# #
+# KVMD - The main PiKVM daemon. #
+# #
+# Copyright (C) 2020 Maxim Devaev <[email protected]> #
+# #
+# This program is free software: you can redistribute it and/or modify #
+# it under the terms of the GNU General Public License as published by #
+# the Free Software Foundation, either version 3 of the License, or #
+# (at your option) any later version. #
+# #
+# This program is distributed in the hope that it will be useful, #
+# but WITHOUT ANY WARRANTY; without even the implied warranty of #
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
+# GNU General Public License for more details. #
+# #
+# You should have received a copy of the GNU General Public License #
+# along with this program. If not, see <https://www.gnu.org/licenses/>. #
+# #
+# ========================================================================== #
+
+
+import asyncio
+import dataclasses
+
+from aiohttp.web import Request
+from aiohttp.web import WebSocketResponse
+
+from ...logging import get_logger
+
+from ... import tools
+from ... import aiotools
+
+from ...htserver import exposed_http
+from ...htserver import exposed_ws
+from ...htserver import WsSession
+from ...htserver import HttpServer
+
+from ...clients.streamer import StreamerError
+from ...clients.streamer import StreamerPermError
+from ...clients.streamer import StreamerFormats
+from ...clients.streamer import BaseStreamerClient
+
+
+# =====
+class _Source:
+ kind: str
+ fmt: str
+ streamer: BaseStreamerClient
+ meta: dict = dataclasses.field(default_factory=dict)
+ clients: dict[WsSession, "_Client"] = dataclasses.field(default_factory=dict)
+ key_required: bool = dataclasses.field(default=False)
+
+
+class _Client:
+ ws: WsSession
+ src: _Source
+ queue: asyncio.Queue[dict]
+ sender: (asyncio.Task | None) = dataclasses.field(default=None)
+
+
+class MediaServer(HttpServer):
+ __K_VIDEO = "video"
+
+ __F_H264 = "h264"
+ __F_JPEG = "jpeg"
+
+ __Q_SIZE = 32
+
+ def __init__(
+ self,
+ h264_streamer: (BaseStreamerClient | None),
+ jpeg_streamer: (BaseStreamerClient | None),
+ ) -> None:
+
+ super().__init__()
+
+ self.__srcs: list[_Source] = []
+ if h264_streamer:
+ self.__srcs.append(_Source(self.__K_VIDEO, self.__F_H264, h264_streamer, {"profile_level_id": "42E01F"}))
+ if jpeg_streamer:
+ self.__srcs.append(_Source(self.__K_VIDEO, self.__F_JPEG, jpeg_streamer))
+
+ # =====
+
+ @exposed_http("GET", "/ws")
+ async def __ws_handler(self, req: Request) -> WebSocketResponse:
+ async with self._ws_session(req) as ws:
+ media: dict = {self.__K_VIDEO: {}}
+ for src in self.__srcs:
+ media[src.kind][src.fmt] = src.meta
+ await ws.send_event("media", media)
+ return (await self._ws_loop(ws))
+
+ @exposed_ws(0)
+ async def __ws_bin_ping_handler(self, ws: WsSession, _: bytes) -> None:
+ await ws.send_bin(255, b"") # Ping-pong
+
+ @exposed_ws("start")
+ async def __ws_start_handler(self, ws: WsSession, event: dict) -> None:
+ try:
+ kind = str(event.get("kind"))
+ fmt = str(event.get("format"))
+ except Exception:
+ return
+ src: (_Source | None) = None
+ for cand in self.__srcs:
+ if ws in cand.clients:
+ return # Don't allow any double streaming
+ if (cand.kind, cand.fmt) == (kind, fmt):
+ src = cand
+ if src:
+ client = _Client(ws, src, asyncio.Queue(self.__Q_SIZE))
+ client.sender = aiotools.create_deadly_task(str(ws), self.__sender(client))
+ src.clients[ws] = client
+ get_logger(0).info("Streaming %s to %s ...", src.streamer, ws)
+
+ # =====
+
+ async def _init_app(self) -> None:
+ logger = get_logger(0)
+ for src in self.__srcs:
+ logger.info("Starting streamer %s ...", src.streamer)
+ aiotools.create_deadly_task(str(src.streamer), self.__streamer(src))
+ self._add_exposed(self)
+
+ async def _on_shutdown(self) -> None:
+ logger = get_logger(0)
+ logger.info("Stopping system tasks ...")
+ await aiotools.stop_all_deadly_tasks()
+ logger.info("Disconnecting clients ...")
+ await self._close_all_wss()
+ logger.info("On-Shutdown complete")
+
+ async def _on_ws_closed(self, ws: WsSession) -> None:
+ for src in self.__srcs:
+ client = src.clients.pop(ws, None)
+ if client and client.sender:
+ get_logger(0).info("Closed stream for %s", ws)
+ client.sender.cancel()
+ return
+
+ # =====
+
+ async def __sender(self, client: _Client) -> None:
+ need_key = StreamerFormats.is_diff(client.src.streamer.get_format())
+ if need_key:
+ client.src.key_required = True
+ has_key = False
+ while True:
+ frame = await client.queue.get()
+ has_key = (not need_key or has_key or frame["key"])
+ if has_key:
+ try:
+ await client.ws.send_bin(1, frame["key"].to_bytes() + frame["data"])
+ except Exception:
+ pass
+
+ async def __streamer(self, src: _Source) -> None:
+ logger = get_logger(0)
+ while True:
+ if len(src.clients) == 0:
+ await asyncio.sleep(1)
+ continue
+ try:
+ async with src.streamer.reading() as read_frame:
+ while len(src.clients) > 0:
+ frame = await read_frame(src.key_required)
+ if frame["key"]:
+ src.key_required = False
+ for client in src.clients.values():
+ try:
+ client.queue.put_nowait(frame)
+ except asyncio.QueueFull:
+ # Если какой-то из клиентов не справляется, очищаем ему очередь и запрашиваем кейфрейм.
+ # Я вижу у такой логики кучу минусов, хз как себя покажет, но лучше пока ничего не придумал.
+ tools.clear_queue(client.queue)
+ src.key_required = True
+ except Exception:
+ pass
+ except StreamerError as ex:
+ if isinstance(ex, StreamerPermError):
+ logger.exception("Streamer failed: %s", src.streamer)
+ else:
+ logger.error("Streamer error: %s: %s", src.streamer, tools.efmt(ex))
+ except Exception:
+ get_logger(0).exception("Unexpected streamer error: %s", src.streamer)
+ await asyncio.sleep(1)
diff --git a/kvmd/apps/pst/server.py b/kvmd/apps/pst/server.py
index 8d8bf9d4..d96043b3 100644
--- a/kvmd/apps/pst/server.py
+++ b/kvmd/apps/pst/server.py
@@ -104,10 +104,10 @@ class PstServer(HttpServer): # pylint: disable=too-many-arguments,too-many-inst
await self.__remount_storage(rw=False)
logger.info("On-Cleanup complete")
- async def _on_ws_opened(self) -> None:
+ async def _on_ws_opened(self, _: WsSession) -> None:
self.__notifier.notify()
- async def _on_ws_closed(self) -> None:
+ async def _on_ws_closed(self, _: WsSession) -> None:
self.__notifier.notify()
# ===== SYSTEM TASKS
diff --git a/kvmd/clients/streamer.py b/kvmd/clients/streamer.py
index 5369892e..cd5f03ec 100644
--- a/kvmd/clients/streamer.py
+++ b/kvmd/clients/streamer.py
@@ -63,6 +63,10 @@ class StreamerFormats:
H264 = 875967048 # V4L2_PIX_FMT_H264
_MJPEG = 1196444237 # V4L2_PIX_FMT_MJPEG
+ @classmethod
+ def is_diff(cls, fmt: int) -> bool:
+ return (fmt == cls.H264)
+
class BaseStreamerClient:
def get_format(self) -> int:
diff --git a/kvmd/htserver.py b/kvmd/htserver.py
index 351c1328..63c82fcb 100644
--- a/kvmd/htserver.py
+++ b/kvmd/htserver.py
@@ -232,6 +232,16 @@ async def send_ws_event(
}))
+async def send_ws_bin(
+ wsr: (ClientWebSocketResponse | WebSocketResponse),
+ op: int,
+ data: bytes,
+) -> None:
+
+ assert 0 <= op <= 255
+ await wsr.send_bytes(op.to_bytes() + data)
+
+
def parse_ws_event(msg: str) -> tuple[str, dict]:
data = json.loads(msg)
if not isinstance(data, dict):
@@ -264,14 +274,24 @@ def set_request_auth_info(req: BaseRequest, info: str) -> None:
@dataclasses.dataclass(frozen=True)
class WsSession:
wsr: WebSocketResponse
- kwargs: dict[str, Any]
+ kwargs: dict[str, Any] = dataclasses.field(hash=False)
def __str__(self) -> str:
return f"WsSession(id={id(self)}, {self.kwargs})"
+ def is_alive(self) -> bool:
+ return (
+ not self.wsr.closed
+ and self.wsr._req is not None # pylint: disable=protected-access
+ and self.wsr._req.transport is not None # pylint: disable=protected-access
+ )
+
async def send_event(self, event_type: str, event: (dict | None)) -> None:
await send_ws_event(self.wsr, event_type, event)
+ async def send_bin(self, op: int, data: bytes) -> None:
+ await send_ws_bin(self.wsr, op, data)
+
class HttpServer:
def __init__(self) -> None:
@@ -353,7 +373,7 @@ class HttpServer:
get_logger(2).info("Registered new client session: %s; clients now: %d", ws, len(self.__ws_sessions))
try:
- await self._on_ws_opened()
+ await self._on_ws_opened(ws)
yield ws
finally:
await aiotools.shield_fg(self.__close_ws(ws))
@@ -389,12 +409,7 @@ class HttpServer:
await asyncio.gather(*[
ws.send_event(event_type, event)
for ws in self.__ws_sessions
- if (
- not ws.wsr.closed
- and ws.wsr._req is not None # pylint: disable=protected-access
- and ws.wsr._req.transport is not None # pylint: disable=protected-access
- and (legacy is None or ws.kwargs.get("legacy") == legacy)
- )
+ if ws.is_alive() and (legacy is None or ws.kwargs.get("legacy") == legacy)
], return_exceptions=True)
async def _close_all_wss(self) -> bool:
@@ -414,7 +429,7 @@ class HttpServer:
await ws.wsr.close()
except Exception:
pass
- await self._on_ws_closed()
+ await self._on_ws_closed(ws)
# =====
@@ -430,10 +445,10 @@ class HttpServer:
async def _on_cleanup(self) -> None:
pass
- async def _on_ws_opened(self) -> None:
+ async def _on_ws_opened(self, ws: WsSession) -> None:
pass
- async def _on_ws_closed(self) -> None:
+ async def _on_ws_closed(self, ws: WsSession) -> None:
pass
# =====
diff --git a/kvmd/tools.py b/kvmd/tools.py
index 14a58ab3..6dd7d2f9 100644
--- a/kvmd/tools.py
+++ b/kvmd/tools.py
@@ -20,6 +20,7 @@
# ========================================================================== #
+import asyncio
import operator
import functools
import multiprocessing.queues
@@ -64,11 +65,11 @@ def swapped_kvs(dct: dict[_DictKeyT, _DictValueT]) -> dict[_DictValueT, _DictKey
# =====
-def clear_queue(q: multiprocessing.queues.Queue) -> None: # pylint: disable=invalid-name
+def clear_queue(q: (multiprocessing.queues.Queue | asyncio.Queue)) -> None: # pylint: disable=invalid-name
for _ in range(q.qsize()):
try:
q.get_nowait()
- except queue.Empty:
+ except (queue.Empty, asyncio.QueueEmpty):
break
diff --git a/setup.py b/setup.py
index 8d36376b..b75e41f7 100755
--- a/setup.py
+++ b/setup.py
@@ -86,6 +86,7 @@ def main() -> None:
"kvmd.apps.kvmd.switch",
"kvmd.apps.kvmd.info",
"kvmd.apps.kvmd.api",
+ "kvmd.apps.media",
"kvmd.apps.pst",
"kvmd.apps.pstrun",
"kvmd.apps.otg",
@@ -116,6 +117,7 @@ def main() -> None:
entry_points={
"console_scripts": [
"kvmd = kvmd.apps.kvmd:main",
+ "kvmd-media = kvmd.apps.media:main",
"kvmd-pst = kvmd.apps.pst:main",
"kvmd-pstrun = kvmd.apps.pstrun:main",
"kvmd-otg = kvmd.apps.otg:main",