diff options
-rw-r--r-- | kvmd/apps/vnc/rfb/__init__.py | 13 | ||||
-rw-r--r-- | kvmd/apps/vnc/rfb/encodings.py | 2 | ||||
-rw-r--r-- | kvmd/apps/vnc/server.py | 37 | ||||
-rw-r--r-- | kvmd/clients/streamer.py | 113 |
4 files changed, 109 insertions, 56 deletions
diff --git a/kvmd/apps/vnc/rfb/__init__.py b/kvmd/apps/vnc/rfb/__init__.py index f715f9b7..bbb3036c 100644 --- a/kvmd/apps/vnc/rfb/__init__.py +++ b/kvmd/apps/vnc/rfb/__init__.py @@ -159,6 +159,9 @@ class RfbClient(RfbClientStream): # pylint: disable=too-many-instance-attribute async def _on_fb_update_request(self) -> None: raise NotImplementedError + async def _on_enable_cont_updates(self, enabled: bool) -> None: + raise NotImplementedError + # ===== async def _send_fb_jpeg(self, data: bytes) -> None: @@ -398,6 +401,7 @@ class RfbClient(RfbClientStream): # pylint: disable=too-many-instance-attribute 4: self.__handle_key_event, 5: self.__handle_pointer_event, 6: self.__handle_client_cut_text, + 150: self.__handle_enable_cont_updates, 255: self.__handle_qemu_event, } while True: @@ -429,6 +433,9 @@ class RfbClient(RfbClientStream): # pylint: disable=too-many-instance-attribute logger.info("%s [main]: ... %s", self._remote, item) self.__check_encodings() + if self._encodings.has_cont_updates: + await self._write_struct("allow ContUpdates", "B", 150) + if self._encodings.has_ext_keys: # Preferred method await self._write_fb_update("ExtKeys FBUR", 0, 0, RfbEncodings.EXT_KEYS, drain=True) await self._on_set_encodings() @@ -473,6 +480,12 @@ class RfbClient(RfbClientStream): # pylint: disable=too-many-instance-attribute text = await self._read_text("cut text data", length) await self._on_cut_event(text) + async def __handle_enable_cont_updates(self) -> None: + enabled = bool((await self._read_struct("enabled ContUpdates", "B HH HH"))[0]) + await self._on_enable_cont_updates(enabled) + if not enabled: + await self._write_struct("disabled ContUpdates", "B", 150) + async def __handle_qemu_event(self) -> None: (sub_type, state, code) = await self._read_struct("QEMU event (key?)", "B H xxxx L") if sub_type != 0: diff --git a/kvmd/apps/vnc/rfb/encodings.py b/kvmd/apps/vnc/rfb/encodings.py index e153b5e8..597e3a92 100644 --- a/kvmd/apps/vnc/rfb/encodings.py +++ b/kvmd/apps/vnc/rfb/encodings.py @@ -31,6 +31,7 @@ class RfbEncodings: RENAME = -307 # DesktopName Pseudo-encoding LEDS_STATE = -261 # QEMU LED State Pseudo-encoding EXT_KEYS = -258 # QEMU Extended Key Events Pseudo-encoding + CONT_UPDATES = -313 # ContinuousUpdates Pseudo-encoding TIGHT = 7 TIGHT_JPEG_QUALITIES = dict(zip( # JPEG Quality Level Pseudo-encoding @@ -53,6 +54,7 @@ class RfbClientEncodings: # pylint: disable=too-many-instance-attributes has_rename: bool = dataclasses.field(default=False, metadata=_make_meta(RfbEncodings.RENAME)) # noqa: E224 has_leds_state: bool = dataclasses.field(default=False, metadata=_make_meta(RfbEncodings.LEDS_STATE)) # noqa: E224 has_ext_keys: bool = dataclasses.field(default=False, metadata=_make_meta(RfbEncodings.EXT_KEYS)) # noqa: E224 + has_cont_updates: bool = dataclasses.field(default=False, metadata=_make_meta(RfbEncodings.CONT_UPDATES)) # noqa: E224 has_tight: bool = dataclasses.field(default=False, metadata=_make_meta(RfbEncodings.TIGHT)) # noqa: E224 tight_jpeg_quality: int = dataclasses.field(default=0, metadata=_make_meta(frozenset(RfbEncodings.TIGHT_JPEG_QUALITIES))) # noqa: E224 diff --git a/kvmd/apps/vnc/server.py b/kvmd/apps/vnc/server.py index 25a6016f..0195cdd9 100644 --- a/kvmd/apps/vnc/server.py +++ b/kvmd/apps/vnc/server.py @@ -122,6 +122,8 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes self.__fb_notifier = aiotools.AioNotifier() self.__fb_queue: "asyncio.Queue[dict]" = asyncio.Queue() + self.__fb_cont_updates = False + self.__fb_key_required = False # Эти состояния шарить не обязательно - бекенд исключает дублирующиеся события. # Все это нужно только чтобы не посылать лишние жсоны в сокет KVMD @@ -198,14 +200,16 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes while True: try: streaming = False - async for frame in streamer.read_stream(): - if not streaming: - logger.info("%s [streamer]: Streaming ...", self._remote) - streaming = True - if frame["online"]: - await self.__queue_frame(frame) - else: - await self.__queue_frame("No signal") + async with streamer.reading() as read_frame: + while True: + frame = await read_frame(self.__fb_key_required) + if not streaming: + logger.info("%s [streamer]: Streaming ...", self._remote) + streaming = True + if frame["online"]: + await self.__queue_frame(frame) + else: + await self.__queue_frame("No signal") except StreamerError as err: if isinstance(err, StreamerPermError): streamer = self.__get_default_streamer() @@ -284,6 +288,8 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes f" -> {last['width']}x{last['height']}\nPlease reconnect" ) await self._send_fb_jpeg((await self.__make_text_frame(msg))["data"]) + if self.__fb_cont_updates: + self.__fb_notifier.notify() continue await self._send_resize(last["width"], last["height"]) @@ -294,12 +300,18 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes if last["format"] == StreamFormats.JPEG: await self._send_fb_jpeg(last["data"]) + if self.__fb_cont_updates: + self.__fb_notifier.notify() elif last["format"] == StreamFormats.H264: if not self._encodings.has_h264: raise RfbError("The client doesn't want to accept H264 anymore") if has_h264_key: + self.__fb_key_required = False await self._send_fb_h264(last["data"]) + if self.__fb_cont_updates: + self.__fb_notifier.notify() else: + self.__fb_key_required = True self.__fb_notifier.notify() else: raise RuntimeError(f"Unknown format: {last['format']}") @@ -413,7 +425,14 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes await self.__kvmd_session.streamer.set_params(quality, self.__desired_fps) async def _on_fb_update_request(self) -> None: - self.__fb_notifier.notify() + if not self.__fb_cont_updates: + self.__fb_notifier.notify() + + async def _on_enable_cont_updates(self, enabled: bool) -> None: + get_logger(0).info("%s [main]: Applying ContUpdates=%s ...", self._remote, enabled) + self.__fb_cont_updates = enabled + if enabled: + self.__fb_notifier.notify() # ===== diff --git a/kvmd/clients/streamer.py b/kvmd/clients/streamer.py index b9847679..1d8cd601 100644 --- a/kvmd/clients/streamer.py +++ b/kvmd/clients/streamer.py @@ -20,16 +20,16 @@ # ========================================================================== # +import contextlib import types +from typing import Callable +from typing import Awaitable +from typing import Generator from typing import AsyncGenerator import aiohttp - -try: - import ustreamer -except ImportError: - ustreamer = None +import ustreamer from .. import tools from .. import aiotools @@ -60,12 +60,24 @@ class BaseStreamerClient: def get_format(self) -> int: raise NotImplementedError() - async def read_stream(self) -> AsyncGenerator[dict, None]: + @contextlib.asynccontextmanager + async def reading(self) -> AsyncGenerator[Callable[[bool], Awaitable[dict]], None]: if self is not None: # XXX: Vulture and pylint hack raise NotImplementedError() yield +# ===== +def _http_handle_errors() -> Generator[None, None, None]: + try: + yield + except Exception as err: # Тут бывают и ассерты, и KeyError, и прочая херня + if isinstance(err, StreamerTempError): + raise + raise StreamerTempError(tools.efmt(err)) + + class HttpStreamerClient(BaseStreamerClient): def __init__( self, @@ -83,8 +95,9 @@ class HttpStreamerClient(BaseStreamerClient): def get_format(self) -> int: return StreamFormats.JPEG - async def read_stream(self) -> AsyncGenerator[dict, None]: - try: + @contextlib.asynccontextmanager + async def reading(self) -> AsyncGenerator[Callable[[bool], Awaitable[dict]], None]: + with _http_handle_errors(): async with self.__make_http_session() as session: async with session.get( url=self.__make_url("stream"), @@ -94,27 +107,26 @@ class HttpStreamerClient(BaseStreamerClient): reader = aiohttp.MultipartReader.from_response(response) self.__patch_stream_reader(reader.resp.content) - while True: - frame = await reader.next() # pylint: disable=not-callable - if not isinstance(frame, aiohttp.BodyPartReader): - raise StreamerTempError("Expected body part") - - data = bytes(await frame.read()) - if not data: - break - - yield { - "online": (frame.headers["X-UStreamer-Online"] == "true"), - "width": int(frame.headers["X-UStreamer-Width"]), - "height": int(frame.headers["X-UStreamer-Height"]), - "data": data, - "format": StreamFormats.JPEG, - } - except Exception as err: # Тут бывают и ассерты, и KeyError, и прочая херня - if isinstance(err, StreamerTempError): - raise - raise StreamerTempError(tools.efmt(err)) - raise StreamerTempError("Reached EOF") + async def read_frame(key_required: bool) -> dict: + _ = key_required + with _http_handle_errors(): + frame = await reader.next() # pylint: disable=not-callable + if not isinstance(frame, aiohttp.BodyPartReader): + raise StreamerTempError("Expected body part") + + data = bytes(await frame.read()) + if not data: + raise StreamerTempError("Reached EOF") + + return { + "online": (frame.headers["X-UStreamer-Online"] == "true"), + "width": int(frame.headers["X-UStreamer-Width"]), + "height": int(frame.headers["X-UStreamer-Height"]), + "data": data, + "format": StreamFormats.JPEG, + } + + yield read_frame def __make_http_session(self) -> aiohttp.ClientSession: kwargs: dict = { @@ -148,6 +160,19 @@ class HttpStreamerClient(BaseStreamerClient): return f"HttpStreamerClient({self.__name})" +# ===== +def _memsink_handle_errors() -> Generator[None, None, None]: + try: + yield + except StreamerPermError: + raise + except FileNotFoundError as err: + raise StreamerTempError(tools.efmt(err)) + except Exception as err: + raise StreamerPermError(tools.efmt(err)) + + class MemsinkStreamerClient(BaseStreamerClient): def __init__( self, @@ -171,25 +196,19 @@ class MemsinkStreamerClient(BaseStreamerClient): def get_format(self) -> int: return self.__fmt - async def read_stream(self) -> AsyncGenerator[dict, None]: - if ustreamer is None: - raise StreamerPermError("Missing ustreamer library") - try: + @contextlib.asynccontextmanager + async def reading(self) -> AsyncGenerator[Callable[[bool], Awaitable[dict]], None]: + with _memsink_handle_errors(): with ustreamer.Memsink(**self.__kwargs) as sink: - key_required = (self.__fmt == StreamFormats.H264) - while True: - frame = await aiotools.run_async(sink.wait_frame, key_required) - if frame is not None: - self.__check_format(frame["format"]) - if frame["key"]: - key_required = False - yield frame - except StreamerPermError: - raise - except FileNotFoundError as err: - raise StreamerTempError(tools.efmt(err)) - except Exception as err: - raise StreamerPermError(tools.efmt(err)) + async def read_frame(key_required: bool) -> dict: + key_required = (key_required and self.__fmt == StreamFormats.H264) + with _memsink_handle_errors(): + while True: + frame = await aiotools.run_async(sink.wait_frame, key_required) + if frame is not None: + self.__check_format(frame["format"]) + return frame + yield read_frame def __check_format(self, fmt: int) -> None: if fmt == StreamFormats._MJPEG: # pylint: disable=protected-access |