summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kvmd/apps/vnc/rfb/__init__.py13
-rw-r--r--kvmd/apps/vnc/rfb/encodings.py2
-rw-r--r--kvmd/apps/vnc/server.py37
-rw-r--r--kvmd/clients/streamer.py113
4 files changed, 109 insertions, 56 deletions
diff --git a/kvmd/apps/vnc/rfb/__init__.py b/kvmd/apps/vnc/rfb/__init__.py
index f715f9b7..bbb3036c 100644
--- a/kvmd/apps/vnc/rfb/__init__.py
+++ b/kvmd/apps/vnc/rfb/__init__.py
@@ -159,6 +159,9 @@ class RfbClient(RfbClientStream): # pylint: disable=too-many-instance-attribute
async def _on_fb_update_request(self) -> None:
raise NotImplementedError
+ async def _on_enable_cont_updates(self, enabled: bool) -> None:
+ raise NotImplementedError
+
# =====
async def _send_fb_jpeg(self, data: bytes) -> None:
@@ -398,6 +401,7 @@ class RfbClient(RfbClientStream): # pylint: disable=too-many-instance-attribute
4: self.__handle_key_event,
5: self.__handle_pointer_event,
6: self.__handle_client_cut_text,
+ 150: self.__handle_enable_cont_updates,
255: self.__handle_qemu_event,
}
while True:
@@ -429,6 +433,9 @@ class RfbClient(RfbClientStream): # pylint: disable=too-many-instance-attribute
logger.info("%s [main]: ... %s", self._remote, item)
self.__check_encodings()
+ if self._encodings.has_cont_updates:
+ await self._write_struct("allow ContUpdates", "B", 150)
+
if self._encodings.has_ext_keys: # Preferred method
await self._write_fb_update("ExtKeys FBUR", 0, 0, RfbEncodings.EXT_KEYS, drain=True)
await self._on_set_encodings()
@@ -473,6 +480,12 @@ class RfbClient(RfbClientStream): # pylint: disable=too-many-instance-attribute
text = await self._read_text("cut text data", length)
await self._on_cut_event(text)
+ async def __handle_enable_cont_updates(self) -> None:
+ enabled = bool((await self._read_struct("enabled ContUpdates", "B HH HH"))[0])
+ await self._on_enable_cont_updates(enabled)
+ if not enabled:
+ await self._write_struct("disabled ContUpdates", "B", 150)
+
async def __handle_qemu_event(self) -> None:
(sub_type, state, code) = await self._read_struct("QEMU event (key?)", "B H xxxx L")
if sub_type != 0:
diff --git a/kvmd/apps/vnc/rfb/encodings.py b/kvmd/apps/vnc/rfb/encodings.py
index e153b5e8..597e3a92 100644
--- a/kvmd/apps/vnc/rfb/encodings.py
+++ b/kvmd/apps/vnc/rfb/encodings.py
@@ -31,6 +31,7 @@ class RfbEncodings:
RENAME = -307 # DesktopName Pseudo-encoding
LEDS_STATE = -261 # QEMU LED State Pseudo-encoding
EXT_KEYS = -258 # QEMU Extended Key Events Pseudo-encoding
+ CONT_UPDATES = -313 # ContinuousUpdates Pseudo-encoding
TIGHT = 7
TIGHT_JPEG_QUALITIES = dict(zip( # JPEG Quality Level Pseudo-encoding
@@ -53,6 +54,7 @@ class RfbClientEncodings: # pylint: disable=too-many-instance-attributes
has_rename: bool = dataclasses.field(default=False, metadata=_make_meta(RfbEncodings.RENAME)) # noqa: E224
has_leds_state: bool = dataclasses.field(default=False, metadata=_make_meta(RfbEncodings.LEDS_STATE)) # noqa: E224
has_ext_keys: bool = dataclasses.field(default=False, metadata=_make_meta(RfbEncodings.EXT_KEYS)) # noqa: E224
+ has_cont_updates: bool = dataclasses.field(default=False, metadata=_make_meta(RfbEncodings.CONT_UPDATES)) # noqa: E224
has_tight: bool = dataclasses.field(default=False, metadata=_make_meta(RfbEncodings.TIGHT)) # noqa: E224
tight_jpeg_quality: int = dataclasses.field(default=0, metadata=_make_meta(frozenset(RfbEncodings.TIGHT_JPEG_QUALITIES))) # noqa: E224
diff --git a/kvmd/apps/vnc/server.py b/kvmd/apps/vnc/server.py
index 25a6016f..0195cdd9 100644
--- a/kvmd/apps/vnc/server.py
+++ b/kvmd/apps/vnc/server.py
@@ -122,6 +122,8 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes
self.__fb_notifier = aiotools.AioNotifier()
self.__fb_queue: "asyncio.Queue[dict]" = asyncio.Queue()
+ self.__fb_cont_updates = False
+ self.__fb_key_required = False
# Эти состояния шарить не обязательно - бекенд исключает дублирующиеся события.
# Все это нужно только чтобы не посылать лишние жсоны в сокет KVMD
@@ -198,14 +200,16 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes
while True:
try:
streaming = False
- async for frame in streamer.read_stream():
- if not streaming:
- logger.info("%s [streamer]: Streaming ...", self._remote)
- streaming = True
- if frame["online"]:
- await self.__queue_frame(frame)
- else:
- await self.__queue_frame("No signal")
+ async with streamer.reading() as read_frame:
+ while True:
+ frame = await read_frame(self.__fb_key_required)
+ if not streaming:
+ logger.info("%s [streamer]: Streaming ...", self._remote)
+ streaming = True
+ if frame["online"]:
+ await self.__queue_frame(frame)
+ else:
+ await self.__queue_frame("No signal")
except StreamerError as err:
if isinstance(err, StreamerPermError):
streamer = self.__get_default_streamer()
@@ -284,6 +288,8 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes
f" -> {last['width']}x{last['height']}\nPlease reconnect"
)
await self._send_fb_jpeg((await self.__make_text_frame(msg))["data"])
+ if self.__fb_cont_updates:
+ self.__fb_notifier.notify()
continue
await self._send_resize(last["width"], last["height"])
@@ -294,12 +300,18 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes
if last["format"] == StreamFormats.JPEG:
await self._send_fb_jpeg(last["data"])
+ if self.__fb_cont_updates:
+ self.__fb_notifier.notify()
elif last["format"] == StreamFormats.H264:
if not self._encodings.has_h264:
raise RfbError("The client doesn't want to accept H264 anymore")
if has_h264_key:
+ self.__fb_key_required = False
await self._send_fb_h264(last["data"])
+ if self.__fb_cont_updates:
+ self.__fb_notifier.notify()
else:
+ self.__fb_key_required = True
self.__fb_notifier.notify()
else:
raise RuntimeError(f"Unknown format: {last['format']}")
@@ -413,7 +425,14 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes
await self.__kvmd_session.streamer.set_params(quality, self.__desired_fps)
async def _on_fb_update_request(self) -> None:
- self.__fb_notifier.notify()
+ if not self.__fb_cont_updates:
+ self.__fb_notifier.notify()
+
+ async def _on_enable_cont_updates(self, enabled: bool) -> None:
+ get_logger(0).info("%s [main]: Applying ContUpdates=%s ...", self._remote, enabled)
+ self.__fb_cont_updates = enabled
+ if enabled:
+ self.__fb_notifier.notify()
# =====
diff --git a/kvmd/clients/streamer.py b/kvmd/clients/streamer.py
index b9847679..1d8cd601 100644
--- a/kvmd/clients/streamer.py
+++ b/kvmd/clients/streamer.py
@@ -20,16 +20,16 @@
# ========================================================================== #
+import contextlib
import types
+from typing import Callable
+from typing import Awaitable
+from typing import Generator
from typing import AsyncGenerator
import aiohttp
-
-try:
- import ustreamer
-except ImportError:
- ustreamer = None
+import ustreamer
from .. import tools
from .. import aiotools
@@ -60,12 +60,24 @@ class BaseStreamerClient:
def get_format(self) -> int:
raise NotImplementedError()
- async def read_stream(self) -> AsyncGenerator[dict, None]:
+ @contextlib.asynccontextmanager
+ async def reading(self) -> AsyncGenerator[Callable[[bool], Awaitable[dict]], None]:
if self is not None: # XXX: Vulture and pylint hack
raise NotImplementedError()
yield
+# =====
+def _http_handle_errors() -> Generator[None, None, None]:
+ try:
+ yield
+ except Exception as err: # Тут бывают и ассерты, и KeyError, и прочая херня
+ if isinstance(err, StreamerTempError):
+ raise
+ raise StreamerTempError(tools.efmt(err))
+
+
class HttpStreamerClient(BaseStreamerClient):
def __init__(
self,
@@ -83,8 +95,9 @@ class HttpStreamerClient(BaseStreamerClient):
def get_format(self) -> int:
return StreamFormats.JPEG
- async def read_stream(self) -> AsyncGenerator[dict, None]:
- try:
+ @contextlib.asynccontextmanager
+ async def reading(self) -> AsyncGenerator[Callable[[bool], Awaitable[dict]], None]:
+ with _http_handle_errors():
async with self.__make_http_session() as session:
async with session.get(
url=self.__make_url("stream"),
@@ -94,27 +107,26 @@ class HttpStreamerClient(BaseStreamerClient):
reader = aiohttp.MultipartReader.from_response(response)
self.__patch_stream_reader(reader.resp.content)
- while True:
- frame = await reader.next() # pylint: disable=not-callable
- if not isinstance(frame, aiohttp.BodyPartReader):
- raise StreamerTempError("Expected body part")
-
- data = bytes(await frame.read())
- if not data:
- break
-
- yield {
- "online": (frame.headers["X-UStreamer-Online"] == "true"),
- "width": int(frame.headers["X-UStreamer-Width"]),
- "height": int(frame.headers["X-UStreamer-Height"]),
- "data": data,
- "format": StreamFormats.JPEG,
- }
- except Exception as err: # Тут бывают и ассерты, и KeyError, и прочая херня
- if isinstance(err, StreamerTempError):
- raise
- raise StreamerTempError(tools.efmt(err))
- raise StreamerTempError("Reached EOF")
+ async def read_frame(key_required: bool) -> dict:
+ _ = key_required
+ with _http_handle_errors():
+ frame = await reader.next() # pylint: disable=not-callable
+ if not isinstance(frame, aiohttp.BodyPartReader):
+ raise StreamerTempError("Expected body part")
+
+ data = bytes(await frame.read())
+ if not data:
+ raise StreamerTempError("Reached EOF")
+
+ return {
+ "online": (frame.headers["X-UStreamer-Online"] == "true"),
+ "width": int(frame.headers["X-UStreamer-Width"]),
+ "height": int(frame.headers["X-UStreamer-Height"]),
+ "data": data,
+ "format": StreamFormats.JPEG,
+ }
+
+ yield read_frame
def __make_http_session(self) -> aiohttp.ClientSession:
kwargs: dict = {
@@ -148,6 +160,19 @@ class HttpStreamerClient(BaseStreamerClient):
return f"HttpStreamerClient({self.__name})"
+# =====
+def _memsink_handle_errors() -> Generator[None, None, None]:
+ try:
+ yield
+ except StreamerPermError:
+ raise
+ except FileNotFoundError as err:
+ raise StreamerTempError(tools.efmt(err))
+ except Exception as err:
+ raise StreamerPermError(tools.efmt(err))
+
+
class MemsinkStreamerClient(BaseStreamerClient):
def __init__(
self,
@@ -171,25 +196,19 @@ class MemsinkStreamerClient(BaseStreamerClient):
def get_format(self) -> int:
return self.__fmt
- async def read_stream(self) -> AsyncGenerator[dict, None]:
- if ustreamer is None:
- raise StreamerPermError("Missing ustreamer library")
- try:
+ @contextlib.asynccontextmanager
+ async def reading(self) -> AsyncGenerator[Callable[[bool], Awaitable[dict]], None]:
+ with _memsink_handle_errors():
with ustreamer.Memsink(**self.__kwargs) as sink:
- key_required = (self.__fmt == StreamFormats.H264)
- while True:
- frame = await aiotools.run_async(sink.wait_frame, key_required)
- if frame is not None:
- self.__check_format(frame["format"])
- if frame["key"]:
- key_required = False
- yield frame
- except StreamerPermError:
- raise
- except FileNotFoundError as err:
- raise StreamerTempError(tools.efmt(err))
- except Exception as err:
- raise StreamerPermError(tools.efmt(err))
+ async def read_frame(key_required: bool) -> dict:
+ key_required = (key_required and self.__fmt == StreamFormats.H264)
+ with _memsink_handle_errors():
+ while True:
+ frame = await aiotools.run_async(sink.wait_frame, key_required)
+ if frame is not None:
+ self.__check_format(frame["format"])
+ return frame
+ yield read_frame
def __check_format(self, fmt: int) -> None:
if fmt == StreamFormats._MJPEG: # pylint: disable=protected-access