diff options
author | Maxim Devaev <[email protected]> | 2024-12-18 06:39:18 +0200 |
---|---|---|
committer | Maxim Devaev <[email protected]> | 2024-12-18 06:39:18 +0200 |
commit | af2ee26a2f022bff01ca446814a35c4aea14d5ca (patch) | |
tree | 8826291c60ea78935a113f5f2a17f11a288e6ab1 /kvmd | |
parent | 596334735e1e7a0ebd685ff7df2a8dbd776763f0 (diff) |
kvmd-media server
Diffstat (limited to 'kvmd')
-rw-r--r-- | kvmd/aiotools.py | 2 | ||||
-rw-r--r-- | kvmd/apps/__init__.py | 26 | ||||
-rw-r--r-- | kvmd/apps/kvmd/server.py | 4 | ||||
-rw-r--r-- | kvmd/apps/media/__init__.py | 48 | ||||
-rw-r--r-- | kvmd/apps/media/__main__.py | 24 | ||||
-rw-r--r-- | kvmd/apps/media/server.py | 190 | ||||
-rw-r--r-- | kvmd/apps/pst/server.py | 4 | ||||
-rw-r--r-- | kvmd/clients/streamer.py | 4 | ||||
-rw-r--r-- | kvmd/htserver.py | 37 | ||||
-rw-r--r-- | kvmd/tools.py | 5 |
10 files changed, 326 insertions, 18 deletions
diff --git a/kvmd/aiotools.py b/kvmd/aiotools.py index 6183690f..f400ad3c 100644 --- a/kvmd/aiotools.py +++ b/kvmd/aiotools.py @@ -171,7 +171,7 @@ def create_deadly_task(name: str, coro: Coroutine) -> asyncio.Task: except asyncio.CancelledError: pass except Exception: - logger.exception("Unhandled exception in deadly task, killing myself ...") + logger.exception("Unhandled exception in deadly task %r, killing myself ...", name) pid = os.getpid() if pid == 1: os._exit(1) # Docker workaround # pylint: disable=protected-access diff --git a/kvmd/apps/__init__.py b/kvmd/apps/__init__.py index 2090e5c6..8a1fc96e 100644 --- a/kvmd/apps/__init__.py +++ b/kvmd/apps/__init__.py @@ -509,6 +509,32 @@ def _get_config_scheme() -> dict: }, }, + "media": { + "server": { + "unix": Option("/run/kvmd/media.sock", type=valid_abs_path, unpack_as="unix_path"), + "unix_rm": Option(True, type=valid_bool), + "unix_mode": Option(0o660, type=valid_unix_mode), + "heartbeat": Option(15.0, type=valid_float_f01), + "access_log_format": Option("[%P / %{X-Real-IP}i] '%r' => %s; size=%b ---" + " referer='%{Referer}i'; user_agent='%{User-Agent}i'"), + }, + + "memsink": { + "jpeg": { + "sink": Option("", unpack_as="obj"), + "lock_timeout": Option(1.0, type=valid_float_f01), + "wait_timeout": Option(1.0, type=valid_float_f01), + "drop_same_frames": Option(0.0, type=valid_float_f0), + }, + "h264": { + "sink": Option("", unpack_as="obj"), + "lock_timeout": Option(1.0, type=valid_float_f01), + "wait_timeout": Option(1.0, type=valid_float_f01), + "drop_same_frames": Option(0.0, type=valid_float_f0), + }, + }, + }, + "pst": { "server": { "unix": Option("/run/kvmd/pst.sock", type=valid_abs_path, unpack_as="unix_path"), diff --git a/kvmd/apps/kvmd/server.py b/kvmd/apps/kvmd/server.py index 92eb496c..8e1f4adc 100644 --- a/kvmd/apps/kvmd/server.py +++ b/kvmd/apps/kvmd/server.py @@ -298,10 +298,10 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins logger.exception("Cleanup error on %s", sub.name) logger.info("On-Cleanup complete") - async def _on_ws_opened(self) -> None: + async def _on_ws_opened(self, _: WsSession) -> None: self.__streamer_notifier.notify() - async def _on_ws_closed(self) -> None: + async def _on_ws_closed(self, _: WsSession) -> None: self.__hid.clear_events() self.__streamer_notifier.notify() diff --git a/kvmd/apps/media/__init__.py b/kvmd/apps/media/__init__.py new file mode 100644 index 00000000..325a817c --- /dev/null +++ b/kvmd/apps/media/__init__.py @@ -0,0 +1,48 @@ +# ========================================================================== # +# # +# KVMD - The main PiKVM daemon. # +# # +# Copyright (C) 2020 Maxim Devaev <[email protected]> # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see <https://www.gnu.org/licenses/>. # +# # +# ========================================================================== # + + +from ...clients.streamer import StreamerFormats +from ...clients.streamer import MemsinkStreamerClient + +from .. import init + +from .server import MediaServer + + +# ===== +def main(argv: (list[str] | None)=None) -> None: + config = init( + prog="kvmd-media", + description="The media proxy", + check_run=True, + argv=argv, + )[2].media + + def make_streamer(name: str, fmt: int) -> (MemsinkStreamerClient | None): + if getattr(config.memsink, name).sink: + return MemsinkStreamerClient(name.upper(), fmt, **getattr(config.memsink, name)._unpack()) + return None + + MediaServer( + h264_streamer=make_streamer("h264", StreamerFormats.H264), + jpeg_streamer=make_streamer("jpeg", StreamerFormats.JPEG), + ).run(**config.server._unpack()) diff --git a/kvmd/apps/media/__main__.py b/kvmd/apps/media/__main__.py new file mode 100644 index 00000000..ab578e06 --- /dev/null +++ b/kvmd/apps/media/__main__.py @@ -0,0 +1,24 @@ +# ========================================================================== # +# # +# KVMD - The main PiKVM daemon. # +# # +# Copyright (C) 2020 Maxim Devaev <[email protected]> # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see <https://www.gnu.org/licenses/>. # +# # +# ========================================================================== # + + +from . import main +main() diff --git a/kvmd/apps/media/server.py b/kvmd/apps/media/server.py new file mode 100644 index 00000000..2763ffa0 --- /dev/null +++ b/kvmd/apps/media/server.py @@ -0,0 +1,190 @@ +# ========================================================================== # +# # +# KVMD - The main PiKVM daemon. # +# # +# Copyright (C) 2020 Maxim Devaev <[email protected]> # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see <https://www.gnu.org/licenses/>. # +# # +# ========================================================================== # + + +import asyncio +import dataclasses + +from aiohttp.web import Request +from aiohttp.web import WebSocketResponse + +from ...logging import get_logger + +from ... import tools +from ... import aiotools + +from ...htserver import exposed_http +from ...htserver import exposed_ws +from ...htserver import WsSession +from ...htserver import HttpServer + +from ...clients.streamer import StreamerError +from ...clients.streamer import StreamerPermError +from ...clients.streamer import StreamerFormats +from ...clients.streamer import BaseStreamerClient + + +# ===== +class _Source: + kind: str + fmt: str + streamer: BaseStreamerClient + meta: dict = dataclasses.field(default_factory=dict) + clients: dict[WsSession, "_Client"] = dataclasses.field(default_factory=dict) + key_required: bool = dataclasses.field(default=False) + + +class _Client: + ws: WsSession + src: _Source + queue: asyncio.Queue[dict] + sender: (asyncio.Task | None) = dataclasses.field(default=None) + + +class MediaServer(HttpServer): + __K_VIDEO = "video" + + __F_H264 = "h264" + __F_JPEG = "jpeg" + + __Q_SIZE = 32 + + def __init__( + self, + h264_streamer: (BaseStreamerClient | None), + jpeg_streamer: (BaseStreamerClient | None), + ) -> None: + + super().__init__() + + self.__srcs: list[_Source] = [] + if h264_streamer: + self.__srcs.append(_Source(self.__K_VIDEO, self.__F_H264, h264_streamer, {"profile_level_id": "42E01F"})) + if jpeg_streamer: + self.__srcs.append(_Source(self.__K_VIDEO, self.__F_JPEG, jpeg_streamer)) + + # ===== + + @exposed_http("GET", "/ws") + async def __ws_handler(self, req: Request) -> WebSocketResponse: + async with self._ws_session(req) as ws: + media: dict = {self.__K_VIDEO: {}} + for src in self.__srcs: + media[src.kind][src.fmt] = src.meta + await ws.send_event("media", media) + return (await self._ws_loop(ws)) + + @exposed_ws(0) + async def __ws_bin_ping_handler(self, ws: WsSession, _: bytes) -> None: + await ws.send_bin(255, b"") # Ping-pong + + @exposed_ws("start") + async def __ws_start_handler(self, ws: WsSession, event: dict) -> None: + try: + kind = str(event.get("kind")) + fmt = str(event.get("format")) + except Exception: + return + src: (_Source | None) = None + for cand in self.__srcs: + if ws in cand.clients: + return # Don't allow any double streaming + if (cand.kind, cand.fmt) == (kind, fmt): + src = cand + if src: + client = _Client(ws, src, asyncio.Queue(self.__Q_SIZE)) + client.sender = aiotools.create_deadly_task(str(ws), self.__sender(client)) + src.clients[ws] = client + get_logger(0).info("Streaming %s to %s ...", src.streamer, ws) + + # ===== + + async def _init_app(self) -> None: + logger = get_logger(0) + for src in self.__srcs: + logger.info("Starting streamer %s ...", src.streamer) + aiotools.create_deadly_task(str(src.streamer), self.__streamer(src)) + self._add_exposed(self) + + async def _on_shutdown(self) -> None: + logger = get_logger(0) + logger.info("Stopping system tasks ...") + await aiotools.stop_all_deadly_tasks() + logger.info("Disconnecting clients ...") + await self._close_all_wss() + logger.info("On-Shutdown complete") + + async def _on_ws_closed(self, ws: WsSession) -> None: + for src in self.__srcs: + client = src.clients.pop(ws, None) + if client and client.sender: + get_logger(0).info("Closed stream for %s", ws) + client.sender.cancel() + return + + # ===== + + async def __sender(self, client: _Client) -> None: + need_key = StreamerFormats.is_diff(client.src.streamer.get_format()) + if need_key: + client.src.key_required = True + has_key = False + while True: + frame = await client.queue.get() + has_key = (not need_key or has_key or frame["key"]) + if has_key: + try: + await client.ws.send_bin(1, frame["key"].to_bytes() + frame["data"]) + except Exception: + pass + + async def __streamer(self, src: _Source) -> None: + logger = get_logger(0) + while True: + if len(src.clients) == 0: + await asyncio.sleep(1) + continue + try: + async with src.streamer.reading() as read_frame: + while len(src.clients) > 0: + frame = await read_frame(src.key_required) + if frame["key"]: + src.key_required = False + for client in src.clients.values(): + try: + client.queue.put_nowait(frame) + except asyncio.QueueFull: + # Если какой-то из клиентов не справляется, очищаем ему очередь и запрашиваем кейфрейм. + # Я вижу у такой логики кучу минусов, хз как себя покажет, но лучше пока ничего не придумал. + tools.clear_queue(client.queue) + src.key_required = True + except Exception: + pass + except StreamerError as ex: + if isinstance(ex, StreamerPermError): + logger.exception("Streamer failed: %s", src.streamer) + else: + logger.error("Streamer error: %s: %s", src.streamer, tools.efmt(ex)) + except Exception: + get_logger(0).exception("Unexpected streamer error: %s", src.streamer) + await asyncio.sleep(1) diff --git a/kvmd/apps/pst/server.py b/kvmd/apps/pst/server.py index 8d8bf9d4..d96043b3 100644 --- a/kvmd/apps/pst/server.py +++ b/kvmd/apps/pst/server.py @@ -104,10 +104,10 @@ class PstServer(HttpServer): # pylint: disable=too-many-arguments,too-many-inst await self.__remount_storage(rw=False) logger.info("On-Cleanup complete") - async def _on_ws_opened(self) -> None: + async def _on_ws_opened(self, _: WsSession) -> None: self.__notifier.notify() - async def _on_ws_closed(self) -> None: + async def _on_ws_closed(self, _: WsSession) -> None: self.__notifier.notify() # ===== SYSTEM TASKS diff --git a/kvmd/clients/streamer.py b/kvmd/clients/streamer.py index 5369892e..cd5f03ec 100644 --- a/kvmd/clients/streamer.py +++ b/kvmd/clients/streamer.py @@ -63,6 +63,10 @@ class StreamerFormats: H264 = 875967048 # V4L2_PIX_FMT_H264 _MJPEG = 1196444237 # V4L2_PIX_FMT_MJPEG + @classmethod + def is_diff(cls, fmt: int) -> bool: + return (fmt == cls.H264) + class BaseStreamerClient: def get_format(self) -> int: diff --git a/kvmd/htserver.py b/kvmd/htserver.py index 351c1328..63c82fcb 100644 --- a/kvmd/htserver.py +++ b/kvmd/htserver.py @@ -232,6 +232,16 @@ async def send_ws_event( })) +async def send_ws_bin( + wsr: (ClientWebSocketResponse | WebSocketResponse), + op: int, + data: bytes, +) -> None: + + assert 0 <= op <= 255 + await wsr.send_bytes(op.to_bytes() + data) + + def parse_ws_event(msg: str) -> tuple[str, dict]: data = json.loads(msg) if not isinstance(data, dict): @@ -264,14 +274,24 @@ def set_request_auth_info(req: BaseRequest, info: str) -> None: @dataclasses.dataclass(frozen=True) class WsSession: wsr: WebSocketResponse - kwargs: dict[str, Any] + kwargs: dict[str, Any] = dataclasses.field(hash=False) def __str__(self) -> str: return f"WsSession(id={id(self)}, {self.kwargs})" + def is_alive(self) -> bool: + return ( + not self.wsr.closed + and self.wsr._req is not None # pylint: disable=protected-access + and self.wsr._req.transport is not None # pylint: disable=protected-access + ) + async def send_event(self, event_type: str, event: (dict | None)) -> None: await send_ws_event(self.wsr, event_type, event) + async def send_bin(self, op: int, data: bytes) -> None: + await send_ws_bin(self.wsr, op, data) + class HttpServer: def __init__(self) -> None: @@ -353,7 +373,7 @@ class HttpServer: get_logger(2).info("Registered new client session: %s; clients now: %d", ws, len(self.__ws_sessions)) try: - await self._on_ws_opened() + await self._on_ws_opened(ws) yield ws finally: await aiotools.shield_fg(self.__close_ws(ws)) @@ -389,12 +409,7 @@ class HttpServer: await asyncio.gather(*[ ws.send_event(event_type, event) for ws in self.__ws_sessions - if ( - not ws.wsr.closed - and ws.wsr._req is not None # pylint: disable=protected-access - and ws.wsr._req.transport is not None # pylint: disable=protected-access - and (legacy is None or ws.kwargs.get("legacy") == legacy) - ) + if ws.is_alive() and (legacy is None or ws.kwargs.get("legacy") == legacy) ], return_exceptions=True) async def _close_all_wss(self) -> bool: @@ -414,7 +429,7 @@ class HttpServer: await ws.wsr.close() except Exception: pass - await self._on_ws_closed() + await self._on_ws_closed(ws) # ===== @@ -430,10 +445,10 @@ class HttpServer: async def _on_cleanup(self) -> None: pass - async def _on_ws_opened(self) -> None: + async def _on_ws_opened(self, ws: WsSession) -> None: pass - async def _on_ws_closed(self) -> None: + async def _on_ws_closed(self, ws: WsSession) -> None: pass # ===== diff --git a/kvmd/tools.py b/kvmd/tools.py index 14a58ab3..6dd7d2f9 100644 --- a/kvmd/tools.py +++ b/kvmd/tools.py @@ -20,6 +20,7 @@ # ========================================================================== # +import asyncio import operator import functools import multiprocessing.queues @@ -64,11 +65,11 @@ def swapped_kvs(dct: dict[_DictKeyT, _DictValueT]) -> dict[_DictValueT, _DictKey # ===== -def clear_queue(q: multiprocessing.queues.Queue) -> None: # pylint: disable=invalid-name +def clear_queue(q: (multiprocessing.queues.Queue | asyncio.Queue)) -> None: # pylint: disable=invalid-name for _ in range(q.qsize()): try: q.get_nowait() - except queue.Empty: + except (queue.Empty, asyncio.QueueEmpty): break |