diff options
-rw-r--r-- | PKGBUILD | 2 | ||||
-rw-r--r-- | kvmd/apps/__init__.py | 9 | ||||
-rw-r--r-- | kvmd/apps/vnc/__init__.py | 9 | ||||
-rw-r--r-- | kvmd/apps/vnc/rfb/__init__.py | 2 | ||||
-rw-r--r-- | kvmd/apps/vnc/server.py | 37 | ||||
-rw-r--r-- | kvmd/clients/streamer.py | 65 | ||||
-rw-r--r-- | testenv/Dockerfile | 2 |
7 files changed, 105 insertions, 21 deletions
@@ -67,7 +67,7 @@ depends=( iproute2 dnsmasq "raspberrypi-io-access>=0.5" - "ustreamer>=1.19" + "ustreamer>=3.12" # Avoid dhcpcd stack trace dhclient diff --git a/kvmd/apps/__init__.py b/kvmd/apps/__init__.py index a4a41403..517cff72 100644 --- a/kvmd/apps/__init__.py +++ b/kvmd/apps/__init__.py @@ -549,6 +549,15 @@ def _get_config_scheme() -> Dict: "timeout": Option(5.0, type=valid_float_f01), }, + "memsink": { + "jpeg": { + "sink": Option("", unpack_as="obj"), + "lock_timeout": Option(1.0, type=valid_float_f01), + "wait_timeout": Option(1.0, type=valid_float_f01), + "drop_same_frames": Option(1.0, type=valid_float_f0), + }, + }, + "auth": { "vncauth": { "enabled": Option(False, type=valid_bool), diff --git a/kvmd/apps/vnc/__init__.py b/kvmd/apps/vnc/__init__.py index f252e404..96199fca 100644 --- a/kvmd/apps/vnc/__init__.py +++ b/kvmd/apps/vnc/__init__.py @@ -24,7 +24,8 @@ from typing import List from typing import Optional from ...clients.kvmd import KvmdClient -from ...clients.streamer import StreamerClient +from ...clients.streamer import StreamerHttpClient +from ...clients.streamer import StreamerMemsinkClient from ... import htclient @@ -62,10 +63,14 @@ def main(argv: Optional[List[str]]=None) -> None: user_agent=user_agent, **config.kvmd._unpack(), ), - streamer=StreamerClient( + streamer_http=StreamerHttpClient( user_agent=user_agent, **config.streamer._unpack(), ), + streamer_memsink_jpeg=( + StreamerMemsinkClient(**config.memsink.jpeg._unpack()) + if config.memsink.jpeg.sink else None + ), vnc_auth_manager=VncAuthManager(**config.auth.vncauth._unpack()), **config.server.keepalive._unpack(), diff --git a/kvmd/apps/vnc/rfb/__init__.py b/kvmd/apps/vnc/rfb/__init__.py index 8ca93c8a..ce2b0daf 100644 --- a/kvmd/apps/vnc/rfb/__init__.py +++ b/kvmd/apps/vnc/rfb/__init__.py @@ -151,7 +151,7 @@ class RfbClient(RfbClientStream): # pylint: disable=too-many-instance-attribute # ===== - async def _send_fb(self, jpeg: bytes) -> None: + async def _send_fb_jpeg(self, jpeg: bytes) -> None: assert self._encodings.has_tight assert self._encodings.tight_jpeg_quality > 0 assert len(jpeg) <= 4194303, len(jpeg) diff --git a/kvmd/apps/vnc/server.py b/kvmd/apps/vnc/server.py index ece2fcea..14617311 100644 --- a/kvmd/apps/vnc/server.py +++ b/kvmd/apps/vnc/server.py @@ -45,7 +45,10 @@ from ...clients.kvmd import KvmdClientSession from ...clients.kvmd import KvmdClient from ...clients.streamer import StreamerError -from ...clients.streamer import StreamerClient +from ...clients.streamer import StreamerPermError +from ...clients.streamer import BaseStreamerClient +from ...clients.streamer import StreamerHttpClient +from ...clients.streamer import StreamerMemsinkClient from .rfb import RfbClient from .rfb.stream import rfb_format_remote @@ -79,7 +82,8 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes symmap: Dict[int, Dict[int, str]], kvmd: KvmdClient, - streamer: StreamerClient, + streamer_http: StreamerHttpClient, + streamer_memsink_jpeg: Optional[StreamerMemsinkClient], vnc_credentials: Dict[str, VncAuthKvmdCredentials], none_auth_only: bool, @@ -103,7 +107,8 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes self.__symmap = symmap self.__kvmd = kvmd - self.__streamer = streamer + self.__streamer_http = streamer_http + self.__streamer_memsink_jpeg = streamer_memsink_jpeg self.__shared_params = shared_params @@ -178,19 +183,29 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes async def __streamer_task_loop(self) -> None: logger = get_logger(0) await self.__ws_connected + + name = "streamer_http" + streamer: BaseStreamerClient = self.__streamer_http + if self.__streamer_memsink_jpeg: + (name, streamer) = ("streamer_memsink_jpeg", self.__streamer_memsink_jpeg) + while True: try: streaming = False - async for (online, width, height, jpeg) in self.__streamer.read_stream(): + async for (online, width, height, jpeg) in streamer.read_stream(): if not streaming: - logger.info("[streamer] %s: Streaming ...", self._remote) + logger.info("[%s] %s: Streaming ...", name, self._remote) streaming = True if online: await self.__send_fb_real(width, height, jpeg) else: await self.__send_fb_stub("No signal") except StreamerError as err: - logger.info("[streamer] %s: Waiting for stream: %s", self._remote, err) + if isinstance(err, StreamerPermError): + logger.info("[%s] %s: Permanent error: %s; switching to HTTP ...", name, self._remote, err) + (name, streamer) = ("streamer_http", self.__streamer_http) + else: + logger.info("[%s] %s: Waiting for stream: %s", name, self._remote, err) await self.__send_fb_stub("Waiting for stream ...") await asyncio.sleep(1) @@ -205,7 +220,7 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes await self.__send_fb_stub(msg, no_lock=True) return await self._send_resize(width, height) - await self._send_fb(jpeg) + await self._send_fb_jpeg(jpeg) self.__fb_stub_text = "" self.__fb_stub_quality = 0 self.__fb_requested = False @@ -215,7 +230,7 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes await self.__lock.acquire() try: if self.__fb_requested and (self.__fb_stub_text != text or self.__fb_stub_quality != self._encodings.tight_jpeg_quality): - await self._send_fb(await make_text_jpeg(self._width, self._height, self._encodings.tight_jpeg_quality, text)) + await self._send_fb_jpeg(await make_text_jpeg(self._width, self._height, self._encodings.tight_jpeg_quality, text)) self.__fb_stub_text = text self.__fb_stub_quality = self._encodings.tight_jpeg_quality self.__fb_requested = False @@ -344,7 +359,8 @@ class VncServer: # pylint: disable=too-many-instance-attributes keymap_path: str, kvmd: KvmdClient, - streamer: StreamerClient, + streamer_http: StreamerHttpClient, + streamer_memsink_jpeg: Optional[StreamerMemsinkClient], vnc_auth_manager: VncAuthManager, ) -> None: @@ -393,7 +409,8 @@ class VncServer: # pylint: disable=too-many-instance-attributes keymap_name=keymap_name, symmap=symmap, kvmd=kvmd, - streamer=streamer, + streamer_http=streamer_http, + streamer_memsink_jpeg=streamer_memsink_jpeg, vnc_credentials=(await self.__vnc_auth_manager.read_credentials())[0], none_auth_only=none_auth_only, shared_params=shared_params, diff --git a/kvmd/clients/streamer.py b/kvmd/clients/streamer.py index 1a86b4f9..a955b407 100644 --- a/kvmd/clients/streamer.py +++ b/kvmd/clients/streamer.py @@ -28,6 +28,12 @@ from typing import AsyncGenerator import aiohttp +try: + import ustreamer +except ImportError: + ustreamer = None + +from .. import aiotools from .. import htclient @@ -36,6 +42,22 @@ class StreamerError(Exception): pass +class StreamerTempError(StreamerError): + pass + + +class StreamerPermError(StreamerError): + pass + + +# ===== +class BaseStreamerClient: + async def read_stream(self) -> AsyncGenerator[Tuple[bool, int, int, bytes], None]: + if self is not None: # XXX: Vulture and pylint hack + raise NotImplementedError() + yield + + # ===== def _patch_stream_reader(reader: aiohttp.StreamReader) -> None: # https://github.com/pikvm/pikvm/issues/92 @@ -45,13 +67,13 @@ def _patch_stream_reader(reader: aiohttp.StreamReader) -> None: async def read(self: aiohttp.StreamReader, n: int=-1) -> bytes: # pylint: disable=invalid-name if self.is_eof(): - raise StreamerError("StreamReader.read(): Reached EOF") + raise StreamerTempError("StreamReader.read(): Reached EOF") return (await orig_read(n)) reader.read = types.MethodType(read, reader) # type: ignore -class StreamerClient: +class StreamerHttpClient(BaseStreamerClient): def __init__( self, host: str, @@ -82,7 +104,7 @@ class StreamerClient: while True: frame = await reader.next() # pylint: disable=not-callable if not isinstance(frame, aiohttp.BodyPartReader): - raise StreamerError("Expected body part") + raise StreamerTempError("Expected body part") data = bytes(await frame.read()) if not data: @@ -95,10 +117,10 @@ class StreamerClient: data, ) except Exception as err: # Тут бывают и ассерты, и KeyError, и прочая херня - if isinstance(err, StreamerError): + if isinstance(err, StreamerTempError): raise - raise StreamerError(f"{type(err).__name__}: {err}") - raise StreamerError("Reached EOF") + raise StreamerTempError(f"{type(err).__name__}: {err}") + raise StreamerTempError("Reached EOF") def __make_http_session(self) -> aiohttp.ClientSession: kwargs: Dict = { @@ -115,3 +137,34 @@ class StreamerClient: def __make_url(self, handle: str) -> str: assert not handle.startswith("/"), handle return f"http://{self.__host}:{self.__port}/{handle}" + + +class StreamerMemsinkClient(BaseStreamerClient): + def __init__( + self, + obj: str, + lock_timeout: float, + wait_timeout: float, + drop_same_frames: float, + ) -> None: + + self.__kwargs: Dict = { + "obj": obj, + "lock_timeout": lock_timeout, + "wait_timeout": wait_timeout, + "drop_same_frames": drop_same_frames, + } + + async def read_stream(self) -> AsyncGenerator[Tuple[bool, int, int, bytes], None]: + if ustreamer is None: + raise StreamerPermError("Missing ustreamer library") + try: + with ustreamer.Memsink(**self.__kwargs) as sink: + while True: + frame = await aiotools.run_async(sink.wait_frame) + if frame is not None: + yield (frame["online"], frame["width"], frame["height"], frame["data"]) + except FileNotFoundError as err: + raise StreamerTempError(f"{type(err).__name__}: {err}") + except Exception as err: + raise StreamerPermError(f"{type(err).__name__}: {err}") diff --git a/testenv/Dockerfile b/testenv/Dockerfile index d146248b..b8fcaf31 100644 --- a/testenv/Dockerfile +++ b/testenv/Dockerfile @@ -64,7 +64,7 @@ ENV USTREAMER_MIN_VERSION $USTREAMER_MIN_VERSION RUN echo $USTREAMER_MIN_VERSION RUN git clone https://github.com/pikvm/ustreamer \ && cd ustreamer \ - && make PREFIX=/usr install \ + && make WITH_PYTHON=1 PREFIX=/usr DESTDIR=/ install \ && cd - \ && rm -rf ustreamer |