summaryrefslogtreecommitdiff
path: root/kvmd
diff options
context:
space:
mode:
Diffstat (limited to 'kvmd')
-rw-r--r--kvmd/apps/__init__.py25
-rw-r--r--kvmd/apps/vnc/__init__.py48
-rw-r--r--kvmd/apps/vnc/__main__.py24
-rw-r--r--kvmd/apps/vnc/fonts/Azbuka04.ttfbin0 -> 71852 bytes
-rw-r--r--kvmd/apps/vnc/keysym.py97
-rw-r--r--kvmd/apps/vnc/kvmd.py110
-rw-r--r--kvmd/apps/vnc/render.py53
-rw-r--r--kvmd/apps/vnc/rfb.py437
-rw-r--r--kvmd/apps/vnc/server.py307
-rw-r--r--kvmd/apps/vnc/streamer.py83
-rw-r--r--kvmd/keymap.py232
-rw-r--r--kvmd/keymap.py.mako19
12 files changed, 1434 insertions, 1 deletions
diff --git a/kvmd/apps/__init__.py b/kvmd/apps/__init__.py
index c3b3a5f2..8d0ce28f 100644
--- a/kvmd/apps/__init__.py
+++ b/kvmd/apps/__init__.py
@@ -320,4 +320,29 @@ def _get_config_scheme() -> Dict:
"file": Option("/etc/kvmd/ipmipasswd", type=valid_abs_file, unpack_as="path"),
},
},
+
+ "vnc": {
+ "keymap": Option("", type=valid_abs_path),
+
+ "server": {
+ "host": Option("::", type=valid_ip_or_host),
+ "port": Option(5900, type=valid_port),
+ # TODO: timeout
+ "max_clients": Option(10, type=(lambda arg: valid_number(arg, min=1))),
+ },
+
+ "kvmd": {
+ "host": Option("localhost", type=valid_ip_or_host),
+ "port": Option(0, type=valid_port),
+ "unix": Option("", type=valid_abs_path, only_if="!port", unpack_as="unix_path"),
+ "timeout": Option(5.0, type=valid_float_f01),
+ },
+
+ "streamer": {
+ "host": Option("localhost", type=valid_ip_or_host),
+ "port": Option(0, type=valid_port),
+ "unix": Option("", type=valid_abs_path, only_if="!port", unpack_as="unix_path"),
+ "timeout": Option(5.0, type=valid_float_f01),
+ },
+ },
}
diff --git a/kvmd/apps/vnc/__init__.py b/kvmd/apps/vnc/__init__.py
new file mode 100644
index 00000000..49f7fcbe
--- /dev/null
+++ b/kvmd/apps/vnc/__init__.py
@@ -0,0 +1,48 @@
+# ========================================================================== #
+# #
+# KVMD - The main Pi-KVM daemon. #
+# #
+# Copyright (C) 2020 Maxim Devaev <[email protected]> #
+# #
+# This program is free software: you can redistribute it and/or modify #
+# it under the terms of the GNU General Public License as published by #
+# the Free Software Foundation, either version 3 of the License, or #
+# (at your option) any later version. #
+# #
+# This program is distributed in the hope that it will be useful, #
+# but WITHOUT ANY WARRANTY; without even the implied warranty of #
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
+# GNU General Public License for more details. #
+# #
+# You should have received a copy of the GNU General Public License #
+# along with this program. If not, see <https://www.gnu.org/licenses/>. #
+# #
+# ========================================================================== #
+
+
+from typing import List
+from typing import Optional
+
+from .. import init
+
+from .kvmd import KvmdClient
+from .streamer import StreamerClient
+from .server import VncServer
+from .keysym import build_symmap
+
+
+# =====
+def main(argv: Optional[List[str]]=None) -> None:
+ config = init(
+ prog="kvmd-vnc",
+ description="VNC to KVMD proxy",
+ argv=argv,
+ )[2].vnc
+
+ # pylint: disable=protected-access
+ VncServer(
+ kvmd=KvmdClient(**config.kvmd._unpack()),
+ streamer=StreamerClient(**config.streamer._unpack()),
+ symmap=build_symmap(config.keymap),
+ **config.server._unpack(),
+ ).run()
diff --git a/kvmd/apps/vnc/__main__.py b/kvmd/apps/vnc/__main__.py
new file mode 100644
index 00000000..689fbbd1
--- /dev/null
+++ b/kvmd/apps/vnc/__main__.py
@@ -0,0 +1,24 @@
+# ========================================================================== #
+# #
+# KVMD - The main Pi-KVM daemon. #
+# #
+# Copyright (C) 2020 Maxim Devaev <[email protected]> #
+# #
+# This program is free software: you can redistribute it and/or modify #
+# it under the terms of the GNU General Public License as published by #
+# the Free Software Foundation, either version 3 of the License, or #
+# (at your option) any later version. #
+# #
+# This program is distributed in the hope that it will be useful, #
+# but WITHOUT ANY WARRANTY; without even the implied warranty of #
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
+# GNU General Public License for more details. #
+# #
+# You should have received a copy of the GNU General Public License #
+# along with this program. If not, see <https://www.gnu.org/licenses/>. #
+# #
+# ========================================================================== #
+
+
+from . import main
+main()
diff --git a/kvmd/apps/vnc/fonts/Azbuka04.ttf b/kvmd/apps/vnc/fonts/Azbuka04.ttf
new file mode 100644
index 00000000..16ade315
--- /dev/null
+++ b/kvmd/apps/vnc/fonts/Azbuka04.ttf
Binary files differ
diff --git a/kvmd/apps/vnc/keysym.py b/kvmd/apps/vnc/keysym.py
new file mode 100644
index 00000000..5f35c28b
--- /dev/null
+++ b/kvmd/apps/vnc/keysym.py
@@ -0,0 +1,97 @@
+# ========================================================================== #
+# #
+# KVMD - The main Pi-KVM daemon. #
+# #
+# Copyright (C) 2020 Maxim Devaev <[email protected]> #
+# #
+# This program is free software: you can redistribute it and/or modify #
+# it under the terms of the GNU General Public License as published by #
+# the Free Software Foundation, either version 3 of the License, or #
+# (at your option) any later version. #
+# #
+# This program is distributed in the hope that it will be useful, #
+# but WITHOUT ANY WARRANTY; without even the implied warranty of #
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
+# GNU General Public License for more details. #
+# #
+# You should have received a copy of the GNU General Public License #
+# along with this program. If not, see <https://www.gnu.org/licenses/>. #
+# #
+# ========================================================================== #
+
+
+import pkgutil
+import functools
+
+from typing import Dict
+
+import Xlib.keysymdef
+
+from ...logging import get_logger
+
+from ... import keymap
+
+
+# =====
+def build_symmap(path: str) -> Dict[int, str]:
+ # https://github.com/qemu/qemu/blob/95a9457fd44ad97c518858a4e1586a5498f9773c/ui/keymaps.c
+
+ symmap: Dict[int, str] = {}
+ for (x11_code, at1_code) in keymap.X11_TO_AT1.items():
+ symmap[x11_code] = keymap.AT1_TO_WEB[at1_code]
+
+ for (x11_code, at1_code) in _read_keyboard_layout(path).items():
+ if (web_name := keymap.AT1_TO_WEB.get(at1_code)) is not None: # noqa: E203,E231
+ # mypy bug
+ symmap[x11_code] = web_name # type: ignore
+ return symmap
+
+
+# =====
+def _get_keysyms() -> Dict[str, int]:
+ keysyms: Dict[str, int] = {}
+ for (loader, module_name, _) in pkgutil.walk_packages(Xlib.keysymdef.__path__):
+ module = loader.find_module(module_name).load_module(module_name)
+ for keysym_name in dir(module):
+ if keysym_name.startswith("XK_"):
+ short_name = keysym_name[3:]
+ if short_name.startswith("XF86_"):
+ short_name = "XF86" + short_name[5:]
+ # assert short_name not in keysyms, short_name
+ keysyms[short_name] = int(getattr(module, keysym_name))
+ return keysyms
+
+
+def _resolve_keysym(name: str) -> int:
+ code = _get_keysyms().get(name)
+ if code is not None:
+ return code
+ if len(name) == 5 and name[0] == "U": # Try unicode Uxxxx
+ try:
+ return int(name[1:], 16)
+ except ValueError:
+ pass
+ return 0
+
+
+def _read_keyboard_layout(path: str) -> Dict[int, int]: # Keysym to evdev (at1)
+ logger = get_logger(0)
+ logger.info("Reading keyboard layout %s ...", path)
+
+ with open(path) as layout_file:
+ lines = list(map(str.strip, layout_file.read().split("\n")))
+
+ layout: Dict[int, int] = {}
+ for (number, line) in enumerate(lines):
+ if len(line) == 0 or line.startswith(("#", "map ", "include ")):
+ continue
+
+ parts = line.split()
+ if len(parts) >= 2:
+ if (code := _resolve_keysym(parts[0])) != 0: # noqa: E203,E231
+ try:
+ layout[code] = int(parts[1], 16)
+ except ValueError as err:
+ logger.error("Can't parse layout line #%d: %s", number, str(err))
+ return layout
diff --git a/kvmd/apps/vnc/kvmd.py b/kvmd/apps/vnc/kvmd.py
new file mode 100644
index 00000000..d921ad43
--- /dev/null
+++ b/kvmd/apps/vnc/kvmd.py
@@ -0,0 +1,110 @@
+# ========================================================================== #
+# #
+# KVMD - The main Pi-KVM daemon. #
+# #
+# Copyright (C) 2020 Maxim Devaev <[email protected]> #
+# #
+# This program is free software: you can redistribute it and/or modify #
+# it under the terms of the GNU General Public License as published by #
+# the Free Software Foundation, either version 3 of the License, or #
+# (at your option) any later version. #
+# #
+# This program is distributed in the hope that it will be useful, #
+# but WITHOUT ANY WARRANTY; without even the implied warranty of #
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
+# GNU General Public License for more details. #
+# #
+# You should have received a copy of the GNU General Public License #
+# along with this program. If not, see <https://www.gnu.org/licenses/>. #
+# #
+# ========================================================================== #
+
+
+import contextlib
+
+from typing import Dict
+
+import aiohttp
+
+from ...logging import get_logger
+
+from ... import __version__
+
+
+# =====
+class KvmdClient:
+ def __init__(
+ self,
+ host: str,
+ port: int,
+ unix_path: str,
+ timeout: float,
+ ) -> None:
+
+ assert port or unix_path
+ self.__host = host
+ self.__port = port
+ self.__unix_path = unix_path
+ self.__timeout = timeout
+
+ # =====
+
+ async def authorize(self, user: str, passwd: str) -> bool:
+ try:
+ async with self.__make_session(user, passwd) as session:
+ async with session.get(
+ url=f"http://{self.__host}:{self.__port}/auth/check",
+ timeout=self.__timeout,
+ ) as response:
+ response.raise_for_status()
+ if response.status == 200:
+ return True
+ raise RuntimeError(f"Invalid OK response: {response.status} {await response.text()}")
+ except aiohttp.ClientResponseError as err:
+ if err.status in [401, 403]:
+ return False
+ get_logger(0).exception("Can't check user access")
+ except Exception:
+ get_logger(0).exception("Can't check user access")
+ return False
+
+ @contextlib.asynccontextmanager
+ async def ws(self, user: str, passwd: str) -> aiohttp.ClientWebSocketResponse: # pylint: disable=invalid-name
+ async with self.__make_session(user, passwd) as session:
+ async with session.ws_connect(
+ url=f"http://{self.__host}:{self.__port}/ws",
+ timeout=self.__timeout,
+ ) as ws:
+ yield ws
+
+ async def set_streamer_params(self, user: str, passwd: str, quality: int=-1, desired_fps: int=-1) -> None:
+ params = {
+ key: value
+ for (key, value) in [
+ ("quality", quality),
+ ("desired_fps", desired_fps),
+ ]
+ if value >= 0
+ }
+ if params:
+ async with self.__make_session(user, passwd) as session:
+ async with session.post(
+ url=f"http://{self.__host}:{self.__port}/streamer/set_params",
+ timeout=self.__timeout,
+ params=params,
+ ) as response:
+ response.raise_for_status()
+
+ # =====
+
+ def __make_session(self, user: str, passwd: str) -> aiohttp.ClientSession:
+ kwargs: Dict = {
+ "headers": {
+ "X-KVMD-User": user,
+ "X-KVMD-Passwd": passwd,
+ "User-Agent": f"KVMD-VNC/{__version__}",
+ },
+ }
+ if self.__unix_path:
+ kwargs["connector"] = aiohttp.UnixConnector(path=self.__unix_path)
+ return aiohttp.ClientSession(**kwargs)
diff --git a/kvmd/apps/vnc/render.py b/kvmd/apps/vnc/render.py
new file mode 100644
index 00000000..f65f1405
--- /dev/null
+++ b/kvmd/apps/vnc/render.py
@@ -0,0 +1,53 @@
+# ========================================================================== #
+# #
+# KVMD - The main Pi-KVM daemon. #
+# #
+# Copyright (C) 2020 Maxim Devaev <[email protected]> #
+# #
+# This program is free software: you can redistribute it and/or modify #
+# it under the terms of the GNU General Public License as published by #
+# the Free Software Foundation, either version 3 of the License, or #
+# (at your option) any later version. #
+# #
+# This program is distributed in the hope that it will be useful, #
+# but WITHOUT ANY WARRANTY; without even the implied warranty of #
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
+# GNU General Public License for more details. #
+# #
+# You should have received a copy of the GNU General Public License #
+# along with this program. If not, see <https://www.gnu.org/licenses/>. #
+# #
+# ========================================================================== #
+
+
+import sys
+import os
+import io
+import functools
+
+from PIL import Image
+from PIL import ImageDraw
+from PIL import ImageFont
+
+from ... import aiotools
+
+
+# =====
+async def make_text_jpeg(width: int, height: int, quality: int, text: str) -> bytes:
+ return (await aiotools.run_async(_inner_make_text_jpeg, width, height, quality, text))
+
+
[email protected]_cache(maxsize=10)
+def _inner_make_text_jpeg(width: int, height: int, quality: int, text: str) -> bytes:
+ image = Image.new("RGB", (width, height), color=(0, 0, 0))
+ draw = ImageDraw.Draw(image)
+ draw.multiline_text((20, 20), text, font=_get_font(), fill=(255, 255, 255))
+ bio = io.BytesIO()
+ image.save(bio, format="jpeg", quality=quality)
+ return bio.getvalue()
+
+
+def _get_font() -> ImageFont.FreeTypeFont:
+ path = os.path.join(os.path.dirname(sys.modules[__name__].__file__), "fonts", "Azbuka04.ttf")
+ return ImageFont.truetype(path, size=20)
diff --git a/kvmd/apps/vnc/rfb.py b/kvmd/apps/vnc/rfb.py
new file mode 100644
index 00000000..450024c4
--- /dev/null
+++ b/kvmd/apps/vnc/rfb.py
@@ -0,0 +1,437 @@
+# ========================================================================== #
+# #
+# KVMD - The main Pi-KVM daemon. #
+# #
+# Copyright (C) 2020 Maxim Devaev <[email protected]> #
+# #
+# This program is free software: you can redistribute it and/or modify #
+# it under the terms of the GNU General Public License as published by #
+# the Free Software Foundation, either version 3 of the License, or #
+# (at your option) any later version. #
+# #
+# This program is distributed in the hope that it will be useful, #
+# but WITHOUT ANY WARRANTY; without even the implied warranty of #
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
+# GNU General Public License for more details. #
+# #
+# You should have received a copy of the GNU General Public License #
+# along with this program. If not, see <https://www.gnu.org/licenses/>. #
+# #
+# ========================================================================== #
+
+
+import asyncio
+import struct
+import dataclasses
+
+from typing import Tuple
+from typing import Dict
+from typing import FrozenSet
+from typing import Coroutine
+from typing import Any
+
+from ...logging import get_logger
+
+from ... import aiotools
+
+
+# =====
+class RfbError(Exception):
+ pass
+
+
+class RfbConnectionError(RfbError):
+ def __init__(self, err: Exception) -> None:
+ super().__init__(f"Gone ({type(err).__name__})")
+
+
+# =====
+_ENCODING_RESIZE = -223 # DesktopSize Pseudo-encoding
+_ENCODING_RENAME = -307 # DesktopName Pseudo-encoding
+_ENCODING_LEDS_STATE = -261 # LED State Pseudo-encoding
+
+_ENCODING_TIGHT = 7
+_ENCODING_TIGHT_JPEG_QUALITIES = dict(zip( # JPEG Quality Level Pseudo-encoding
+ [-32, -31, -30, -29, -28, -27, -26, -25, -24, -23],
+ [10, 20, 30, 40, 50, 60, 70, 80, 90, 100],
+))
+
+
[email protected](frozen=True)
+class _Encodings:
+ encodings: FrozenSet[int]
+
+ has_resize: bool = dataclasses.field(default=False)
+ has_rename: bool = dataclasses.field(default=False)
+ has_leds_state: bool = dataclasses.field(default=False)
+
+ has_tight: bool = dataclasses.field(default=False)
+ tight_jpeg_quality: int = dataclasses.field(default=0)
+
+ def __post_init__(self) -> None:
+ self.__set("has_resize", (_ENCODING_RESIZE in self.encodings))
+ self.__set("has_rename", (_ENCODING_RENAME in self.encodings))
+ self.__set("has_leds_state", (_ENCODING_LEDS_STATE in self.encodings))
+
+ self.__set("has_tight", (_ENCODING_TIGHT in self.encodings))
+ self.__set("tight_jpeg_quality", self.__get_tight_jpeg_quality())
+
+ def __set(self, key: str, value: Any) -> None:
+ object.__setattr__(self, key, value)
+
+ def __get_tight_jpeg_quality(self) -> int:
+ if _ENCODING_TIGHT in self.encodings:
+ qualities = self.encodings.intersection(_ENCODING_TIGHT_JPEG_QUALITIES)
+ if qualities:
+ return _ENCODING_TIGHT_JPEG_QUALITIES[max(qualities)]
+ return 0
+
+
+class RfbClient: # pylint: disable=too-many-instance-attributes
+ # https://github.com/rfbproto/rfbproto/blob/master/rfbproto.rst
+ # https://www.toptal.com/java/implementing-remote-framebuffer-server-java
+ # https://github.com/TigerVNC/tigervnc
+
+ def __init__(
+ self,
+ reader: asyncio.StreamReader,
+ writer: asyncio.StreamWriter,
+
+ width: int,
+ height: int,
+ name: str,
+ ) -> None:
+
+ self.__reader = reader
+ self.__writer = writer
+
+ self._remote = "[%s]:%d" % (self.__writer.transport.get_extra_info("peername")[:2])
+
+ self._width = width
+ self._height = height
+ self._name = name
+
+ self._encodings = _Encodings(frozenset())
+
+ self._lock = asyncio.Lock()
+
+ get_logger(0).info("Connected client: %s", self._remote)
+
+ # =====
+
+ async def _run(self, **coros: Coroutine) -> None:
+ tasks = list(map(asyncio.create_task, [
+ self.__wrapper(name, coro)
+ for (name, coro) in {"main": self.__main_task_loop(), **coros}.items()
+ ]))
+ try:
+ await aiotools.wait_first(*tasks)
+ finally:
+ for task in tasks:
+ task.cancel()
+
+ async def __wrapper(self, name: str, coro: Coroutine) -> None:
+ logger = get_logger(0)
+ try:
+ await coro
+ raise RuntimeError("Subtask just finished without any exception")
+ except asyncio.CancelledError:
+ logger.info("[%s] Client %s: Cancelling ...", name, self._remote)
+ raise
+ except RfbError as err:
+ logger.info("[%s] Client %s: %s: Disconnected", name, self._remote, str(err))
+ except Exception:
+ logger.exception("[%s] Unhandled exception with client %s: Disconnected", name, self._remote)
+
+ async def __main_task_loop(self) -> None:
+ try:
+ rfb_version = await self.__handshake_version()
+ await self.__handshake_security(rfb_version)
+ await self.__handshake_init()
+ await self.__main_loop()
+ finally:
+ try:
+ self.__writer.close()
+ except Exception:
+ pass
+
+ # =====
+
+ async def _authorize(self, user: str, passwd: str) -> bool:
+ raise NotImplementedError
+
+ async def _on_key_event(self, code: int, state: bool) -> None:
+ raise NotImplementedError
+
+ async def _on_pointer_event(self, buttons: Dict[str, bool], wheel: Dict[str, int], move: Dict[str, int]) -> None:
+ raise NotImplementedError
+
+ async def _on_cut_event(self, text: str) -> None:
+ raise NotImplementedError
+
+ async def _on_set_encodings(self) -> None:
+ raise NotImplementedError
+
+ async def _on_fb_update_request(self) -> None:
+ raise NotImplementedError
+
+ # =====
+
+ async def _send_fb(self, jpeg: bytes) -> None:
+ assert self._encodings.has_tight
+ assert self._encodings.tight_jpeg_quality > 0
+ assert len(jpeg) <= 4194303, len(jpeg)
+ await self.__write_fb_update(self._width, self._height, _ENCODING_TIGHT, drain=False)
+ length = len(jpeg)
+ if length <= 127:
+ await self.__write_struct("", bytes([0b10011111, length & 0x7F]), jpeg)
+ elif length <= 16383:
+ await self.__write_struct("", bytes([0b10011111, length & 0x7F | 0x80, length >> 7 & 0x7F]), jpeg)
+ else:
+ await self.__write_struct("", bytes([0b10011111, length & 0x7F | 0x80, length >> 7 & 0x7F | 0x80, length >> 14 & 0xFF]), jpeg)
+
+ async def _send_resize(self, width: int, height: int) -> None:
+ assert self._encodings.has_resize
+ await self.__write_fb_update(width, height, _ENCODING_RESIZE)
+ self._width = width
+ self._height = height
+
+ async def _send_rename(self, name: str) -> None:
+ assert self._encodings.has_rename
+ await self.__write_fb_update(0, 0, _ENCODING_RENAME, drain=False)
+ await self.__write_reason(name)
+ self._name = name
+
+ async def _send_leds_state(self, caps: bool, scroll: bool, num: bool) -> None:
+ assert self._encodings.has_leds_state
+ await self.__write_fb_update(0, 0, _ENCODING_LEDS_STATE, drain=False)
+ await self.__write_struct("B", 0x1 & scroll | 0x2 & num | 0x4 & caps)
+
+ # =====
+
+ async def __handshake_version(self) -> int:
+ # The only published protocol versions at this time are 3.3, 3.7, 3.8.
+ # Version 3.5 was wrongly reported by some clients, but it should be
+ # interpreted by all servers as 3.3
+
+ await self.__write_struct("", b"RFB 003.008\n")
+
+ response = await self.__read_text(12)
+ if (
+ not response.startswith("RFB 003.00")
+ or not response.endswith("\n")
+ or response[-2] not in ["3", "5", "7", "8"]
+ ):
+ raise RfbError(f"Invalid version response: {response!r}")
+
+ try:
+ version = int(response[-2])
+ except ValueError:
+ raise RfbError(f"Invalid version response: {response!r}")
+ return (3 if version == 5 else version)
+
+ # =====
+
+ async def __handshake_security(self, rfb_version: int) -> None:
+ if rfb_version == 3:
+ await self.__handshake_security_v3(rfb_version)
+ else:
+ await self.__handshake_security_v7_plus(rfb_version)
+
+ async def __handshake_security_v3(self, rfb_version: int) -> None:
+ assert rfb_version == 3
+
+ await self.__write_struct("L", 0, drain=False) # Refuse old clients using the invalid security type
+ msg = "The client uses a very old protocol 3.3; required 3.7 at least"
+ await self.__write_reason(msg)
+ raise RfbError(msg)
+
+ async def __handshake_security_v7_plus(self, rfb_version: int) -> None:
+ assert rfb_version >= 7
+
+ vencrypt = 19
+ await self.__write_struct("B B", 1, vencrypt) # One security type, VeNCrypt
+
+ security_type = await self.__read_number("B")
+ if security_type != vencrypt:
+ raise RfbError(f"Invalid security type: {security_type}; expected VeNCrypt({vencrypt})")
+
+ # -----
+
+ await self.__write_struct("BB", 0, 2) # VeNCrypt 0.2
+
+ vencrypt_version = "%d.%d" % (await self.__read_struct("BB"))
+ if vencrypt_version != "0.2":
+ await self.__write_struct("B", 1) # Unsupported
+ raise RfbError(f"Unsupported VeNCrypt version: {vencrypt_version}")
+
+ await self.__write_struct("B", 0)
+
+ # -----
+
+ plain = 256
+ await self.__write_struct("B L", 1, plain) # One auth subtype, plain
+
+ auth_type = await self.__read_number("L")
+ if auth_type != plain:
+ raise RfbError(f"Invalid auth type: {auth_type}; expected Plain({plain})")
+
+ # -----
+
+ (user_length, passwd_length) = await self.__read_struct("LL")
+ user = await self.__read_text(user_length)
+ passwd = await self.__read_text(passwd_length)
+
+ if (await self._authorize(user, passwd)):
+ get_logger(0).info("[main] Client %s: Access granted for user %r", self._remote, user)
+ await self.__write_struct("L", 0)
+ else:
+ await self.__write_struct("L", 1, drain=(rfb_version < 8))
+ if rfb_version >= 8:
+ await self.__write_reason("Invalid username or password")
+ raise RfbError(f"Access denied for user {user!r}")
+
+ # =====
+
+ async def __handshake_init(self) -> None:
+ await self.__read_number("B") # Shared flag, ignored
+
+ await self.__write_struct("HH", self._width, self._height, drain=False)
+ await self.__write_struct(
+ "BB?? HHH BBB xxx",
+ 32, # Bits per pixel
+ 24, # Depth
+ False, # Big endian
+ True, # True color
+ 255, # Red max
+ 255, # Green max
+ 255, # Blue max
+ 16, # Red shift
+ 8, # Green shift
+ 0, # Blue shift
+ drain=False,
+ )
+ await self.__write_reason(self._name)
+
+ # =====
+
+ async def __main_loop(self) -> None:
+ while True:
+ msg_type = await self.__read_number("B")
+
+ async with self._lock:
+ if msg_type == 0: # SetPixelFormat
+ # JpegCompression may only be used when bits-per-pixel is either 16 or 32
+ bits_per_pixel = (await self.__read_struct("xxx BB?? HHH BBB xxx"))[0]
+ if bits_per_pixel not in [16, 32]:
+ raise RfbError(f"Requested unsupported {bits_per_pixel=} for Tight JPEG; required 16 or 32")
+
+ elif msg_type == 2: # SetEncodings
+ encodings_count = (await self.__read_struct("x H"))[0]
+ if encodings_count > 1024:
+ raise RfbError(f"Too many encodings: {encodings_count}")
+ self._encodings = _Encodings(frozenset(await self.__read_struct("l" * encodings_count)))
+ self.__check_tight_jpeg()
+ await self._on_set_encodings()
+
+ elif msg_type == 3: # FramebufferUpdateRequest
+ self.__check_tight_jpeg() # If we don't receive SetEncodings from client
+ await self.__read_struct("? HH HH") # Ignore any arguments, just perform the full update
+ await self._on_fb_update_request()
+
+ elif msg_type == 4: # KeyEvent
+ (state, code) = await self.__read_struct("? xx L")
+ await self._on_key_event(code, state) # type: ignore
+
+ elif msg_type == 5: # PointerEvent
+ (buttons, to_x, to_y) = await self.__read_struct("B HH")
+ await self._on_pointer_event(
+ buttons={
+ "left": bool(buttons & 0x1),
+ "right": bool(buttons & 0x4),
+ "middle": bool(buttons & 0x2),
+ },
+ wheel={
+ "x": (32 if buttons & 0x40 else (-32 if buttons & 0x20 else 0)),
+ "y": (32 if buttons & 0x10 else (-32 if buttons & 0x8 else 0)),
+ },
+ move={
+ "x": round(to_x / self._width * 65535 + -32768),
+ "y": round(to_y / self._width * 65535 + -32768),
+ },
+ )
+
+ elif msg_type == 6: # ClientCutText
+ await self._on_cut_event(await self.__read_text((await self.__read_struct("xxx L"))[0]))
+
+ else:
+ raise RfbError(f"Unknown message type: {msg_type}")
+
+ def __check_tight_jpeg(self) -> None:
+ # JpegCompression may only be used when the client has advertized
+ # a quality level using the JPEG Quality Level Pseudo-encoding
+ if not self._encodings.has_tight or self._encodings.tight_jpeg_quality == 0:
+ raise RfbError(f"Tight JPEG encoding is not supported by client: {self._encodings}")
+
+ # =====
+
+ async def __read_number(self, fmt: str) -> int:
+ assert len(fmt) == 1
+ try:
+ if fmt == "B":
+ return (await self.__reader.readexactly(1))[0]
+ else:
+ fmt = f">{fmt}"
+ return struct.unpack(fmt, await self.__reader.readexactly(struct.calcsize(fmt)))[0]
+ except (ConnectionError, asyncio.IncompleteReadError) as err:
+ raise RfbConnectionError(err)
+
+ async def __read_struct(self, fmt: str) -> Tuple[int, ...]:
+ assert len(fmt) > 1
+ try:
+ fmt = f">{fmt}"
+ return struct.unpack(fmt, (await self.__reader.readexactly(struct.calcsize(fmt))))
+ except (ConnectionError, asyncio.IncompleteReadError) as err:
+ raise RfbConnectionError(err)
+
+ async def __read_text(self, length: int) -> str:
+ try:
+ return (await self.__reader.readexactly(length)).decode("utf-8", errors="ignore")
+ except (ConnectionError, asyncio.IncompleteReadError) as err:
+ raise RfbConnectionError(err)
+
+ # =====
+
+ async def __write_struct(self, fmt: str, *values: Any, drain: bool=True) -> None:
+ try:
+ if not fmt:
+ for value in values:
+ self.__writer.write(value)
+ elif fmt == "B":
+ assert len(values) == 1
+ self.__writer.write(bytes([values[0]]))
+ else:
+ self.__writer.write(struct.pack(f">{fmt}", *values))
+ if drain:
+ await self.__writer.drain()
+ except ConnectionError as err:
+ raise RfbConnectionError(err)
+
+ async def __write_reason(self, text: str, drain: bool=True) -> None:
+ encoded = text.encode("utf-8", errors="ignore")
+ await self.__write_struct("L", len(encoded), drain=False)
+ try:
+ self.__writer.write(encoded)
+ if drain:
+ await self.__writer.drain()
+ except ConnectionError as err:
+ raise RfbConnectionError(err)
+
+ async def __write_fb_update(self, width: int, height: int, encoding: int, drain: bool=True) -> None:
+ await self.__write_struct(
+ "BxH HH HH l",
+ 0, # FB update
+ 1, # Number of rects
+ 0, 0, width, height, encoding,
+ drain=drain,
+ )
diff --git a/kvmd/apps/vnc/server.py b/kvmd/apps/vnc/server.py
new file mode 100644
index 00000000..8ce78d41
--- /dev/null
+++ b/kvmd/apps/vnc/server.py
@@ -0,0 +1,307 @@
+# ========================================================================== #
+# #
+# KVMD - The main Pi-KVM daemon. #
+# #
+# Copyright (C) 2020 Maxim Devaev <[email protected]> #
+# #
+# This program is free software: you can redistribute it and/or modify #
+# it under the terms of the GNU General Public License as published by #
+# the Free Software Foundation, either version 3 of the License, or #
+# (at your option) any later version. #
+# #
+# This program is distributed in the hope that it will be useful, #
+# but WITHOUT ANY WARRANTY; without even the implied warranty of #
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
+# GNU General Public License for more details. #
+# #
+# You should have received a copy of the GNU General Public License #
+# along with this program. If not, see <https://www.gnu.org/licenses/>. #
+# #
+# ========================================================================== #
+
+
+import asyncio
+import asyncio.queues
+import socket
+import dataclasses
+import json
+
+from typing import Dict
+from typing import Optional
+
+import aiohttp
+
+from ...logging import get_logger
+
+from ... import aiotools
+
+from .rfb import RfbClient
+
+from .kvmd import KvmdClient
+
+from .streamer import StreamerError
+from .streamer import StreamerClient
+
+from .render import make_text_jpeg
+
+
+# =====
+class _SharedParams:
+ width: int = dataclasses.field(default=800)
+ height: int = dataclasses.field(default=600)
+ name: str = dataclasses.field(default="Pi-KVM")
+
+
+class _Client(RfbClient): # pylint: disable=too-many-instance-attributes
+ def __init__(
+ self,
+ reader: asyncio.StreamReader,
+ writer: asyncio.StreamWriter,
+
+ kvmd: KvmdClient,
+ streamer: StreamerClient,
+
+ symmap: Dict[int, str],
+
+ shared_params: _SharedParams,
+ ) -> None:
+
+ super().__init__(reader, writer, **dataclasses.asdict(shared_params))
+
+ self.__kvmd = kvmd
+ self.__streamer = streamer
+ self.__symmap = symmap
+ self.__shared_params = shared_params
+
+ self.__authorized = asyncio.Future() # type: ignore
+ self.__ws_connected = asyncio.Future() # type: ignore
+ self.__ws_writer_queue: asyncio.queues.Queue = asyncio.Queue()
+
+ self.__fb_requested = False
+ self.__fb_stub_text = ""
+ self.__fb_stub_quality = 0
+
+ # Эти состояния шарить не обязательно - бекенд исключает дублирующиеся события.
+ # Все это нужно только чтобы не посылать лишние жсоны в сокет KVMD
+ self.__mouse_buttons: Dict[str, Optional[bool]] = {"left": None, "right": None, "middle": None}
+ self.__mouse_move = {"x": -1, "y": -1}
+
+ # =====
+
+ async def run(self) -> None:
+ await self._run(
+ kvmd=self.__kvmd_task_loop(),
+ streamer=self.__streamer_task_loop(),
+ )
+
+ # =====
+
+ async def __kvmd_task_loop(self) -> None:
+ logger = get_logger(0)
+
+ await self.__authorized
+ (user, passwd) = self.__authorized.result()
+
+ async with self.__kvmd.ws(user, passwd) as ws:
+ logger.info("[kvmd] Client %s: Connected to KVMD websocket", self._remote)
+ self.__ws_connected.set_result(None)
+
+ receive_task: Optional[asyncio.Task] = None
+ writer_task: Optional[asyncio.Task] = None
+ try:
+ while True:
+ if receive_task is None:
+ receive_task = asyncio.create_task(ws.receive())
+ if writer_task is None:
+ writer_task = asyncio.create_task(self.__ws_writer_queue.get())
+
+ done = (await aiotools.wait_first(receive_task, writer_task))[0]
+
+ if receive_task in done:
+ msg = receive_task.result()
+ if msg.type == aiohttp.WSMsgType.TEXT:
+ await self.__process_ws_event(json.loads(msg.data))
+ else:
+ raise RuntimeError(f"Unknown WS message type: {msg!r}")
+ receive_task = None
+
+ if writer_task in done:
+ await ws.send_str(json.dumps(writer_task.result()))
+ writer_task = None
+ finally:
+ if receive_task:
+ receive_task.cancel()
+ if writer_task:
+ writer_task.cancel()
+
+ async def __process_ws_event(self, event: Dict) -> None:
+ if event["event_type"] == "info_state":
+ host = event["event"]["meta"].get("server", {}).get("host")
+ if isinstance(host, str):
+ name = f"Pi-KVM: {host}"
+ async with self._lock:
+ if self._encodings.has_rename:
+ await self._send_rename(name)
+ self.__shared_params.name = name
+
+ elif event["event_type"] == "hid_state":
+ async with self._lock:
+ if self._encodings.has_leds_state:
+ await self._send_leds_state(**event["event"]["keyboard"]["leds"])
+
+ # =====
+
+ async def __streamer_task_loop(self) -> None:
+ logger = get_logger(0)
+ await self.__ws_connected
+ while True:
+ try:
+ streaming = False
+ async for (online, width, height, jpeg) in self.__streamer.read():
+ if not streaming:
+ logger.info("[streamer] Client %s: Streaming ...", self._remote)
+ streaming = True
+ if online:
+ await self.__send_fb_real(width, height, jpeg)
+ else:
+ await self.__send_fb_stub("No signal")
+ except StreamerError as err:
+ logger.info("[streamer] Client %s: Waiting for stream: %s", self._remote, str(err))
+ await self.__send_fb_stub("Waiting for stream ...")
+ await asyncio.sleep(1)
+
+ async def __send_fb_real(self, width: int, height: int, jpeg: bytes) -> None:
+ async with self._lock:
+ if self.__fb_requested:
+ if (self._width, self._height) != (width, height):
+ self.__shared_params.width = width
+ self.__shared_params.height = height
+ if not self._encodings.has_resize:
+ msg = f"Resoultion changed: {self._width}x{self._height} -> {width}x{height}\nPlease reconnect"
+ await self.__send_fb_stub(msg, no_lock=True)
+ return
+ await self._send_resize(width, height)
+ await self._send_fb(jpeg)
+ self.__fb_stub_text = ""
+ self.__fb_stub_quality = 0
+ self.__fb_requested = False
+
+ async def __send_fb_stub(self, text: str, no_lock: bool=False) -> None:
+ if not no_lock:
+ await self._lock.acquire()
+ try:
+ if self.__fb_requested and (self.__fb_stub_text != text or self.__fb_stub_quality != self._encodings.tight_jpeg_quality):
+ await self._send_fb(await make_text_jpeg(self._width, self._height, self._encodings.tight_jpeg_quality, text))
+ self.__fb_stub_text = text
+ self.__fb_stub_quality = self._encodings.tight_jpeg_quality
+ self.__fb_requested = False
+ finally:
+ if not no_lock:
+ self._lock.release()
+
+ # =====
+
+ async def _authorize(self, user: str, passwd: str) -> bool:
+ if (await self.__kvmd.authorize(user, passwd)):
+ self.__authorized.set_result((user, passwd))
+ return True
+ return False
+
+ async def _on_key_event(self, code: int, state: bool) -> None:
+ print("KeyEvent", code, state, self.__symmap.get(code)) # TODO
+
+ async def _on_pointer_event(self, buttons: Dict[str, bool], wheel: Dict[str, int], move: Dict[str, int]) -> None:
+ for (button, state) in buttons.items():
+ if self.__mouse_buttons[button] != state:
+ await self.__ws_writer_queue.put({
+ "event_type": "mouse_button",
+ "event": {"button": button, "state": state},
+ })
+ self.__mouse_buttons[button] = state
+
+ if wheel["x"] or wheel["y"]:
+ await self.__ws_writer_queue.put({
+ "event_type": "mouse_wheel",
+ "event": {"delta": wheel},
+ })
+
+ if self.__mouse_move != move:
+ await self.__ws_writer_queue.put({
+ "event_type": "mouse_move",
+ "event": {"to": move},
+ })
+ self.__mouse_move = move
+
+ async def _on_cut_event(self, text: str) -> None:
+ print("CutEvent", text) # TODO
+
+ async def _on_set_encodings(self) -> None:
+ assert self.__authorized.done()
+ (user, passwd) = self.__authorized.result()
+ (quality, desired_fps) = (self._encodings.tight_jpeg_quality, 30)
+ get_logger(0).info("[main] Client %s: Applying streamer params: quality=%d%%; desired_fps=%d ...",
+ self._remote, quality, desired_fps)
+ await self.__kvmd.set_streamer_params(user, passwd, quality=quality, desired_fps=desired_fps)
+
+ async def _on_fb_update_request(self) -> None:
+ self.__fb_requested = True
+
+
+# =====
+class VncServer:
+ def __init__(
+ self,
+ host: str,
+ port: int,
+ max_clients: int,
+
+ kvmd: KvmdClient,
+ streamer: StreamerClient,
+
+ symmap: Dict[int, str],
+ ) -> None:
+
+ self.__host = host
+ self.__port = port
+ self.__max_clients = max_clients
+
+ self.__kvmd = kvmd
+ self.__streamer = streamer
+
+ self.__symmap = symmap
+
+ self.__shared_params = _SharedParams()
+
+ def run(self) -> None:
+ logger = get_logger(0)
+ logger.info("Listening VNC on TCP [%s]:%d ...", self.__host, self.__port)
+
+ sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
+ sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, False)
+ sock.bind((self.__host, self.__port))
+
+ loop = asyncio.get_event_loop()
+ server = loop.run_until_complete(asyncio.start_server(
+ client_connected_cb=self.__handle_client,
+ sock=sock,
+ backlog=self.__max_clients,
+ loop=loop,
+ ))
+
+ try:
+ loop.run_forever()
+ except (SystemExit, KeyboardInterrupt):
+ pass
+ finally:
+ server.close()
+ loop.run_until_complete(server.wait_closed())
+ tasks = asyncio.Task.all_tasks()
+ for task in tasks:
+ task.cancel()
+ loop.run_until_complete(asyncio.gather(*tasks, return_exceptions=True))
+ loop.close()
+ logger.info("Bye-bye")
+
+ async def __handle_client(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:
+ await _Client(reader, writer, self.__kvmd, self.__streamer, self.__symmap, self.__shared_params).run()
diff --git a/kvmd/apps/vnc/streamer.py b/kvmd/apps/vnc/streamer.py
new file mode 100644
index 00000000..62fce849
--- /dev/null
+++ b/kvmd/apps/vnc/streamer.py
@@ -0,0 +1,83 @@
+# ========================================================================== #
+# #
+# KVMD - The main Pi-KVM daemon. #
+# #
+# Copyright (C) 2020 Maxim Devaev <[email protected]> #
+# #
+# This program is free software: you can redistribute it and/or modify #
+# it under the terms of the GNU General Public License as published by #
+# the Free Software Foundation, either version 3 of the License, or #
+# (at your option) any later version. #
+# #
+# This program is distributed in the hope that it will be useful, #
+# but WITHOUT ANY WARRANTY; without even the implied warranty of #
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
+# GNU General Public License for more details. #
+# #
+# You should have received a copy of the GNU General Public License #
+# along with this program. If not, see <https://www.gnu.org/licenses/>. #
+# #
+# ========================================================================== #
+
+
+from typing import Tuple
+from typing import Dict
+from typing import AsyncGenerator
+
+import aiohttp
+
+from ... import __version__
+
+
+# =====
+class StreamerError(Exception):
+ pass
+
+
+# =====
+class StreamerClient:
+ def __init__(
+ self,
+ host: str,
+ port: int,
+ unix_path: str,
+ timeout: float,
+ ) -> None:
+
+ assert port or unix_path
+ self.__host = host
+ self.__port = port
+ self.__unix_path = unix_path
+ self.__timeout = timeout
+
+ async def read(self) -> AsyncGenerator[Tuple[bool, int, int, bytes], None]:
+ try:
+ async with self.__make_session() as session:
+ async with session.get(
+ url=f"http://{self.__host}:{self.__port}/stream",
+ params={"extra_headers": "1"},
+ headers={"User-Agent": f"KVMD-VNC/{__version__}"},
+ ) as response:
+ response.raise_for_status()
+ reader = aiohttp.MultipartReader.from_response(response)
+ while True:
+ frame = await reader.next()
+ yield (
+ (frame.headers["X-UStreamer-Online"] == "true"),
+ int(frame.headers["X-UStreamer-Width"]),
+ int(frame.headers["X-UStreamer-Height"]),
+ bytes(await frame.read()),
+ )
+ except Exception as err: # Тут бывают и ассерты, и KeyError, и прочая херня из-за корявых исключений в MultipartReader
+ raise StreamerError(f"{type(err).__name__}: {str(err)}")
+
+ def __make_session(self) -> aiohttp.ClientSession:
+ kwargs: Dict = {
+ "timeout": aiohttp.ClientTimeout(
+ connect=self.__timeout,
+ sock_read=self.__timeout,
+ ),
+ }
+ if self.__unix_path:
+ kwargs["connector"] = aiohttp.UnixConnector(path=self.__unix_path)
+ return aiohttp.ClientSession(**kwargs)
diff --git a/kvmd/keymap.py b/kvmd/keymap.py
index a263ea01..6e466f2f 100644
--- a/kvmd/keymap.py
+++ b/kvmd/keymap.py
@@ -398,3 +398,235 @@ KEYMAP: Dict[str, Key] = {
otg=OtgKey(code=101, is_modifier=False),
),
}
+
+
+# =====
+X11_TO_AT1 = {
+ 65307: 1,
+ 33: 2,
+ 49: 2,
+ 50: 3,
+ 64: 3,
+ 35: 4,
+ 51: 4,
+ 36: 5,
+ 52: 5,
+ 37: 6,
+ 53: 6,
+ 54: 7,
+ 94: 7,
+ 38: 8,
+ 55: 8,
+ 42: 9,
+ 56: 9,
+ 40: 10,
+ 57: 10,
+ 41: 11,
+ 48: 11,
+ 45: 12,
+ 95: 12,
+ 43: 13,
+ 61: 13,
+ 65288: 14,
+ 65289: 15,
+ 81: 16,
+ 113: 16,
+ 87: 17,
+ 119: 17,
+ 69: 18,
+ 101: 18,
+ 82: 19,
+ 114: 19,
+ 84: 20,
+ 116: 20,
+ 89: 21,
+ 121: 21,
+ 85: 22,
+ 117: 22,
+ 73: 23,
+ 105: 23,
+ 79: 24,
+ 111: 24,
+ 80: 25,
+ 112: 25,
+ 91: 26,
+ 123: 26,
+ 93: 27,
+ 125: 27,
+ 65293: 28,
+ 65507: 29,
+ 65: 30,
+ 97: 30,
+ 83: 31,
+ 115: 31,
+ 68: 32,
+ 100: 32,
+ 70: 33,
+ 102: 33,
+ 71: 34,
+ 103: 34,
+ 72: 35,
+ 104: 35,
+ 74: 36,
+ 106: 36,
+ 75: 37,
+ 107: 37,
+ 76: 38,
+ 108: 38,
+ 58: 39,
+ 59: 39,
+ 34: 40,
+ 39: 40,
+ 96: 41,
+ 126: 41,
+ 65505: 42,
+ 92: 43,
+ 124: 43,
+ 90: 44,
+ 122: 44,
+ 88: 45,
+ 120: 45,
+ 67: 46,
+ 99: 46,
+ 86: 47,
+ 118: 47,
+ 66: 48,
+ 98: 48,
+ 78: 49,
+ 110: 49,
+ 77: 50,
+ 109: 50,
+ 44: 51,
+ 60: 51,
+ 46: 52,
+ 62: 52,
+ 47: 53,
+ 63: 53,
+ 65506: 54,
+ 65513: 56,
+ 32: 57,
+ 65509: 58,
+ 65470: 59,
+ 65471: 60,
+ 65472: 61,
+ 65473: 62,
+ 65474: 63,
+ 65475: 64,
+ 65476: 65,
+ 65477: 66,
+ 65478: 67,
+ 65479: 68,
+ 65407: 69,
+ 65300: 70,
+ 65301: 84,
+ 65480: 87,
+ 65481: 88,
+ 65508: 57373,
+ 65514: 57400,
+ 65299: 57414,
+ 65360: 57415,
+ 65362: 57416,
+ 65365: 57417,
+ 65361: 57419,
+ 65363: 57421,
+ 65367: 57423,
+ 65364: 57424,
+ 65366: 57425,
+ 65379: 57426,
+ 65535: 57427,
+ 65511: 57435,
+ 65512: 57436,
+ 65383: 57437,
+}
+
+
+AT1_TO_WEB = {
+ 1: "Escape",
+ 2: "Digit1",
+ 3: "Digit2",
+ 4: "Digit3",
+ 5: "Digit4",
+ 6: "Digit5",
+ 7: "Digit6",
+ 8: "Digit7",
+ 9: "Digit8",
+ 10: "Digit9",
+ 11: "Digit0",
+ 12: "Minus",
+ 13: "Equal",
+ 14: "Backspace",
+ 15: "Tab",
+ 16: "KeyQ",
+ 17: "KeyW",
+ 18: "KeyE",
+ 19: "KeyR",
+ 20: "KeyT",
+ 21: "KeyY",
+ 22: "KeyU",
+ 23: "KeyI",
+ 24: "KeyO",
+ 25: "KeyP",
+ 26: "BracketLeft",
+ 27: "BracketRight",
+ 28: "Enter",
+ 29: "ControlLeft",
+ 30: "KeyA",
+ 31: "KeyS",
+ 32: "KeyD",
+ 33: "KeyF",
+ 34: "KeyG",
+ 35: "KeyH",
+ 36: "KeyJ",
+ 37: "KeyK",
+ 38: "KeyL",
+ 39: "Semicolon",
+ 40: "Quote",
+ 41: "Backquote",
+ 42: "ShiftLeft",
+ 43: "Backslash",
+ 44: "KeyZ",
+ 45: "KeyX",
+ 46: "KeyC",
+ 47: "KeyV",
+ 48: "KeyB",
+ 49: "KeyN",
+ 50: "KeyM",
+ 51: "Comma",
+ 52: "Period",
+ 53: "Slash",
+ 54: "ShiftRight",
+ 56: "AltLeft",
+ 57: "Space",
+ 58: "CapsLock",
+ 59: "F1",
+ 60: "F2",
+ 61: "F3",
+ 62: "F4",
+ 63: "F5",
+ 64: "F6",
+ 65: "F7",
+ 66: "F8",
+ 67: "F9",
+ 68: "F10",
+ 69: "NumLock",
+ 70: "ScrollLock",
+ 84: "PrintScreen",
+ 87: "F11",
+ 88: "F12",
+ 57373: "ControlRight",
+ 57400: "AltRight",
+ 57414: "Pause",
+ 57415: "Home",
+ 57416: "ArrowUp",
+ 57417: "PageUp",
+ 57419: "ArrowLeft",
+ 57421: "ArrowRight",
+ 57423: "End",
+ 57424: "ArrowDown",
+ 57425: "PageDown",
+ 57426: "Insert",
+ 57427: "Delete",
+ 57435: "MetaLeft",
+ 57436: "MetaRight",
+ 57437: "ContextMenu",
+}
diff --git a/kvmd/keymap.py.mako b/kvmd/keymap.py.mako
index 219cb186..0f9e7ef0 100644
--- a/kvmd/keymap.py.mako
+++ b/kvmd/keymap.py.mako
@@ -46,9 +46,26 @@ class Key:
# =====
KEYMAP: Dict[str, Key] = {
% for km in sorted(keymap, key=operator.attrgetter("serial_code")):
- "${km.web_key}": Key(
+ "${km.web_name}": Key(
serial=SerialKey(code=${km.serial_code}),
otg=OtgKey(code=${km.otg_code}, is_modifier=${km.otg_is_modifier}),
),
% endfor
}
+
+
+# =====
+X11_TO_AT1 = {
+% for km in sorted(keymap, key=operator.attrgetter("at1_code")):
+ % for code in sorted(km.x11_codes):
+ ${code}: ${km.at1_code},
+ % endfor
+% endfor
+}
+
+
+AT1_TO_WEB = {
+% for km in sorted(keymap, key=operator.attrgetter("at1_code")):
+ ${km.at1_code}: "${km.web_name}",
+% endfor
+}