diff options
Diffstat (limited to 'kvmd/clients')
-rw-r--r-- | kvmd/clients/streamer.py | 113 |
1 files changed, 66 insertions, 47 deletions
diff --git a/kvmd/clients/streamer.py b/kvmd/clients/streamer.py index b9847679..1d8cd601 100644 --- a/kvmd/clients/streamer.py +++ b/kvmd/clients/streamer.py @@ -20,16 +20,16 @@ # ========================================================================== # +import contextlib import types +from typing import Callable +from typing import Awaitable +from typing import Generator from typing import AsyncGenerator import aiohttp - -try: - import ustreamer -except ImportError: - ustreamer = None +import ustreamer from .. import tools from .. import aiotools @@ -60,12 +60,24 @@ class BaseStreamerClient: def get_format(self) -> int: raise NotImplementedError() - async def read_stream(self) -> AsyncGenerator[dict, None]: + @contextlib.asynccontextmanager + async def reading(self) -> AsyncGenerator[Callable[[bool], Awaitable[dict]], None]: if self is not None: # XXX: Vulture and pylint hack raise NotImplementedError() yield +# ===== +def _http_handle_errors() -> Generator[None, None, None]: + try: + yield + except Exception as err: # Тут бывают и ассерты, и KeyError, и прочая херня + if isinstance(err, StreamerTempError): + raise + raise StreamerTempError(tools.efmt(err)) + + class HttpStreamerClient(BaseStreamerClient): def __init__( self, @@ -83,8 +95,9 @@ class HttpStreamerClient(BaseStreamerClient): def get_format(self) -> int: return StreamFormats.JPEG - async def read_stream(self) -> AsyncGenerator[dict, None]: - try: + @contextlib.asynccontextmanager + async def reading(self) -> AsyncGenerator[Callable[[bool], Awaitable[dict]], None]: + with _http_handle_errors(): async with self.__make_http_session() as session: async with session.get( url=self.__make_url("stream"), @@ -94,27 +107,26 @@ class HttpStreamerClient(BaseStreamerClient): reader = aiohttp.MultipartReader.from_response(response) self.__patch_stream_reader(reader.resp.content) - while True: - frame = await reader.next() # pylint: disable=not-callable - if not isinstance(frame, aiohttp.BodyPartReader): - raise StreamerTempError("Expected body part") - - data = bytes(await frame.read()) - if not data: - break - - yield { - "online": (frame.headers["X-UStreamer-Online"] == "true"), - "width": int(frame.headers["X-UStreamer-Width"]), - "height": int(frame.headers["X-UStreamer-Height"]), - "data": data, - "format": StreamFormats.JPEG, - } - except Exception as err: # Тут бывают и ассерты, и KeyError, и прочая херня - if isinstance(err, StreamerTempError): - raise - raise StreamerTempError(tools.efmt(err)) - raise StreamerTempError("Reached EOF") + async def read_frame(key_required: bool) -> dict: + _ = key_required + with _http_handle_errors(): + frame = await reader.next() # pylint: disable=not-callable + if not isinstance(frame, aiohttp.BodyPartReader): + raise StreamerTempError("Expected body part") + + data = bytes(await frame.read()) + if not data: + raise StreamerTempError("Reached EOF") + + return { + "online": (frame.headers["X-UStreamer-Online"] == "true"), + "width": int(frame.headers["X-UStreamer-Width"]), + "height": int(frame.headers["X-UStreamer-Height"]), + "data": data, + "format": StreamFormats.JPEG, + } + + yield read_frame def __make_http_session(self) -> aiohttp.ClientSession: kwargs: dict = { @@ -148,6 +160,19 @@ class HttpStreamerClient(BaseStreamerClient): return f"HttpStreamerClient({self.__name})" +# ===== +def _memsink_handle_errors() -> Generator[None, None, None]: + try: + yield + except StreamerPermError: + raise + except FileNotFoundError as err: + raise StreamerTempError(tools.efmt(err)) + except Exception as err: + raise StreamerPermError(tools.efmt(err)) + + class MemsinkStreamerClient(BaseStreamerClient): def __init__( self, @@ -171,25 +196,19 @@ class MemsinkStreamerClient(BaseStreamerClient): def get_format(self) -> int: return self.__fmt - async def read_stream(self) -> AsyncGenerator[dict, None]: - if ustreamer is None: - raise StreamerPermError("Missing ustreamer library") - try: + @contextlib.asynccontextmanager + async def reading(self) -> AsyncGenerator[Callable[[bool], Awaitable[dict]], None]: + with _memsink_handle_errors(): with ustreamer.Memsink(**self.__kwargs) as sink: - key_required = (self.__fmt == StreamFormats.H264) - while True: - frame = await aiotools.run_async(sink.wait_frame, key_required) - if frame is not None: - self.__check_format(frame["format"]) - if frame["key"]: - key_required = False - yield frame - except StreamerPermError: - raise - except FileNotFoundError as err: - raise StreamerTempError(tools.efmt(err)) - except Exception as err: - raise StreamerPermError(tools.efmt(err)) + async def read_frame(key_required: bool) -> dict: + key_required = (key_required and self.__fmt == StreamFormats.H264) + with _memsink_handle_errors(): + while True: + frame = await aiotools.run_async(sink.wait_frame, key_required) + if frame is not None: + self.__check_format(frame["format"]) + return frame + yield read_frame def __check_format(self, fmt: int) -> None: if fmt == StreamFormats._MJPEG: # pylint: disable=protected-access |