From ffeb626ef854a3ac2318f1a8693bbae4986b7eac Mon Sep 17 00:00:00 2001 From: Devaev Maxim Date: Thu, 4 Feb 2021 02:23:59 +0300 Subject: queue-based vnc fb task --- kvmd/apps/vnc/server.py | 131 ++++++++++++++++++++++++------------------------ 1 file changed, 66 insertions(+), 65 deletions(-) (limited to 'kvmd') diff --git a/kvmd/apps/vnc/server.py b/kvmd/apps/vnc/server.py index 5c915abb..2b677867 100644 --- a/kvmd/apps/vnc/server.py +++ b/kvmd/apps/vnc/server.py @@ -26,7 +26,6 @@ import socket import dataclasses import contextlib -from typing import Tuple from typing import List from typing import Dict from typing import Union @@ -121,9 +120,8 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes self.__kvmd_session: Optional[KvmdClientSession] = None self.__kvmd_ws: Optional[KvmdClientWs] = None - self.__fb_requested = False - self.__fb_stub: Optional[Tuple[int, str]] = None - self.__fb_h264_data = b"" + self.__fb_notifier = aiotools.AioNotifier() + self.__fb_queue: "asyncio.Queue[Dict]" = asyncio.Queue() # Эти состояния шарить не обязательно - бекенд исключает дублирующиеся события. # Все это нужно только чтобы не посылать лишние жсоны в сокет KVMD @@ -141,6 +139,7 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes await self._run( kvmd=self.__kvmd_task_loop(), streamer=self.__streamer_task_loop(), + fb_sendeer=self.__fb_sender_task_loop(), ) finally: if self.__kvmd_session: @@ -201,16 +200,16 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes logger.info("[streamer] %s: Streaming ...", self._remote) streaming = True if frame["online"]: - await self.__send_fb_real(frame) + await self.__queue_fb_real(frame) else: - await self.__send_fb_stub("No signal") + await self.__queue_fb_stub("No signal") except StreamerError as err: if isinstance(err, StreamerPermError): streamer = self.__get_default_streamer() logger.info("[streamer] %s: Permanent error: %s; switching to %s ...", self._remote, err, streamer) else: logger.info("[streamer] %s: Waiting for stream: %s", self._remote, err) - await self.__send_fb_stub("Waiting for stream ...") + await self.__queue_fb_stub("Waiting for stream ...") await asyncio.sleep(1) def __get_preferred_streamer(self) -> BaseStreamerClient: @@ -230,65 +229,68 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes get_logger(0).info("[streamer] %s: Using default %s", self._remote, streamer) return streamer - async def __send_fb_real(self, frame: Dict) -> None: - async with self.__lock: - if self.__fb_requested: - if not (await self.__resize_fb_unsafe(frame)): - return + async def __queue_fb_real(self, frame: Dict) -> None: + await self.__fb_queue.put(frame) - if frame["format"] == StreamFormats.JPEG: - await self._send_fb_jpeg(frame["data"]) - elif frame["format"] == StreamFormats.H264: - if not self._encodings.has_h264: - self.__fb_h264_data = b"" - raise StreamerPermError("The client doesn't want to accept H264 anymore") - self.__append_h264_data(frame) - await self._send_fb_h264(self.__fb_h264_data) - else: - raise RuntimeError(f"Unknown format: {frame['format']}") - - self.__fb_stub = None - self.__fb_requested = False - self.__fb_h264_data = b"" - - elif self._encodings.has_h264 and frame["format"] == StreamFormats.H264: - self.__append_h264_data(frame) - - async def __resize_fb_unsafe(self, frame: Dict) -> bool: - width = frame["width"] - height = frame["height"] - if self._width != width or self._height != height: - self.__shared_params.width = width - self.__shared_params.height = height - if not self._encodings.has_resize: - msg = f"Resoultion changed: {self._width}x{self._height} -> {width}x{height}\nPlease reconnect" - await self.__send_fb_stub(msg, no_lock=True) - return False - await self._send_resize(width, height) - return True + async def __queue_fb_stub(self, text: str) -> None: + await self.__fb_queue.put(await self.__make_stub_frame(text)) - def __append_h264_data(self, frame: Dict) -> None: - if frame["key"]: - self.__fb_h264_data = frame["data"] - elif len(self.__fb_h264_data) + len(frame["data"]) > 4194304: # 4Mb - get_logger(0).info("Accumulated H264 buffer is too big; resetting ...") - self.__fb_h264_data = frame["data"] - else: - self.__fb_h264_data += frame["data"] + async def __make_stub_frame(self, text: str) -> Dict: + return { + "data": (await make_text_jpeg(self._width, self._height, self._encodings.tight_jpeg_quality, text)), + "width": self._width, + "height": self._height, + "format": StreamFormats.JPEG, + } - async def __send_fb_stub(self, text: str, no_lock: bool=False) -> None: - if not no_lock: - await self.__lock.acquire() - try: - quality = self._encodings.tight_jpeg_quality - if self.__fb_requested and self.__fb_stub != (quality, text): - await self._send_fb_jpeg(await make_text_jpeg(self._width, self._height, quality, text)) - self.__fb_stub = (quality, text) - self.__fb_requested = False - self.__fb_h264_data = b"" - finally: - if not no_lock: - self.__lock.release() + async def __fb_sender_task_loop(self) -> None: + while True: + await self.__fb_notifier.wait() + + last: Optional[Dict] = None + while True: + frame = await self.__fb_queue.get() + if ( + last is None # pylint: disable=too-many-boolean-expressions + or frame["format"] == StreamFormats.JPEG + or last["format"] != frame["format"] + or (frame["format"] == StreamFormats.H264 and ( + frame["key"] + or last["width"] != frame["width"] + or last["height"] != frame["height"] + or len(last["data"]) + len(frame["data"]) > 4194304 + )) + ): + last = frame + if self.__fb_queue.qsize() == 0: + break + continue + assert frame["format"] == StreamFormats.H264 + last["data"] += frame["data"] + if self.__fb_queue.qsize() == 0: + break + + async with self.__lock: + if self._width != last["width"] or self._height != last["height"]: + self.__shared_params.width = last["width"] + self.__shared_params.height = last["height"] + if not self._encodings.has_resize: + msg = ( + f"Resoultion changed: {self._width}x{self._height}" + f" -> {last['width']}x{last['height']}\nPlease reconnect" + ) + await self._send_fb_jpeg(await self.__make_stub_frame(msg)) + return + await self._send_resize(last["width"], last["height"]) + + if last["format"] == StreamFormats.JPEG: + await self._send_fb_jpeg(last["data"]) + elif last["format"] == StreamFormats.H264: + if not self._encodings.has_h264: + raise RfbError("The client doesn't want to accept H264 anymore") + await self._send_fb_h264(last["data"]) + else: + raise RuntimeError(f"Unknown format: {last['format']}") # ===== @@ -386,8 +388,7 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes await self.__kvmd_session.streamer.set_params(quality, self.__desired_fps) async def _on_fb_update_request(self) -> None: - async with self.__lock: - self.__fb_requested = True + await self.__fb_notifier.notify() # ===== -- cgit v1.2.3