summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDevaev Maxim <[email protected]>2021-01-22 04:26:04 +0300
committerDevaev Maxim <[email protected]>2021-01-22 04:26:04 +0300
commitebe40697a5935d5d35a36d0825715ca8c3824c6a (patch)
tree7190cd7443b05efbd07660987b465afbcaf57d02
parent7c39b3facd6cc930dbf8994c75b24f48c71902b5 (diff)
sink source for vnc
-rw-r--r--PKGBUILD2
-rw-r--r--kvmd/apps/__init__.py9
-rw-r--r--kvmd/apps/vnc/__init__.py9
-rw-r--r--kvmd/apps/vnc/rfb/__init__.py2
-rw-r--r--kvmd/apps/vnc/server.py37
-rw-r--r--kvmd/clients/streamer.py65
-rw-r--r--testenv/Dockerfile2
7 files changed, 105 insertions, 21 deletions
diff --git a/PKGBUILD b/PKGBUILD
index 75fd6f9c..03be889c 100644
--- a/PKGBUILD
+++ b/PKGBUILD
@@ -67,7 +67,7 @@ depends=(
iproute2
dnsmasq
"raspberrypi-io-access>=0.5"
- "ustreamer>=1.19"
+ "ustreamer>=3.12"
# Avoid dhcpcd stack trace
dhclient
diff --git a/kvmd/apps/__init__.py b/kvmd/apps/__init__.py
index a4a41403..517cff72 100644
--- a/kvmd/apps/__init__.py
+++ b/kvmd/apps/__init__.py
@@ -549,6 +549,15 @@ def _get_config_scheme() -> Dict:
"timeout": Option(5.0, type=valid_float_f01),
},
+ "memsink": {
+ "jpeg": {
+ "sink": Option("", unpack_as="obj"),
+ "lock_timeout": Option(1.0, type=valid_float_f01),
+ "wait_timeout": Option(1.0, type=valid_float_f01),
+ "drop_same_frames": Option(1.0, type=valid_float_f0),
+ },
+ },
+
"auth": {
"vncauth": {
"enabled": Option(False, type=valid_bool),
diff --git a/kvmd/apps/vnc/__init__.py b/kvmd/apps/vnc/__init__.py
index f252e404..96199fca 100644
--- a/kvmd/apps/vnc/__init__.py
+++ b/kvmd/apps/vnc/__init__.py
@@ -24,7 +24,8 @@ from typing import List
from typing import Optional
from ...clients.kvmd import KvmdClient
-from ...clients.streamer import StreamerClient
+from ...clients.streamer import StreamerHttpClient
+from ...clients.streamer import StreamerMemsinkClient
from ... import htclient
@@ -62,10 +63,14 @@ def main(argv: Optional[List[str]]=None) -> None:
user_agent=user_agent,
**config.kvmd._unpack(),
),
- streamer=StreamerClient(
+ streamer_http=StreamerHttpClient(
user_agent=user_agent,
**config.streamer._unpack(),
),
+ streamer_memsink_jpeg=(
+ StreamerMemsinkClient(**config.memsink.jpeg._unpack())
+ if config.memsink.jpeg.sink else None
+ ),
vnc_auth_manager=VncAuthManager(**config.auth.vncauth._unpack()),
**config.server.keepalive._unpack(),
diff --git a/kvmd/apps/vnc/rfb/__init__.py b/kvmd/apps/vnc/rfb/__init__.py
index 8ca93c8a..ce2b0daf 100644
--- a/kvmd/apps/vnc/rfb/__init__.py
+++ b/kvmd/apps/vnc/rfb/__init__.py
@@ -151,7 +151,7 @@ class RfbClient(RfbClientStream): # pylint: disable=too-many-instance-attribute
# =====
- async def _send_fb(self, jpeg: bytes) -> None:
+ async def _send_fb_jpeg(self, jpeg: bytes) -> None:
assert self._encodings.has_tight
assert self._encodings.tight_jpeg_quality > 0
assert len(jpeg) <= 4194303, len(jpeg)
diff --git a/kvmd/apps/vnc/server.py b/kvmd/apps/vnc/server.py
index ece2fcea..14617311 100644
--- a/kvmd/apps/vnc/server.py
+++ b/kvmd/apps/vnc/server.py
@@ -45,7 +45,10 @@ from ...clients.kvmd import KvmdClientSession
from ...clients.kvmd import KvmdClient
from ...clients.streamer import StreamerError
-from ...clients.streamer import StreamerClient
+from ...clients.streamer import StreamerPermError
+from ...clients.streamer import BaseStreamerClient
+from ...clients.streamer import StreamerHttpClient
+from ...clients.streamer import StreamerMemsinkClient
from .rfb import RfbClient
from .rfb.stream import rfb_format_remote
@@ -79,7 +82,8 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes
symmap: Dict[int, Dict[int, str]],
kvmd: KvmdClient,
- streamer: StreamerClient,
+ streamer_http: StreamerHttpClient,
+ streamer_memsink_jpeg: Optional[StreamerMemsinkClient],
vnc_credentials: Dict[str, VncAuthKvmdCredentials],
none_auth_only: bool,
@@ -103,7 +107,8 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes
self.__symmap = symmap
self.__kvmd = kvmd
- self.__streamer = streamer
+ self.__streamer_http = streamer_http
+ self.__streamer_memsink_jpeg = streamer_memsink_jpeg
self.__shared_params = shared_params
@@ -178,19 +183,29 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes
async def __streamer_task_loop(self) -> None:
logger = get_logger(0)
await self.__ws_connected
+
+ name = "streamer_http"
+ streamer: BaseStreamerClient = self.__streamer_http
+ if self.__streamer_memsink_jpeg:
+ (name, streamer) = ("streamer_memsink_jpeg", self.__streamer_memsink_jpeg)
+
while True:
try:
streaming = False
- async for (online, width, height, jpeg) in self.__streamer.read_stream():
+ async for (online, width, height, jpeg) in streamer.read_stream():
if not streaming:
- logger.info("[streamer] %s: Streaming ...", self._remote)
+ logger.info("[%s] %s: Streaming ...", name, self._remote)
streaming = True
if online:
await self.__send_fb_real(width, height, jpeg)
else:
await self.__send_fb_stub("No signal")
except StreamerError as err:
- logger.info("[streamer] %s: Waiting for stream: %s", self._remote, err)
+ if isinstance(err, StreamerPermError):
+ logger.info("[%s] %s: Permanent error: %s; switching to HTTP ...", name, self._remote, err)
+ (name, streamer) = ("streamer_http", self.__streamer_http)
+ else:
+ logger.info("[%s] %s: Waiting for stream: %s", name, self._remote, err)
await self.__send_fb_stub("Waiting for stream ...")
await asyncio.sleep(1)
@@ -205,7 +220,7 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes
await self.__send_fb_stub(msg, no_lock=True)
return
await self._send_resize(width, height)
- await self._send_fb(jpeg)
+ await self._send_fb_jpeg(jpeg)
self.__fb_stub_text = ""
self.__fb_stub_quality = 0
self.__fb_requested = False
@@ -215,7 +230,7 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes
await self.__lock.acquire()
try:
if self.__fb_requested and (self.__fb_stub_text != text or self.__fb_stub_quality != self._encodings.tight_jpeg_quality):
- await self._send_fb(await make_text_jpeg(self._width, self._height, self._encodings.tight_jpeg_quality, text))
+ await self._send_fb_jpeg(await make_text_jpeg(self._width, self._height, self._encodings.tight_jpeg_quality, text))
self.__fb_stub_text = text
self.__fb_stub_quality = self._encodings.tight_jpeg_quality
self.__fb_requested = False
@@ -344,7 +359,8 @@ class VncServer: # pylint: disable=too-many-instance-attributes
keymap_path: str,
kvmd: KvmdClient,
- streamer: StreamerClient,
+ streamer_http: StreamerHttpClient,
+ streamer_memsink_jpeg: Optional[StreamerMemsinkClient],
vnc_auth_manager: VncAuthManager,
) -> None:
@@ -393,7 +409,8 @@ class VncServer: # pylint: disable=too-many-instance-attributes
keymap_name=keymap_name,
symmap=symmap,
kvmd=kvmd,
- streamer=streamer,
+ streamer_http=streamer_http,
+ streamer_memsink_jpeg=streamer_memsink_jpeg,
vnc_credentials=(await self.__vnc_auth_manager.read_credentials())[0],
none_auth_only=none_auth_only,
shared_params=shared_params,
diff --git a/kvmd/clients/streamer.py b/kvmd/clients/streamer.py
index 1a86b4f9..a955b407 100644
--- a/kvmd/clients/streamer.py
+++ b/kvmd/clients/streamer.py
@@ -28,6 +28,12 @@ from typing import AsyncGenerator
import aiohttp
+try:
+ import ustreamer
+except ImportError:
+ ustreamer = None
+
+from .. import aiotools
from .. import htclient
@@ -36,6 +42,22 @@ class StreamerError(Exception):
pass
+class StreamerTempError(StreamerError):
+ pass
+
+
+class StreamerPermError(StreamerError):
+ pass
+
+
+# =====
+class BaseStreamerClient:
+ async def read_stream(self) -> AsyncGenerator[Tuple[bool, int, int, bytes], None]:
+ if self is not None: # XXX: Vulture and pylint hack
+ raise NotImplementedError()
+ yield
+
+
# =====
def _patch_stream_reader(reader: aiohttp.StreamReader) -> None:
# https://github.com/pikvm/pikvm/issues/92
@@ -45,13 +67,13 @@ def _patch_stream_reader(reader: aiohttp.StreamReader) -> None:
async def read(self: aiohttp.StreamReader, n: int=-1) -> bytes: # pylint: disable=invalid-name
if self.is_eof():
- raise StreamerError("StreamReader.read(): Reached EOF")
+ raise StreamerTempError("StreamReader.read(): Reached EOF")
return (await orig_read(n))
reader.read = types.MethodType(read, reader) # type: ignore
-class StreamerClient:
+class StreamerHttpClient(BaseStreamerClient):
def __init__(
self,
host: str,
@@ -82,7 +104,7 @@ class StreamerClient:
while True:
frame = await reader.next() # pylint: disable=not-callable
if not isinstance(frame, aiohttp.BodyPartReader):
- raise StreamerError("Expected body part")
+ raise StreamerTempError("Expected body part")
data = bytes(await frame.read())
if not data:
@@ -95,10 +117,10 @@ class StreamerClient:
data,
)
except Exception as err: # Тут бывают и ассерты, и KeyError, и прочая херня
- if isinstance(err, StreamerError):
+ if isinstance(err, StreamerTempError):
raise
- raise StreamerError(f"{type(err).__name__}: {err}")
- raise StreamerError("Reached EOF")
+ raise StreamerTempError(f"{type(err).__name__}: {err}")
+ raise StreamerTempError("Reached EOF")
def __make_http_session(self) -> aiohttp.ClientSession:
kwargs: Dict = {
@@ -115,3 +137,34 @@ class StreamerClient:
def __make_url(self, handle: str) -> str:
assert not handle.startswith("/"), handle
return f"http://{self.__host}:{self.__port}/{handle}"
+
+
+class StreamerMemsinkClient(BaseStreamerClient):
+ def __init__(
+ self,
+ obj: str,
+ lock_timeout: float,
+ wait_timeout: float,
+ drop_same_frames: float,
+ ) -> None:
+
+ self.__kwargs: Dict = {
+ "obj": obj,
+ "lock_timeout": lock_timeout,
+ "wait_timeout": wait_timeout,
+ "drop_same_frames": drop_same_frames,
+ }
+
+ async def read_stream(self) -> AsyncGenerator[Tuple[bool, int, int, bytes], None]:
+ if ustreamer is None:
+ raise StreamerPermError("Missing ustreamer library")
+ try:
+ with ustreamer.Memsink(**self.__kwargs) as sink:
+ while True:
+ frame = await aiotools.run_async(sink.wait_frame)
+ if frame is not None:
+ yield (frame["online"], frame["width"], frame["height"], frame["data"])
+ except FileNotFoundError as err:
+ raise StreamerTempError(f"{type(err).__name__}: {err}")
+ except Exception as err:
+ raise StreamerPermError(f"{type(err).__name__}: {err}")
diff --git a/testenv/Dockerfile b/testenv/Dockerfile
index d146248b..b8fcaf31 100644
--- a/testenv/Dockerfile
+++ b/testenv/Dockerfile
@@ -64,7 +64,7 @@ ENV USTREAMER_MIN_VERSION $USTREAMER_MIN_VERSION
RUN echo $USTREAMER_MIN_VERSION
RUN git clone https://github.com/pikvm/ustreamer \
&& cd ustreamer \
- && make PREFIX=/usr install \
+ && make WITH_PYTHON=1 PREFIX=/usr DESTDIR=/ install \
&& cd - \
&& rm -rf ustreamer