diff options
author | Maxim Devaev <[email protected]> | 2024-12-18 06:39:18 +0200 |
---|---|---|
committer | Maxim Devaev <[email protected]> | 2024-12-18 06:39:18 +0200 |
commit | af2ee26a2f022bff01ca446814a35c4aea14d5ca (patch) | |
tree | 8826291c60ea78935a113f5f2a17f11a288e6ab1 /kvmd/apps | |
parent | 596334735e1e7a0ebd685ff7df2a8dbd776763f0 (diff) |
kvmd-media server
Diffstat (limited to 'kvmd/apps')
-rw-r--r-- | kvmd/apps/__init__.py | 26 | ||||
-rw-r--r-- | kvmd/apps/kvmd/server.py | 4 | ||||
-rw-r--r-- | kvmd/apps/media/__init__.py | 48 | ||||
-rw-r--r-- | kvmd/apps/media/__main__.py | 24 | ||||
-rw-r--r-- | kvmd/apps/media/server.py | 190 | ||||
-rw-r--r-- | kvmd/apps/pst/server.py | 4 |
6 files changed, 292 insertions, 4 deletions
diff --git a/kvmd/apps/__init__.py b/kvmd/apps/__init__.py index 2090e5c6..8a1fc96e 100644 --- a/kvmd/apps/__init__.py +++ b/kvmd/apps/__init__.py @@ -509,6 +509,32 @@ def _get_config_scheme() -> dict: }, }, + "media": { + "server": { + "unix": Option("/run/kvmd/media.sock", type=valid_abs_path, unpack_as="unix_path"), + "unix_rm": Option(True, type=valid_bool), + "unix_mode": Option(0o660, type=valid_unix_mode), + "heartbeat": Option(15.0, type=valid_float_f01), + "access_log_format": Option("[%P / %{X-Real-IP}i] '%r' => %s; size=%b ---" + " referer='%{Referer}i'; user_agent='%{User-Agent}i'"), + }, + + "memsink": { + "jpeg": { + "sink": Option("", unpack_as="obj"), + "lock_timeout": Option(1.0, type=valid_float_f01), + "wait_timeout": Option(1.0, type=valid_float_f01), + "drop_same_frames": Option(0.0, type=valid_float_f0), + }, + "h264": { + "sink": Option("", unpack_as="obj"), + "lock_timeout": Option(1.0, type=valid_float_f01), + "wait_timeout": Option(1.0, type=valid_float_f01), + "drop_same_frames": Option(0.0, type=valid_float_f0), + }, + }, + }, + "pst": { "server": { "unix": Option("/run/kvmd/pst.sock", type=valid_abs_path, unpack_as="unix_path"), diff --git a/kvmd/apps/kvmd/server.py b/kvmd/apps/kvmd/server.py index 92eb496c..8e1f4adc 100644 --- a/kvmd/apps/kvmd/server.py +++ b/kvmd/apps/kvmd/server.py @@ -298,10 +298,10 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins logger.exception("Cleanup error on %s", sub.name) logger.info("On-Cleanup complete") - async def _on_ws_opened(self) -> None: + async def _on_ws_opened(self, _: WsSession) -> None: self.__streamer_notifier.notify() - async def _on_ws_closed(self) -> None: + async def _on_ws_closed(self, _: WsSession) -> None: self.__hid.clear_events() self.__streamer_notifier.notify() diff --git a/kvmd/apps/media/__init__.py b/kvmd/apps/media/__init__.py new file mode 100644 index 00000000..325a817c --- /dev/null +++ b/kvmd/apps/media/__init__.py @@ -0,0 +1,48 @@ +# ========================================================================== # +# # +# KVMD - The main PiKVM daemon. # +# # +# Copyright (C) 2020 Maxim Devaev <[email protected]> # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see <https://www.gnu.org/licenses/>. # +# # +# ========================================================================== # + + +from ...clients.streamer import StreamerFormats +from ...clients.streamer import MemsinkStreamerClient + +from .. import init + +from .server import MediaServer + + +# ===== +def main(argv: (list[str] | None)=None) -> None: + config = init( + prog="kvmd-media", + description="The media proxy", + check_run=True, + argv=argv, + )[2].media + + def make_streamer(name: str, fmt: int) -> (MemsinkStreamerClient | None): + if getattr(config.memsink, name).sink: + return MemsinkStreamerClient(name.upper(), fmt, **getattr(config.memsink, name)._unpack()) + return None + + MediaServer( + h264_streamer=make_streamer("h264", StreamerFormats.H264), + jpeg_streamer=make_streamer("jpeg", StreamerFormats.JPEG), + ).run(**config.server._unpack()) diff --git a/kvmd/apps/media/__main__.py b/kvmd/apps/media/__main__.py new file mode 100644 index 00000000..ab578e06 --- /dev/null +++ b/kvmd/apps/media/__main__.py @@ -0,0 +1,24 @@ +# ========================================================================== # +# # +# KVMD - The main PiKVM daemon. # +# # +# Copyright (C) 2020 Maxim Devaev <[email protected]> # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see <https://www.gnu.org/licenses/>. # +# # +# ========================================================================== # + + +from . import main +main() diff --git a/kvmd/apps/media/server.py b/kvmd/apps/media/server.py new file mode 100644 index 00000000..2763ffa0 --- /dev/null +++ b/kvmd/apps/media/server.py @@ -0,0 +1,190 @@ +# ========================================================================== # +# # +# KVMD - The main PiKVM daemon. # +# # +# Copyright (C) 2020 Maxim Devaev <[email protected]> # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see <https://www.gnu.org/licenses/>. # +# # +# ========================================================================== # + + +import asyncio +import dataclasses + +from aiohttp.web import Request +from aiohttp.web import WebSocketResponse + +from ...logging import get_logger + +from ... import tools +from ... import aiotools + +from ...htserver import exposed_http +from ...htserver import exposed_ws +from ...htserver import WsSession +from ...htserver import HttpServer + +from ...clients.streamer import StreamerError +from ...clients.streamer import StreamerPermError +from ...clients.streamer import StreamerFormats +from ...clients.streamer import BaseStreamerClient + + +# ===== +class _Source: + kind: str + fmt: str + streamer: BaseStreamerClient + meta: dict = dataclasses.field(default_factory=dict) + clients: dict[WsSession, "_Client"] = dataclasses.field(default_factory=dict) + key_required: bool = dataclasses.field(default=False) + + +class _Client: + ws: WsSession + src: _Source + queue: asyncio.Queue[dict] + sender: (asyncio.Task | None) = dataclasses.field(default=None) + + +class MediaServer(HttpServer): + __K_VIDEO = "video" + + __F_H264 = "h264" + __F_JPEG = "jpeg" + + __Q_SIZE = 32 + + def __init__( + self, + h264_streamer: (BaseStreamerClient | None), + jpeg_streamer: (BaseStreamerClient | None), + ) -> None: + + super().__init__() + + self.__srcs: list[_Source] = [] + if h264_streamer: + self.__srcs.append(_Source(self.__K_VIDEO, self.__F_H264, h264_streamer, {"profile_level_id": "42E01F"})) + if jpeg_streamer: + self.__srcs.append(_Source(self.__K_VIDEO, self.__F_JPEG, jpeg_streamer)) + + # ===== + + @exposed_http("GET", "/ws") + async def __ws_handler(self, req: Request) -> WebSocketResponse: + async with self._ws_session(req) as ws: + media: dict = {self.__K_VIDEO: {}} + for src in self.__srcs: + media[src.kind][src.fmt] = src.meta + await ws.send_event("media", media) + return (await self._ws_loop(ws)) + + @exposed_ws(0) + async def __ws_bin_ping_handler(self, ws: WsSession, _: bytes) -> None: + await ws.send_bin(255, b"") # Ping-pong + + @exposed_ws("start") + async def __ws_start_handler(self, ws: WsSession, event: dict) -> None: + try: + kind = str(event.get("kind")) + fmt = str(event.get("format")) + except Exception: + return + src: (_Source | None) = None + for cand in self.__srcs: + if ws in cand.clients: + return # Don't allow any double streaming + if (cand.kind, cand.fmt) == (kind, fmt): + src = cand + if src: + client = _Client(ws, src, asyncio.Queue(self.__Q_SIZE)) + client.sender = aiotools.create_deadly_task(str(ws), self.__sender(client)) + src.clients[ws] = client + get_logger(0).info("Streaming %s to %s ...", src.streamer, ws) + + # ===== + + async def _init_app(self) -> None: + logger = get_logger(0) + for src in self.__srcs: + logger.info("Starting streamer %s ...", src.streamer) + aiotools.create_deadly_task(str(src.streamer), self.__streamer(src)) + self._add_exposed(self) + + async def _on_shutdown(self) -> None: + logger = get_logger(0) + logger.info("Stopping system tasks ...") + await aiotools.stop_all_deadly_tasks() + logger.info("Disconnecting clients ...") + await self._close_all_wss() + logger.info("On-Shutdown complete") + + async def _on_ws_closed(self, ws: WsSession) -> None: + for src in self.__srcs: + client = src.clients.pop(ws, None) + if client and client.sender: + get_logger(0).info("Closed stream for %s", ws) + client.sender.cancel() + return + + # ===== + + async def __sender(self, client: _Client) -> None: + need_key = StreamerFormats.is_diff(client.src.streamer.get_format()) + if need_key: + client.src.key_required = True + has_key = False + while True: + frame = await client.queue.get() + has_key = (not need_key or has_key or frame["key"]) + if has_key: + try: + await client.ws.send_bin(1, frame["key"].to_bytes() + frame["data"]) + except Exception: + pass + + async def __streamer(self, src: _Source) -> None: + logger = get_logger(0) + while True: + if len(src.clients) == 0: + await asyncio.sleep(1) + continue + try: + async with src.streamer.reading() as read_frame: + while len(src.clients) > 0: + frame = await read_frame(src.key_required) + if frame["key"]: + src.key_required = False + for client in src.clients.values(): + try: + client.queue.put_nowait(frame) + except asyncio.QueueFull: + # Если какой-то из клиентов не справляется, очищаем ему очередь и запрашиваем кейфрейм. + # Я вижу у такой логики кучу минусов, хз как себя покажет, но лучше пока ничего не придумал. + tools.clear_queue(client.queue) + src.key_required = True + except Exception: + pass + except StreamerError as ex: + if isinstance(ex, StreamerPermError): + logger.exception("Streamer failed: %s", src.streamer) + else: + logger.error("Streamer error: %s: %s", src.streamer, tools.efmt(ex)) + except Exception: + get_logger(0).exception("Unexpected streamer error: %s", src.streamer) + await asyncio.sleep(1) diff --git a/kvmd/apps/pst/server.py b/kvmd/apps/pst/server.py index 8d8bf9d4..d96043b3 100644 --- a/kvmd/apps/pst/server.py +++ b/kvmd/apps/pst/server.py @@ -104,10 +104,10 @@ class PstServer(HttpServer): # pylint: disable=too-many-arguments,too-many-inst await self.__remount_storage(rw=False) logger.info("On-Cleanup complete") - async def _on_ws_opened(self) -> None: + async def _on_ws_opened(self, _: WsSession) -> None: self.__notifier.notify() - async def _on_ws_closed(self) -> None: + async def _on_ws_closed(self, _: WsSession) -> None: self.__notifier.notify() # ===== SYSTEM TASKS |