summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDevaev Maxim <[email protected]>2021-02-04 02:23:59 +0300
committerDevaev Maxim <[email protected]>2021-02-04 02:23:59 +0300
commitffeb626ef854a3ac2318f1a8693bbae4986b7eac (patch)
treeee3897c39ef8d86194a25ba910a16bd53777630d
parent32bd2453eb55b5a2516a2b6bf66739e975b867f4 (diff)
queue-based vnc fb task
-rw-r--r--kvmd/apps/vnc/server.py131
1 files changed, 66 insertions, 65 deletions
diff --git a/kvmd/apps/vnc/server.py b/kvmd/apps/vnc/server.py
index 5c915abb..2b677867 100644
--- a/kvmd/apps/vnc/server.py
+++ b/kvmd/apps/vnc/server.py
@@ -26,7 +26,6 @@ import socket
import dataclasses
import contextlib
-from typing import Tuple
from typing import List
from typing import Dict
from typing import Union
@@ -121,9 +120,8 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes
self.__kvmd_session: Optional[KvmdClientSession] = None
self.__kvmd_ws: Optional[KvmdClientWs] = None
- self.__fb_requested = False
- self.__fb_stub: Optional[Tuple[int, str]] = None
- self.__fb_h264_data = b""
+ self.__fb_notifier = aiotools.AioNotifier()
+ self.__fb_queue: "asyncio.Queue[Dict]" = asyncio.Queue()
# Эти состояния шарить не обязательно - бекенд исключает дублирующиеся события.
# Все это нужно только чтобы не посылать лишние жсоны в сокет KVMD
@@ -141,6 +139,7 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes
await self._run(
kvmd=self.__kvmd_task_loop(),
streamer=self.__streamer_task_loop(),
+ fb_sendeer=self.__fb_sender_task_loop(),
)
finally:
if self.__kvmd_session:
@@ -201,16 +200,16 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes
logger.info("[streamer] %s: Streaming ...", self._remote)
streaming = True
if frame["online"]:
- await self.__send_fb_real(frame)
+ await self.__queue_fb_real(frame)
else:
- await self.__send_fb_stub("No signal")
+ await self.__queue_fb_stub("No signal")
except StreamerError as err:
if isinstance(err, StreamerPermError):
streamer = self.__get_default_streamer()
logger.info("[streamer] %s: Permanent error: %s; switching to %s ...", self._remote, err, streamer)
else:
logger.info("[streamer] %s: Waiting for stream: %s", self._remote, err)
- await self.__send_fb_stub("Waiting for stream ...")
+ await self.__queue_fb_stub("Waiting for stream ...")
await asyncio.sleep(1)
def __get_preferred_streamer(self) -> BaseStreamerClient:
@@ -230,65 +229,68 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes
get_logger(0).info("[streamer] %s: Using default %s", self._remote, streamer)
return streamer
- async def __send_fb_real(self, frame: Dict) -> None:
- async with self.__lock:
- if self.__fb_requested:
- if not (await self.__resize_fb_unsafe(frame)):
- return
+ async def __queue_fb_real(self, frame: Dict) -> None:
+ await self.__fb_queue.put(frame)
- if frame["format"] == StreamFormats.JPEG:
- await self._send_fb_jpeg(frame["data"])
- elif frame["format"] == StreamFormats.H264:
- if not self._encodings.has_h264:
- self.__fb_h264_data = b""
- raise StreamerPermError("The client doesn't want to accept H264 anymore")
- self.__append_h264_data(frame)
- await self._send_fb_h264(self.__fb_h264_data)
- else:
- raise RuntimeError(f"Unknown format: {frame['format']}")
-
- self.__fb_stub = None
- self.__fb_requested = False
- self.__fb_h264_data = b""
-
- elif self._encodings.has_h264 and frame["format"] == StreamFormats.H264:
- self.__append_h264_data(frame)
-
- async def __resize_fb_unsafe(self, frame: Dict) -> bool:
- width = frame["width"]
- height = frame["height"]
- if self._width != width or self._height != height:
- self.__shared_params.width = width
- self.__shared_params.height = height
- if not self._encodings.has_resize:
- msg = f"Resoultion changed: {self._width}x{self._height} -> {width}x{height}\nPlease reconnect"
- await self.__send_fb_stub(msg, no_lock=True)
- return False
- await self._send_resize(width, height)
- return True
+ async def __queue_fb_stub(self, text: str) -> None:
+ await self.__fb_queue.put(await self.__make_stub_frame(text))
- def __append_h264_data(self, frame: Dict) -> None:
- if frame["key"]:
- self.__fb_h264_data = frame["data"]
- elif len(self.__fb_h264_data) + len(frame["data"]) > 4194304: # 4Mb
- get_logger(0).info("Accumulated H264 buffer is too big; resetting ...")
- self.__fb_h264_data = frame["data"]
- else:
- self.__fb_h264_data += frame["data"]
+ async def __make_stub_frame(self, text: str) -> Dict:
+ return {
+ "data": (await make_text_jpeg(self._width, self._height, self._encodings.tight_jpeg_quality, text)),
+ "width": self._width,
+ "height": self._height,
+ "format": StreamFormats.JPEG,
+ }
- async def __send_fb_stub(self, text: str, no_lock: bool=False) -> None:
- if not no_lock:
- await self.__lock.acquire()
- try:
- quality = self._encodings.tight_jpeg_quality
- if self.__fb_requested and self.__fb_stub != (quality, text):
- await self._send_fb_jpeg(await make_text_jpeg(self._width, self._height, quality, text))
- self.__fb_stub = (quality, text)
- self.__fb_requested = False
- self.__fb_h264_data = b""
- finally:
- if not no_lock:
- self.__lock.release()
+ async def __fb_sender_task_loop(self) -> None:
+ while True:
+ await self.__fb_notifier.wait()
+
+ last: Optional[Dict] = None
+ while True:
+ frame = await self.__fb_queue.get()
+ if (
+ last is None # pylint: disable=too-many-boolean-expressions
+ or frame["format"] == StreamFormats.JPEG
+ or last["format"] != frame["format"]
+ or (frame["format"] == StreamFormats.H264 and (
+ frame["key"]
+ or last["width"] != frame["width"]
+ or last["height"] != frame["height"]
+ or len(last["data"]) + len(frame["data"]) > 4194304
+ ))
+ ):
+ last = frame
+ if self.__fb_queue.qsize() == 0:
+ break
+ continue
+ assert frame["format"] == StreamFormats.H264
+ last["data"] += frame["data"]
+ if self.__fb_queue.qsize() == 0:
+ break
+
+ async with self.__lock:
+ if self._width != last["width"] or self._height != last["height"]:
+ self.__shared_params.width = last["width"]
+ self.__shared_params.height = last["height"]
+ if not self._encodings.has_resize:
+ msg = (
+ f"Resoultion changed: {self._width}x{self._height}"
+ f" -> {last['width']}x{last['height']}\nPlease reconnect"
+ )
+ await self._send_fb_jpeg(await self.__make_stub_frame(msg))
+ return
+ await self._send_resize(last["width"], last["height"])
+
+ if last["format"] == StreamFormats.JPEG:
+ await self._send_fb_jpeg(last["data"])
+ elif last["format"] == StreamFormats.H264:
+ if not self._encodings.has_h264:
+ raise RfbError("The client doesn't want to accept H264 anymore")
+ await self._send_fb_h264(last["data"])
+ else:
+ raise RuntimeError(f"Unknown format: {last['format']}")
# =====
@@ -386,8 +388,7 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes
await self.__kvmd_session.streamer.set_params(quality, self.__desired_fps)
async def _on_fb_update_request(self) -> None:
- async with self.__lock:
- self.__fb_requested = True
+ await self.__fb_notifier.notify()
# =====