diff options
Diffstat (limited to 'kvmd')
-rw-r--r-- | kvmd/apps/kvmd/streamer.py | 124 | ||||
-rw-r--r-- | kvmd/apps/vnc/__init__.py | 6 | ||||
-rw-r--r-- | kvmd/apps/vnc/server.py | 20 | ||||
-rw-r--r-- | kvmd/clients/__init__.py | 64 | ||||
-rw-r--r-- | kvmd/clients/kvmd.py | 121 | ||||
-rw-r--r-- | kvmd/clients/streamer.py | 133 |
6 files changed, 235 insertions, 233 deletions
diff --git a/kvmd/apps/kvmd/streamer.py b/kvmd/apps/kvmd/streamer.py index c0a56ad1..5ddb7298 100644 --- a/kvmd/apps/kvmd/streamer.py +++ b/kvmd/apps/kvmd/streamer.py @@ -20,12 +20,10 @@ # ========================================================================== # -import io import signal import asyncio import asyncio.subprocess import dataclasses -import functools import copy from typing import AsyncGenerator @@ -33,10 +31,12 @@ from typing import Any import aiohttp -from PIL import Image as PilImage - from ...logging import get_logger +from ...clients.streamer import StreamerSnapshot +from ...clients.streamer import HttpStreamerClient +from ...clients.streamer import HttpStreamerClientSession + from ... import tools from ... import aiotools from ... import aioproc @@ -44,40 +44,6 @@ from ... import htclient # ===== [email protected](frozen=True) -class StreamerSnapshot: - online: bool - width: int - height: int - headers: tuple[tuple[str, str], ...] - data: bytes - - async def make_preview(self, max_width: int, max_height: int, quality: int) -> bytes: - assert max_width >= 0 - assert max_height >= 0 - assert quality > 0 - - if max_width == 0 and max_height == 0: - max_width = self.width // 5 - max_height = self.height // 5 - else: - max_width = min((max_width or self.width), self.width) - max_height = min((max_height or self.height), self.height) - - if (max_width, max_height) == (self.width, self.height): - return self.data - return (await aiotools.run_async(self.__inner_make_preview, max_width, max_height, quality)) - - @functools.lru_cache(maxsize=1) - def __inner_make_preview(self, max_width: int, max_height: int, quality: int) -> bytes: - with io.BytesIO(self.data) as snapshot_bio: - with io.BytesIO() as preview_bio: - with PilImage.open(snapshot_bio) as image: - image.thumbnail((max_width, max_height), PilImage.Resampling.LANCZOS) - image.save(preview_bio, format="jpeg", quality=quality) - return preview_bio.getvalue() - - class _StreamerParams: __DESIRED_FPS = "desired_fps" @@ -204,7 +170,6 @@ class Streamer: # pylint: disable=too-many-instance-attributes self.__state_poll = state_poll self.__unix_path = unix_path - self.__timeout = timeout self.__snapshot_timeout = snapshot_timeout self.__process_name_prefix = process_name_prefix @@ -221,7 +186,13 @@ class Streamer: # pylint: disable=too-many-instance-attributes self.__streamer_task: (asyncio.Task | None) = None self.__streamer_proc: (asyncio.subprocess.Process | None) = None # pylint: disable=no-member - self.__http_session: (aiohttp.ClientSession | None) = None + self.__client = HttpStreamerClient( + name="jpeg", + unix_path=self.__unix_path, + timeout=timeout, + user_agent=htclient.make_user_agent("KVMD"), + ) + self.__client_session: (HttpStreamerClientSession | None) = None self.__snapshot: (StreamerSnapshot | None) = None @@ -300,11 +271,9 @@ class Streamer: # pylint: disable=too-many-instance-attributes async def get_state(self) -> dict: streamer_state = None if self.__streamer_task: - session = self.__ensure_http_session() + session = self.__ensure_client_session() try: - async with session.get(self.__make_url("state")) as response: - htclient.raise_not_200(response) - streamer_state = (await response.json())["result"] + streamer_state = await session.get_state() except (aiohttp.ClientConnectionError, aiohttp.ServerConnectionError): pass except Exception: @@ -350,39 +319,15 @@ class Streamer: # pylint: disable=too-many-instance-attributes if load: return self.__snapshot logger = get_logger() - session = self.__ensure_http_session() + session = self.__ensure_client_session() try: - async with session.get( - self.__make_url("snapshot"), - timeout=aiohttp.ClientTimeout(total=self.__snapshot_timeout), - ) as response: - - htclient.raise_not_200(response) - online = (response.headers["X-UStreamer-Online"] == "true") - if online or allow_offline: - snapshot = StreamerSnapshot( - online=online, - width=int(response.headers["X-UStreamer-Width"]), - height=int(response.headers["X-UStreamer-Height"]), - headers=tuple( - (key, value) - for (key, value) in tools.sorted_kvs(dict(response.headers)) - if key.lower().startswith("x-ustreamer-") or key.lower() in [ - "x-timestamp", - "access-control-allow-origin", - "cache-control", - "pragma", - "expires", - ] - ), - data=bytes(await response.read()), - ) - if save: - self.__snapshot = snapshot - self.__notifier.notify() - return snapshot - logger.error("Stream is offline, no signal or so") - + snapshot = await session.take_snapshot(self.__snapshot_timeout) + if snapshot.online or allow_offline: + if save: + self.__snapshot = snapshot + self.__notifier.notify() + return snapshot + logger.error("Stream is offline, no signal or so") except (aiohttp.ClientConnectionError, aiohttp.ServerConnectionError) as ex: logger.error("Can't connect to streamer: %s", tools.efmt(ex)) except Exception: @@ -397,25 +342,14 @@ class Streamer: # pylint: disable=too-many-instance-attributes @aiotools.atomic_fg async def cleanup(self) -> None: await self.ensure_stop(immediately=True) - if self.__http_session: - await self.__http_session.close() - self.__http_session = None - - # ===== - - def __ensure_http_session(self) -> aiohttp.ClientSession: - if not self.__http_session: - kwargs: dict = { - "headers": {"User-Agent": htclient.make_user_agent("KVMD")}, - "connector": aiohttp.UnixConnector(path=self.__unix_path), - "timeout": aiohttp.ClientTimeout(total=self.__timeout), - } - self.__http_session = aiohttp.ClientSession(**kwargs) - return self.__http_session - - def __make_url(self, handle: str) -> str: - assert not handle.startswith("/"), handle - return f"http://localhost:0/{handle}" + if self.__client_session: + await self.__client_session.close() + self.__client_session = None + + def __ensure_client_session(self) -> HttpStreamerClientSession: + if not self.__client_session: + self.__client_session = self.__client.make_session() + return self.__client_session # ===== diff --git a/kvmd/apps/vnc/__init__.py b/kvmd/apps/vnc/__init__.py index a5d3c781..f41989d1 100644 --- a/kvmd/apps/vnc/__init__.py +++ b/kvmd/apps/vnc/__init__.py @@ -21,7 +21,7 @@ from ...clients.kvmd import KvmdClient -from ...clients.streamer import StreamFormats +from ...clients.streamer import StreamerFormats from ...clients.streamer import BaseStreamerClient from ...clients.streamer import HttpStreamerClient from ...clients.streamer import MemsinkStreamerClient @@ -51,8 +51,8 @@ def main(argv: (list[str] | None)=None) -> None: return None streamers: list[BaseStreamerClient] = list(filter(None, [ - make_memsink_streamer("h264", StreamFormats.H264), - make_memsink_streamer("jpeg", StreamFormats.JPEG), + make_memsink_streamer("h264", StreamerFormats.H264), + make_memsink_streamer("jpeg", StreamerFormats.JPEG), HttpStreamerClient(name="JPEG", user_agent=user_agent, **config.streamer._unpack()), ])) diff --git a/kvmd/apps/vnc/server.py b/kvmd/apps/vnc/server.py index a6e89311..c14bb21f 100644 --- a/kvmd/apps/vnc/server.py +++ b/kvmd/apps/vnc/server.py @@ -42,7 +42,7 @@ from ...clients.kvmd import KvmdClient from ...clients.streamer import StreamerError from ...clients.streamer import StreamerPermError -from ...clients.streamer import StreamFormats +from ...clients.streamer import StreamerFormats from ...clients.streamer import BaseStreamerClient from ... import tools @@ -222,8 +222,8 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes def __get_preferred_streamer(self) -> BaseStreamerClient: formats = { - StreamFormats.JPEG: "has_tight", - StreamFormats.H264: "has_h264", + StreamerFormats.JPEG: "has_tight", + StreamerFormats.H264: "has_h264", } streamer: (BaseStreamerClient | None) = None for streamer in self.__streamers: @@ -249,7 +249,7 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes "data": (await make_text_jpeg(self._width, self._height, self._encodings.tight_jpeg_quality, text)), "width": self._width, "height": self._height, - "format": StreamFormats.JPEG, + "format": StreamerFormats.JPEG, } async def __fb_sender_task_loop(self) -> None: # pylint: disable=too-many-branches @@ -259,21 +259,21 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes frame = await self.__fb_queue.get() if ( last is None # pylint: disable=too-many-boolean-expressions - or frame["format"] == StreamFormats.JPEG + or frame["format"] == StreamerFormats.JPEG or last["format"] != frame["format"] - or (frame["format"] == StreamFormats.H264 and ( + or (frame["format"] == StreamerFormats.H264 and ( frame["key"] or last["width"] != frame["width"] or last["height"] != frame["height"] or len(last["data"]) + len(frame["data"]) > 4194304 )) ): - self.__fb_has_key = (frame["format"] == StreamFormats.H264 and frame["key"]) + self.__fb_has_key = (frame["format"] == StreamerFormats.H264 and frame["key"]) last = frame if self.__fb_queue.qsize() == 0: break continue - assert frame["format"] == StreamFormats.H264 + assert frame["format"] == StreamerFormats.H264 last["data"] += frame["data"] if self.__fb_queue.qsize() == 0: break @@ -295,9 +295,9 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes await self._send_fb_allow_again() continue - if last["format"] == StreamFormats.JPEG: + if last["format"] == StreamerFormats.JPEG: await self._send_fb_jpeg(last["data"]) - elif last["format"] == StreamFormats.H264: + elif last["format"] == StreamerFormats.H264: if not self._encodings.has_h264: raise RfbError("The client doesn't want to accept H264 anymore") if self.__fb_has_key: diff --git a/kvmd/clients/__init__.py b/kvmd/clients/__init__.py index 8d45fdfd..e917c9f6 100644 --- a/kvmd/clients/__init__.py +++ b/kvmd/clients/__init__.py @@ -18,3 +18,67 @@ # along with this program. If not, see <https://www.gnu.org/licenses/>. # # # # ========================================================================== # + + +import types + +from typing import Callable +from typing import Self + +import aiohttp + + +# ===== +class BaseHttpClientSession: + def __init__(self, make_http_session: Callable[[], aiohttp.ClientSession]) -> None: + self._make_http_session = make_http_session + self.__http_session: (aiohttp.ClientSession | None) = None + + def _ensure_http_session(self) -> aiohttp.ClientSession: + if not self.__http_session: + self.__http_session = self._make_http_session() + return self.__http_session + + async def close(self) -> None: + if self.__http_session: + await self.__http_session.close() + self.__http_session = None + + async def __aenter__(self) -> Self: + return self + + async def __aexit__( + self, + _exc_type: type[BaseException], + _exc: BaseException, + _tb: types.TracebackType, + ) -> None: + + await self.close() + + +class BaseHttpClient: + def __init__( + self, + unix_path: str, + timeout: float, + user_agent: str, + ) -> None: + + self.__unix_path = unix_path + self.__timeout = timeout + self.__user_agent = user_agent + + def make_session(self) -> BaseHttpClientSession: + raise NotImplementedError + + def _make_http_session(self, headers: (dict[str, str] | None)=None) -> aiohttp.ClientSession: + return aiohttp.ClientSession( + base_url="http://localhost:0", + headers={ + "User-Agent": self.__user_agent, + **(headers or {}), + }, + connector=aiohttp.UnixConnector(path=self.__unix_path), + timeout=aiohttp.ClientTimeout(total=self.__timeout), + ) diff --git a/kvmd/clients/kvmd.py b/kvmd/clients/kvmd.py index ddf0d024..a0412ce2 100644 --- a/kvmd/clients/kvmd.py +++ b/kvmd/clients/kvmd.py @@ -23,7 +23,6 @@ import asyncio import contextlib import struct -import types from typing import Callable from typing import AsyncGenerator @@ -34,22 +33,19 @@ from .. import aiotools from .. import htclient from .. import htserver +from . import BaseHttpClient +from . import BaseHttpClientSession + # ===== class _BaseApiPart: - def __init__( - self, - ensure_http_session: Callable[[], aiohttp.ClientSession], - make_url: Callable[[str], str], - ) -> None: - + def __init__(self, ensure_http_session: Callable[[], aiohttp.ClientSession]) -> None: self._ensure_http_session = ensure_http_session - self._make_url = make_url async def _set_params(self, handle: str, **params: (int | str | None)) -> None: session = self._ensure_http_session() async with session.post( - url=self._make_url(handle), + url=handle, params={ key: value for (key, value) in params.items() @@ -63,7 +59,7 @@ class _AuthApiPart(_BaseApiPart): async def check(self) -> bool: session = self._ensure_http_session() try: - async with session.get(self._make_url("auth/check")) as response: + async with session.get("/auth/check") as response: htclient.raise_not_200(response) return True except aiohttp.ClientResponseError as ex: @@ -75,13 +71,13 @@ class _AuthApiPart(_BaseApiPart): class _StreamerApiPart(_BaseApiPart): async def get_state(self) -> dict: session = self._ensure_http_session() - async with session.get(self._make_url("streamer")) as response: + async with session.get("/streamer") as response: htclient.raise_not_200(response) return (await response.json())["result"] async def set_params(self, quality: (int | None)=None, desired_fps: (int | None)=None) -> None: await self._set_params( - "streamer/set_params", + "/streamer/set_params", quality=quality, desired_fps=desired_fps, ) @@ -90,7 +86,7 @@ class _StreamerApiPart(_BaseApiPart): class _HidApiPart(_BaseApiPart): async def get_keymaps(self) -> tuple[str, set[str]]: session = self._ensure_http_session() - async with session.get(self._make_url("hid/keymaps")) as response: + async with session.get("/hid/keymaps") as response: htclient.raise_not_200(response) result = (await response.json())["result"] return (result["keymaps"]["default"], set(result["keymaps"]["available"])) @@ -98,7 +94,7 @@ class _HidApiPart(_BaseApiPart): async def print(self, text: str, limit: int, keymap_name: str) -> None: session = self._ensure_http_session() async with session.post( - url=self._make_url("hid/print"), + url="/hid/print", params={"limit": limit, "keymap": keymap_name}, data=text, ) as response: @@ -106,7 +102,7 @@ class _HidApiPart(_BaseApiPart): async def set_params(self, keyboard_output: (str | None)=None, mouse_output: (str | None)=None) -> None: await self._set_params( - "hid/set_params", + "/hid/set_params", keyboard_output=keyboard_output, mouse_output=mouse_output, ) @@ -115,7 +111,7 @@ class _HidApiPart(_BaseApiPart): class _AtxApiPart(_BaseApiPart): async def get_state(self) -> dict: session = self._ensure_http_session() - async with session.get(self._make_url("atx")) as response: + async with session.get("/atx") as response: htclient.raise_not_200(response) return (await response.json())["result"] @@ -123,7 +119,7 @@ class _AtxApiPart(_BaseApiPart): session = self._ensure_http_session() try: async with session.post( - url=self._make_url("atx/power"), + url="/atx/power", params={"action": action}, ) as response: htclient.raise_not_200(response) @@ -138,7 +134,6 @@ class _AtxApiPart(_BaseApiPart): class KvmdClientWs: def __init__(self, ws: aiohttp.ClientWebSocketResponse) -> None: self.__ws = ws - self.__writer_queue: "asyncio.Queue[tuple[str, dict] | bytes]" = asyncio.Queue() self.__communicated = False @@ -200,83 +195,25 @@ class KvmdClientWs: await self.__writer_queue.put(struct.pack(">bbbb", 5, 0, delta_x, delta_y)) -class KvmdClientSession: - def __init__( - self, - make_http_session: Callable[[], aiohttp.ClientSession], - make_url: Callable[[str], str], - ) -> None: - - self.__make_http_session = make_http_session - self.__make_url = make_url - - self.__http_session: (aiohttp.ClientSession | None) = None - - args = (self.__ensure_http_session, make_url) - - self.auth = _AuthApiPart(*args) - self.streamer = _StreamerApiPart(*args) - self.hid = _HidApiPart(*args) - self.atx = _AtxApiPart(*args) +class KvmdClientSession(BaseHttpClientSession): + def __init__(self, make_http_session: Callable[[], aiohttp.ClientSession]) -> None: + super().__init__(make_http_session) + self.auth = _AuthApiPart(self._ensure_http_session) + self.streamer = _StreamerApiPart(self._ensure_http_session) + self.hid = _HidApiPart(self._ensure_http_session) + self.atx = _AtxApiPart(self._ensure_http_session) @contextlib.asynccontextmanager async def ws(self) -> AsyncGenerator[KvmdClientWs, None]: - session = self.__ensure_http_session() - async with session.ws_connect(self.__make_url("ws"), params={"legacy": 0}) as ws: + session = self._ensure_http_session() + async with session.ws_connect("/ws", params={"legacy": "0"}) as ws: yield KvmdClientWs(ws) - def __ensure_http_session(self) -> aiohttp.ClientSession: - if not self.__http_session: - self.__http_session = self.__make_http_session() - return self.__http_session - - async def close(self) -> None: - if self.__http_session: - await self.__http_session.close() - self.__http_session = None - - async def __aenter__(self) -> "KvmdClientSession": - return self - - async def __aexit__( - self, - _exc_type: type[BaseException], - _exc: BaseException, - _tb: types.TracebackType, - ) -> None: - - await self.close() - - -class KvmdClient: - def __init__( - self, - unix_path: str, - timeout: float, - user_agent: str, - ) -> None: - - self.__unix_path = unix_path - self.__timeout = timeout - self.__user_agent = user_agent - - def make_session(self, user: str, passwd: str) -> KvmdClientSession: - return KvmdClientSession( - make_http_session=(lambda: self.__make_http_session(user, passwd)), - make_url=self.__make_url, - ) - - def __make_http_session(self, user: str, passwd: str) -> aiohttp.ClientSession: - return aiohttp.ClientSession( - headers={ - "X-KVMD-User": user, - "X-KVMD-Passwd": passwd, - "User-Agent": self.__user_agent, - }, - connector=aiohttp.UnixConnector(path=self.__unix_path), - timeout=aiohttp.ClientTimeout(total=self.__timeout), - ) - def __make_url(self, handle: str) -> str: - assert not handle.startswith("/"), handle - return f"http://localhost:0/{handle}" +class KvmdClient(BaseHttpClient): + def make_session(self, user: str="", passwd: str="") -> KvmdClientSession: + headers = { + "X-KVMD-User": user, + "X-KVMD-Passwd": passwd, + } + return KvmdClientSession(lambda: self._make_http_session(headers)) diff --git a/kvmd/clients/streamer.py b/kvmd/clients/streamer.py index fdc855bb..5369892e 100644 --- a/kvmd/clients/streamer.py +++ b/kvmd/clients/streamer.py @@ -20,7 +20,10 @@ # ========================================================================== # +import io import contextlib +import dataclasses +import functools import types from typing import Callable @@ -31,10 +34,15 @@ from typing import AsyncGenerator import aiohttp import ustreamer +from PIL import Image as PilImage + from .. import tools from .. import aiotools from .. import htclient +from . import BaseHttpClient +from . import BaseHttpClientSession + # ===== class StreamerError(Exception): @@ -50,7 +58,7 @@ class StreamerPermError(StreamerError): # ===== -class StreamFormats: +class StreamerFormats: JPEG = 1195724874 # V4L2_PIX_FMT_JPEG H264 = 875967048 # V4L2_PIX_FMT_H264 _MJPEG = 1196444237 # V4L2_PIX_FMT_MJPEG @@ -68,8 +76,76 @@ class BaseStreamerClient: # ===== [email protected](frozen=True) +class StreamerSnapshot: + online: bool + width: int + height: int + headers: tuple[tuple[str, str], ...] + data: bytes + + async def make_preview(self, max_width: int, max_height: int, quality: int) -> bytes: + assert max_width >= 0 + assert max_height >= 0 + assert quality > 0 + + if max_width == 0 and max_height == 0: + max_width = self.width // 5 + max_height = self.height // 5 + else: + max_width = min((max_width or self.width), self.width) + max_height = min((max_height or self.height), self.height) + + if (max_width, max_height) == (self.width, self.height): + return self.data + return (await aiotools.run_async(self.__inner_make_preview, max_width, max_height, quality)) + + @functools.lru_cache(maxsize=1) + def __inner_make_preview(self, max_width: int, max_height: int, quality: int) -> bytes: + with io.BytesIO(self.data) as snapshot_bio: + with io.BytesIO() as preview_bio: + with PilImage.open(snapshot_bio) as image: + image.thumbnail((max_width, max_height), PilImage.Resampling.LANCZOS) + image.save(preview_bio, format="jpeg", quality=quality) + return preview_bio.getvalue() + + +class HttpStreamerClientSession(BaseHttpClientSession): + async def get_state(self) -> dict: + session = self._ensure_http_session() + async with session.get("/state") as response: + htclient.raise_not_200(response) + return (await response.json())["result"] + + async def take_snapshot(self, timeout: float) -> StreamerSnapshot: + session = self._ensure_http_session() + async with session.get( + url="/snapshot", + timeout=aiohttp.ClientTimeout(total=timeout), + ) as response: + + htclient.raise_not_200(response) + return StreamerSnapshot( + online=(response.headers["X-UStreamer-Online"] == "true"), + width=int(response.headers["X-UStreamer-Width"]), + height=int(response.headers["X-UStreamer-Height"]), + headers=tuple( + (key, value) + for (key, value) in tools.sorted_kvs(dict(response.headers)) + if key.lower().startswith("x-ustreamer-") or key.lower() in [ + "x-timestamp", + "access-control-allow-origin", + "cache-control", + "pragma", + "expires", + ] + ), + data=bytes(await response.read()), + ) + + @contextlib.contextmanager -def _http_handle_errors() -> Generator[None, None, None]: +def _http_reading_handle_errors() -> Generator[None, None, None]: try: yield except Exception as ex: # Тут бывают и ассерты, и KeyError, и прочая херня @@ -78,7 +154,7 @@ def _http_handle_errors() -> Generator[None, None, None]: raise StreamerTempError(tools.efmt(ex)) -class HttpStreamerClient(BaseStreamerClient): +class HttpStreamerClient(BaseHttpClient, BaseStreamerClient): def __init__( self, name: str, @@ -87,29 +163,35 @@ class HttpStreamerClient(BaseStreamerClient): user_agent: str, ) -> None: + super().__init__(unix_path, timeout, user_agent) self.__name = name - self.__unix_path = unix_path - self.__timeout = timeout - self.__user_agent = user_agent + + def make_session(self) -> HttpStreamerClientSession: + return HttpStreamerClientSession(self._make_http_session) def get_format(self) -> int: - return StreamFormats.JPEG + return StreamerFormats.JPEG @contextlib.asynccontextmanager async def reading(self) -> AsyncGenerator[Callable[[bool], Awaitable[dict]], None]: - with _http_handle_errors(): - async with self.__make_http_session() as session: + with _http_reading_handle_errors(): + async with self._make_http_session() as session: async with session.get( - url=self.__make_url("stream"), + url="/stream", params={"extra_headers": "1"}, + timeout=aiohttp.ClientTimeout( + connect=session.timeout.total, + sock_read=session.timeout.total, + ), ) as response: + htclient.raise_not_200(response) reader = aiohttp.MultipartReader.from_response(response) self.__patch_stream_reader(reader.resp.content) async def read_frame(key_required: bool) -> dict: _ = key_required - with _http_handle_errors(): + with _http_reading_handle_errors(): frame = await reader.next() # pylint: disable=not-callable if not isinstance(frame, aiohttp.BodyPartReader): raise StreamerTempError("Expected body part") @@ -123,26 +205,11 @@ class HttpStreamerClient(BaseStreamerClient): "width": int(frame.headers["X-UStreamer-Width"]), "height": int(frame.headers["X-UStreamer-Height"]), "data": data, - "format": StreamFormats.JPEG, + "format": StreamerFormats.JPEG, } yield read_frame - def __make_http_session(self) -> aiohttp.ClientSession: - kwargs: dict = { - "headers": {"User-Agent": self.__user_agent}, - "connector": aiohttp.UnixConnector(path=self.__unix_path), - "timeout": aiohttp.ClientTimeout( - connect=self.__timeout, - sock_read=self.__timeout, - ), - } - return aiohttp.ClientSession(**kwargs) - - def __make_url(self, handle: str) -> str: - assert not handle.startswith("/"), handle - return f"http://localhost:0/{handle}" - def __patch_stream_reader(self, reader: aiohttp.StreamReader) -> None: # https://github.com/pikvm/pikvm/issues/92 # Infinite looping in BodyPartReader.read() because _at_eof flag. @@ -162,7 +229,7 @@ class HttpStreamerClient(BaseStreamerClient): # ===== @contextlib.contextmanager -def _memsink_handle_errors() -> Generator[None, None, None]: +def _memsink_reading_handle_errors() -> Generator[None, None, None]: try: yield except StreamerPermError: @@ -198,11 +265,11 @@ class MemsinkStreamerClient(BaseStreamerClient): @contextlib.asynccontextmanager async def reading(self) -> AsyncGenerator[Callable[[bool], Awaitable[dict]], None]: - with _memsink_handle_errors(): + with _memsink_reading_handle_errors(): with ustreamer.Memsink(**self.__kwargs) as sink: async def read_frame(key_required: bool) -> dict: - key_required = (key_required and self.__fmt == StreamFormats.H264) - with _memsink_handle_errors(): + key_required = (key_required and self.__fmt == StreamerFormats.H264) + with _memsink_reading_handle_errors(): while True: frame = await aiotools.run_async(sink.wait_frame, key_required) if frame is not None: @@ -211,8 +278,8 @@ class MemsinkStreamerClient(BaseStreamerClient): yield read_frame def __check_format(self, fmt: int) -> None: - if fmt == StreamFormats._MJPEG: # pylint: disable=protected-access - fmt = StreamFormats.JPEG + if fmt == StreamerFormats._MJPEG: # pylint: disable=protected-access + fmt = StreamerFormats.JPEG if fmt != self.__fmt: raise StreamerPermError("Invalid sink format") |