diff options
Diffstat (limited to 'kvmd')
-rw-r--r-- | kvmd/apps/vnc/server.py | 94 | ||||
-rw-r--r-- | kvmd/clients/kvmd.py | 75 |
2 files changed, 101 insertions, 68 deletions
diff --git a/kvmd/apps/vnc/server.py b/kvmd/apps/vnc/server.py index 270072db..c7d376d8 100644 --- a/kvmd/apps/vnc/server.py +++ b/kvmd/apps/vnc/server.py @@ -22,11 +22,9 @@ import os import asyncio -import asyncio.queues import socket import dataclasses import contextlib -import json from typing import Dict from typing import Optional @@ -38,14 +36,13 @@ from ...logging import get_logger from ...keyboard.keysym import SymmapWebKey from ...keyboard.keysym import build_symmap +from ...clients.kvmd import KvmdClientWs from ...clients.kvmd import KvmdClientSession from ...clients.kvmd import KvmdClient from ...clients.streamer import StreamerError from ...clients.streamer import StreamerClient -from ... import aiotools - from .rfb import RfbClient from .rfb.stream import rfb_format_remote from .rfb.stream import rfb_close_writer @@ -106,10 +103,10 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes self.__shared_params = shared_params - self.__kvmd_session: Optional[KvmdClientSession] = None self.__authorized = asyncio.Future() # type: ignore self.__ws_connected = asyncio.Future() # type: ignore - self.__ws_writer_queue: asyncio.queues.Queue = asyncio.Queue() + self.__kvmd_session: Optional[KvmdClientSession] = None + self.__kvmd_ws: Optional[KvmdClientWs] = None self.__fb_requested = False self.__fb_stub_text = "" @@ -133,48 +130,23 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes finally: if self.__kvmd_session: await self.__kvmd_session.close() + self.__kvmd_session = None # ===== async def __kvmd_task_loop(self) -> None: logger = get_logger(0) - await self.__authorized assert self.__kvmd_session - - async with self.__kvmd_session.ws() as ws: - logger.info("[kvmd] Client %s: Connected to KVMD websocket", self._remote) - self.__ws_connected.set_result(None) - - receive_task: Optional[asyncio.Task] = None - writer_task: Optional[asyncio.Task] = None - try: - while True: - if receive_task is None: - receive_task = asyncio.create_task(ws.receive()) - if writer_task is None: - writer_task = asyncio.create_task(self.__ws_writer_queue.get()) - - done = (await aiotools.wait_first(receive_task, writer_task))[0] - - if receive_task in done: - msg = receive_task.result() - if msg.type == aiohttp.WSMsgType.TEXT: - await self.__process_ws_event(json.loads(msg.data)) - elif msg.type == aiohttp.WSMsgType.CLOSE: - raise RfbError("KVMD closed the wesocket (it may have been stopped)") - else: - raise RuntimeError(f"Unhandled WS message type: {msg!r}") - receive_task = None - - if writer_task in done: - await ws.send_str(json.dumps(writer_task.result())) - writer_task = None - finally: - if receive_task: - receive_task.cancel() - if writer_task: - writer_task.cancel() + try: + async with self.__kvmd_session.ws() as self.__kvmd_ws: + logger.info("[kvmd] Client %s: Connected to KVMD websocket", self._remote) + self.__ws_connected.set_result(None) + async for event in self.__kvmd_ws.communicate(): + await self.__process_ws_event(event) + raise RfbError("KVMD closes the websocket (the server may have been stopped)") + finally: + self.__kvmd_ws = None async def __process_ws_event(self, event: Dict) -> None: if event["event_type"] == "info_state": @@ -262,33 +234,23 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes # ===== async def _on_key_event(self, code: int, state: bool) -> None: - if (web_key := self.__symmap.get(code)) is not None: - await self.__ws_writer_queue.put({ - "event_type": "key", - "event": {"key": web_key.name, "state": state}, - }) + if self.__kvmd_ws: + if (web_key := self.__symmap.get(code)) is not None: + await self.__kvmd_ws.send_key_event(web_key.name, state) async def _on_pointer_event(self, buttons: Dict[str, bool], wheel: Dict[str, int], move: Dict[str, int]) -> None: - for (button, state) in buttons.items(): - if self.__mouse_buttons[button] != state: - await self.__ws_writer_queue.put({ - "event_type": "mouse_button", - "event": {"button": button, "state": state}, - }) - self.__mouse_buttons[button] = state - - if wheel["x"] or wheel["y"]: - await self.__ws_writer_queue.put({ - "event_type": "mouse_wheel", - "event": {"delta": wheel}, - }) - - if self.__mouse_move != move: - await self.__ws_writer_queue.put({ - "event_type": "mouse_move", - "event": {"to": move}, - }) - self.__mouse_move = move + if self.__kvmd_ws: + for (button, state) in buttons.items(): + if self.__mouse_buttons[button] != state: + await self.__kvmd_ws.send_mouse_button_event(button, state) + self.__mouse_buttons[button] = state + + if wheel["x"] or wheel["y"]: + await self.__kvmd_ws.send_mouse_wheel_event(wheel["x"], wheel["y"]) + + if self.__mouse_move != move: + await self.__kvmd_ws.send_mouse_move_event(move["x"], move["y"]) + self.__mouse_move = move async def _on_cut_event(self, text: str) -> None: assert self.__authorized.done() diff --git a/kvmd/clients/kvmd.py b/kvmd/clients/kvmd.py index fd46f7cb..70fa7661 100644 --- a/kvmd/clients/kvmd.py +++ b/kvmd/clients/kvmd.py @@ -20,7 +20,10 @@ # ========================================================================== # +import asyncio +import asyncio.queues import contextlib +import json import types from typing import Tuple @@ -111,6 +114,73 @@ class _AtxApiPart(_BaseApiPart): raise +# ===== +class KvmdClientWs: + def __init__(self, ws: aiohttp.ClientWebSocketResponse) -> None: + self.__ws = ws + + self.__writer_queue: asyncio.queues.Queue = asyncio.Queue() + self.__communicated = False + + async def communicate(self) -> AsyncGenerator[Dict, None]: + assert not self.__communicated + self.__communicated = True + receive_task: Optional[asyncio.Task] = None + writer_task: Optional[asyncio.Task] = None + try: + while True: + if receive_task is None: + receive_task = asyncio.create_task(self.__ws.receive()) + if writer_task is None: + writer_task = asyncio.create_task(self.__writer_queue.get()) + + done = (await aiotools.wait_first(receive_task, writer_task))[0] + + if receive_task in done: + msg = receive_task.result() + if msg.type == aiohttp.WSMsgType.TEXT: + yield json.loads(msg.data) + elif msg.type == aiohttp.WSMsgType.CLOSE: + break + else: + raise RuntimeError(f"Unhandled WS message type: {msg!r}") + receive_task = None + + if writer_task in done: + await self.__ws.send_str(json.dumps(writer_task.result())) + writer_task = None + finally: + if receive_task: + receive_task.cancel() + if writer_task: + writer_task.cancel() + self.__communicated = False + + async def send_key_event(self, key: str, state: bool) -> None: + await self.__writer_queue.put({ + "event_type": "key", + "event": {"key": key, "state": state}, + }) + + async def send_mouse_button_event(self, button: str, state: bool) -> None: + await self.__writer_queue.put({ + "event_type": "mouse_button", + "event": {"button": button, "state": state}, + }) + + async def send_mouse_move_event(self, to_x: int, to_y: int) -> None: + await self.__writer_queue.put({ + "event_type": "mouse_move", + "event": {"to": {"x": to_x, "y": to_y}}, + }) + + async def send_mouse_wheel_event(self, delta_x: int, delta_y: int) -> None: + await self.__writer_queue.put({ + "event_type": "mouse_wheel", + "event": {"delta": {"x": delta_x, "y": delta_y}}, + }) + + class KvmdClientSession: def __init__( self, @@ -124,16 +194,17 @@ class KvmdClientSession: self.__http_session: Optional[aiohttp.ClientSession] = None args = (self.__ensure_http_session, make_url) + self.auth = _AuthApiPart(*args) self.streamer = _StreamerApiPart(*args) self.hid = _HidApiPart(*args) self.atx = _AtxApiPart(*args) @contextlib.asynccontextmanager - async def ws(self) -> AsyncGenerator[aiohttp.ClientWebSocketResponse, None]: + async def ws(self) -> AsyncGenerator[KvmdClientWs, None]: session = self.__ensure_http_session() async with session.ws_connect(self.__make_url("ws")) as ws: - yield ws + yield KvmdClientWs(ws) def __ensure_http_session(self) -> aiohttp.ClientSession: if not self.__http_session: |