summaryrefslogtreecommitdiff
path: root/kvmd/clients/streamer.py
diff options
context:
space:
mode:
authorMaxim Devaev <[email protected]>2024-10-22 00:51:03 +0300
committerMaxim Devaev <[email protected]>2024-10-22 05:39:18 +0300
commit0e4a70e7b9cdde53b1063686a896057d3c940e35 (patch)
treefd88a4f7a8c06341ff600dc27461437b90fb5bd3 /kvmd/clients/streamer.py
parentcda32a083faf3e7326cfe317336e473c905c6dfd (diff)
refactoring
Diffstat (limited to 'kvmd/clients/streamer.py')
-rw-r--r--kvmd/clients/streamer.py133
1 files changed, 100 insertions, 33 deletions
diff --git a/kvmd/clients/streamer.py b/kvmd/clients/streamer.py
index fdc855bb..5369892e 100644
--- a/kvmd/clients/streamer.py
+++ b/kvmd/clients/streamer.py
@@ -20,7 +20,10 @@
# ========================================================================== #
+import io
import contextlib
+import dataclasses
+import functools
import types
from typing import Callable
@@ -31,10 +34,15 @@ from typing import AsyncGenerator
import aiohttp
import ustreamer
+from PIL import Image as PilImage
+
from .. import tools
from .. import aiotools
from .. import htclient
+from . import BaseHttpClient
+from . import BaseHttpClientSession
+
# =====
class StreamerError(Exception):
@@ -50,7 +58,7 @@ class StreamerPermError(StreamerError):
# =====
-class StreamFormats:
+class StreamerFormats:
JPEG = 1195724874 # V4L2_PIX_FMT_JPEG
H264 = 875967048 # V4L2_PIX_FMT_H264
_MJPEG = 1196444237 # V4L2_PIX_FMT_MJPEG
@@ -68,8 +76,76 @@ class BaseStreamerClient:
# =====
[email protected](frozen=True)
+class StreamerSnapshot:
+ online: bool
+ width: int
+ height: int
+ headers: tuple[tuple[str, str], ...]
+ data: bytes
+
+ async def make_preview(self, max_width: int, max_height: int, quality: int) -> bytes:
+ assert max_width >= 0
+ assert max_height >= 0
+ assert quality > 0
+
+ if max_width == 0 and max_height == 0:
+ max_width = self.width // 5
+ max_height = self.height // 5
+ else:
+ max_width = min((max_width or self.width), self.width)
+ max_height = min((max_height or self.height), self.height)
+
+ if (max_width, max_height) == (self.width, self.height):
+ return self.data
+ return (await aiotools.run_async(self.__inner_make_preview, max_width, max_height, quality))
+
+ @functools.lru_cache(maxsize=1)
+ def __inner_make_preview(self, max_width: int, max_height: int, quality: int) -> bytes:
+ with io.BytesIO(self.data) as snapshot_bio:
+ with io.BytesIO() as preview_bio:
+ with PilImage.open(snapshot_bio) as image:
+ image.thumbnail((max_width, max_height), PilImage.Resampling.LANCZOS)
+ image.save(preview_bio, format="jpeg", quality=quality)
+ return preview_bio.getvalue()
+
+
+class HttpStreamerClientSession(BaseHttpClientSession):
+ async def get_state(self) -> dict:
+ session = self._ensure_http_session()
+ async with session.get("/state") as response:
+ htclient.raise_not_200(response)
+ return (await response.json())["result"]
+
+ async def take_snapshot(self, timeout: float) -> StreamerSnapshot:
+ session = self._ensure_http_session()
+ async with session.get(
+ url="/snapshot",
+ timeout=aiohttp.ClientTimeout(total=timeout),
+ ) as response:
+
+ htclient.raise_not_200(response)
+ return StreamerSnapshot(
+ online=(response.headers["X-UStreamer-Online"] == "true"),
+ width=int(response.headers["X-UStreamer-Width"]),
+ height=int(response.headers["X-UStreamer-Height"]),
+ headers=tuple(
+ (key, value)
+ for (key, value) in tools.sorted_kvs(dict(response.headers))
+ if key.lower().startswith("x-ustreamer-") or key.lower() in [
+ "x-timestamp",
+ "access-control-allow-origin",
+ "cache-control",
+ "pragma",
+ "expires",
+ ]
+ ),
+ data=bytes(await response.read()),
+ )
+
+
@contextlib.contextmanager
-def _http_handle_errors() -> Generator[None, None, None]:
+def _http_reading_handle_errors() -> Generator[None, None, None]:
try:
yield
except Exception as ex: # Тут бывают и ассерты, и KeyError, и прочая херня
@@ -78,7 +154,7 @@ def _http_handle_errors() -> Generator[None, None, None]:
raise StreamerTempError(tools.efmt(ex))
-class HttpStreamerClient(BaseStreamerClient):
+class HttpStreamerClient(BaseHttpClient, BaseStreamerClient):
def __init__(
self,
name: str,
@@ -87,29 +163,35 @@ class HttpStreamerClient(BaseStreamerClient):
user_agent: str,
) -> None:
+ super().__init__(unix_path, timeout, user_agent)
self.__name = name
- self.__unix_path = unix_path
- self.__timeout = timeout
- self.__user_agent = user_agent
+
+ def make_session(self) -> HttpStreamerClientSession:
+ return HttpStreamerClientSession(self._make_http_session)
def get_format(self) -> int:
- return StreamFormats.JPEG
+ return StreamerFormats.JPEG
@contextlib.asynccontextmanager
async def reading(self) -> AsyncGenerator[Callable[[bool], Awaitable[dict]], None]:
- with _http_handle_errors():
- async with self.__make_http_session() as session:
+ with _http_reading_handle_errors():
+ async with self._make_http_session() as session:
async with session.get(
- url=self.__make_url("stream"),
+ url="/stream",
params={"extra_headers": "1"},
+ timeout=aiohttp.ClientTimeout(
+ connect=session.timeout.total,
+ sock_read=session.timeout.total,
+ ),
) as response:
+
htclient.raise_not_200(response)
reader = aiohttp.MultipartReader.from_response(response)
self.__patch_stream_reader(reader.resp.content)
async def read_frame(key_required: bool) -> dict:
_ = key_required
- with _http_handle_errors():
+ with _http_reading_handle_errors():
frame = await reader.next() # pylint: disable=not-callable
if not isinstance(frame, aiohttp.BodyPartReader):
raise StreamerTempError("Expected body part")
@@ -123,26 +205,11 @@ class HttpStreamerClient(BaseStreamerClient):
"width": int(frame.headers["X-UStreamer-Width"]),
"height": int(frame.headers["X-UStreamer-Height"]),
"data": data,
- "format": StreamFormats.JPEG,
+ "format": StreamerFormats.JPEG,
}
yield read_frame
- def __make_http_session(self) -> aiohttp.ClientSession:
- kwargs: dict = {
- "headers": {"User-Agent": self.__user_agent},
- "connector": aiohttp.UnixConnector(path=self.__unix_path),
- "timeout": aiohttp.ClientTimeout(
- connect=self.__timeout,
- sock_read=self.__timeout,
- ),
- }
- return aiohttp.ClientSession(**kwargs)
-
- def __make_url(self, handle: str) -> str:
- assert not handle.startswith("/"), handle
- return f"http://localhost:0/{handle}"
-
def __patch_stream_reader(self, reader: aiohttp.StreamReader) -> None:
# https://github.com/pikvm/pikvm/issues/92
# Infinite looping in BodyPartReader.read() because _at_eof flag.
@@ -162,7 +229,7 @@ class HttpStreamerClient(BaseStreamerClient):
# =====
@contextlib.contextmanager
-def _memsink_handle_errors() -> Generator[None, None, None]:
+def _memsink_reading_handle_errors() -> Generator[None, None, None]:
try:
yield
except StreamerPermError:
@@ -198,11 +265,11 @@ class MemsinkStreamerClient(BaseStreamerClient):
@contextlib.asynccontextmanager
async def reading(self) -> AsyncGenerator[Callable[[bool], Awaitable[dict]], None]:
- with _memsink_handle_errors():
+ with _memsink_reading_handle_errors():
with ustreamer.Memsink(**self.__kwargs) as sink:
async def read_frame(key_required: bool) -> dict:
- key_required = (key_required and self.__fmt == StreamFormats.H264)
- with _memsink_handle_errors():
+ key_required = (key_required and self.__fmt == StreamerFormats.H264)
+ with _memsink_reading_handle_errors():
while True:
frame = await aiotools.run_async(sink.wait_frame, key_required)
if frame is not None:
@@ -211,8 +278,8 @@ class MemsinkStreamerClient(BaseStreamerClient):
yield read_frame
def __check_format(self, fmt: int) -> None:
- if fmt == StreamFormats._MJPEG: # pylint: disable=protected-access
- fmt = StreamFormats.JPEG
+ if fmt == StreamerFormats._MJPEG: # pylint: disable=protected-access
+ fmt = StreamerFormats.JPEG
if fmt != self.__fmt:
raise StreamerPermError("Invalid sink format")