summaryrefslogtreecommitdiff
path: root/kvmd/apps/vnc/server.py
diff options
context:
space:
mode:
authorDevaev Maxim <[email protected]>2020-03-20 03:07:27 +0300
committerDevaev Maxim <[email protected]>2020-03-20 03:07:27 +0300
commitd5ae32b1326fc5ac9207193d7679b34e0ceec4c7 (patch)
tree43bb961fd3006c06dffec900a2c84fb8387302c0 /kvmd/apps/vnc/server.py
parentab6264bd5e65497121139eab6deae353e06d592f (diff)
vnc
Diffstat (limited to 'kvmd/apps/vnc/server.py')
-rw-r--r--kvmd/apps/vnc/server.py307
1 files changed, 307 insertions, 0 deletions
diff --git a/kvmd/apps/vnc/server.py b/kvmd/apps/vnc/server.py
new file mode 100644
index 00000000..8ce78d41
--- /dev/null
+++ b/kvmd/apps/vnc/server.py
@@ -0,0 +1,307 @@
+# ========================================================================== #
+# #
+# KVMD - The main Pi-KVM daemon. #
+# #
+# Copyright (C) 2020 Maxim Devaev <[email protected]> #
+# #
+# This program is free software: you can redistribute it and/or modify #
+# it under the terms of the GNU General Public License as published by #
+# the Free Software Foundation, either version 3 of the License, or #
+# (at your option) any later version. #
+# #
+# This program is distributed in the hope that it will be useful, #
+# but WITHOUT ANY WARRANTY; without even the implied warranty of #
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
+# GNU General Public License for more details. #
+# #
+# You should have received a copy of the GNU General Public License #
+# along with this program. If not, see <https://www.gnu.org/licenses/>. #
+# #
+# ========================================================================== #
+
+
+import asyncio
+import asyncio.queues
+import socket
+import dataclasses
+import json
+
+from typing import Dict
+from typing import Optional
+
+import aiohttp
+
+from ...logging import get_logger
+
+from ... import aiotools
+
+from .rfb import RfbClient
+
+from .kvmd import KvmdClient
+
+from .streamer import StreamerError
+from .streamer import StreamerClient
+
+from .render import make_text_jpeg
+
+
+# =====
+class _SharedParams:
+ width: int = dataclasses.field(default=800)
+ height: int = dataclasses.field(default=600)
+ name: str = dataclasses.field(default="Pi-KVM")
+
+
+class _Client(RfbClient): # pylint: disable=too-many-instance-attributes
+ def __init__(
+ self,
+ reader: asyncio.StreamReader,
+ writer: asyncio.StreamWriter,
+
+ kvmd: KvmdClient,
+ streamer: StreamerClient,
+
+ symmap: Dict[int, str],
+
+ shared_params: _SharedParams,
+ ) -> None:
+
+ super().__init__(reader, writer, **dataclasses.asdict(shared_params))
+
+ self.__kvmd = kvmd
+ self.__streamer = streamer
+ self.__symmap = symmap
+ self.__shared_params = shared_params
+
+ self.__authorized = asyncio.Future() # type: ignore
+ self.__ws_connected = asyncio.Future() # type: ignore
+ self.__ws_writer_queue: asyncio.queues.Queue = asyncio.Queue()
+
+ self.__fb_requested = False
+ self.__fb_stub_text = ""
+ self.__fb_stub_quality = 0
+
+ # Эти состояния шарить не обязательно - бекенд исключает дублирующиеся события.
+ # Все это нужно только чтобы не посылать лишние жсоны в сокет KVMD
+ self.__mouse_buttons: Dict[str, Optional[bool]] = {"left": None, "right": None, "middle": None}
+ self.__mouse_move = {"x": -1, "y": -1}
+
+ # =====
+
+ async def run(self) -> None:
+ await self._run(
+ kvmd=self.__kvmd_task_loop(),
+ streamer=self.__streamer_task_loop(),
+ )
+
+ # =====
+
+ async def __kvmd_task_loop(self) -> None:
+ logger = get_logger(0)
+
+ await self.__authorized
+ (user, passwd) = self.__authorized.result()
+
+ async with self.__kvmd.ws(user, passwd) as ws:
+ logger.info("[kvmd] Client %s: Connected to KVMD websocket", self._remote)
+ self.__ws_connected.set_result(None)
+
+ receive_task: Optional[asyncio.Task] = None
+ writer_task: Optional[asyncio.Task] = None
+ try:
+ while True:
+ if receive_task is None:
+ receive_task = asyncio.create_task(ws.receive())
+ if writer_task is None:
+ writer_task = asyncio.create_task(self.__ws_writer_queue.get())
+
+ done = (await aiotools.wait_first(receive_task, writer_task))[0]
+
+ if receive_task in done:
+ msg = receive_task.result()
+ if msg.type == aiohttp.WSMsgType.TEXT:
+ await self.__process_ws_event(json.loads(msg.data))
+ else:
+ raise RuntimeError(f"Unknown WS message type: {msg!r}")
+ receive_task = None
+
+ if writer_task in done:
+ await ws.send_str(json.dumps(writer_task.result()))
+ writer_task = None
+ finally:
+ if receive_task:
+ receive_task.cancel()
+ if writer_task:
+ writer_task.cancel()
+
+ async def __process_ws_event(self, event: Dict) -> None:
+ if event["event_type"] == "info_state":
+ host = event["event"]["meta"].get("server", {}).get("host")
+ if isinstance(host, str):
+ name = f"Pi-KVM: {host}"
+ async with self._lock:
+ if self._encodings.has_rename:
+ await self._send_rename(name)
+ self.__shared_params.name = name
+
+ elif event["event_type"] == "hid_state":
+ async with self._lock:
+ if self._encodings.has_leds_state:
+ await self._send_leds_state(**event["event"]["keyboard"]["leds"])
+
+ # =====
+
+ async def __streamer_task_loop(self) -> None:
+ logger = get_logger(0)
+ await self.__ws_connected
+ while True:
+ try:
+ streaming = False
+ async for (online, width, height, jpeg) in self.__streamer.read():
+ if not streaming:
+ logger.info("[streamer] Client %s: Streaming ...", self._remote)
+ streaming = True
+ if online:
+ await self.__send_fb_real(width, height, jpeg)
+ else:
+ await self.__send_fb_stub("No signal")
+ except StreamerError as err:
+ logger.info("[streamer] Client %s: Waiting for stream: %s", self._remote, str(err))
+ await self.__send_fb_stub("Waiting for stream ...")
+ await asyncio.sleep(1)
+
+ async def __send_fb_real(self, width: int, height: int, jpeg: bytes) -> None:
+ async with self._lock:
+ if self.__fb_requested:
+ if (self._width, self._height) != (width, height):
+ self.__shared_params.width = width
+ self.__shared_params.height = height
+ if not self._encodings.has_resize:
+ msg = f"Resoultion changed: {self._width}x{self._height} -> {width}x{height}\nPlease reconnect"
+ await self.__send_fb_stub(msg, no_lock=True)
+ return
+ await self._send_resize(width, height)
+ await self._send_fb(jpeg)
+ self.__fb_stub_text = ""
+ self.__fb_stub_quality = 0
+ self.__fb_requested = False
+
+ async def __send_fb_stub(self, text: str, no_lock: bool=False) -> None:
+ if not no_lock:
+ await self._lock.acquire()
+ try:
+ if self.__fb_requested and (self.__fb_stub_text != text or self.__fb_stub_quality != self._encodings.tight_jpeg_quality):
+ await self._send_fb(await make_text_jpeg(self._width, self._height, self._encodings.tight_jpeg_quality, text))
+ self.__fb_stub_text = text
+ self.__fb_stub_quality = self._encodings.tight_jpeg_quality
+ self.__fb_requested = False
+ finally:
+ if not no_lock:
+ self._lock.release()
+
+ # =====
+
+ async def _authorize(self, user: str, passwd: str) -> bool:
+ if (await self.__kvmd.authorize(user, passwd)):
+ self.__authorized.set_result((user, passwd))
+ return True
+ return False
+
+ async def _on_key_event(self, code: int, state: bool) -> None:
+ print("KeyEvent", code, state, self.__symmap.get(code)) # TODO
+
+ async def _on_pointer_event(self, buttons: Dict[str, bool], wheel: Dict[str, int], move: Dict[str, int]) -> None:
+ for (button, state) in buttons.items():
+ if self.__mouse_buttons[button] != state:
+ await self.__ws_writer_queue.put({
+ "event_type": "mouse_button",
+ "event": {"button": button, "state": state},
+ })
+ self.__mouse_buttons[button] = state
+
+ if wheel["x"] or wheel["y"]:
+ await self.__ws_writer_queue.put({
+ "event_type": "mouse_wheel",
+ "event": {"delta": wheel},
+ })
+
+ if self.__mouse_move != move:
+ await self.__ws_writer_queue.put({
+ "event_type": "mouse_move",
+ "event": {"to": move},
+ })
+ self.__mouse_move = move
+
+ async def _on_cut_event(self, text: str) -> None:
+ print("CutEvent", text) # TODO
+
+ async def _on_set_encodings(self) -> None:
+ assert self.__authorized.done()
+ (user, passwd) = self.__authorized.result()
+ (quality, desired_fps) = (self._encodings.tight_jpeg_quality, 30)
+ get_logger(0).info("[main] Client %s: Applying streamer params: quality=%d%%; desired_fps=%d ...",
+ self._remote, quality, desired_fps)
+ await self.__kvmd.set_streamer_params(user, passwd, quality=quality, desired_fps=desired_fps)
+
+ async def _on_fb_update_request(self) -> None:
+ self.__fb_requested = True
+
+
+# =====
+class VncServer:
+ def __init__(
+ self,
+ host: str,
+ port: int,
+ max_clients: int,
+
+ kvmd: KvmdClient,
+ streamer: StreamerClient,
+
+ symmap: Dict[int, str],
+ ) -> None:
+
+ self.__host = host
+ self.__port = port
+ self.__max_clients = max_clients
+
+ self.__kvmd = kvmd
+ self.__streamer = streamer
+
+ self.__symmap = symmap
+
+ self.__shared_params = _SharedParams()
+
+ def run(self) -> None:
+ logger = get_logger(0)
+ logger.info("Listening VNC on TCP [%s]:%d ...", self.__host, self.__port)
+
+ sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
+ sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, False)
+ sock.bind((self.__host, self.__port))
+
+ loop = asyncio.get_event_loop()
+ server = loop.run_until_complete(asyncio.start_server(
+ client_connected_cb=self.__handle_client,
+ sock=sock,
+ backlog=self.__max_clients,
+ loop=loop,
+ ))
+
+ try:
+ loop.run_forever()
+ except (SystemExit, KeyboardInterrupt):
+ pass
+ finally:
+ server.close()
+ loop.run_until_complete(server.wait_closed())
+ tasks = asyncio.Task.all_tasks()
+ for task in tasks:
+ task.cancel()
+ loop.run_until_complete(asyncio.gather(*tasks, return_exceptions=True))
+ loop.close()
+ logger.info("Bye-bye")
+
+ async def __handle_client(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:
+ await _Client(reader, writer, self.__kvmd, self.__streamer, self.__symmap, self.__shared_params).run()