# ========================================================================== # # # # KVMD - The main PiKVM daemon. # # # # Copyright (C) 2020 Maxim Devaev # # # # This program is free software: you can redistribute it and/or modify # # it under the terms of the GNU General Public License as published by # # the Free Software Foundation, either version 3 of the License, or # # (at your option) any later version. # # # # This program is distributed in the hope that it will be useful, # # but WITHOUT ANY WARRANTY; without even the implied warranty of # # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # # GNU General Public License for more details. # # # # You should have received a copy of the GNU General Public License # # along with this program. If not, see . # # # # ========================================================================== # import io import contextlib import dataclasses import functools import types from typing import Callable from typing import Awaitable from typing import Generator from typing import AsyncGenerator import aiohttp import ustreamer from PIL import Image as PilImage from .. import tools from .. import aiotools from .. import htclient from . import BaseHttpClient from . import BaseHttpClientSession # ===== class StreamerError(Exception): pass class StreamerTempError(StreamerError): pass class StreamerPermError(StreamerError): pass # ===== class StreamerFormats: JPEG = 1195724874 # V4L2_PIX_FMT_JPEG H264 = 875967048 # V4L2_PIX_FMT_H264 _MJPEG = 1196444237 # V4L2_PIX_FMT_MJPEG @classmethod def is_diff(cls, fmt: int) -> bool: return (fmt == cls.H264) class BaseStreamerClient: def get_format(self) -> int: raise NotImplementedError() @contextlib.asynccontextmanager async def reading(self) -> AsyncGenerator[Callable[[bool], Awaitable[dict]], None]: if self is not None: # XXX: Vulture and pylint hack raise NotImplementedError() yield # ===== @dataclasses.dataclass(frozen=True) class StreamerSnapshot: online: bool width: int height: int headers: tuple[tuple[str, str], ...] data: bytes async def make_preview(self, max_width: int, max_height: int, quality: int) -> bytes: assert max_width >= 0 assert max_height >= 0 assert quality > 0 if max_width == 0 and max_height == 0: max_width = self.width // 5 max_height = self.height // 5 else: max_width = min((max_width or self.width), self.width) max_height = min((max_height or self.height), self.height) if (max_width, max_height) == (self.width, self.height): return self.data return (await aiotools.run_async(self.__inner_make_preview, max_width, max_height, quality)) @functools.lru_cache(maxsize=1) def __inner_make_preview(self, max_width: int, max_height: int, quality: int) -> bytes: with io.BytesIO(self.data) as snapshot_bio: with io.BytesIO() as preview_bio: with PilImage.open(snapshot_bio) as image: image.thumbnail((max_width, max_height), PilImage.Resampling.LANCZOS) image.save(preview_bio, format="jpeg", quality=quality) return preview_bio.getvalue() class HttpStreamerClientSession(BaseHttpClientSession): async def get_state(self) -> dict: session = self._ensure_http_session() async with session.get("/state") as response: htclient.raise_not_200(response) return (await response.json())["result"] async def take_snapshot(self, timeout: float) -> StreamerSnapshot: session = self._ensure_http_session() async with session.get( url="/snapshot", timeout=aiohttp.ClientTimeout(total=timeout), ) as response: htclient.raise_not_200(response) return StreamerSnapshot( online=(response.headers["X-UStreamer-Online"] == "true"), width=int(response.headers["X-UStreamer-Width"]), height=int(response.headers["X-UStreamer-Height"]), headers=tuple( (key, value) for (key, value) in tools.sorted_kvs(dict(response.headers)) if key.lower().startswith("x-ustreamer-") or key.lower() in [ "x-timestamp", "access-control-allow-origin", "cache-control", "pragma", "expires", ] ), data=bytes(await response.read()), ) @contextlib.contextmanager def _http_reading_handle_errors() -> Generator[None, None, None]: try: yield except Exception as ex: # Тут бывают и ассерты, и KeyError, и прочая херня if isinstance(ex, StreamerTempError): raise raise StreamerTempError(tools.efmt(ex)) class HttpStreamerClient(BaseHttpClient, BaseStreamerClient): def __init__( self, name: str, unix_path: str, timeout: float, user_agent: str, ) -> None: super().__init__(unix_path, timeout, user_agent) self.__name = name def make_session(self) -> HttpStreamerClientSession: return HttpStreamerClientSession(self._make_http_session) def get_format(self) -> int: return StreamerFormats.JPEG @contextlib.asynccontextmanager async def reading(self) -> AsyncGenerator[Callable[[bool], Awaitable[dict]], None]: with _http_reading_handle_errors(): async with self._make_http_session() as session: async with session.get( url="/stream", params={"extra_headers": "1"}, timeout=aiohttp.ClientTimeout( connect=session.timeout.total, sock_read=session.timeout.total, ), ) as response: htclient.raise_not_200(response) reader = aiohttp.MultipartReader.from_response(response) self.__patch_stream_reader(reader.resp.content) async def read_frame(key_required: bool) -> dict: _ = key_required with _http_reading_handle_errors(): frame = await reader.next() # pylint: disable=not-callable if not isinstance(frame, aiohttp.BodyPartReader): raise StreamerTempError("Expected body part") data = bytes(await frame.read()) if not data: raise StreamerTempError("Reached EOF") return { "online": (frame.headers["X-UStreamer-Online"] == "true"), "width": int(frame.headers["X-UStreamer-Width"]), "height": int(frame.headers["X-UStreamer-Height"]), "data": data, "format": StreamerFormats.JPEG, } yield read_frame def __patch_stream_reader(self, reader: aiohttp.StreamReader) -> None: # https://github.com/pikvm/pikvm/issues/92 # Infinite looping in BodyPartReader.read() because _at_eof flag. orig_read = reader.read async def read(self: aiohttp.StreamReader, n: int=-1) -> bytes: # pylint: disable=invalid-name if self.is_eof(): raise StreamerTempError("StreamReader.read(): Reached EOF") return (await orig_read(n)) reader.read = types.MethodType(read, reader) # type: ignore def __str__(self) -> str: return f"HttpStreamerClient({self.__name})" # ===== @contextlib.contextmanager def _memsink_reading_handle_errors() -> Generator[None, None, None]: try: yield except StreamerPermError: raise except FileNotFoundError as ex: raise StreamerTempError(tools.efmt(ex)) except Exception as ex: raise StreamerPermError(tools.efmt(ex)) class MemsinkStreamerClient(BaseStreamerClient): def __init__( self, name: str, fmt: int, obj: str, lock_timeout: float, wait_timeout: float, drop_same_frames: float, ) -> None: self.__name = name self.__fmt = fmt self.__kwargs: dict = { "obj": obj, "lock_timeout": lock_timeout, "wait_timeout": wait_timeout, "drop_same_frames": drop_same_frames, } def get_format(self) -> int: return self.__fmt @contextlib.asynccontextmanager async def reading(self) -> AsyncGenerator[Callable[[bool], Awaitable[dict]], None]: with _memsink_reading_handle_errors(): with ustreamer.Memsink(**self.__kwargs) as sink: async def read_frame(key_required: bool) -> dict: key_required = (key_required and self.__fmt == StreamerFormats.H264) with _memsink_reading_handle_errors(): while True: frame = await aiotools.run_async(sink.wait_frame, key_required) if frame is not None: self.__check_format(frame["format"]) return frame yield read_frame def __check_format(self, fmt: int) -> None: if fmt == StreamerFormats._MJPEG: # pylint: disable=protected-access fmt = StreamerFormats.JPEG if fmt != self.__fmt: raise StreamerPermError("Invalid sink format") def __str__(self) -> str: return f"MemsinkStreamerClient({self.__name})"