summaryrefslogtreecommitdiff
path: root/kvmd/apps
diff options
context:
space:
mode:
Diffstat (limited to 'kvmd/apps')
-rw-r--r--kvmd/apps/__init__.py0
-rw-r--r--kvmd/apps/cleanup/__init__.py40
-rw-r--r--kvmd/apps/cleanup/__main__.py2
-rw-r--r--kvmd/apps/kvmd/__init__.py81
-rw-r--r--kvmd/apps/kvmd/__main__.py2
-rw-r--r--kvmd/apps/kvmd/atx.py63
-rw-r--r--kvmd/apps/kvmd/hid.py213
-rw-r--r--kvmd/apps/kvmd/msd.py322
-rw-r--r--kvmd/apps/kvmd/server.py471
-rw-r--r--kvmd/apps/kvmd/streamer.py175
-rw-r--r--kvmd/apps/wscli/__init__.py50
-rw-r--r--kvmd/apps/wscli/__main__.py2
12 files changed, 1421 insertions, 0 deletions
diff --git a/kvmd/apps/__init__.py b/kvmd/apps/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/kvmd/apps/__init__.py
diff --git a/kvmd/apps/cleanup/__init__.py b/kvmd/apps/cleanup/__init__.py
new file mode 100644
index 00000000..60446c2b
--- /dev/null
+++ b/kvmd/apps/cleanup/__init__.py
@@ -0,0 +1,40 @@
+import os
+import subprocess
+import time
+
+from ...application import init
+from ...logging import get_logger
+
+from ... import gpio
+
+
+# =====
+def main() -> None:
+ config = init()
+ logger = get_logger(0)
+
+ logger.info("Cleaning up ...")
+ with gpio.bcm():
+ for (name, pin) in [
+ ("hid_reset", config["hid"]["pinout"]["reset"]),
+ ("msd_target", config["msd"]["pinout"]["target"]),
+ ("msd_reset", config["msd"]["pinout"]["reset"]),
+ ("atx_power_switch", config["atx"]["pinout"]["power_switch"]),
+ ("atx_reset_switch", config["atx"]["pinout"]["reset_switch"]),
+ ("streamer_cap", config["streamer"]["pinout"]["cap"]),
+ ("streamer_conv", config["streamer"]["pinout"]["conv"]),
+ ]:
+ if pin > 0:
+ logger.info("Writing value=0 to pin=%d (%s)", pin, name)
+ gpio.set_output(pin, initial=False)
+
+ streamer = os.path.basename(config["streamer"]["cmd"][0])
+ logger.info("Trying to find and kill %r ...", streamer)
+ try:
+ subprocess.check_output(["killall", streamer], stderr=subprocess.STDOUT)
+ time.sleep(3)
+ subprocess.check_output(["killall", "-9", streamer], stderr=subprocess.STDOUT)
+ except subprocess.CalledProcessError:
+ pass
+
+ logger.info("Bye-bye")
diff --git a/kvmd/apps/cleanup/__main__.py b/kvmd/apps/cleanup/__main__.py
new file mode 100644
index 00000000..031df43e
--- /dev/null
+++ b/kvmd/apps/cleanup/__main__.py
@@ -0,0 +1,2 @@
+from . import main
+main()
diff --git a/kvmd/apps/kvmd/__init__.py b/kvmd/apps/kvmd/__init__.py
new file mode 100644
index 00000000..b58b6df3
--- /dev/null
+++ b/kvmd/apps/kvmd/__init__.py
@@ -0,0 +1,81 @@
+import asyncio
+
+from ...application import init
+from ...logging import get_logger
+from ...logging import Log
+
+from ... import gpio
+
+from .hid import Hid
+from .atx import Atx
+from .msd import MassStorageDevice
+from .streamer import Streamer
+from .server import Server
+
+
+# =====
+def main() -> None:
+ config = init()
+ with gpio.bcm():
+ loop = asyncio.get_event_loop()
+
+ log = Log(
+ services=list(config["log"]["services"]),
+ loop=loop,
+ )
+
+ hid = Hid(
+ reset=int(config["hid"]["pinout"]["reset"]),
+ device_path=str(config["hid"]["device"]),
+ speed=int(config["hid"]["speed"]),
+ reset_delay=float(config["hid"]["reset_delay"]),
+ )
+
+ atx = Atx(
+ power_led=int(config["atx"]["pinout"]["power_led"]),
+ hdd_led=int(config["atx"]["pinout"]["hdd_led"]),
+ power_switch=int(config["atx"]["pinout"]["power_switch"]),
+ reset_switch=int(config["atx"]["pinout"]["reset_switch"]),
+ click_delay=float(config["atx"]["click_delay"]),
+ long_click_delay=float(config["atx"]["long_click_delay"]),
+ )
+
+ msd = MassStorageDevice(
+ target=int(config["msd"]["pinout"]["target"]),
+ reset=int(config["msd"]["pinout"]["reset"]),
+ device_path=str(config["msd"]["device"]),
+ init_delay=float(config["msd"]["init_delay"]),
+ reset_delay=float(config["msd"]["reset_delay"]),
+ write_meta=bool(config["msd"]["write_meta"]),
+ loop=loop,
+ )
+
+ streamer = Streamer(
+ cap_power=int(config["streamer"]["pinout"]["cap"]),
+ conv_power=int(config["streamer"]["pinout"]["conv"]),
+ sync_delay=float(config["streamer"]["sync_delay"]),
+ init_delay=float(config["streamer"]["init_delay"]),
+ init_restart_after=float(config["streamer"]["init_restart_after"]),
+ quality=int(config["streamer"]["quality"]),
+ soft_fps=int(config["streamer"]["soft_fps"]),
+ cmd=list(map(str, config["streamer"]["cmd"])),
+ loop=loop,
+ )
+
+ Server(
+ log=log,
+ hid=hid,
+ atx=atx,
+ msd=msd,
+ streamer=streamer,
+ heartbeat=float(config["server"]["heartbeat"]),
+ atx_state_poll=float(config["atx"]["state_poll"]),
+ streamer_shutdown_delay=float(config["streamer"]["shutdown_delay"]),
+ msd_chunk_size=int(config["msd"]["chunk_size"]),
+ loop=loop,
+ ).run(
+ host=str(config["server"]["host"]),
+ port=int(config["server"]["port"]),
+ )
+
+ get_logger().info("Bye-bye")
diff --git a/kvmd/apps/kvmd/__main__.py b/kvmd/apps/kvmd/__main__.py
new file mode 100644
index 00000000..031df43e
--- /dev/null
+++ b/kvmd/apps/kvmd/__main__.py
@@ -0,0 +1,2 @@
+from . import main
+main()
diff --git a/kvmd/apps/kvmd/atx.py b/kvmd/apps/kvmd/atx.py
new file mode 100644
index 00000000..d49708e0
--- /dev/null
+++ b/kvmd/apps/kvmd/atx.py
@@ -0,0 +1,63 @@
+import asyncio
+
+from typing import Dict
+
+from ...logging import get_logger
+
+from ... import aioregion
+from ... import gpio
+
+
+# =====
+class AtxIsBusy(aioregion.RegionIsBusyError):
+ pass
+
+
+class Atx:
+ def __init__(
+ self,
+ power_led: int,
+ hdd_led: int,
+
+ power_switch: int,
+ reset_switch: int,
+ click_delay: float,
+ long_click_delay: float,
+ ) -> None:
+
+ self.__power_led = gpio.set_input(power_led)
+ self.__hdd_led = gpio.set_input(hdd_led)
+
+ self.__power_switch = gpio.set_output(power_switch)
+ self.__reset_switch = gpio.set_output(reset_switch)
+ self.__click_delay = click_delay
+ self.__long_click_delay = long_click_delay
+
+ self.__region = aioregion.AioExclusiveRegion(AtxIsBusy)
+
+ def get_state(self) -> Dict:
+ return {
+ "busy": self.__region.is_busy(),
+ "leds": {
+ "power": (not gpio.read(self.__power_led)),
+ "hdd": (not gpio.read(self.__hdd_led)),
+ },
+ }
+
+ async def click_power(self) -> None:
+ await self.__click(self.__power_switch, self.__click_delay)
+ get_logger().info("Clicked power")
+
+ async def click_power_long(self) -> None:
+ await self.__click(self.__power_switch, self.__long_click_delay)
+ get_logger().info("Clicked power (long press)")
+
+ async def click_reset(self) -> None:
+ await self.__click(self.__reset_switch, self.__click_delay)
+ get_logger().info("Clicked reset")
+
+ async def __click(self, pin: int, delay: float) -> None:
+ with self.__region:
+ for flag in (True, False):
+ gpio.write(pin, flag)
+ await asyncio.sleep(delay)
diff --git a/kvmd/apps/kvmd/hid.py b/kvmd/apps/kvmd/hid.py
new file mode 100644
index 00000000..d7eb47cf
--- /dev/null
+++ b/kvmd/apps/kvmd/hid.py
@@ -0,0 +1,213 @@
+import asyncio
+import multiprocessing
+import multiprocessing.queues
+import queue
+import struct
+import pkgutil
+import time
+
+from typing import Dict
+from typing import Set
+from typing import NamedTuple
+
+import yaml
+import serial
+import setproctitle
+
+from ...logging import get_logger
+
+from ... import gpio
+
+
+# =====
+def _get_keymap() -> Dict[str, int]:
+ return yaml.load(pkgutil.get_data("kvmd", "data/keymap.yaml").decode()) # type: ignore
+
+
+_KEYMAP = _get_keymap()
+
+
+class _KeyEvent(NamedTuple):
+ key: str
+ state: bool
+
+
+class _MouseMoveEvent(NamedTuple):
+ to_x: int
+ to_y: int
+
+
+class _MouseButtonEvent(NamedTuple):
+ button: str
+ state: bool
+
+
+class _MouseWheelEvent(NamedTuple):
+ delta_y: int
+
+
+class Hid(multiprocessing.Process): # pylint: disable=too-many-instance-attributes
+ def __init__(
+ self,
+ reset: int,
+ device_path: str,
+ speed: int,
+ reset_delay: float,
+ ) -> None:
+
+ super().__init__(daemon=True)
+
+ self.__reset = gpio.set_output(reset)
+ self.__device_path = device_path
+ self.__speed = speed
+ self.__reset_delay = reset_delay
+
+ self.__pressed_keys: Set[str] = set()
+ self.__pressed_mouse_buttons: Set[str] = set()
+ self.__lock = asyncio.Lock()
+ self.__queue: multiprocessing.queues.Queue = multiprocessing.Queue()
+
+ self.__stop_event = multiprocessing.Event()
+
+ def start(self) -> None:
+ get_logger().info("Starting HID daemon ...")
+ super().start()
+
+ async def reset(self) -> None:
+ async with self.__lock:
+ gpio.write(self.__reset, True)
+ await asyncio.sleep(self.__reset_delay)
+ gpio.write(self.__reset, False)
+
+ async def send_key_event(self, key: str, state: bool) -> None:
+ if not self.__stop_event.is_set():
+ async with self.__lock:
+ if state and key not in self.__pressed_keys:
+ self.__pressed_keys.add(key)
+ self.__queue.put(_KeyEvent(key, state))
+ elif not state and key in self.__pressed_keys:
+ self.__pressed_keys.remove(key)
+ self.__queue.put(_KeyEvent(key, state))
+
+ async def send_mouse_move_event(self, to_x: int, to_y: int) -> None:
+ if not self.__stop_event.is_set():
+ async with self.__lock:
+ self.__queue.put(_MouseMoveEvent(to_x, to_y))
+
+ async def send_mouse_button_event(self, button: str, state: bool) -> None:
+ if not self.__stop_event.is_set():
+ async with self.__lock:
+ if state and button not in self.__pressed_mouse_buttons:
+ self.__pressed_mouse_buttons.add(button)
+ self.__queue.put(_MouseButtonEvent(button, state))
+ elif not state and button in self.__pressed_mouse_buttons:
+ self.__pressed_mouse_buttons.remove(button)
+ self.__queue.put(_MouseButtonEvent(button, state))
+
+ async def send_mouse_wheel_event(self, delta_y: int) -> None:
+ if not self.__stop_event.is_set():
+ async with self.__lock:
+ self.__queue.put(_MouseWheelEvent(delta_y))
+
+ async def clear_events(self) -> None:
+ if not self.__stop_event.is_set():
+ async with self.__lock:
+ self.__unsafe_clear_events()
+
+ async def cleanup(self) -> None:
+ async with self.__lock:
+ if self.is_alive():
+ self.__unsafe_clear_events()
+ get_logger().info("Stopping keyboard daemon ...")
+ self.__stop_event.set()
+ self.join()
+ else:
+ get_logger().warning("Emergency cleaning up HID events ...")
+ self.__emergency_clear_events()
+ gpio.write(self.__reset, False)
+
+ def __unsafe_clear_events(self) -> None:
+ for button in self.__pressed_mouse_buttons:
+ self.__queue.put(_MouseButtonEvent(button, False))
+ self.__pressed_mouse_buttons.clear()
+ for key in self.__pressed_keys:
+ self.__queue.put(_KeyEvent(key, False))
+ self.__pressed_keys.clear()
+
+ def __emergency_clear_events(self) -> None:
+ try:
+ with serial.Serial(self.__device_path, self.__speed) as tty:
+ self.__send_clear_hid(tty)
+ except Exception:
+ get_logger().exception("Can't execute emergency clear HID events")
+
+ def run(self) -> None: # pylint: disable=too-many-branches
+ setproctitle.setproctitle("[hid] " + setproctitle.getproctitle())
+ try:
+ with serial.Serial(self.__device_path, self.__speed) as tty:
+ hid_ready = False
+ while True:
+ if hid_ready:
+ try:
+ event = self.__queue.get(timeout=0.05)
+ except queue.Empty:
+ pass
+ else:
+ if isinstance(event, _KeyEvent):
+ self.__send_key_event(tty, event)
+ elif isinstance(event, _MouseMoveEvent):
+ self.__send_mouse_move_event(tty, event)
+ elif isinstance(event, _MouseButtonEvent):
+ self.__send_mouse_button_event(tty, event)
+ elif isinstance(event, _MouseWheelEvent):
+ self.__send_mouse_wheel_event(tty, event)
+ else:
+ raise RuntimeError("Unknown HID event")
+ hid_ready = False
+
+ if tty.in_waiting:
+ while tty.in_waiting:
+ tty.read(tty.in_waiting)
+ hid_ready = True
+ else:
+ time.sleep(0.05)
+
+ if self.__stop_event.is_set() and self.__queue.qsize() == 0:
+ break
+ except Exception:
+ get_logger().exception("Unhandled exception")
+ raise
+
+ def __send_key_event(self, tty: serial.Serial, event: _KeyEvent) -> None:
+ code = _KEYMAP.get(event.key)
+ if code:
+ key_bytes = bytes([code])
+ assert len(key_bytes) == 1, (event, key_bytes)
+ tty.write(
+ b"\01"
+ + key_bytes
+ + (b"\01" if event.state else b"\00")
+ + b"\00\00"
+ )
+
+ def __send_mouse_move_event(self, tty: serial.Serial, event: _MouseMoveEvent) -> None:
+ to_x = min(max(-32768, event.to_x), 32767)
+ to_y = min(max(-32768, event.to_y), 32767)
+ tty.write(b"\02" + struct.pack(">hh", to_x, to_y))
+
+ def __send_mouse_button_event(self, tty: serial.Serial, event: _MouseButtonEvent) -> None:
+ if event.button == "left":
+ code = (0b10000000 | (0b00001000 if event.state else 0))
+ elif event.button == "right":
+ code = (0b01000000 | (0b00000100 if event.state else 0))
+ else:
+ code = 0
+ if code:
+ tty.write(b"\03" + bytes([code]) + b"\00\00\00")
+
+ def __send_mouse_wheel_event(self, tty: serial.Serial, event: _MouseWheelEvent) -> None:
+ delta_y = min(max(-128, event.delta_y), 127)
+ tty.write(b"\04\00" + struct.pack(">b", delta_y) + b"\00\00")
+
+ def __send_clear_hid(self, tty: serial.Serial) -> None:
+ tty.write(b"\00\00\00\00\00")
diff --git a/kvmd/apps/kvmd/msd.py b/kvmd/apps/kvmd/msd.py
new file mode 100644
index 00000000..2cdb9050
--- /dev/null
+++ b/kvmd/apps/kvmd/msd.py
@@ -0,0 +1,322 @@
+import os
+import struct
+import asyncio
+import types
+
+from typing import Dict
+from typing import NamedTuple
+from typing import Callable
+from typing import Type
+from typing import Optional
+from typing import Any
+
+import pyudev
+
+import aiofiles
+import aiofiles.base
+
+from ...logging import get_logger
+
+from ... import aioregion
+from ... import gpio
+
+
+# =====
+class MsdError(Exception):
+ pass
+
+
+class MsdOperationError(MsdError):
+ pass
+
+
+class MsdIsNotOperationalError(MsdOperationError):
+ def __init__(self) -> None:
+ super().__init__("Missing path for mass-storage device")
+
+
+class MsdAlreadyConnectedToPcError(MsdOperationError):
+ def __init__(self) -> None:
+ super().__init__("Mass-storage is already connected to Server")
+
+
+class MsdAlreadyConnectedToKvmError(MsdOperationError):
+ def __init__(self) -> None:
+ super().__init__("Mass-storage is already connected to KVM")
+
+
+class MsdIsNotConnectedToKvmError(MsdOperationError):
+ def __init__(self) -> None:
+ super().__init__("Mass-storage is not connected to KVM")
+
+
+class MsdIsBusyError(MsdOperationError, aioregion.RegionIsBusyError):
+ pass
+
+
+# =====
+class _HardwareInfo(NamedTuple):
+ manufacturer: str
+ product: str
+ serial: str
+
+
+class _ImageInfo(NamedTuple):
+ name: str
+ size: int
+ complete: bool
+
+
+class _MassStorageDeviceInfo(NamedTuple):
+ path: str
+ real: str
+ size: int
+ hw: Optional[_HardwareInfo]
+ image: Optional[_ImageInfo]
+
+
+_IMAGE_INFO_SIZE = 4096
+_IMAGE_INFO_MAGIC_SIZE = 16
+_IMAGE_INFO_IMAGE_NAME_SIZE = 256
+_IMAGE_INFO_PADS_SIZE = _IMAGE_INFO_SIZE - _IMAGE_INFO_IMAGE_NAME_SIZE - 1 - 8 - _IMAGE_INFO_MAGIC_SIZE * 8
+_IMAGE_INFO_FORMAT = ">%dL%dc?Q%dx%dL" % (
+ _IMAGE_INFO_MAGIC_SIZE,
+ _IMAGE_INFO_IMAGE_NAME_SIZE,
+ _IMAGE_INFO_PADS_SIZE,
+ _IMAGE_INFO_MAGIC_SIZE,
+)
+_IMAGE_INFO_MAGIC = [0x1ACE1ACE] * _IMAGE_INFO_MAGIC_SIZE
+
+
+def _make_image_info_bytes(name: str, size: int, complete: bool) -> bytes:
+ return struct.pack(
+ _IMAGE_INFO_FORMAT,
+ *_IMAGE_INFO_MAGIC,
+ *memoryview(( # type: ignore
+ name.encode("utf-8")
+ + b"\x00" * _IMAGE_INFO_IMAGE_NAME_SIZE
+ )[:_IMAGE_INFO_IMAGE_NAME_SIZE]).cast("c"),
+ complete,
+ size,
+ *_IMAGE_INFO_MAGIC,
+ )
+
+
+def _parse_image_info_bytes(data: bytes) -> Optional[_ImageInfo]:
+ try:
+ parsed = list(struct.unpack(_IMAGE_INFO_FORMAT, data))
+ except struct.error:
+ pass
+ else:
+ magic_begin = parsed[:_IMAGE_INFO_MAGIC_SIZE]
+ magic_end = parsed[-_IMAGE_INFO_MAGIC_SIZE:]
+ if magic_begin == magic_end == _IMAGE_INFO_MAGIC:
+ image_name_bytes = b"".join(parsed[_IMAGE_INFO_MAGIC_SIZE:_IMAGE_INFO_MAGIC_SIZE + _IMAGE_INFO_IMAGE_NAME_SIZE])
+ return _ImageInfo(
+ name=image_name_bytes.decode("utf-8", errors="ignore").strip("\x00").strip(),
+ size=parsed[_IMAGE_INFO_MAGIC_SIZE + _IMAGE_INFO_IMAGE_NAME_SIZE + 1],
+ complete=parsed[_IMAGE_INFO_MAGIC_SIZE + _IMAGE_INFO_IMAGE_NAME_SIZE],
+ )
+ return None
+
+
+def _explore_device(device_path: str) -> Optional[_MassStorageDeviceInfo]:
+ # udevadm info -a -p $(udevadm info -q path -n /dev/sda)
+ ctx = pyudev.Context()
+
+ device = pyudev.Devices.from_device_file(ctx, device_path)
+ if device.subsystem != "block":
+ return None
+ try:
+ size = device.attributes.asint("size") * 512
+ except KeyError:
+ return None
+
+ hw_info: Optional[_HardwareInfo] = None
+ usb_device = device.find_parent("usb", "usb_device")
+ if usb_device:
+ hw_info = _HardwareInfo(**{
+ attr: usb_device.attributes.asstring(attr).strip()
+ for attr in ["manufacturer", "product", "serial"]
+ })
+
+ with open(device_path, "rb") as device_file:
+ device_file.seek(size - _IMAGE_INFO_SIZE)
+ image_info = _parse_image_info_bytes(device_file.read())
+
+ return _MassStorageDeviceInfo(
+ path=device_path,
+ real=os.path.realpath(device_path),
+ size=size,
+ image=image_info,
+ hw=hw_info,
+ )
+
+
+def _msd_operated(method: Callable) -> Callable:
+ async def wrap(self: "MassStorageDevice", *args: Any, **kwargs: Any) -> Any:
+ if not self._device_path: # pylint: disable=protected-access
+ MsdIsNotOperationalError()
+ return (await method(self, *args, **kwargs))
+ return wrap
+
+
+# =====
+class MassStorageDevice: # pylint: disable=too-many-instance-attributes
+ def __init__(
+ self,
+ target: int,
+ reset: int,
+
+ device_path: str,
+ init_delay: float,
+ reset_delay: float,
+ write_meta: bool,
+
+ loop: asyncio.AbstractEventLoop,
+ ) -> None:
+
+ self.__target = gpio.set_output(target)
+ self.__reset = gpio.set_output(reset)
+
+ self._device_path = device_path
+ self.__init_delay = init_delay
+ self.__reset_delay = reset_delay
+ self.__write_meta = write_meta
+
+ self.__loop = loop
+
+ self.__device_info: Optional[_MassStorageDeviceInfo] = None
+ self.__saved_device_info: Optional[_MassStorageDeviceInfo] = None
+ self.__region = aioregion.AioExclusiveRegion(MsdIsBusyError)
+ self.__device_file: Optional[aiofiles.base.AiofilesContextManager] = None
+ self.__written = 0
+
+ logger = get_logger(0)
+ if self._device_path:
+ logger.info("Using %r as mass-storage device", self._device_path)
+ try:
+ logger.info("Enabled image metadata writing")
+ loop.run_until_complete(self.connect_to_kvm(no_delay=True))
+ except Exception as err:
+ if isinstance(err, MsdError):
+ log = logger.error
+ else:
+ log = logger.exception
+ log("Mass-storage device is not operational: %s", err)
+ self._device_path = ""
+ else:
+ logger.warning("Mass-storage device is not operational")
+
+ @_msd_operated
+ async def connect_to_kvm(self, no_delay: bool=False) -> None:
+ with self.__region:
+ if self.__device_info:
+ raise MsdAlreadyConnectedToKvmError()
+ gpio.write(self.__target, False)
+ if not no_delay:
+ await asyncio.sleep(self.__init_delay)
+ await self.__load_device_info()
+ get_logger().info("Mass-storage device switched to KVM: %s", self.__device_info)
+
+ @_msd_operated
+ async def connect_to_pc(self) -> None:
+ with self.__region:
+ if not self.__device_info:
+ raise MsdAlreadyConnectedToPcError()
+ gpio.write(self.__target, True)
+ self.__device_info = None
+ get_logger().info("Mass-storage device switched to Server")
+
+ @_msd_operated
+ async def reset(self) -> None:
+ with self.__region:
+ gpio.write(self.__reset, True)
+ await asyncio.sleep(self.__reset_delay)
+ gpio.write(self.__reset, False)
+
+ def get_state(self) -> Dict:
+ info = (self.__saved_device_info._asdict() if self.__saved_device_info else None)
+ if info:
+ info["hw"] = (info["hw"]._asdict() if info["hw"] else None)
+ info["image"] = (info["image"]._asdict() if info["image"] else None)
+
+ connected_to: Optional[str] = None
+ if self._device_path:
+ connected_to = ("kvm" if self.__device_info else "server")
+
+ return {
+ "in_operate": bool(self._device_path),
+ "connected_to": connected_to,
+ "busy": bool(self.__device_file),
+ "written": self.__written,
+ "info": info,
+ }
+
+ async def cleanup(self) -> None:
+ await self.__close_device_file()
+ gpio.write(self.__target, False)
+ gpio.write(self.__reset, False)
+
+ @_msd_operated
+ async def __aenter__(self) -> "MassStorageDevice":
+ self.__region.enter()
+ try:
+ if not self.__device_info:
+ raise MsdIsNotConnectedToKvmError()
+ self.__device_file = await aiofiles.open(self.__device_info.path, mode="w+b", buffering=0)
+ self.__written = 0
+ return self
+ finally:
+ self.__region.exit()
+
+ async def write_image_info(self, name: str, complete: bool) -> None:
+ assert self.__device_file
+ assert self.__device_info
+ if self.__write_meta:
+ if self.__device_info.size - self.__written > _IMAGE_INFO_SIZE:
+ await self.__device_file.seek(self.__device_info.size - _IMAGE_INFO_SIZE)
+ await self.__write_to_device_file(_make_image_info_bytes(name, self.__written, complete))
+ await self.__device_file.seek(0)
+ await self.__load_device_info()
+ else:
+ get_logger().error("Can't write image info because device is full")
+
+ async def write_image_chunk(self, chunk: bytes) -> int:
+ await self.__write_to_device_file(chunk)
+ self.__written += len(chunk)
+ return self.__written
+
+ async def __aexit__(
+ self,
+ _exc_type: Type[BaseException],
+ _exc: BaseException,
+ _tb: types.TracebackType,
+ ) -> None:
+ try:
+ await self.__close_device_file()
+ finally:
+ self.__region.exit()
+
+ async def __write_to_device_file(self, data: bytes) -> None:
+ assert self.__device_file
+ await self.__device_file.write(data)
+ await self.__device_file.flush()
+ await self.__loop.run_in_executor(None, os.fsync, self.__device_file.fileno())
+
+ async def __load_device_info(self) -> None:
+ device_info = await self.__loop.run_in_executor(None, _explore_device, self._device_path)
+ if not device_info:
+ raise MsdError("Can't explore device %r" % (self._device_path))
+ self.__device_info = self.__saved_device_info = device_info
+
+ async def __close_device_file(self) -> None:
+ try:
+ if self.__device_file:
+ get_logger().info("Closing mass-storage device file ...")
+ await self.__device_file.close()
+ except Exception:
+ get_logger().exception("Can't close mass-storage device file")
+ await self.reset()
+ self.__device_file = None
+ self.__written = 0
diff --git a/kvmd/apps/kvmd/server.py b/kvmd/apps/kvmd/server.py
new file mode 100644
index 00000000..8522cb16
--- /dev/null
+++ b/kvmd/apps/kvmd/server.py
@@ -0,0 +1,471 @@
+import os
+import signal
+import asyncio
+import json
+import time
+
+from typing import List
+from typing import Dict
+from typing import Set
+from typing import Callable
+from typing import Optional
+
+import aiohttp.web
+import setproctitle
+
+from ...logging import get_logger
+from ...logging import Log
+
+from ...aioregion import RegionIsBusyError
+
+from ... import __version__
+
+from .hid import Hid
+
+from .atx import Atx
+
+from .msd import MsdOperationError
+from .msd import MassStorageDevice
+
+from .streamer import Streamer
+
+
+# =====
+def _system_task(method: Callable) -> Callable:
+ async def wrap(self: "Server") -> None:
+ try:
+ await method(self)
+ except asyncio.CancelledError:
+ pass
+ except Exception:
+ get_logger().exception("Unhandled exception, killing myself ...")
+ os.kill(os.getpid(), signal.SIGTERM)
+ return wrap
+
+
+def _json(result: Optional[Dict]=None, status: int=200) -> aiohttp.web.Response:
+ return aiohttp.web.Response(
+ text=json.dumps({
+ "ok": (True if status == 200 else False),
+ "result": (result or {}),
+ }, sort_keys=True, indent=4),
+ status=status,
+ content_type="application/json",
+ )
+
+
+def _json_exception(msg: str, err: Exception, status: int) -> aiohttp.web.Response:
+ msg = "%s: %s" % (msg, err)
+ get_logger().error(msg)
+ return _json({
+ "error": type(err).__name__,
+ "error_msg": msg,
+ }, status=status)
+
+
+class BadRequest(Exception):
+ pass
+
+
+def _valid_bool(name: str, flag: Optional[str]) -> bool:
+ flag = str(flag).strip().lower()
+ if flag in ["1", "true", "yes"]:
+ return True
+ elif flag in ["0", "false", "no"]:
+ return False
+ raise BadRequest("Invalid param '%s'" % (name))
+
+
+def _valid_int(name: str, value: Optional[str], min_value: Optional[int]=None, max_value: Optional[int]=None) -> int:
+ try:
+ value_int = int(value) # type: ignore
+ if (
+ (min_value is not None and value_int < min_value)
+ or (max_value is not None and value_int > max_value)
+ ):
+ raise ValueError()
+ return value_int
+ except Exception:
+ raise BadRequest("Invalid param %r" % (name))
+
+
+def _wrap_exceptions_for_web(msg: str) -> Callable:
+ def make_wrapper(method: Callable) -> Callable:
+ async def wrap(self: "Server", request: aiohttp.web.Request) -> aiohttp.web.Response:
+ try:
+ return (await method(self, request))
+ except RegionIsBusyError as err:
+ return _json_exception(msg, err, 409)
+ except (BadRequest, MsdOperationError) as err:
+ return _json_exception(msg, err, 400)
+ return wrap
+ return make_wrapper
+
+
+class Server: # pylint: disable=too-many-instance-attributes
+ def __init__( # pylint: disable=too-many-arguments
+ self,
+ log: Log,
+ hid: Hid,
+ atx: Atx,
+ msd: MassStorageDevice,
+ streamer: Streamer,
+
+ heartbeat: float,
+ atx_state_poll: float,
+ streamer_shutdown_delay: float,
+ msd_chunk_size: int,
+
+ loop: asyncio.AbstractEventLoop,
+ ) -> None:
+
+ self.__log = log
+ self.__hid = hid
+ self.__atx = atx
+ self.__msd = msd
+ self.__streamer = streamer
+
+ self.__heartbeat = heartbeat
+ self.__streamer_shutdown_delay = streamer_shutdown_delay
+ self.__atx_state_poll = atx_state_poll
+ self.__msd_chunk_size = msd_chunk_size
+
+ self.__loop = loop
+
+ self.__sockets: Set[aiohttp.web.WebSocketResponse] = set()
+ self.__sockets_lock = asyncio.Lock()
+
+ self.__system_tasks: List[asyncio.Task] = []
+
+ self.__reset_streamer = False
+ self.__streamer_quality = streamer.get_current_quality()
+ self.__streamer_soft_fps = streamer.get_current_soft_fps()
+
+ def run(self, host: str, port: int) -> None:
+ self.__hid.start()
+
+ setproctitle.setproctitle("[main] " + setproctitle.getproctitle())
+
+ app = aiohttp.web.Application(loop=self.__loop)
+
+ app.router.add_get("/info", self.__info_handler)
+ app.router.add_get("/log", self.__log_handler)
+
+ app.router.add_get("/ws", self.__ws_handler)
+
+ app.router.add_post("/hid/reset", self.__hid_reset_handler)
+
+ app.router.add_get("/atx", self.__atx_state_handler)
+ app.router.add_post("/atx/click", self.__atx_click_handler)
+
+ app.router.add_get("/msd", self.__msd_state_handler)
+ app.router.add_post("/msd/connect", self.__msd_connect_handler)
+ app.router.add_post("/msd/write", self.__msd_write_handler)
+ app.router.add_post("/msd/reset", self.__msd_reset_handler)
+
+ app.router.add_get("/streamer", self.__streamer_state_handler)
+ app.router.add_post("/streamer/set_params", self.__streamer_set_params_handler)
+ app.router.add_post("/streamer/reset", self.__streamer_reset_handler)
+
+ app.on_shutdown.append(self.__on_shutdown)
+ app.on_cleanup.append(self.__on_cleanup)
+
+ self.__system_tasks.extend([
+ self.__loop.create_task(self.__hid_watchdog()),
+ self.__loop.create_task(self.__stream_controller()),
+ self.__loop.create_task(self.__poll_dead_sockets()),
+ self.__loop.create_task(self.__poll_atx_state()),
+ ])
+
+ aiohttp.web.run_app(app, host=host, port=port, print=self.__run_app_print)
+
+ # ===== SYSTEM
+
+ async def __info_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
+ return _json({
+ "version": {
+ "kvmd": __version__,
+ "streamer": await self.__streamer.get_version(),
+ },
+ "streamer": self.__streamer.get_app(),
+ })
+
+ @_wrap_exceptions_for_web("Log error")
+ async def __log_handler(self, request: aiohttp.web.Request) -> aiohttp.web.StreamResponse:
+ seek = _valid_int("seek", request.query.get("seek", "0"), 0)
+ follow = _valid_bool("follow", request.query.get("follow", "false"))
+ response = aiohttp.web.StreamResponse(status=200, reason="OK", headers={"Content-Type": "text/plain"})
+ await response.prepare(request)
+ async for record in self.__log.log(seek, follow):
+ await response.write(("[%s %s] --- %s" % (
+ record["dt"].strftime("%Y-%m-%d %H:%M:%S"),
+ record["service"],
+ record["msg"],
+ )).encode("utf-8") + b"\r\n")
+ return response
+
+ # ===== WEBSOCKET
+
+ async def __ws_handler(self, request: aiohttp.web.Request) -> aiohttp.web.WebSocketResponse:
+ logger = get_logger(0)
+ ws = aiohttp.web.WebSocketResponse(heartbeat=self.__heartbeat)
+ await ws.prepare(request)
+ await self.__register_socket(ws)
+ async for msg in ws:
+ if msg.type == aiohttp.web.WSMsgType.TEXT:
+ try:
+ event = json.loads(msg.data)
+ except Exception as err:
+ logger.error("Can't parse JSON event from websocket: %s", err)
+ else:
+ event_type = event.get("event_type")
+ if event_type == "ping":
+ await ws.send_str(json.dumps({"msg_type": "pong"}))
+ elif event_type == "key":
+ await self.__handle_ws_key_event(event)
+ elif event_type == "mouse_move":
+ await self.__handle_ws_mouse_move_event(event)
+ elif event_type == "mouse_button":
+ await self.__handle_ws_mouse_button_event(event)
+ elif event_type == "mouse_wheel":
+ await self.__handle_ws_mouse_wheel_event(event)
+ else:
+ logger.error("Unknown websocket event: %r", event)
+ else:
+ break
+ return ws
+
+ async def __handle_ws_key_event(self, event: Dict) -> None:
+ key = str(event.get("key", ""))[:64].strip()
+ state = event.get("state")
+ if key and state in [True, False]:
+ await self.__hid.send_key_event(key, state)
+
+ async def __handle_ws_mouse_move_event(self, event: Dict) -> None:
+ try:
+ to_x = int(event["to"]["x"])
+ to_y = int(event["to"]["y"])
+ except Exception:
+ return
+ await self.__hid.send_mouse_move_event(to_x, to_y)
+
+ async def __handle_ws_mouse_button_event(self, event: Dict) -> None:
+ button = str(event.get("button", ""))[:64].strip()
+ state = event.get("state")
+ if button and state in [True, False]:
+ await self.__hid.send_mouse_button_event(button, state)
+
+ async def __handle_ws_mouse_wheel_event(self, event: Dict) -> None:
+ try:
+ delta_y = int(event["delta"]["y"])
+ except Exception:
+ return
+ await self.__hid.send_mouse_wheel_event(delta_y)
+
+ # ===== HID
+
+ async def __hid_reset_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
+ await self.__hid.reset()
+ return _json()
+
+ # ===== ATX
+
+ async def __atx_state_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
+ return _json(self.__atx.get_state())
+
+ @_wrap_exceptions_for_web("Click error")
+ async def __atx_click_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response:
+ button = request.query.get("button")
+ clicker = {
+ "power": self.__atx.click_power,
+ "power_long": self.__atx.click_power_long,
+ "reset": self.__atx.click_reset,
+ }.get(button)
+ if not clicker:
+ raise BadRequest("Invalid param 'button'")
+ await self.__broadcast_event("atx_click", button=button) # type: ignore
+ await clicker()
+ await self.__broadcast_event("atx_click", button=None) # type: ignore
+ return _json({"clicked": button})
+
+ # ===== MSD
+
+ async def __msd_state_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
+ return _json(self.__msd.get_state())
+
+ @_wrap_exceptions_for_web("Mass-storage error")
+ async def __msd_connect_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response:
+ to = request.query.get("to")
+ if to == "kvm":
+ await self.__msd.connect_to_kvm()
+ state = self.__msd.get_state()
+ await self.__broadcast_event("msd_state", **state)
+ elif to == "server":
+ await self.__msd.connect_to_pc()
+ state = self.__msd.get_state()
+ await self.__broadcast_event("msd_state", **state)
+ else:
+ raise BadRequest("Invalid param 'to'")
+ return _json(state)
+
+ @_wrap_exceptions_for_web("Can't write data to mass-storage device")
+ async def __msd_write_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response:
+ logger = get_logger(0)
+ reader = await request.multipart()
+ written = 0
+ try:
+ field = await reader.next()
+ if not field or field.name != "image_name":
+ raise BadRequest("Missing 'image_name' field")
+ image_name = (await field.read()).decode("utf-8")[:256]
+
+ field = await reader.next()
+ if not field or field.name != "image_data":
+ raise BadRequest("Missing 'image_data' field")
+
+ async with self.__msd:
+ await self.__broadcast_event("msd_state", **self.__msd.get_state())
+ logger.info("Writing image %r to mass-storage device ...", image_name)
+ await self.__msd.write_image_info(image_name, False)
+ while True:
+ chunk = await field.read_chunk(self.__msd_chunk_size)
+ if not chunk:
+ break
+ written = await self.__msd.write_image_chunk(chunk)
+ await self.__msd.write_image_info(image_name, True)
+ finally:
+ await self.__broadcast_event("msd_state", **self.__msd.get_state())
+ if written != 0:
+ logger.info("Written %d bytes to mass-storage device", written)
+ return _json({"written": written})
+
+ @_wrap_exceptions_for_web("Mass-storage error")
+ async def __msd_reset_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
+ await self.__msd.reset()
+ return _json()
+
+ # ===== STREAMER
+
+ async def __streamer_state_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
+ return _json(self.__streamer.get_state())
+
+ @_wrap_exceptions_for_web("Can't set stream params")
+ async def __streamer_set_params_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response:
+ quality = request.query.get("quality")
+ if quality:
+ self.__streamer_quality = _valid_int("quality", quality, 1, 100)
+ soft_fps = request.query.get("soft_fps")
+ if soft_fps:
+ self.__streamer_soft_fps = _valid_int("soft_fps", soft_fps, 1, 30)
+ return _json()
+
+ async def __streamer_reset_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
+ self.__reset_streamer = True
+ return _json()
+
+ # =====
+
+ def __run_app_print(self, text: str) -> None:
+ logger = get_logger()
+ for line in text.strip().splitlines():
+ logger.info(line.strip())
+
+ async def __on_shutdown(self, _: aiohttp.web.Application) -> None:
+ logger = get_logger(0)
+
+ logger.info("Cancelling system tasks ...")
+ for task in self.__system_tasks:
+ task.cancel()
+ await asyncio.gather(*self.__system_tasks)
+
+ logger.info("Disconnecting clients ...")
+ for ws in list(self.__sockets):
+ await self.__remove_socket(ws)
+
+ async def __on_cleanup(self, _: aiohttp.web.Application) -> None:
+ await self.__hid.cleanup()
+ await self.__streamer.cleanup()
+ await self.__msd.cleanup()
+
+ @_system_task
+ async def __hid_watchdog(self) -> None:
+ while self.__hid.is_alive():
+ await asyncio.sleep(0.1)
+ raise RuntimeError("HID is dead")
+
+ @_system_task
+ async def __stream_controller(self) -> None:
+ prev = 0
+ shutdown_at = 0.0
+
+ while True:
+ cur = len(self.__sockets)
+ if prev == 0 and cur > 0:
+ if not self.__streamer.is_running():
+ await self.__streamer.start(self.__streamer_quality, self.__streamer_soft_fps)
+ await self.__broadcast_event("streamer_state", **self.__streamer.get_state())
+ elif prev > 0 and cur == 0:
+ shutdown_at = time.time() + self.__streamer_shutdown_delay
+ elif prev == 0 and cur == 0 and time.time() > shutdown_at:
+ if self.__streamer.is_running():
+ await self.__streamer.stop()
+ await self.__broadcast_event("streamer_state", **self.__streamer.get_state())
+
+ if (
+ self.__reset_streamer
+ or self.__streamer_quality != self.__streamer.get_current_quality()
+ or self.__streamer_soft_fps != self.__streamer.get_current_soft_fps()
+ ):
+ if self.__streamer.is_running():
+ await self.__streamer.stop()
+ await self.__streamer.start(self.__streamer_quality, self.__streamer_soft_fps, no_init_restart=True)
+ await self.__broadcast_event("streamer_state", **self.__streamer.get_state())
+ self.__reset_streamer = False
+
+ prev = cur
+ await asyncio.sleep(0.1)
+
+ @_system_task
+ async def __poll_dead_sockets(self) -> None:
+ while True:
+ for ws in list(self.__sockets):
+ if ws.closed or not ws._req.transport: # pylint: disable=protected-access
+ await self.__remove_socket(ws)
+ await asyncio.sleep(0.1)
+
+ @_system_task
+ async def __poll_atx_state(self) -> None:
+ while True:
+ if self.__sockets:
+ await self.__broadcast_event("atx_state", **self.__atx.get_state())
+ await asyncio.sleep(self.__atx_state_poll)
+
+ async def __broadcast_event(self, event: str, **kwargs: Dict) -> None:
+ await asyncio.gather(*[
+ ws.send_str(json.dumps({
+ "msg_type": "event",
+ "msg": {
+ "event": event,
+ "event_attrs": kwargs,
+ },
+ }))
+ for ws in list(self.__sockets)
+ if not ws.closed and ws._req.transport # pylint: disable=protected-access
+ ], return_exceptions=True)
+
+ async def __register_socket(self, ws: aiohttp.web.WebSocketResponse) -> None:
+ async with self.__sockets_lock:
+ self.__sockets.add(ws)
+ get_logger().info("Registered new client socket: remote=%s; id=%d; active=%d",
+ ws._req.remote, id(ws), len(self.__sockets)) # pylint: disable=protected-access
+
+ async def __remove_socket(self, ws: aiohttp.web.WebSocketResponse) -> None:
+ async with self.__sockets_lock:
+ await self.__hid.clear_events()
+ try:
+ self.__sockets.remove(ws)
+ get_logger().info("Removed client socket: remote=%s; id=%d; active=%d",
+ ws._req.remote, id(ws), len(self.__sockets)) # pylint: disable=protected-access
+ await ws.close()
+ except Exception:
+ pass
diff --git a/kvmd/apps/kvmd/streamer.py b/kvmd/apps/kvmd/streamer.py
new file mode 100644
index 00000000..bfb77d28
--- /dev/null
+++ b/kvmd/apps/kvmd/streamer.py
@@ -0,0 +1,175 @@
+import os
+import asyncio
+import asyncio.subprocess
+
+from typing import List
+from typing import Dict
+from typing import Optional
+
+from ...logging import get_logger
+
+from ... import gpio
+
+
+# =====
+class Streamer: # pylint: disable=too-many-instance-attributes
+ def __init__(
+ self,
+ cap_power: int,
+ conv_power: int,
+ sync_delay: float,
+ init_delay: float,
+ init_restart_after: float,
+ quality: int,
+ soft_fps: int,
+ cmd: List[str],
+ loop: asyncio.AbstractEventLoop,
+ ) -> None:
+
+ self.__cap_power = (gpio.set_output(cap_power) if cap_power > 0 else cap_power)
+ self.__conv_power = (gpio.set_output(conv_power) if conv_power > 0 else conv_power)
+ self.__sync_delay = sync_delay
+ self.__init_delay = init_delay
+ self.__init_restart_after = init_restart_after
+ self.__quality = quality
+ self.__soft_fps = soft_fps
+ self.__cmd = cmd
+
+ self.__loop = loop
+
+ self.__proc_task: Optional[asyncio.Task] = None
+
+ async def start(self, quality: int, soft_fps: int, no_init_restart: bool=False) -> None:
+ logger = get_logger()
+ logger.info("Starting streamer ...")
+
+ assert 1 <= quality <= 100
+ self.__quality = quality
+
+ assert 1 <= soft_fps <= 30
+ self.__soft_fps = soft_fps
+
+ await self.__inner_start()
+ if self.__init_restart_after > 0.0 and not no_init_restart:
+ logger.info("Stopping streamer to restart ...")
+ await self.__inner_stop()
+ logger.info("Starting again ...")
+ await self.__inner_start()
+
+ async def stop(self) -> None:
+ get_logger().info("Stopping streamer ...")
+ await self.__inner_stop()
+
+ def is_running(self) -> bool:
+ return bool(self.__proc_task)
+
+ def get_current_quality(self) -> int:
+ return self.__quality
+
+ def get_current_soft_fps(self) -> int:
+ return self.__soft_fps
+
+ def get_state(self) -> Dict:
+ return {
+ "is_running": self.is_running(),
+ "quality": self.__quality,
+ "soft_fps": self.__soft_fps,
+ }
+
+ def get_app(self) -> str:
+ return os.path.basename(self.__cmd[0])
+
+ async def get_version(self) -> str:
+ proc = await asyncio.create_subprocess_exec(
+ *[self.__cmd[0], "--version"],
+ stdout=asyncio.subprocess.PIPE,
+ stderr=asyncio.subprocess.DEVNULL,
+ )
+ (stdout, _) = await proc.communicate()
+ return stdout.decode(errors="ignore").strip()
+
+ async def cleanup(self) -> None:
+ if self.is_running():
+ await self.stop()
+
+ async def __inner_start(self) -> None:
+ assert not self.__proc_task
+ await self.__set_hw_enabled(True)
+ self.__proc_task = self.__loop.create_task(self.__process())
+
+ async def __inner_stop(self) -> None:
+ assert self.__proc_task
+ self.__proc_task.cancel()
+ await asyncio.gather(self.__proc_task, return_exceptions=True)
+ await self.__set_hw_enabled(False)
+ self.__proc_task = None
+
+ async def __set_hw_enabled(self, enabled: bool) -> None:
+ # XXX: This sequence is very important to enable converter and cap board
+ if self.__cap_power > 0:
+ gpio.write(self.__cap_power, enabled)
+ if self.__conv_power > 0:
+ if enabled:
+ await asyncio.sleep(self.__sync_delay)
+ gpio.write(self.__conv_power, enabled)
+ if enabled:
+ await asyncio.sleep(self.__init_delay)
+
+ async def __process(self) -> None: # pylint: disable=too-many-branches
+ logger = get_logger(0)
+
+ while True: # pylint: disable=too-many-nested-blocks
+ proc: Optional[asyncio.subprocess.Process] = None # pylint: disable=no-member
+ try:
+ cmd = [part.format(quality=self.__quality, soft_fps=self.__soft_fps) for part in self.__cmd]
+ proc = await asyncio.create_subprocess_exec(
+ *cmd,
+ stdout=asyncio.subprocess.PIPE,
+ stderr=asyncio.subprocess.STDOUT,
+ )
+ logger.info("Started streamer pid=%d: %s", proc.pid, cmd)
+
+ empty = 0
+ async for line_bytes in proc.stdout: # type: ignore
+ line = line_bytes.decode(errors="ignore").strip()
+ if line:
+ logger.info("Streamer: %s", line)
+ empty = 0
+ else:
+ empty += 1
+ if empty == 100: # asyncio bug
+ raise RuntimeError("Streamer/asyncio: too many empty lines")
+
+ raise RuntimeError("Streamer unexpectedly died")
+
+ except asyncio.CancelledError:
+ break
+
+ except Exception as err:
+ if proc:
+ logger.exception("Unexpected streamer error: pid=%d", proc.pid)
+ else:
+ logger.exception("Can't start streamer: %s", err)
+ await asyncio.sleep(1)
+
+ finally:
+ if proc and proc.returncode is None:
+ await self.__kill(proc)
+
+ async def __kill(self, proc: asyncio.subprocess.Process) -> None: # pylint: disable=no-member
+ try:
+ proc.terminate()
+ await asyncio.sleep(1)
+ if proc.returncode is None:
+ try:
+ proc.kill()
+ except Exception:
+ if proc.returncode is not None:
+ raise
+ await proc.wait()
+ get_logger().info("Streamer killed: pid=%d; retcode=%d", proc.pid, proc.returncode)
+ except Exception:
+ if proc.returncode is None:
+ get_logger().exception("Can't kill streamer pid=%d", proc.pid)
+ else:
+ get_logger().info("Streamer killed: pid=%d; retcode=%d", proc.pid, proc.returncode)
diff --git a/kvmd/apps/wscli/__init__.py b/kvmd/apps/wscli/__init__.py
new file mode 100644
index 00000000..19d3181b
--- /dev/null
+++ b/kvmd/apps/wscli/__init__.py
@@ -0,0 +1,50 @@
+#!/usr/bin/env python3
+
+
+import sys
+import signal
+import asyncio
+import argparse
+import time
+
+import aiohttp
+
+
+# =====
+async def _run_client(loop: asyncio.AbstractEventLoop, url: str) -> None:
+ def stdin_callback() -> None:
+ line = sys.stdin.buffer.readline().decode()
+ if line:
+ asyncio.ensure_future(ws.send_str(line), loop=loop)
+ else:
+ loop.stop()
+
+ loop.add_reader(sys.stdin.fileno(), stdin_callback)
+
+ async def dispatch() -> None:
+ while True:
+ msg = await ws.receive()
+ if msg.type == aiohttp.WSMsgType.TEXT:
+ print("[%.5f] Received: %s" % (time.time(), msg.data.strip()))
+ else:
+ if msg.type == aiohttp.WSMsgType.CLOSE:
+ await ws.close()
+ elif msg.type == aiohttp.WSMsgType.ERROR:
+ print("[%.5f] Error during receive: %s" % (time.time(), ws.exception()))
+ elif msg.type == aiohttp.WSMsgType.CLOSED:
+ pass
+ break
+
+ async with aiohttp.ClientSession().ws_connect(url) as ws:
+ await dispatch()
+
+
+def main() -> None:
+ parser = argparse.ArgumentParser()
+ parser.add_argument("-u", "--url", default="http://127.0.0.1:8081/ws")
+ options = parser.parse_args()
+
+ loop = asyncio.get_event_loop()
+ loop.add_signal_handler(signal.SIGINT, loop.stop)
+ loop.create_task(_run_client(loop, options.url))
+ loop.run_forever()
diff --git a/kvmd/apps/wscli/__main__.py b/kvmd/apps/wscli/__main__.py
new file mode 100644
index 00000000..031df43e
--- /dev/null
+++ b/kvmd/apps/wscli/__main__.py
@@ -0,0 +1,2 @@
+from . import main
+main()