diff options
author | Devaev Maxim <[email protected]> | 2020-03-20 03:07:27 +0300 |
---|---|---|
committer | Devaev Maxim <[email protected]> | 2020-03-20 03:07:27 +0300 |
commit | d5ae32b1326fc5ac9207193d7679b34e0ceec4c7 (patch) | |
tree | 43bb961fd3006c06dffec900a2c84fb8387302c0 /kvmd/apps | |
parent | ab6264bd5e65497121139eab6deae353e06d592f (diff) |
vnc
Diffstat (limited to 'kvmd/apps')
-rw-r--r-- | kvmd/apps/__init__.py | 25 | ||||
-rw-r--r-- | kvmd/apps/vnc/__init__.py | 48 | ||||
-rw-r--r-- | kvmd/apps/vnc/__main__.py | 24 | ||||
-rw-r--r-- | kvmd/apps/vnc/fonts/Azbuka04.ttf | bin | 0 -> 71852 bytes | |||
-rw-r--r-- | kvmd/apps/vnc/keysym.py | 97 | ||||
-rw-r--r-- | kvmd/apps/vnc/kvmd.py | 110 | ||||
-rw-r--r-- | kvmd/apps/vnc/render.py | 53 | ||||
-rw-r--r-- | kvmd/apps/vnc/rfb.py | 437 | ||||
-rw-r--r-- | kvmd/apps/vnc/server.py | 307 | ||||
-rw-r--r-- | kvmd/apps/vnc/streamer.py | 83 |
10 files changed, 1184 insertions, 0 deletions
diff --git a/kvmd/apps/__init__.py b/kvmd/apps/__init__.py index c3b3a5f2..8d0ce28f 100644 --- a/kvmd/apps/__init__.py +++ b/kvmd/apps/__init__.py @@ -320,4 +320,29 @@ def _get_config_scheme() -> Dict: "file": Option("/etc/kvmd/ipmipasswd", type=valid_abs_file, unpack_as="path"), }, }, + + "vnc": { + "keymap": Option("", type=valid_abs_path), + + "server": { + "host": Option("::", type=valid_ip_or_host), + "port": Option(5900, type=valid_port), + # TODO: timeout + "max_clients": Option(10, type=(lambda arg: valid_number(arg, min=1))), + }, + + "kvmd": { + "host": Option("localhost", type=valid_ip_or_host), + "port": Option(0, type=valid_port), + "unix": Option("", type=valid_abs_path, only_if="!port", unpack_as="unix_path"), + "timeout": Option(5.0, type=valid_float_f01), + }, + + "streamer": { + "host": Option("localhost", type=valid_ip_or_host), + "port": Option(0, type=valid_port), + "unix": Option("", type=valid_abs_path, only_if="!port", unpack_as="unix_path"), + "timeout": Option(5.0, type=valid_float_f01), + }, + }, } diff --git a/kvmd/apps/vnc/__init__.py b/kvmd/apps/vnc/__init__.py new file mode 100644 index 00000000..49f7fcbe --- /dev/null +++ b/kvmd/apps/vnc/__init__.py @@ -0,0 +1,48 @@ +# ========================================================================== # +# # +# KVMD - The main Pi-KVM daemon. # +# # +# Copyright (C) 2020 Maxim Devaev <[email protected]> # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see <https://www.gnu.org/licenses/>. # +# # +# ========================================================================== # + + +from typing import List +from typing import Optional + +from .. import init + +from .kvmd import KvmdClient +from .streamer import StreamerClient +from .server import VncServer +from .keysym import build_symmap + + +# ===== +def main(argv: Optional[List[str]]=None) -> None: + config = init( + prog="kvmd-vnc", + description="VNC to KVMD proxy", + argv=argv, + )[2].vnc + + # pylint: disable=protected-access + VncServer( + kvmd=KvmdClient(**config.kvmd._unpack()), + streamer=StreamerClient(**config.streamer._unpack()), + symmap=build_symmap(config.keymap), + **config.server._unpack(), + ).run() diff --git a/kvmd/apps/vnc/__main__.py b/kvmd/apps/vnc/__main__.py new file mode 100644 index 00000000..689fbbd1 --- /dev/null +++ b/kvmd/apps/vnc/__main__.py @@ -0,0 +1,24 @@ +# ========================================================================== # +# # +# KVMD - The main Pi-KVM daemon. # +# # +# Copyright (C) 2020 Maxim Devaev <[email protected]> # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see <https://www.gnu.org/licenses/>. # +# # +# ========================================================================== # + + +from . import main +main() diff --git a/kvmd/apps/vnc/fonts/Azbuka04.ttf b/kvmd/apps/vnc/fonts/Azbuka04.ttf Binary files differnew file mode 100644 index 00000000..16ade315 --- /dev/null +++ b/kvmd/apps/vnc/fonts/Azbuka04.ttf diff --git a/kvmd/apps/vnc/keysym.py b/kvmd/apps/vnc/keysym.py new file mode 100644 index 00000000..5f35c28b --- /dev/null +++ b/kvmd/apps/vnc/keysym.py @@ -0,0 +1,97 @@ +# ========================================================================== # +# # +# KVMD - The main Pi-KVM daemon. # +# # +# Copyright (C) 2020 Maxim Devaev <[email protected]> # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see <https://www.gnu.org/licenses/>. # +# # +# ========================================================================== # + + +import pkgutil +import functools + +from typing import Dict + +import Xlib.keysymdef + +from ...logging import get_logger + +from ... import keymap + + +# ===== +def build_symmap(path: str) -> Dict[int, str]: + # https://github.com/qemu/qemu/blob/95a9457fd44ad97c518858a4e1586a5498f9773c/ui/keymaps.c + + symmap: Dict[int, str] = {} + for (x11_code, at1_code) in keymap.X11_TO_AT1.items(): + symmap[x11_code] = keymap.AT1_TO_WEB[at1_code] + + for (x11_code, at1_code) in _read_keyboard_layout(path).items(): + if (web_name := keymap.AT1_TO_WEB.get(at1_code)) is not None: # noqa: E203,E231 + # mypy bug + symmap[x11_code] = web_name # type: ignore + return symmap + + +# ===== [email protected]_cache() +def _get_keysyms() -> Dict[str, int]: + keysyms: Dict[str, int] = {} + for (loader, module_name, _) in pkgutil.walk_packages(Xlib.keysymdef.__path__): + module = loader.find_module(module_name).load_module(module_name) + for keysym_name in dir(module): + if keysym_name.startswith("XK_"): + short_name = keysym_name[3:] + if short_name.startswith("XF86_"): + short_name = "XF86" + short_name[5:] + # assert short_name not in keysyms, short_name + keysyms[short_name] = int(getattr(module, keysym_name)) + return keysyms + + +def _resolve_keysym(name: str) -> int: + code = _get_keysyms().get(name) + if code is not None: + return code + if len(name) == 5 and name[0] == "U": # Try unicode Uxxxx + try: + return int(name[1:], 16) + except ValueError: + pass + return 0 + + +def _read_keyboard_layout(path: str) -> Dict[int, int]: # Keysym to evdev (at1) + logger = get_logger(0) + logger.info("Reading keyboard layout %s ...", path) + + with open(path) as layout_file: + lines = list(map(str.strip, layout_file.read().split("\n"))) + + layout: Dict[int, int] = {} + for (number, line) in enumerate(lines): + if len(line) == 0 or line.startswith(("#", "map ", "include ")): + continue + + parts = line.split() + if len(parts) >= 2: + if (code := _resolve_keysym(parts[0])) != 0: # noqa: E203,E231 + try: + layout[code] = int(parts[1], 16) + except ValueError as err: + logger.error("Can't parse layout line #%d: %s", number, str(err)) + return layout diff --git a/kvmd/apps/vnc/kvmd.py b/kvmd/apps/vnc/kvmd.py new file mode 100644 index 00000000..d921ad43 --- /dev/null +++ b/kvmd/apps/vnc/kvmd.py @@ -0,0 +1,110 @@ +# ========================================================================== # +# # +# KVMD - The main Pi-KVM daemon. # +# # +# Copyright (C) 2020 Maxim Devaev <[email protected]> # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see <https://www.gnu.org/licenses/>. # +# # +# ========================================================================== # + + +import contextlib + +from typing import Dict + +import aiohttp + +from ...logging import get_logger + +from ... import __version__ + + +# ===== +class KvmdClient: + def __init__( + self, + host: str, + port: int, + unix_path: str, + timeout: float, + ) -> None: + + assert port or unix_path + self.__host = host + self.__port = port + self.__unix_path = unix_path + self.__timeout = timeout + + # ===== + + async def authorize(self, user: str, passwd: str) -> bool: + try: + async with self.__make_session(user, passwd) as session: + async with session.get( + url=f"http://{self.__host}:{self.__port}/auth/check", + timeout=self.__timeout, + ) as response: + response.raise_for_status() + if response.status == 200: + return True + raise RuntimeError(f"Invalid OK response: {response.status} {await response.text()}") + except aiohttp.ClientResponseError as err: + if err.status in [401, 403]: + return False + get_logger(0).exception("Can't check user access") + except Exception: + get_logger(0).exception("Can't check user access") + return False + + @contextlib.asynccontextmanager + async def ws(self, user: str, passwd: str) -> aiohttp.ClientWebSocketResponse: # pylint: disable=invalid-name + async with self.__make_session(user, passwd) as session: + async with session.ws_connect( + url=f"http://{self.__host}:{self.__port}/ws", + timeout=self.__timeout, + ) as ws: + yield ws + + async def set_streamer_params(self, user: str, passwd: str, quality: int=-1, desired_fps: int=-1) -> None: + params = { + key: value + for (key, value) in [ + ("quality", quality), + ("desired_fps", desired_fps), + ] + if value >= 0 + } + if params: + async with self.__make_session(user, passwd) as session: + async with session.post( + url=f"http://{self.__host}:{self.__port}/streamer/set_params", + timeout=self.__timeout, + params=params, + ) as response: + response.raise_for_status() + + # ===== + + def __make_session(self, user: str, passwd: str) -> aiohttp.ClientSession: + kwargs: Dict = { + "headers": { + "X-KVMD-User": user, + "X-KVMD-Passwd": passwd, + "User-Agent": f"KVMD-VNC/{__version__}", + }, + } + if self.__unix_path: + kwargs["connector"] = aiohttp.UnixConnector(path=self.__unix_path) + return aiohttp.ClientSession(**kwargs) diff --git a/kvmd/apps/vnc/render.py b/kvmd/apps/vnc/render.py new file mode 100644 index 00000000..f65f1405 --- /dev/null +++ b/kvmd/apps/vnc/render.py @@ -0,0 +1,53 @@ +# ========================================================================== # +# # +# KVMD - The main Pi-KVM daemon. # +# # +# Copyright (C) 2020 Maxim Devaev <[email protected]> # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see <https://www.gnu.org/licenses/>. # +# # +# ========================================================================== # + + +import sys +import os +import io +import functools + +from PIL import Image +from PIL import ImageDraw +from PIL import ImageFont + +from ... import aiotools + + +# ===== +async def make_text_jpeg(width: int, height: int, quality: int, text: str) -> bytes: + return (await aiotools.run_async(_inner_make_text_jpeg, width, height, quality, text)) + + [email protected]_cache(maxsize=10) +def _inner_make_text_jpeg(width: int, height: int, quality: int, text: str) -> bytes: + image = Image.new("RGB", (width, height), color=(0, 0, 0)) + draw = ImageDraw.Draw(image) + draw.multiline_text((20, 20), text, font=_get_font(), fill=(255, 255, 255)) + bio = io.BytesIO() + image.save(bio, format="jpeg", quality=quality) + return bio.getvalue() + + [email protected]_cache() +def _get_font() -> ImageFont.FreeTypeFont: + path = os.path.join(os.path.dirname(sys.modules[__name__].__file__), "fonts", "Azbuka04.ttf") + return ImageFont.truetype(path, size=20) diff --git a/kvmd/apps/vnc/rfb.py b/kvmd/apps/vnc/rfb.py new file mode 100644 index 00000000..450024c4 --- /dev/null +++ b/kvmd/apps/vnc/rfb.py @@ -0,0 +1,437 @@ +# ========================================================================== # +# # +# KVMD - The main Pi-KVM daemon. # +# # +# Copyright (C) 2020 Maxim Devaev <[email protected]> # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see <https://www.gnu.org/licenses/>. # +# # +# ========================================================================== # + + +import asyncio +import struct +import dataclasses + +from typing import Tuple +from typing import Dict +from typing import FrozenSet +from typing import Coroutine +from typing import Any + +from ...logging import get_logger + +from ... import aiotools + + +# ===== +class RfbError(Exception): + pass + + +class RfbConnectionError(RfbError): + def __init__(self, err: Exception) -> None: + super().__init__(f"Gone ({type(err).__name__})") + + +# ===== +_ENCODING_RESIZE = -223 # DesktopSize Pseudo-encoding +_ENCODING_RENAME = -307 # DesktopName Pseudo-encoding +_ENCODING_LEDS_STATE = -261 # LED State Pseudo-encoding + +_ENCODING_TIGHT = 7 +_ENCODING_TIGHT_JPEG_QUALITIES = dict(zip( # JPEG Quality Level Pseudo-encoding + [-32, -31, -30, -29, -28, -27, -26, -25, -24, -23], + [10, 20, 30, 40, 50, 60, 70, 80, 90, 100], +)) + + [email protected](frozen=True) +class _Encodings: + encodings: FrozenSet[int] + + has_resize: bool = dataclasses.field(default=False) + has_rename: bool = dataclasses.field(default=False) + has_leds_state: bool = dataclasses.field(default=False) + + has_tight: bool = dataclasses.field(default=False) + tight_jpeg_quality: int = dataclasses.field(default=0) + + def __post_init__(self) -> None: + self.__set("has_resize", (_ENCODING_RESIZE in self.encodings)) + self.__set("has_rename", (_ENCODING_RENAME in self.encodings)) + self.__set("has_leds_state", (_ENCODING_LEDS_STATE in self.encodings)) + + self.__set("has_tight", (_ENCODING_TIGHT in self.encodings)) + self.__set("tight_jpeg_quality", self.__get_tight_jpeg_quality()) + + def __set(self, key: str, value: Any) -> None: + object.__setattr__(self, key, value) + + def __get_tight_jpeg_quality(self) -> int: + if _ENCODING_TIGHT in self.encodings: + qualities = self.encodings.intersection(_ENCODING_TIGHT_JPEG_QUALITIES) + if qualities: + return _ENCODING_TIGHT_JPEG_QUALITIES[max(qualities)] + return 0 + + +class RfbClient: # pylint: disable=too-many-instance-attributes + # https://github.com/rfbproto/rfbproto/blob/master/rfbproto.rst + # https://www.toptal.com/java/implementing-remote-framebuffer-server-java + # https://github.com/TigerVNC/tigervnc + + def __init__( + self, + reader: asyncio.StreamReader, + writer: asyncio.StreamWriter, + + width: int, + height: int, + name: str, + ) -> None: + + self.__reader = reader + self.__writer = writer + + self._remote = "[%s]:%d" % (self.__writer.transport.get_extra_info("peername")[:2]) + + self._width = width + self._height = height + self._name = name + + self._encodings = _Encodings(frozenset()) + + self._lock = asyncio.Lock() + + get_logger(0).info("Connected client: %s", self._remote) + + # ===== + + async def _run(self, **coros: Coroutine) -> None: + tasks = list(map(asyncio.create_task, [ + self.__wrapper(name, coro) + for (name, coro) in {"main": self.__main_task_loop(), **coros}.items() + ])) + try: + await aiotools.wait_first(*tasks) + finally: + for task in tasks: + task.cancel() + + async def __wrapper(self, name: str, coro: Coroutine) -> None: + logger = get_logger(0) + try: + await coro + raise RuntimeError("Subtask just finished without any exception") + except asyncio.CancelledError: + logger.info("[%s] Client %s: Cancelling ...", name, self._remote) + raise + except RfbError as err: + logger.info("[%s] Client %s: %s: Disconnected", name, self._remote, str(err)) + except Exception: + logger.exception("[%s] Unhandled exception with client %s: Disconnected", name, self._remote) + + async def __main_task_loop(self) -> None: + try: + rfb_version = await self.__handshake_version() + await self.__handshake_security(rfb_version) + await self.__handshake_init() + await self.__main_loop() + finally: + try: + self.__writer.close() + except Exception: + pass + + # ===== + + async def _authorize(self, user: str, passwd: str) -> bool: + raise NotImplementedError + + async def _on_key_event(self, code: int, state: bool) -> None: + raise NotImplementedError + + async def _on_pointer_event(self, buttons: Dict[str, bool], wheel: Dict[str, int], move: Dict[str, int]) -> None: + raise NotImplementedError + + async def _on_cut_event(self, text: str) -> None: + raise NotImplementedError + + async def _on_set_encodings(self) -> None: + raise NotImplementedError + + async def _on_fb_update_request(self) -> None: + raise NotImplementedError + + # ===== + + async def _send_fb(self, jpeg: bytes) -> None: + assert self._encodings.has_tight + assert self._encodings.tight_jpeg_quality > 0 + assert len(jpeg) <= 4194303, len(jpeg) + await self.__write_fb_update(self._width, self._height, _ENCODING_TIGHT, drain=False) + length = len(jpeg) + if length <= 127: + await self.__write_struct("", bytes([0b10011111, length & 0x7F]), jpeg) + elif length <= 16383: + await self.__write_struct("", bytes([0b10011111, length & 0x7F | 0x80, length >> 7 & 0x7F]), jpeg) + else: + await self.__write_struct("", bytes([0b10011111, length & 0x7F | 0x80, length >> 7 & 0x7F | 0x80, length >> 14 & 0xFF]), jpeg) + + async def _send_resize(self, width: int, height: int) -> None: + assert self._encodings.has_resize + await self.__write_fb_update(width, height, _ENCODING_RESIZE) + self._width = width + self._height = height + + async def _send_rename(self, name: str) -> None: + assert self._encodings.has_rename + await self.__write_fb_update(0, 0, _ENCODING_RENAME, drain=False) + await self.__write_reason(name) + self._name = name + + async def _send_leds_state(self, caps: bool, scroll: bool, num: bool) -> None: + assert self._encodings.has_leds_state + await self.__write_fb_update(0, 0, _ENCODING_LEDS_STATE, drain=False) + await self.__write_struct("B", 0x1 & scroll | 0x2 & num | 0x4 & caps) + + # ===== + + async def __handshake_version(self) -> int: + # The only published protocol versions at this time are 3.3, 3.7, 3.8. + # Version 3.5 was wrongly reported by some clients, but it should be + # interpreted by all servers as 3.3 + + await self.__write_struct("", b"RFB 003.008\n") + + response = await self.__read_text(12) + if ( + not response.startswith("RFB 003.00") + or not response.endswith("\n") + or response[-2] not in ["3", "5", "7", "8"] + ): + raise RfbError(f"Invalid version response: {response!r}") + + try: + version = int(response[-2]) + except ValueError: + raise RfbError(f"Invalid version response: {response!r}") + return (3 if version == 5 else version) + + # ===== + + async def __handshake_security(self, rfb_version: int) -> None: + if rfb_version == 3: + await self.__handshake_security_v3(rfb_version) + else: + await self.__handshake_security_v7_plus(rfb_version) + + async def __handshake_security_v3(self, rfb_version: int) -> None: + assert rfb_version == 3 + + await self.__write_struct("L", 0, drain=False) # Refuse old clients using the invalid security type + msg = "The client uses a very old protocol 3.3; required 3.7 at least" + await self.__write_reason(msg) + raise RfbError(msg) + + async def __handshake_security_v7_plus(self, rfb_version: int) -> None: + assert rfb_version >= 7 + + vencrypt = 19 + await self.__write_struct("B B", 1, vencrypt) # One security type, VeNCrypt + + security_type = await self.__read_number("B") + if security_type != vencrypt: + raise RfbError(f"Invalid security type: {security_type}; expected VeNCrypt({vencrypt})") + + # ----- + + await self.__write_struct("BB", 0, 2) # VeNCrypt 0.2 + + vencrypt_version = "%d.%d" % (await self.__read_struct("BB")) + if vencrypt_version != "0.2": + await self.__write_struct("B", 1) # Unsupported + raise RfbError(f"Unsupported VeNCrypt version: {vencrypt_version}") + + await self.__write_struct("B", 0) + + # ----- + + plain = 256 + await self.__write_struct("B L", 1, plain) # One auth subtype, plain + + auth_type = await self.__read_number("L") + if auth_type != plain: + raise RfbError(f"Invalid auth type: {auth_type}; expected Plain({plain})") + + # ----- + + (user_length, passwd_length) = await self.__read_struct("LL") + user = await self.__read_text(user_length) + passwd = await self.__read_text(passwd_length) + + if (await self._authorize(user, passwd)): + get_logger(0).info("[main] Client %s: Access granted for user %r", self._remote, user) + await self.__write_struct("L", 0) + else: + await self.__write_struct("L", 1, drain=(rfb_version < 8)) + if rfb_version >= 8: + await self.__write_reason("Invalid username or password") + raise RfbError(f"Access denied for user {user!r}") + + # ===== + + async def __handshake_init(self) -> None: + await self.__read_number("B") # Shared flag, ignored + + await self.__write_struct("HH", self._width, self._height, drain=False) + await self.__write_struct( + "BB?? HHH BBB xxx", + 32, # Bits per pixel + 24, # Depth + False, # Big endian + True, # True color + 255, # Red max + 255, # Green max + 255, # Blue max + 16, # Red shift + 8, # Green shift + 0, # Blue shift + drain=False, + ) + await self.__write_reason(self._name) + + # ===== + + async def __main_loop(self) -> None: + while True: + msg_type = await self.__read_number("B") + + async with self._lock: + if msg_type == 0: # SetPixelFormat + # JpegCompression may only be used when bits-per-pixel is either 16 or 32 + bits_per_pixel = (await self.__read_struct("xxx BB?? HHH BBB xxx"))[0] + if bits_per_pixel not in [16, 32]: + raise RfbError(f"Requested unsupported {bits_per_pixel=} for Tight JPEG; required 16 or 32") + + elif msg_type == 2: # SetEncodings + encodings_count = (await self.__read_struct("x H"))[0] + if encodings_count > 1024: + raise RfbError(f"Too many encodings: {encodings_count}") + self._encodings = _Encodings(frozenset(await self.__read_struct("l" * encodings_count))) + self.__check_tight_jpeg() + await self._on_set_encodings() + + elif msg_type == 3: # FramebufferUpdateRequest + self.__check_tight_jpeg() # If we don't receive SetEncodings from client + await self.__read_struct("? HH HH") # Ignore any arguments, just perform the full update + await self._on_fb_update_request() + + elif msg_type == 4: # KeyEvent + (state, code) = await self.__read_struct("? xx L") + await self._on_key_event(code, state) # type: ignore + + elif msg_type == 5: # PointerEvent + (buttons, to_x, to_y) = await self.__read_struct("B HH") + await self._on_pointer_event( + buttons={ + "left": bool(buttons & 0x1), + "right": bool(buttons & 0x4), + "middle": bool(buttons & 0x2), + }, + wheel={ + "x": (32 if buttons & 0x40 else (-32 if buttons & 0x20 else 0)), + "y": (32 if buttons & 0x10 else (-32 if buttons & 0x8 else 0)), + }, + move={ + "x": round(to_x / self._width * 65535 + -32768), + "y": round(to_y / self._width * 65535 + -32768), + }, + ) + + elif msg_type == 6: # ClientCutText + await self._on_cut_event(await self.__read_text((await self.__read_struct("xxx L"))[0])) + + else: + raise RfbError(f"Unknown message type: {msg_type}") + + def __check_tight_jpeg(self) -> None: + # JpegCompression may only be used when the client has advertized + # a quality level using the JPEG Quality Level Pseudo-encoding + if not self._encodings.has_tight or self._encodings.tight_jpeg_quality == 0: + raise RfbError(f"Tight JPEG encoding is not supported by client: {self._encodings}") + + # ===== + + async def __read_number(self, fmt: str) -> int: + assert len(fmt) == 1 + try: + if fmt == "B": + return (await self.__reader.readexactly(1))[0] + else: + fmt = f">{fmt}" + return struct.unpack(fmt, await self.__reader.readexactly(struct.calcsize(fmt)))[0] + except (ConnectionError, asyncio.IncompleteReadError) as err: + raise RfbConnectionError(err) + + async def __read_struct(self, fmt: str) -> Tuple[int, ...]: + assert len(fmt) > 1 + try: + fmt = f">{fmt}" + return struct.unpack(fmt, (await self.__reader.readexactly(struct.calcsize(fmt)))) + except (ConnectionError, asyncio.IncompleteReadError) as err: + raise RfbConnectionError(err) + + async def __read_text(self, length: int) -> str: + try: + return (await self.__reader.readexactly(length)).decode("utf-8", errors="ignore") + except (ConnectionError, asyncio.IncompleteReadError) as err: + raise RfbConnectionError(err) + + # ===== + + async def __write_struct(self, fmt: str, *values: Any, drain: bool=True) -> None: + try: + if not fmt: + for value in values: + self.__writer.write(value) + elif fmt == "B": + assert len(values) == 1 + self.__writer.write(bytes([values[0]])) + else: + self.__writer.write(struct.pack(f">{fmt}", *values)) + if drain: + await self.__writer.drain() + except ConnectionError as err: + raise RfbConnectionError(err) + + async def __write_reason(self, text: str, drain: bool=True) -> None: + encoded = text.encode("utf-8", errors="ignore") + await self.__write_struct("L", len(encoded), drain=False) + try: + self.__writer.write(encoded) + if drain: + await self.__writer.drain() + except ConnectionError as err: + raise RfbConnectionError(err) + + async def __write_fb_update(self, width: int, height: int, encoding: int, drain: bool=True) -> None: + await self.__write_struct( + "BxH HH HH l", + 0, # FB update + 1, # Number of rects + 0, 0, width, height, encoding, + drain=drain, + ) diff --git a/kvmd/apps/vnc/server.py b/kvmd/apps/vnc/server.py new file mode 100644 index 00000000..8ce78d41 --- /dev/null +++ b/kvmd/apps/vnc/server.py @@ -0,0 +1,307 @@ +# ========================================================================== # +# # +# KVMD - The main Pi-KVM daemon. # +# # +# Copyright (C) 2020 Maxim Devaev <[email protected]> # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see <https://www.gnu.org/licenses/>. # +# # +# ========================================================================== # + + +import asyncio +import asyncio.queues +import socket +import dataclasses +import json + +from typing import Dict +from typing import Optional + +import aiohttp + +from ...logging import get_logger + +from ... import aiotools + +from .rfb import RfbClient + +from .kvmd import KvmdClient + +from .streamer import StreamerError +from .streamer import StreamerClient + +from .render import make_text_jpeg + + +# ===== +class _SharedParams: + width: int = dataclasses.field(default=800) + height: int = dataclasses.field(default=600) + name: str = dataclasses.field(default="Pi-KVM") + + +class _Client(RfbClient): # pylint: disable=too-many-instance-attributes + def __init__( + self, + reader: asyncio.StreamReader, + writer: asyncio.StreamWriter, + + kvmd: KvmdClient, + streamer: StreamerClient, + + symmap: Dict[int, str], + + shared_params: _SharedParams, + ) -> None: + + super().__init__(reader, writer, **dataclasses.asdict(shared_params)) + + self.__kvmd = kvmd + self.__streamer = streamer + self.__symmap = symmap + self.__shared_params = shared_params + + self.__authorized = asyncio.Future() # type: ignore + self.__ws_connected = asyncio.Future() # type: ignore + self.__ws_writer_queue: asyncio.queues.Queue = asyncio.Queue() + + self.__fb_requested = False + self.__fb_stub_text = "" + self.__fb_stub_quality = 0 + + # Эти состояния шарить не обязательно - бекенд исключает дублирующиеся события. + # Все это нужно только чтобы не посылать лишние жсоны в сокет KVMD + self.__mouse_buttons: Dict[str, Optional[bool]] = {"left": None, "right": None, "middle": None} + self.__mouse_move = {"x": -1, "y": -1} + + # ===== + + async def run(self) -> None: + await self._run( + kvmd=self.__kvmd_task_loop(), + streamer=self.__streamer_task_loop(), + ) + + # ===== + + async def __kvmd_task_loop(self) -> None: + logger = get_logger(0) + + await self.__authorized + (user, passwd) = self.__authorized.result() + + async with self.__kvmd.ws(user, passwd) as ws: + logger.info("[kvmd] Client %s: Connected to KVMD websocket", self._remote) + self.__ws_connected.set_result(None) + + receive_task: Optional[asyncio.Task] = None + writer_task: Optional[asyncio.Task] = None + try: + while True: + if receive_task is None: + receive_task = asyncio.create_task(ws.receive()) + if writer_task is None: + writer_task = asyncio.create_task(self.__ws_writer_queue.get()) + + done = (await aiotools.wait_first(receive_task, writer_task))[0] + + if receive_task in done: + msg = receive_task.result() + if msg.type == aiohttp.WSMsgType.TEXT: + await self.__process_ws_event(json.loads(msg.data)) + else: + raise RuntimeError(f"Unknown WS message type: {msg!r}") + receive_task = None + + if writer_task in done: + await ws.send_str(json.dumps(writer_task.result())) + writer_task = None + finally: + if receive_task: + receive_task.cancel() + if writer_task: + writer_task.cancel() + + async def __process_ws_event(self, event: Dict) -> None: + if event["event_type"] == "info_state": + host = event["event"]["meta"].get("server", {}).get("host") + if isinstance(host, str): + name = f"Pi-KVM: {host}" + async with self._lock: + if self._encodings.has_rename: + await self._send_rename(name) + self.__shared_params.name = name + + elif event["event_type"] == "hid_state": + async with self._lock: + if self._encodings.has_leds_state: + await self._send_leds_state(**event["event"]["keyboard"]["leds"]) + + # ===== + + async def __streamer_task_loop(self) -> None: + logger = get_logger(0) + await self.__ws_connected + while True: + try: + streaming = False + async for (online, width, height, jpeg) in self.__streamer.read(): + if not streaming: + logger.info("[streamer] Client %s: Streaming ...", self._remote) + streaming = True + if online: + await self.__send_fb_real(width, height, jpeg) + else: + await self.__send_fb_stub("No signal") + except StreamerError as err: + logger.info("[streamer] Client %s: Waiting for stream: %s", self._remote, str(err)) + await self.__send_fb_stub("Waiting for stream ...") + await asyncio.sleep(1) + + async def __send_fb_real(self, width: int, height: int, jpeg: bytes) -> None: + async with self._lock: + if self.__fb_requested: + if (self._width, self._height) != (width, height): + self.__shared_params.width = width + self.__shared_params.height = height + if not self._encodings.has_resize: + msg = f"Resoultion changed: {self._width}x{self._height} -> {width}x{height}\nPlease reconnect" + await self.__send_fb_stub(msg, no_lock=True) + return + await self._send_resize(width, height) + await self._send_fb(jpeg) + self.__fb_stub_text = "" + self.__fb_stub_quality = 0 + self.__fb_requested = False + + async def __send_fb_stub(self, text: str, no_lock: bool=False) -> None: + if not no_lock: + await self._lock.acquire() + try: + if self.__fb_requested and (self.__fb_stub_text != text or self.__fb_stub_quality != self._encodings.tight_jpeg_quality): + await self._send_fb(await make_text_jpeg(self._width, self._height, self._encodings.tight_jpeg_quality, text)) + self.__fb_stub_text = text + self.__fb_stub_quality = self._encodings.tight_jpeg_quality + self.__fb_requested = False + finally: + if not no_lock: + self._lock.release() + + # ===== + + async def _authorize(self, user: str, passwd: str) -> bool: + if (await self.__kvmd.authorize(user, passwd)): + self.__authorized.set_result((user, passwd)) + return True + return False + + async def _on_key_event(self, code: int, state: bool) -> None: + print("KeyEvent", code, state, self.__symmap.get(code)) # TODO + + async def _on_pointer_event(self, buttons: Dict[str, bool], wheel: Dict[str, int], move: Dict[str, int]) -> None: + for (button, state) in buttons.items(): + if self.__mouse_buttons[button] != state: + await self.__ws_writer_queue.put({ + "event_type": "mouse_button", + "event": {"button": button, "state": state}, + }) + self.__mouse_buttons[button] = state + + if wheel["x"] or wheel["y"]: + await self.__ws_writer_queue.put({ + "event_type": "mouse_wheel", + "event": {"delta": wheel}, + }) + + if self.__mouse_move != move: + await self.__ws_writer_queue.put({ + "event_type": "mouse_move", + "event": {"to": move}, + }) + self.__mouse_move = move + + async def _on_cut_event(self, text: str) -> None: + print("CutEvent", text) # TODO + + async def _on_set_encodings(self) -> None: + assert self.__authorized.done() + (user, passwd) = self.__authorized.result() + (quality, desired_fps) = (self._encodings.tight_jpeg_quality, 30) + get_logger(0).info("[main] Client %s: Applying streamer params: quality=%d%%; desired_fps=%d ...", + self._remote, quality, desired_fps) + await self.__kvmd.set_streamer_params(user, passwd, quality=quality, desired_fps=desired_fps) + + async def _on_fb_update_request(self) -> None: + self.__fb_requested = True + + +# ===== +class VncServer: + def __init__( + self, + host: str, + port: int, + max_clients: int, + + kvmd: KvmdClient, + streamer: StreamerClient, + + symmap: Dict[int, str], + ) -> None: + + self.__host = host + self.__port = port + self.__max_clients = max_clients + + self.__kvmd = kvmd + self.__streamer = streamer + + self.__symmap = symmap + + self.__shared_params = _SharedParams() + + def run(self) -> None: + logger = get_logger(0) + logger.info("Listening VNC on TCP [%s]:%d ...", self.__host, self.__port) + + sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) + sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, False) + sock.bind((self.__host, self.__port)) + + loop = asyncio.get_event_loop() + server = loop.run_until_complete(asyncio.start_server( + client_connected_cb=self.__handle_client, + sock=sock, + backlog=self.__max_clients, + loop=loop, + )) + + try: + loop.run_forever() + except (SystemExit, KeyboardInterrupt): + pass + finally: + server.close() + loop.run_until_complete(server.wait_closed()) + tasks = asyncio.Task.all_tasks() + for task in tasks: + task.cancel() + loop.run_until_complete(asyncio.gather(*tasks, return_exceptions=True)) + loop.close() + logger.info("Bye-bye") + + async def __handle_client(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None: + await _Client(reader, writer, self.__kvmd, self.__streamer, self.__symmap, self.__shared_params).run() diff --git a/kvmd/apps/vnc/streamer.py b/kvmd/apps/vnc/streamer.py new file mode 100644 index 00000000..62fce849 --- /dev/null +++ b/kvmd/apps/vnc/streamer.py @@ -0,0 +1,83 @@ +# ========================================================================== # +# # +# KVMD - The main Pi-KVM daemon. # +# # +# Copyright (C) 2020 Maxim Devaev <[email protected]> # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see <https://www.gnu.org/licenses/>. # +# # +# ========================================================================== # + + +from typing import Tuple +from typing import Dict +from typing import AsyncGenerator + +import aiohttp + +from ... import __version__ + + +# ===== +class StreamerError(Exception): + pass + + +# ===== +class StreamerClient: + def __init__( + self, + host: str, + port: int, + unix_path: str, + timeout: float, + ) -> None: + + assert port or unix_path + self.__host = host + self.__port = port + self.__unix_path = unix_path + self.__timeout = timeout + + async def read(self) -> AsyncGenerator[Tuple[bool, int, int, bytes], None]: + try: + async with self.__make_session() as session: + async with session.get( + url=f"http://{self.__host}:{self.__port}/stream", + params={"extra_headers": "1"}, + headers={"User-Agent": f"KVMD-VNC/{__version__}"}, + ) as response: + response.raise_for_status() + reader = aiohttp.MultipartReader.from_response(response) + while True: + frame = await reader.next() + yield ( + (frame.headers["X-UStreamer-Online"] == "true"), + int(frame.headers["X-UStreamer-Width"]), + int(frame.headers["X-UStreamer-Height"]), + bytes(await frame.read()), + ) + except Exception as err: # Тут бывают и ассерты, и KeyError, и прочая херня из-за корявых исключений в MultipartReader + raise StreamerError(f"{type(err).__name__}: {str(err)}") + + def __make_session(self) -> aiohttp.ClientSession: + kwargs: Dict = { + "timeout": aiohttp.ClientTimeout( + connect=self.__timeout, + sock_read=self.__timeout, + ), + } + if self.__unix_path: + kwargs["connector"] = aiohttp.UnixConnector(path=self.__unix_path) + return aiohttp.ClientSession(**kwargs) |