diff options
author | Devaev Maxim <[email protected]> | 2021-01-24 12:02:04 +0300 |
---|---|---|
committer | Devaev Maxim <[email protected]> | 2021-01-24 12:02:04 +0300 |
commit | 3a2ffca6b767b9732759100158dbe34eb67a4d6c (patch) | |
tree | da1a25030613b79aa93d61ec9239ae5440df41ec /kvmd | |
parent | dc87f8d259b1a1e3dd92569c1548b1b5ea3b9738 (diff) |
vnc h264
Diffstat (limited to 'kvmd')
-rw-r--r-- | kvmd/apps/__init__.py | 6 | ||||
-rw-r--r-- | kvmd/apps/vnc/__init__.py | 23 | ||||
-rw-r--r-- | kvmd/apps/vnc/rfb/__init__.py | 18 | ||||
-rw-r--r-- | kvmd/apps/vnc/rfb/encodings.py | 8 | ||||
-rw-r--r-- | kvmd/apps/vnc/server.py | 82 | ||||
-rw-r--r-- | kvmd/clients/streamer.py | 25 |
6 files changed, 101 insertions, 61 deletions
diff --git a/kvmd/apps/__init__.py b/kvmd/apps/__init__.py index 517cff72..a8543a83 100644 --- a/kvmd/apps/__init__.py +++ b/kvmd/apps/__init__.py @@ -556,6 +556,12 @@ def _get_config_scheme() -> Dict: "wait_timeout": Option(1.0, type=valid_float_f01), "drop_same_frames": Option(1.0, type=valid_float_f0), }, + "h264": { + "sink": Option("", unpack_as="obj"), + "lock_timeout": Option(1.0, type=valid_float_f01), + "wait_timeout": Option(1.0, type=valid_float_f01), + "drop_same_frames": Option(0.0, type=valid_float_f0), + }, }, "auth": { diff --git a/kvmd/apps/vnc/__init__.py b/kvmd/apps/vnc/__init__.py index 96199fca..07be2abd 100644 --- a/kvmd/apps/vnc/__init__.py +++ b/kvmd/apps/vnc/__init__.py @@ -46,6 +46,11 @@ def main(argv: Optional[List[str]]=None) -> None: user_agent = htclient.make_user_agent("KVMD-VNC") + def make_memsink(name: str) -> Optional[StreamerMemsinkClient]: + if getattr(config.memsink, name).sink: + return StreamerMemsinkClient(name=name, **getattr(config.memsink, name)._unpack()) + return None + VncServer( host=config.server.host, port=config.server.port, @@ -59,18 +64,12 @@ def main(argv: Optional[List[str]]=None) -> None: desired_fps=config.desired_fps, keymap_path=config.keymap, - kvmd=KvmdClient( - user_agent=user_agent, - **config.kvmd._unpack(), - ), - streamer_http=StreamerHttpClient( - user_agent=user_agent, - **config.streamer._unpack(), - ), - streamer_memsink_jpeg=( - StreamerMemsinkClient(**config.memsink.jpeg._unpack()) - if config.memsink.jpeg.sink else None - ), + kvmd=KvmdClient(user_agent=user_agent, **config.kvmd._unpack()), + streamers=list(filter(None, [ + make_memsink("h264"), + make_memsink("jpeg"), + StreamerHttpClient(name="jpeg", user_agent=user_agent, **config.streamer._unpack()), + ])), vnc_auth_manager=VncAuthManager(**config.auth.vncauth._unpack()), **config.server.keepalive._unpack(), diff --git a/kvmd/apps/vnc/rfb/__init__.py b/kvmd/apps/vnc/rfb/__init__.py index f0a42223..d4702d74 100644 --- a/kvmd/apps/vnc/rfb/__init__.py +++ b/kvmd/apps/vnc/rfb/__init__.py @@ -22,6 +22,7 @@ import asyncio import ssl +import dataclasses from typing import Tuple from typing import List @@ -164,6 +165,13 @@ class RfbClient(RfbClientStream): # pylint: disable=too-many-instance-attribute else: await self._write_struct("", bytes([0b10011111, length & 0x7F | 0x80, length >> 7 & 0x7F | 0x80, length >> 14 & 0xFF]), data) + async def _send_fb_h264(self, data: bytes) -> None: + assert self._encodings.has_h264 + assert len(data) <= 0xFFFFFFFF, len(data) + await self._write_fb_update(self._width, self._height, RfbEncodings.H264, drain=False) + await self._write_struct("LL", len(data), 0, drain=False) + await self._write_struct("", data) + async def _send_resize(self, width: int, height: int) -> None: assert self._encodings.has_resize await self._write_fb_update(width, height, RfbEncodings.RESIZE) @@ -380,14 +388,18 @@ class RfbClient(RfbClientStream): # pylint: disable=too-many-instance-attribute raise RfbError(f"Requested unsupported bits_per_pixel={bits_per_pixel} for Tight JPEG; required 16 or 32") async def __handle_set_encodings(self) -> None: + logger = get_logger(0) + encodings_count = (await self._read_struct("x H"))[0] if encodings_count > 1024: raise RfbError(f"Too many encodings: {encodings_count}") + self._encodings = RfbClientEncodings(frozenset(await self._read_struct("l" * encodings_count))) - get_logger(0).info("[main] %s: Features: resize=%d, rename=%d, leds=%d, extkeys=%d", - self._remote, self._encodings.has_resize, self._encodings.has_rename, - self._encodings.has_leds_state, self._encodings.has_ext_keys) + logger.info("[main] %s: Client features (SetEncodings): ...", self._remote) + for (key, value) in dataclasses.asdict(self._encodings).items(): + logger.info("[main] %s: ... %s=%s", self._remote, key, value) self.__check_tight_jpeg() + if self._encodings.has_ext_keys: # Preferred method await self._write_fb_update(0, 0, RfbEncodings.EXT_KEYS, drain=True) await self._on_set_encodings() diff --git a/kvmd/apps/vnc/rfb/encodings.py b/kvmd/apps/vnc/rfb/encodings.py index ad83f726..6ecfaa76 100644 --- a/kvmd/apps/vnc/rfb/encodings.py +++ b/kvmd/apps/vnc/rfb/encodings.py @@ -39,9 +39,11 @@ class RfbEncodings: [10, 20, 30, 40, 50, 60, 70, 80, 90, 100], )) + H264 = 0xCAFE0101 # Pi-KVM H264 Encoding + @dataclasses.dataclass(frozen=True) -class RfbClientEncodings: +class RfbClientEncodings: # pylint: disable=too-many-instance-attributes encodings: FrozenSet[int] has_resize: bool = dataclasses.field(default=False) @@ -52,6 +54,8 @@ class RfbClientEncodings: has_tight: bool = dataclasses.field(default=False) tight_jpeg_quality: int = dataclasses.field(default=0) + has_h264: bool = dataclasses.field(default=False) + def __post_init__(self) -> None: self.__set("has_resize", (RfbEncodings.RESIZE in self.encodings)) self.__set("has_rename", (RfbEncodings.RENAME in self.encodings)) @@ -61,6 +65,8 @@ class RfbClientEncodings: self.__set("has_tight", (RfbEncodings.TIGHT in self.encodings)) self.__set("tight_jpeg_quality", self.__get_tight_jpeg_quality()) + self.__set("has_h264", (RfbEncodings.H264 in self.encodings)) + def __set(self, key: str, value: Any) -> None: object.__setattr__(self, key, value) diff --git a/kvmd/apps/vnc/server.py b/kvmd/apps/vnc/server.py index e6bfe3e2..dbb82d5e 100644 --- a/kvmd/apps/vnc/server.py +++ b/kvmd/apps/vnc/server.py @@ -26,6 +26,8 @@ import socket import dataclasses import contextlib +from typing import Tuple +from typing import List from typing import Dict from typing import Union from typing import Optional @@ -47,8 +49,6 @@ from ...clients.kvmd import KvmdClient from ...clients.streamer import StreamerError from ...clients.streamer import StreamerPermError from ...clients.streamer import BaseStreamerClient -from ...clients.streamer import StreamerHttpClient -from ...clients.streamer import StreamerMemsinkClient from ... import tools @@ -84,8 +84,7 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes symmap: Dict[int, Dict[int, str]], kvmd: KvmdClient, - streamer_http: StreamerHttpClient, - streamer_memsink_jpeg: Optional[StreamerMemsinkClient], + streamers: List[BaseStreamerClient], vnc_credentials: Dict[str, VncAuthKvmdCredentials], none_auth_only: bool, @@ -109,19 +108,19 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes self.__symmap = symmap self.__kvmd = kvmd - self.__streamer_http = streamer_http - self.__streamer_memsink_jpeg = streamer_memsink_jpeg + self.__streamers = streamers self.__shared_params = shared_params - self.__authorized = asyncio.Future() # type: ignore - self.__ws_connected = asyncio.Future() # type: ignore + self.__stage1_authorized = asyncio.Future() # type: ignore + self.__stage2_encodings_accepted = asyncio.Future() # type: ignore + self.__stage3_ws_connected = asyncio.Future() # type: ignore + self.__kvmd_session: Optional[KvmdClientSession] = None self.__kvmd_ws: Optional[KvmdClientWs] = None self.__fb_requested = False - self.__fb_stub_text = "" - self.__fb_stub_quality = 0 + self.__fb_stub: Optional[Tuple[int, str]] = None # Эти состояния шарить не обязательно - бекенд исключает дублирующиеся события. # Все это нужно только чтобы не посылать лишние жсоны в сокет KVMD @@ -149,12 +148,19 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes async def __kvmd_task_loop(self) -> None: logger = get_logger(0) - await self.__authorized + await self.__stage1_authorized + + logger.info("[kvmd] %s: Waiting for the SetEncodings message ...", self._remote) + try: + await asyncio.wait_for(self.__stage2_encodings_accepted, timeout=5) + except asyncio.TimeoutError: + raise RfbError("No SetEncodings message recieved from the clienta in 5 secs") + assert self.__kvmd_session try: async with self.__kvmd_session.ws() as self.__kvmd_ws: logger.info("[kvmd] %s: Connected to KVMD websocket", self._remote) - self.__ws_connected.set_result(None) + self.__stage3_ws_connected.set_result(None) async for event in self.__kvmd_ws.communicate(): await self.__process_ws_event(event) raise RfbError("KVMD closes the websocket (the server may have been stopped)") @@ -184,34 +190,29 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes async def __streamer_task_loop(self) -> None: logger = get_logger(0) - await self.__ws_connected - - name = "streamer_http" - streamer: BaseStreamerClient = self.__streamer_http - if self.__streamer_memsink_jpeg: - (name, streamer) = ("streamer_memsink_jpeg", self.__streamer_memsink_jpeg) - + await self.__stage3_ws_connected + streamer = self.__streamers[0] while True: try: streaming = False - async for (online, width, height, data) in streamer.read_stream(): + async for (online, width, height, data, h264) in streamer.read_stream(): if not streaming: - logger.info("[%s] %s: Streaming ...", name, self._remote) + logger.info("[streamer] %s: Streaming from %s ...", self._remote, streamer) streaming = True if online: - await self.__send_fb_real(width, height, data) + await self.__send_fb_real(width, height, data, h264) else: await self.__send_fb_stub("No signal") except StreamerError as err: if isinstance(err, StreamerPermError): - logger.info("[%s] %s: Permanent error: %s; switching to HTTP ...", name, self._remote, err) - (name, streamer) = ("streamer_http", self.__streamer_http) + streamer = self.__streamers[-1] + logger.info("[streamer] %s: Permanent error: %s; switching to %s ...", self._remote, err, streamer) else: - logger.info("[%s] %s: Waiting for stream: %s", name, self._remote, err) + logger.info("[streamer] %s: Waiting for stream: %s", self._remote, err) await self.__send_fb_stub("Waiting for stream ...") await asyncio.sleep(1) - async def __send_fb_real(self, width: int, height: int, data: bytes) -> None: + async def __send_fb_real(self, width: int, height: int, data: bytes, h264: bool) -> None: async with self.__lock: if self.__fb_requested: if self._width != width or self._height != height: @@ -222,19 +223,18 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes await self.__send_fb_stub(msg, no_lock=True) return await self._send_resize(width, height) - await self._send_fb_jpeg(data) - self.__fb_stub_text = "" - self.__fb_stub_quality = 0 + await (self._send_fb_h264 if h264 else self._send_fb_jpeg)(data) + self.__fb_stub = None self.__fb_requested = False async def __send_fb_stub(self, text: str, no_lock: bool=False) -> None: if not no_lock: await self.__lock.acquire() try: - if self.__fb_requested and (self.__fb_stub_text != text or self.__fb_stub_quality != self._encodings.tight_jpeg_quality): - await self._send_fb_jpeg(await make_text_jpeg(self._width, self._height, self._encodings.tight_jpeg_quality, text)) - self.__fb_stub_text = text - self.__fb_stub_quality = self._encodings.tight_jpeg_quality + quality = self._encodings.tight_jpeg_quality + if self.__fb_requested and self.__fb_stub != (quality, text): + await self._send_fb_jpeg(await make_text_jpeg(self._width, self._height, quality, text)) + self.__fb_stub = (quality, text) self.__fb_requested = False finally: if not no_lock: @@ -245,7 +245,7 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes async def _authorize_userpass(self, user: str, passwd: str) -> bool: self.__kvmd_session = self.__kvmd.make_session(user, passwd) if (await self.__kvmd_session.auth.check()): - self.__authorized.set_result(None) + self.__stage1_authorized.set_result(None) return True return False @@ -312,7 +312,7 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes self.__mouse_move = move async def _on_cut_event(self, text: str) -> None: - assert self.__authorized.done() + assert self.__stage1_authorized.done() assert self.__kvmd_session logger = get_logger(0) logger.info("[main] %s: Printing %d characters ...", self._remote, len(text)) @@ -327,11 +327,13 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes logger.exception("[main] %s: Can't print characters", self._remote) async def _on_set_encodings(self) -> None: - assert self.__authorized.done() + assert self.__stage1_authorized.done() assert self.__kvmd_session + if not self.__stage2_encodings_accepted.done(): + self.__stage2_encodings_accepted.set_result(None) has_quality = (await self.__kvmd_session.streamer.get_state())["features"]["quality"] quality = (self._encodings.tight_jpeg_quality if has_quality else None) - get_logger(0).info("[main] %s: Applying streamer params: quality=%s; desired_fps=%d ...", + get_logger(0).info("[main] %s: Applying streamer params: jpeg_quality=%s; desired_fps=%d ...", self._remote, quality, self.__desired_fps) await self.__kvmd_session.streamer.set_params(quality, self.__desired_fps) @@ -361,8 +363,7 @@ class VncServer: # pylint: disable=too-many-instance-attributes keymap_path: str, kvmd: KvmdClient, - streamer_http: StreamerHttpClient, - streamer_memsink_jpeg: Optional[StreamerMemsinkClient], + streamers: List[BaseStreamerClient], vnc_auth_manager: VncAuthManager, ) -> None: @@ -411,8 +412,7 @@ class VncServer: # pylint: disable=too-many-instance-attributes keymap_name=keymap_name, symmap=symmap, kvmd=kvmd, - streamer_http=streamer_http, - streamer_memsink_jpeg=streamer_memsink_jpeg, + streamers=streamers, vnc_credentials=(await self.__vnc_auth_manager.read_credentials())[0], none_auth_only=none_auth_only, shared_params=shared_params, diff --git a/kvmd/clients/streamer.py b/kvmd/clients/streamer.py index ea08500e..e5430ef7 100644 --- a/kvmd/clients/streamer.py +++ b/kvmd/clients/streamer.py @@ -53,7 +53,7 @@ class StreamerPermError(StreamerError): # ===== class BaseStreamerClient: - async def read_stream(self) -> AsyncGenerator[Tuple[bool, int, int, bytes], None]: + async def read_stream(self) -> AsyncGenerator[Tuple[bool, int, int, bytes, bool], None]: if self is not None: # XXX: Vulture and pylint hack raise NotImplementedError() yield @@ -77,6 +77,7 @@ def _patch_stream_reader(reader: aiohttp.StreamReader) -> None: class StreamerHttpClient(BaseStreamerClient): def __init__( self, + name: str, host: str, port: int, unix_path: str, @@ -85,13 +86,14 @@ class StreamerHttpClient(BaseStreamerClient): ) -> None: assert port or unix_path + self.__name = name self.__host = host self.__port = port self.__unix_path = unix_path self.__timeout = timeout self.__user_agent = user_agent - async def read_stream(self) -> AsyncGenerator[Tuple[bool, int, int, bytes], None]: + async def read_stream(self) -> AsyncGenerator[Tuple[bool, int, int, bytes, bool], None]: try: async with self.__make_http_session() as session: async with session.get( @@ -116,6 +118,7 @@ class StreamerHttpClient(BaseStreamerClient): int(frame.headers["X-UStreamer-Width"]), int(frame.headers["X-UStreamer-Height"]), data, + False, ) except Exception as err: # Тут бывают и ассерты, и KeyError, и прочая херня if isinstance(err, StreamerTempError): @@ -139,16 +142,21 @@ class StreamerHttpClient(BaseStreamerClient): assert not handle.startswith("/"), handle return f"http://{self.__host}:{self.__port}/{handle}" + def __str__(self) -> str: + return f"StreamerHttpClient({self.__name})" + class StreamerMemsinkClient(BaseStreamerClient): def __init__( self, + name: str, obj: str, lock_timeout: float, wait_timeout: float, drop_same_frames: float, ) -> None: + self.__name = name self.__kwargs: Dict = { "obj": obj, "lock_timeout": lock_timeout, @@ -156,7 +164,7 @@ class StreamerMemsinkClient(BaseStreamerClient): "drop_same_frames": drop_same_frames, } - async def read_stream(self) -> AsyncGenerator[Tuple[bool, int, int, bytes], None]: + async def read_stream(self) -> AsyncGenerator[Tuple[bool, int, int, bytes, bool], None]: if ustreamer is None: raise StreamerPermError("Missing ustreamer library") try: @@ -164,8 +172,17 @@ class StreamerMemsinkClient(BaseStreamerClient): while True: frame = await aiotools.run_async(sink.wait_frame) if frame is not None: - yield (frame["online"], frame["width"], frame["height"], frame["data"]) + yield ( + frame["online"], + frame["width"], + frame["height"], + frame["data"], + (frame["format"] == 875967048), # V4L2_PIX_FMT_H264 + ) except FileNotFoundError as err: raise StreamerTempError(tools.efmt(err)) except Exception as err: raise StreamerPermError(tools.efmt(err)) + + def __str__(self) -> str: + return f"StreamerMemsinkClient({self.__name})" |