diff options
author | Devaev Maxim <[email protected]> | 2020-05-23 15:57:02 +0300 |
---|---|---|
committer | Devaev Maxim <[email protected]> | 2020-05-23 15:57:02 +0300 |
commit | e9d86c058d715af343be355b1ab3d58b7eed43b9 (patch) | |
tree | 6ae712152e4e9d7c274c5e369755f7030269677e /kvmd | |
parent | a795fe5ed63b960cbf24c9130ed37f9f7f33280c (diff) |
major keymaps improvement
Diffstat (limited to 'kvmd')
-rw-r--r-- | kvmd/apps/__init__.py | 3 | ||||
-rw-r--r-- | kvmd/apps/kvmd/__init__.py | 4 | ||||
-rw-r--r-- | kvmd/apps/kvmd/api/hid.py | 53 | ||||
-rw-r--r-- | kvmd/apps/kvmd/server.py | 4 | ||||
-rw-r--r-- | kvmd/apps/vnc/__init__.py | 4 | ||||
-rw-r--r-- | kvmd/apps/vnc/server.py | 27 | ||||
-rw-r--r-- | kvmd/clients/kvmd.py | 13 | ||||
-rw-r--r-- | kvmd/keyboard/keysym.py | 38 | ||||
-rw-r--r-- | kvmd/keyboard/printer.py | 79 | ||||
-rw-r--r-- | kvmd/plugins/hid/otg/__init__.py | 1 |
10 files changed, 140 insertions, 86 deletions
diff --git a/kvmd/apps/__init__.py b/kvmd/apps/__init__.py index c1e9f958..ad235c03 100644 --- a/kvmd/apps/__init__.py +++ b/kvmd/apps/__init__.py @@ -226,6 +226,7 @@ def _get_config_scheme() -> Dict: "hid": { "type": Option("", type=valid_stripped_string_not_empty), + "keymap": Option("/usr/share/kvmd/keymaps/en-us", type=valid_abs_path), # Dynamic content }, @@ -324,7 +325,7 @@ def _get_config_scheme() -> Dict: "vnc": { "desired_fps": Option(30, type=valid_stream_fps), - "keymap": Option("", type=valid_abs_path), + "keymap": Option("/usr/share/kvmd/keymaps/en-us", type=valid_abs_path), "server": { "host": Option("::", type=valid_ip_or_host), diff --git a/kvmd/apps/kvmd/__init__.py b/kvmd/apps/kvmd/__init__.py index 5aa3d687..5d690840 100644 --- a/kvmd/apps/kvmd/__init__.py +++ b/kvmd/apps/kvmd/__init__.py @@ -76,13 +76,15 @@ def main(argv: Optional[List[str]]=None) -> None: log_reader=LogReader(), wol=WakeOnLan(**config.wol._unpack()), - hid=get_hid_class(config.hid.type)(**config.hid._unpack(ignore=["type"])), + hid=get_hid_class(config.hid.type)(**config.hid._unpack(ignore=["type", "keymap"])), atx=get_atx_class(config.atx.type)(**config.atx._unpack(ignore=["type"])), msd=get_msd_class(config.msd.type)(**msd_kwargs), streamer=Streamer(**config.streamer._unpack()), heartbeat=config.server.heartbeat, sync_chunk_size=config.server.sync_chunk_size, + + keymap_path=config.hid.keymap, ).run(**config.server._unpack(ignore=["heartbeat", "sync_chunk_size"])) get_logger(0).info("Bye-bye") diff --git a/kvmd/apps/kvmd/api/hid.py b/kvmd/apps/kvmd/api/hid.py index 8bdaac59..59824e11 100644 --- a/kvmd/apps/kvmd/api/hid.py +++ b/kvmd/apps/kvmd/api/hid.py @@ -20,9 +20,13 @@ # ========================================================================== # +import os +import stat import asyncio +import functools from typing import Dict +from typing import Set from aiohttp.web import Request from aiohttp.web import Response @@ -30,14 +34,21 @@ from aiohttp.web import WebSocketResponse from ....plugins.hid import BaseHid +from ....validators import raise_error + from ....validators.basic import valid_bool from ....validators.basic import valid_number +from ....validators.os import valid_printable_filename + from ....validators.kvm import valid_hid_key from ....validators.kvm import valid_hid_mouse_move from ....validators.kvm import valid_hid_mouse_button from ....validators.kvm import valid_hid_mouse_wheel +from ....keyboard.keysym import SymmapWebKey +from ....keyboard.keysym import build_symmap + from ....keyboard.printer import text_to_web_keys from ..http import exposed_http @@ -47,9 +58,14 @@ from ..http import make_json_response # ===== class HidApi: - def __init__(self, hid: BaseHid) -> None: + def __init__(self, hid: BaseHid, keymap_path: str) -> None: self.__hid = hid + self.__keymaps_dir_path = os.path.dirname(keymap_path) + self.__default_keymap_name = os.path.basename(keymap_path) + + self.__ensure_symmap(self.__default_keymap_name) + self.__key_lock = asyncio.Lock() # ===== @@ -63,17 +79,50 @@ class HidApi: await self.__hid.reset() return make_json_response() + # ===== + + @exposed_http("GET", "/hid/keymaps") + async def __keymaps_handler(self, _: Request) -> Response: + keymaps: Set[str] = set() + for keymap_name in os.listdir(self.__keymaps_dir_path): + path = os.path.join(self.__keymaps_dir_path, keymap_name) + if os.access(path, os.R_OK) and stat.S_ISREG(os.stat(path).st_mode): + keymaps.add(keymap_name) + return make_json_response({ + "keymaps": { + "default": self.__default_keymap_name, + "available": sorted(keymaps), + }, + }) + @exposed_http("POST", "/hid/print") async def __print_handler(self, request: Request) -> Response: text = await request.text() limit = int(valid_number(request.query.get("limit", "1024"), min=0, type=int)) if limit > 0: text = text[:limit] + symmap = self.__ensure_symmap(request.query.get("keymap", self.__default_keymap_name)) async with self.__key_lock: - for (key, state) in text_to_web_keys(text): + for (key, state) in text_to_web_keys(text, symmap): self.__hid.send_key_event(key, state) return make_json_response() + def __ensure_symmap(self, keymap_name: str) -> Dict[int, SymmapWebKey]: + keymap_name = valid_printable_filename(keymap_name, "keymap") + path = os.path.join(self.__keymaps_dir_path, keymap_name) + try: + st = os.stat(path) + if not (os.access(path, os.R_OK) and stat.S_ISREG(st.st_mode)): + raise_error(keymap_name, "keymap") + except Exception: + raise_error(keymap_name, "keymap") + return self.__inner_ensure_symmap(path, st.st_mtime) + + @functools.lru_cache(maxsize=10) + def __inner_ensure_symmap(self, path: str, mtime: int) -> Dict[int, SymmapWebKey]: + _ = mtime # For LRU + return build_symmap(path) + # ===== @exposed_ws("key") diff --git a/kvmd/apps/kvmd/server.py b/kvmd/apps/kvmd/server.py index 3a309651..d88982aa 100644 --- a/kvmd/apps/kvmd/server.py +++ b/kvmd/apps/kvmd/server.py @@ -119,6 +119,8 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins heartbeat: float, sync_chunk_size: int, + + keymap_path: str, ) -> None: self.__auth_manager = auth_manager @@ -136,7 +138,7 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins self, LogApi(log_reader), WolApi(wol), - HidApi(hid), + HidApi(hid, keymap_path), AtxApi(atx), MsdApi(msd, sync_chunk_size), ] diff --git a/kvmd/apps/vnc/__init__.py b/kvmd/apps/vnc/__init__.py index 92fb073b..a88a3966 100644 --- a/kvmd/apps/vnc/__init__.py +++ b/kvmd/apps/vnc/__init__.py @@ -23,8 +23,6 @@ from typing import List from typing import Optional -from ...keyboard.keysym import build_symmap - from ...clients.kvmd import KvmdClient from ...clients.streamer import StreamerClient @@ -56,7 +54,7 @@ def main(argv: Optional[List[str]]=None) -> None: tls_timeout=config.server.tls.timeout, desired_fps=config.desired_fps, - symmap=build_symmap(config.keymap), + keymap_path=config.keymap, kvmd=KvmdClient( user_agent=user_agent, diff --git a/kvmd/apps/vnc/server.py b/kvmd/apps/vnc/server.py index 0b9a26f6..189332c7 100644 --- a/kvmd/apps/vnc/server.py +++ b/kvmd/apps/vnc/server.py @@ -20,6 +20,7 @@ # ========================================================================== # +import os import asyncio import asyncio.queues import socket @@ -34,6 +35,9 @@ import aiohttp from ...logging import get_logger +from ...keyboard.keysym import SymmapWebKey +from ...keyboard.keysym import build_symmap + from ...clients.kvmd import KvmdClient from ...clients.streamer import StreamerError @@ -69,7 +73,8 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes tls_timeout: float, desired_fps: int, - symmap: Dict[int, str], + keymap_name: str, + symmap: Dict[int, SymmapWebKey], kvmd: KvmdClient, streamer: StreamerClient, @@ -92,6 +97,7 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes ) self.__desired_fps = desired_fps + self.__keymap_name = keymap_name self.__symmap = symmap self.__kvmd = kvmd @@ -249,10 +255,10 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes # ===== async def _on_key_event(self, code: int, state: bool) -> None: - if (web_name := self.__symmap.get(code)) is not None: + if (web_key := self.__symmap.get(code)) is not None: await self.__ws_writer_queue.put({ "event_type": "key", - "event": {"key": web_name, "state": state}, + "event": {"key": web_key.name, "state": state}, }) async def _on_pointer_event(self, buttons: Dict[str, bool], wheel: Dict[str, int], move: Dict[str, int]) -> None: @@ -283,7 +289,14 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes logger = get_logger(0) logger.info("[main] Client %s: Printing %d characters ...", self._remote, len(text)) try: - await self.__kvmd.hid.print(user, passwd, text, 0) + (default, available) = await self.__kvmd.hid.get_keymaps(user, passwd) + await self.__kvmd.hid.print( + user=user, + passwd=passwd, + text=text, + limit=0, + keymap_name=(self.__keymap_name if self.__keymap_name in available else default), + ) except Exception: logger.exception("[main] Client %s: Can't print characters", self._remote) @@ -311,7 +324,7 @@ class VncServer: # pylint: disable=too-many-instance-attributes tls_timeout: float, desired_fps: int, - symmap: Dict[int, str], + keymap_path: str, kvmd: KvmdClient, streamer: StreamerClient, @@ -322,6 +335,9 @@ class VncServer: # pylint: disable=too-many-instance-attributes self.__port = port self.__max_clients = max_clients + keymap_name = os.path.basename(keymap_path) + symmap = build_symmap(keymap_path) + self.__vnc_auth_manager = vnc_auth_manager shared_params = _SharedParams() @@ -343,6 +359,7 @@ class VncServer: # pylint: disable=too-many-instance-attributes tls_ciphers=tls_ciphers, tls_timeout=tls_timeout, desired_fps=desired_fps, + keymap_name=keymap_name, symmap=symmap, kvmd=kvmd, streamer=streamer, diff --git a/kvmd/clients/kvmd.py b/kvmd/clients/kvmd.py index 678f104e..5e44811b 100644 --- a/kvmd/clients/kvmd.py +++ b/kvmd/clients/kvmd.py @@ -22,7 +22,9 @@ import contextlib +from typing import Tuple from typing import Dict +from typing import Set from typing import AsyncGenerator import aiohttp @@ -90,11 +92,18 @@ class _StreamerClientPart(_BaseClientPart): class _HidClientPart(_BaseClientPart): - async def print(self, user: str, passwd: str, text: str, limit: int) -> None: + async def get_keymaps(self, user: str, passwd: str) -> Tuple[str, Set[str]]: + async with self._make_session(user, passwd) as session: + async with session.get(self._make_url("hid/keymaps")) as response: + aiotools.raise_not_200(response) + result = (await response.json())["result"] + return (result["keymaps"]["default"], set(result["keymaps"]["available"])) + + async def print(self, user: str, passwd: str, text: str, limit: int, keymap_name: str) -> None: async with self._make_session(user, passwd) as session: async with session.post( url=self._make_url("hid/print"), - params={"limit": limit}, + params={"limit": limit, "keymap": keymap_name}, data=text, ) as response: aiotools.raise_not_200(response) diff --git a/kvmd/keyboard/keysym.py b/kvmd/keyboard/keysym.py index fbfeb913..d12e2b84 100644 --- a/kvmd/keyboard/keysym.py +++ b/kvmd/keyboard/keysym.py @@ -20,6 +20,7 @@ # ========================================================================== # +import dataclasses import pkgutil import functools @@ -29,22 +30,34 @@ import Xlib.keysymdef from ..logging import get_logger +from .mappings import At1Key from .mappings import X11_TO_AT1 from .mappings import AT1_TO_WEB # ===== -def build_symmap(path: str) -> Dict[int, str]: [email protected](frozen=True) +class SymmapWebKey: + name: str + shift: bool + + +def build_symmap(path: str) -> Dict[int, SymmapWebKey]: # https://github.com/qemu/qemu/blob/95a9457fd44ad97c518858a4e1586a5498f9773c/ui/keymaps.c - symmap: Dict[int, str] = {} + symmap: Dict[int, SymmapWebKey] = {} for (x11_code, at1_key) in X11_TO_AT1.items(): - symmap[x11_code] = AT1_TO_WEB[at1_key.code] - - for (x11_code, at1_code) in _read_keyboard_layout(path).items(): - if (web_name := AT1_TO_WEB.get(at1_code)) is not None: - # mypy bug - symmap[x11_code] = web_name # type: ignore + symmap[x11_code] = SymmapWebKey( + name=AT1_TO_WEB[at1_key.code], + shift=False, + ) + + for (x11_code, at1_key) in _read_keyboard_layout(path).items(): + if (web_name := AT1_TO_WEB.get(at1_key.code)) is not None: + symmap[x11_code] = SymmapWebKey( + name=web_name, + shift=at1_key.shift, + ) return symmap @@ -76,14 +89,14 @@ def _resolve_keysym(name: str) -> int: return 0 -def _read_keyboard_layout(path: str) -> Dict[int, int]: # Keysym to evdev (at1) +def _read_keyboard_layout(path: str) -> Dict[int, At1Key]: # Keysym to evdev (at1) logger = get_logger(0) logger.info("Reading keyboard layout %s ...", path) with open(path) as layout_file: lines = list(map(str.strip, layout_file.read().split("\n"))) - layout: Dict[int, int] = {} + layout: Dict[int, At1Key] = {} for (number, line) in enumerate(lines): if len(line) == 0 or line.startswith(("#", "map ", "include ")): continue @@ -92,7 +105,10 @@ def _read_keyboard_layout(path: str) -> Dict[int, int]: # Keysym to evdev (at1) if len(parts) >= 2: if (code := _resolve_keysym(parts[0])) != 0: try: - layout[code] = int(parts[1], 16) + layout[code] = At1Key( + code=int(parts[1], 16), + shift=bool(len(parts) == 3 and parts[2] == "shift"), + ) except ValueError as err: logger.error("Can't parse layout line #%d: %s", number, str(err)) return layout diff --git a/kvmd/keyboard/printer.py b/kvmd/keyboard/printer.py index f8086be3..2e26c232 100644 --- a/kvmd/keyboard/printer.py +++ b/kvmd/keyboard/printer.py @@ -20,84 +20,43 @@ # ========================================================================== # -import string - from typing import Tuple +from typing import Dict from typing import Generator -from .mappings import KEYMAP +from .keysym import SymmapWebKey # ===== -_LOWER_CHARS = { - "\n": "Enter", - "\t": "Tab", - " ": "Space", - "`": "Backquote", - "\\": "Backslash", - "[": "BracketLeft", - "]": "BracketLeft", - ",": "Comma", - ".": "Period", - "-": "Minus", - "'": "Quote", - ";": "Semicolon", - "/": "Slash", - "=": "Equal", - **{str(number): f"Digit{number}" for number in range(0, 10)}, - **{ch: f"Key{ch.upper()}" for ch in string.ascii_lowercase}, -} -assert not set(_LOWER_CHARS.values()).difference(KEYMAP) - -_UPPER_CHARS = { - "~": "Backquote", - "|": "Backslash", - "{": "BracketLeft", - "}": "BracketRight", - "<": "Comma", - ">": "Period", - "!": "Digit1", - "@": "Digit2", - "#": "Digit3", - "$": "Digit4", - "%": "Digit5", - "^": "Digit6", - "&": "Digit7", - "*": "Digit8", - "(": "Digit9", - ")": "Digit0", - "_": "Minus", - "\"": "Quote", - ":": "Semicolon", - "?": "Slash", - "+": "Equal", - **{ch: f"Key{ch}" for ch in string.ascii_uppercase}, -} -assert not set(_UPPER_CHARS.values()).difference(KEYMAP) +def text_to_web_keys( + text: str, + symmap: Dict[int, SymmapWebKey], + shift_key: str="ShiftLeft", +) -> Generator[Tuple[str, bool], None, None]: - -# ===== -def text_to_web_keys(text: str, shift_key: str="ShiftLeft") -> Generator[Tuple[str, bool], None, None]: assert shift_key in ["ShiftLeft", "ShiftRight"] shifted = False for ch in text: - upper = False - key = _LOWER_CHARS.get(ch) - if key is None: - if (key := _UPPER_CHARS.get(ch)) is None: + try: + code = ord(ch) + if not (0x20 <= code <= 0x7E): + # https://stackoverflow.com/questions/12343987/convert-ascii-character-to-x11-keycode + # https://www.ascii-code.com continue - upper = True + key = symmap[code] + except Exception: + continue - if upper and not shifted: + if key.shift and not shifted: yield (shift_key, True) shifted = True - elif not upper and shifted: + elif not key.shift and shifted: yield (shift_key, False) shifted = False - yield (key, True) - yield (key, False) + yield (key.name, True) + yield (key.name, False) if shifted: yield (shift_key, False) diff --git a/kvmd/plugins/hid/otg/__init__.py b/kvmd/plugins/hid/otg/__init__.py index 3dc49c72..3cc866ce 100644 --- a/kvmd/plugins/hid/otg/__init__.py +++ b/kvmd/plugins/hid/otg/__init__.py @@ -114,6 +114,7 @@ class Plugin(BaseHid): # ===== def send_key_event(self, key: str, state: bool) -> None: + print(key, int(state)) self.__keyboard_proc.send_key_event(key, state) def send_mouse_button_event(self, button: str, state: bool) -> None: |