diff options
Diffstat (limited to 'kvmd/clients')
-rw-r--r-- | kvmd/clients/streamer.py | 65 |
1 files changed, 59 insertions, 6 deletions
diff --git a/kvmd/clients/streamer.py b/kvmd/clients/streamer.py index 1a86b4f9..a955b407 100644 --- a/kvmd/clients/streamer.py +++ b/kvmd/clients/streamer.py @@ -28,6 +28,12 @@ from typing import AsyncGenerator import aiohttp +try: + import ustreamer +except ImportError: + ustreamer = None + +from .. import aiotools from .. import htclient @@ -36,6 +42,22 @@ class StreamerError(Exception): pass +class StreamerTempError(StreamerError): + pass + + +class StreamerPermError(StreamerError): + pass + + +# ===== +class BaseStreamerClient: + async def read_stream(self) -> AsyncGenerator[Tuple[bool, int, int, bytes], None]: + if self is not None: # XXX: Vulture and pylint hack + raise NotImplementedError() + yield + + # ===== def _patch_stream_reader(reader: aiohttp.StreamReader) -> None: # https://github.com/pikvm/pikvm/issues/92 @@ -45,13 +67,13 @@ def _patch_stream_reader(reader: aiohttp.StreamReader) -> None: async def read(self: aiohttp.StreamReader, n: int=-1) -> bytes: # pylint: disable=invalid-name if self.is_eof(): - raise StreamerError("StreamReader.read(): Reached EOF") + raise StreamerTempError("StreamReader.read(): Reached EOF") return (await orig_read(n)) reader.read = types.MethodType(read, reader) # type: ignore -class StreamerClient: +class StreamerHttpClient(BaseStreamerClient): def __init__( self, host: str, @@ -82,7 +104,7 @@ class StreamerClient: while True: frame = await reader.next() # pylint: disable=not-callable if not isinstance(frame, aiohttp.BodyPartReader): - raise StreamerError("Expected body part") + raise StreamerTempError("Expected body part") data = bytes(await frame.read()) if not data: @@ -95,10 +117,10 @@ class StreamerClient: data, ) except Exception as err: # Тут бывают и ассерты, и KeyError, и прочая херня - if isinstance(err, StreamerError): + if isinstance(err, StreamerTempError): raise - raise StreamerError(f"{type(err).__name__}: {err}") - raise StreamerError("Reached EOF") + raise StreamerTempError(f"{type(err).__name__}: {err}") + raise StreamerTempError("Reached EOF") def __make_http_session(self) -> aiohttp.ClientSession: kwargs: Dict = { @@ -115,3 +137,34 @@ class StreamerClient: def __make_url(self, handle: str) -> str: assert not handle.startswith("/"), handle return f"http://{self.__host}:{self.__port}/{handle}" + + +class StreamerMemsinkClient(BaseStreamerClient): + def __init__( + self, + obj: str, + lock_timeout: float, + wait_timeout: float, + drop_same_frames: float, + ) -> None: + + self.__kwargs: Dict = { + "obj": obj, + "lock_timeout": lock_timeout, + "wait_timeout": wait_timeout, + "drop_same_frames": drop_same_frames, + } + + async def read_stream(self) -> AsyncGenerator[Tuple[bool, int, int, bytes], None]: + if ustreamer is None: + raise StreamerPermError("Missing ustreamer library") + try: + with ustreamer.Memsink(**self.__kwargs) as sink: + while True: + frame = await aiotools.run_async(sink.wait_frame) + if frame is not None: + yield (frame["online"], frame["width"], frame["height"], frame["data"]) + except FileNotFoundError as err: + raise StreamerTempError(f"{type(err).__name__}: {err}") + except Exception as err: + raise StreamerPermError(f"{type(err).__name__}: {err}") |