diff options
author | Maxim Devaev <[email protected]> | 2024-10-22 00:51:03 +0300 |
---|---|---|
committer | Maxim Devaev <[email protected]> | 2024-10-22 05:39:18 +0300 |
commit | 0e4a70e7b9cdde53b1063686a896057d3c940e35 (patch) | |
tree | fd88a4f7a8c06341ff600dc27461437b90fb5bd3 /kvmd/clients/streamer.py | |
parent | cda32a083faf3e7326cfe317336e473c905c6dfd (diff) |
refactoring
Diffstat (limited to 'kvmd/clients/streamer.py')
-rw-r--r-- | kvmd/clients/streamer.py | 133 |
1 files changed, 100 insertions, 33 deletions
diff --git a/kvmd/clients/streamer.py b/kvmd/clients/streamer.py index fdc855bb..5369892e 100644 --- a/kvmd/clients/streamer.py +++ b/kvmd/clients/streamer.py @@ -20,7 +20,10 @@ # ========================================================================== # +import io import contextlib +import dataclasses +import functools import types from typing import Callable @@ -31,10 +34,15 @@ from typing import AsyncGenerator import aiohttp import ustreamer +from PIL import Image as PilImage + from .. import tools from .. import aiotools from .. import htclient +from . import BaseHttpClient +from . import BaseHttpClientSession + # ===== class StreamerError(Exception): @@ -50,7 +58,7 @@ class StreamerPermError(StreamerError): # ===== -class StreamFormats: +class StreamerFormats: JPEG = 1195724874 # V4L2_PIX_FMT_JPEG H264 = 875967048 # V4L2_PIX_FMT_H264 _MJPEG = 1196444237 # V4L2_PIX_FMT_MJPEG @@ -68,8 +76,76 @@ class BaseStreamerClient: # ===== [email protected](frozen=True) +class StreamerSnapshot: + online: bool + width: int + height: int + headers: tuple[tuple[str, str], ...] + data: bytes + + async def make_preview(self, max_width: int, max_height: int, quality: int) -> bytes: + assert max_width >= 0 + assert max_height >= 0 + assert quality > 0 + + if max_width == 0 and max_height == 0: + max_width = self.width // 5 + max_height = self.height // 5 + else: + max_width = min((max_width or self.width), self.width) + max_height = min((max_height or self.height), self.height) + + if (max_width, max_height) == (self.width, self.height): + return self.data + return (await aiotools.run_async(self.__inner_make_preview, max_width, max_height, quality)) + + @functools.lru_cache(maxsize=1) + def __inner_make_preview(self, max_width: int, max_height: int, quality: int) -> bytes: + with io.BytesIO(self.data) as snapshot_bio: + with io.BytesIO() as preview_bio: + with PilImage.open(snapshot_bio) as image: + image.thumbnail((max_width, max_height), PilImage.Resampling.LANCZOS) + image.save(preview_bio, format="jpeg", quality=quality) + return preview_bio.getvalue() + + +class HttpStreamerClientSession(BaseHttpClientSession): + async def get_state(self) -> dict: + session = self._ensure_http_session() + async with session.get("/state") as response: + htclient.raise_not_200(response) + return (await response.json())["result"] + + async def take_snapshot(self, timeout: float) -> StreamerSnapshot: + session = self._ensure_http_session() + async with session.get( + url="/snapshot", + timeout=aiohttp.ClientTimeout(total=timeout), + ) as response: + + htclient.raise_not_200(response) + return StreamerSnapshot( + online=(response.headers["X-UStreamer-Online"] == "true"), + width=int(response.headers["X-UStreamer-Width"]), + height=int(response.headers["X-UStreamer-Height"]), + headers=tuple( + (key, value) + for (key, value) in tools.sorted_kvs(dict(response.headers)) + if key.lower().startswith("x-ustreamer-") or key.lower() in [ + "x-timestamp", + "access-control-allow-origin", + "cache-control", + "pragma", + "expires", + ] + ), + data=bytes(await response.read()), + ) + + @contextlib.contextmanager -def _http_handle_errors() -> Generator[None, None, None]: +def _http_reading_handle_errors() -> Generator[None, None, None]: try: yield except Exception as ex: # Тут бывают и ассерты, и KeyError, и прочая херня @@ -78,7 +154,7 @@ def _http_handle_errors() -> Generator[None, None, None]: raise StreamerTempError(tools.efmt(ex)) -class HttpStreamerClient(BaseStreamerClient): +class HttpStreamerClient(BaseHttpClient, BaseStreamerClient): def __init__( self, name: str, @@ -87,29 +163,35 @@ class HttpStreamerClient(BaseStreamerClient): user_agent: str, ) -> None: + super().__init__(unix_path, timeout, user_agent) self.__name = name - self.__unix_path = unix_path - self.__timeout = timeout - self.__user_agent = user_agent + + def make_session(self) -> HttpStreamerClientSession: + return HttpStreamerClientSession(self._make_http_session) def get_format(self) -> int: - return StreamFormats.JPEG + return StreamerFormats.JPEG @contextlib.asynccontextmanager async def reading(self) -> AsyncGenerator[Callable[[bool], Awaitable[dict]], None]: - with _http_handle_errors(): - async with self.__make_http_session() as session: + with _http_reading_handle_errors(): + async with self._make_http_session() as session: async with session.get( - url=self.__make_url("stream"), + url="/stream", params={"extra_headers": "1"}, + timeout=aiohttp.ClientTimeout( + connect=session.timeout.total, + sock_read=session.timeout.total, + ), ) as response: + htclient.raise_not_200(response) reader = aiohttp.MultipartReader.from_response(response) self.__patch_stream_reader(reader.resp.content) async def read_frame(key_required: bool) -> dict: _ = key_required - with _http_handle_errors(): + with _http_reading_handle_errors(): frame = await reader.next() # pylint: disable=not-callable if not isinstance(frame, aiohttp.BodyPartReader): raise StreamerTempError("Expected body part") @@ -123,26 +205,11 @@ class HttpStreamerClient(BaseStreamerClient): "width": int(frame.headers["X-UStreamer-Width"]), "height": int(frame.headers["X-UStreamer-Height"]), "data": data, - "format": StreamFormats.JPEG, + "format": StreamerFormats.JPEG, } yield read_frame - def __make_http_session(self) -> aiohttp.ClientSession: - kwargs: dict = { - "headers": {"User-Agent": self.__user_agent}, - "connector": aiohttp.UnixConnector(path=self.__unix_path), - "timeout": aiohttp.ClientTimeout( - connect=self.__timeout, - sock_read=self.__timeout, - ), - } - return aiohttp.ClientSession(**kwargs) - - def __make_url(self, handle: str) -> str: - assert not handle.startswith("/"), handle - return f"http://localhost:0/{handle}" - def __patch_stream_reader(self, reader: aiohttp.StreamReader) -> None: # https://github.com/pikvm/pikvm/issues/92 # Infinite looping in BodyPartReader.read() because _at_eof flag. @@ -162,7 +229,7 @@ class HttpStreamerClient(BaseStreamerClient): # ===== @contextlib.contextmanager -def _memsink_handle_errors() -> Generator[None, None, None]: +def _memsink_reading_handle_errors() -> Generator[None, None, None]: try: yield except StreamerPermError: @@ -198,11 +265,11 @@ class MemsinkStreamerClient(BaseStreamerClient): @contextlib.asynccontextmanager async def reading(self) -> AsyncGenerator[Callable[[bool], Awaitable[dict]], None]: - with _memsink_handle_errors(): + with _memsink_reading_handle_errors(): with ustreamer.Memsink(**self.__kwargs) as sink: async def read_frame(key_required: bool) -> dict: - key_required = (key_required and self.__fmt == StreamFormats.H264) - with _memsink_handle_errors(): + key_required = (key_required and self.__fmt == StreamerFormats.H264) + with _memsink_reading_handle_errors(): while True: frame = await aiotools.run_async(sink.wait_frame, key_required) if frame is not None: @@ -211,8 +278,8 @@ class MemsinkStreamerClient(BaseStreamerClient): yield read_frame def __check_format(self, fmt: int) -> None: - if fmt == StreamFormats._MJPEG: # pylint: disable=protected-access - fmt = StreamFormats.JPEG + if fmt == StreamerFormats._MJPEG: # pylint: disable=protected-access + fmt = StreamerFormats.JPEG if fmt != self.__fmt: raise StreamerPermError("Invalid sink format") |