summaryrefslogtreecommitdiff
path: root/kvmd
diff options
context:
space:
mode:
authorDevaev Maxim <[email protected]>2020-05-24 11:41:38 +0300
committerDevaev Maxim <[email protected]>2020-05-24 11:41:38 +0300
commitcf47e0c8805a27ef024f5024a75957ea159b0328 (patch)
tree93a6684b9432e219b36b6ac387d3b0bee53d98c7 /kvmd
parent6d7351502e268155c00cd0eb6b3bba37054f761d (diff)
commond kvmd ws client
Diffstat (limited to 'kvmd')
-rw-r--r--kvmd/apps/vnc/server.py94
-rw-r--r--kvmd/clients/kvmd.py75
2 files changed, 101 insertions, 68 deletions
diff --git a/kvmd/apps/vnc/server.py b/kvmd/apps/vnc/server.py
index 270072db..c7d376d8 100644
--- a/kvmd/apps/vnc/server.py
+++ b/kvmd/apps/vnc/server.py
@@ -22,11 +22,9 @@
import os
import asyncio
-import asyncio.queues
import socket
import dataclasses
import contextlib
-import json
from typing import Dict
from typing import Optional
@@ -38,14 +36,13 @@ from ...logging import get_logger
from ...keyboard.keysym import SymmapWebKey
from ...keyboard.keysym import build_symmap
+from ...clients.kvmd import KvmdClientWs
from ...clients.kvmd import KvmdClientSession
from ...clients.kvmd import KvmdClient
from ...clients.streamer import StreamerError
from ...clients.streamer import StreamerClient
-from ... import aiotools
-
from .rfb import RfbClient
from .rfb.stream import rfb_format_remote
from .rfb.stream import rfb_close_writer
@@ -106,10 +103,10 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes
self.__shared_params = shared_params
- self.__kvmd_session: Optional[KvmdClientSession] = None
self.__authorized = asyncio.Future() # type: ignore
self.__ws_connected = asyncio.Future() # type: ignore
- self.__ws_writer_queue: asyncio.queues.Queue = asyncio.Queue()
+ self.__kvmd_session: Optional[KvmdClientSession] = None
+ self.__kvmd_ws: Optional[KvmdClientWs] = None
self.__fb_requested = False
self.__fb_stub_text = ""
@@ -133,48 +130,23 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes
finally:
if self.__kvmd_session:
await self.__kvmd_session.close()
+ self.__kvmd_session = None
# =====
async def __kvmd_task_loop(self) -> None:
logger = get_logger(0)
-
await self.__authorized
assert self.__kvmd_session
-
- async with self.__kvmd_session.ws() as ws:
- logger.info("[kvmd] Client %s: Connected to KVMD websocket", self._remote)
- self.__ws_connected.set_result(None)
-
- receive_task: Optional[asyncio.Task] = None
- writer_task: Optional[asyncio.Task] = None
- try:
- while True:
- if receive_task is None:
- receive_task = asyncio.create_task(ws.receive())
- if writer_task is None:
- writer_task = asyncio.create_task(self.__ws_writer_queue.get())
-
- done = (await aiotools.wait_first(receive_task, writer_task))[0]
-
- if receive_task in done:
- msg = receive_task.result()
- if msg.type == aiohttp.WSMsgType.TEXT:
- await self.__process_ws_event(json.loads(msg.data))
- elif msg.type == aiohttp.WSMsgType.CLOSE:
- raise RfbError("KVMD closed the wesocket (it may have been stopped)")
- else:
- raise RuntimeError(f"Unhandled WS message type: {msg!r}")
- receive_task = None
-
- if writer_task in done:
- await ws.send_str(json.dumps(writer_task.result()))
- writer_task = None
- finally:
- if receive_task:
- receive_task.cancel()
- if writer_task:
- writer_task.cancel()
+ try:
+ async with self.__kvmd_session.ws() as self.__kvmd_ws:
+ logger.info("[kvmd] Client %s: Connected to KVMD websocket", self._remote)
+ self.__ws_connected.set_result(None)
+ async for event in self.__kvmd_ws.communicate():
+ await self.__process_ws_event(event)
+ raise RfbError("KVMD closes the websocket (the server may have been stopped)")
+ finally:
+ self.__kvmd_ws = None
async def __process_ws_event(self, event: Dict) -> None:
if event["event_type"] == "info_state":
@@ -262,33 +234,23 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes
# =====
async def _on_key_event(self, code: int, state: bool) -> None:
- if (web_key := self.__symmap.get(code)) is not None:
- await self.__ws_writer_queue.put({
- "event_type": "key",
- "event": {"key": web_key.name, "state": state},
- })
+ if self.__kvmd_ws:
+ if (web_key := self.__symmap.get(code)) is not None:
+ await self.__kvmd_ws.send_key_event(web_key.name, state)
async def _on_pointer_event(self, buttons: Dict[str, bool], wheel: Dict[str, int], move: Dict[str, int]) -> None:
- for (button, state) in buttons.items():
- if self.__mouse_buttons[button] != state:
- await self.__ws_writer_queue.put({
- "event_type": "mouse_button",
- "event": {"button": button, "state": state},
- })
- self.__mouse_buttons[button] = state
-
- if wheel["x"] or wheel["y"]:
- await self.__ws_writer_queue.put({
- "event_type": "mouse_wheel",
- "event": {"delta": wheel},
- })
-
- if self.__mouse_move != move:
- await self.__ws_writer_queue.put({
- "event_type": "mouse_move",
- "event": {"to": move},
- })
- self.__mouse_move = move
+ if self.__kvmd_ws:
+ for (button, state) in buttons.items():
+ if self.__mouse_buttons[button] != state:
+ await self.__kvmd_ws.send_mouse_button_event(button, state)
+ self.__mouse_buttons[button] = state
+
+ if wheel["x"] or wheel["y"]:
+ await self.__kvmd_ws.send_mouse_wheel_event(wheel["x"], wheel["y"])
+
+ if self.__mouse_move != move:
+ await self.__kvmd_ws.send_mouse_move_event(move["x"], move["y"])
+ self.__mouse_move = move
async def _on_cut_event(self, text: str) -> None:
assert self.__authorized.done()
diff --git a/kvmd/clients/kvmd.py b/kvmd/clients/kvmd.py
index fd46f7cb..70fa7661 100644
--- a/kvmd/clients/kvmd.py
+++ b/kvmd/clients/kvmd.py
@@ -20,7 +20,10 @@
# ========================================================================== #
+import asyncio
+import asyncio.queues
import contextlib
+import json
import types
from typing import Tuple
@@ -111,6 +114,73 @@ class _AtxApiPart(_BaseApiPart):
raise
+# =====
+class KvmdClientWs:
+ def __init__(self, ws: aiohttp.ClientWebSocketResponse) -> None:
+ self.__ws = ws
+
+ self.__writer_queue: asyncio.queues.Queue = asyncio.Queue()
+ self.__communicated = False
+
+ async def communicate(self) -> AsyncGenerator[Dict, None]:
+ assert not self.__communicated
+ self.__communicated = True
+ receive_task: Optional[asyncio.Task] = None
+ writer_task: Optional[asyncio.Task] = None
+ try:
+ while True:
+ if receive_task is None:
+ receive_task = asyncio.create_task(self.__ws.receive())
+ if writer_task is None:
+ writer_task = asyncio.create_task(self.__writer_queue.get())
+
+ done = (await aiotools.wait_first(receive_task, writer_task))[0]
+
+ if receive_task in done:
+ msg = receive_task.result()
+ if msg.type == aiohttp.WSMsgType.TEXT:
+ yield json.loads(msg.data)
+ elif msg.type == aiohttp.WSMsgType.CLOSE:
+ break
+ else:
+ raise RuntimeError(f"Unhandled WS message type: {msg!r}")
+ receive_task = None
+
+ if writer_task in done:
+ await self.__ws.send_str(json.dumps(writer_task.result()))
+ writer_task = None
+ finally:
+ if receive_task:
+ receive_task.cancel()
+ if writer_task:
+ writer_task.cancel()
+ self.__communicated = False
+
+ async def send_key_event(self, key: str, state: bool) -> None:
+ await self.__writer_queue.put({
+ "event_type": "key",
+ "event": {"key": key, "state": state},
+ })
+
+ async def send_mouse_button_event(self, button: str, state: bool) -> None:
+ await self.__writer_queue.put({
+ "event_type": "mouse_button",
+ "event": {"button": button, "state": state},
+ })
+
+ async def send_mouse_move_event(self, to_x: int, to_y: int) -> None:
+ await self.__writer_queue.put({
+ "event_type": "mouse_move",
+ "event": {"to": {"x": to_x, "y": to_y}},
+ })
+
+ async def send_mouse_wheel_event(self, delta_x: int, delta_y: int) -> None:
+ await self.__writer_queue.put({
+ "event_type": "mouse_wheel",
+ "event": {"delta": {"x": delta_x, "y": delta_y}},
+ })
+
+
class KvmdClientSession:
def __init__(
self,
@@ -124,16 +194,17 @@ class KvmdClientSession:
self.__http_session: Optional[aiohttp.ClientSession] = None
args = (self.__ensure_http_session, make_url)
+
self.auth = _AuthApiPart(*args)
self.streamer = _StreamerApiPart(*args)
self.hid = _HidApiPart(*args)
self.atx = _AtxApiPart(*args)
@contextlib.asynccontextmanager
- async def ws(self) -> AsyncGenerator[aiohttp.ClientWebSocketResponse, None]:
+ async def ws(self) -> AsyncGenerator[KvmdClientWs, None]:
session = self.__ensure_http_session()
async with session.ws_connect(self.__make_url("ws")) as ws:
- yield ws
+ yield KvmdClientWs(ws)
def __ensure_http_session(self) -> aiohttp.ClientSession:
if not self.__http_session: