diff options
Diffstat (limited to 'kvmd/apps')
-rw-r--r-- | kvmd/apps/__init__.py | 0 | ||||
-rw-r--r-- | kvmd/apps/cleanup/__init__.py | 40 | ||||
-rw-r--r-- | kvmd/apps/cleanup/__main__.py | 2 | ||||
-rw-r--r-- | kvmd/apps/kvmd/__init__.py | 81 | ||||
-rw-r--r-- | kvmd/apps/kvmd/__main__.py | 2 | ||||
-rw-r--r-- | kvmd/apps/kvmd/atx.py | 63 | ||||
-rw-r--r-- | kvmd/apps/kvmd/hid.py | 213 | ||||
-rw-r--r-- | kvmd/apps/kvmd/msd.py | 322 | ||||
-rw-r--r-- | kvmd/apps/kvmd/server.py | 471 | ||||
-rw-r--r-- | kvmd/apps/kvmd/streamer.py | 175 | ||||
-rw-r--r-- | kvmd/apps/wscli/__init__.py | 50 | ||||
-rw-r--r-- | kvmd/apps/wscli/__main__.py | 2 |
12 files changed, 1421 insertions, 0 deletions
diff --git a/kvmd/apps/__init__.py b/kvmd/apps/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/kvmd/apps/__init__.py diff --git a/kvmd/apps/cleanup/__init__.py b/kvmd/apps/cleanup/__init__.py new file mode 100644 index 00000000..60446c2b --- /dev/null +++ b/kvmd/apps/cleanup/__init__.py @@ -0,0 +1,40 @@ +import os +import subprocess +import time + +from ...application import init +from ...logging import get_logger + +from ... import gpio + + +# ===== +def main() -> None: + config = init() + logger = get_logger(0) + + logger.info("Cleaning up ...") + with gpio.bcm(): + for (name, pin) in [ + ("hid_reset", config["hid"]["pinout"]["reset"]), + ("msd_target", config["msd"]["pinout"]["target"]), + ("msd_reset", config["msd"]["pinout"]["reset"]), + ("atx_power_switch", config["atx"]["pinout"]["power_switch"]), + ("atx_reset_switch", config["atx"]["pinout"]["reset_switch"]), + ("streamer_cap", config["streamer"]["pinout"]["cap"]), + ("streamer_conv", config["streamer"]["pinout"]["conv"]), + ]: + if pin > 0: + logger.info("Writing value=0 to pin=%d (%s)", pin, name) + gpio.set_output(pin, initial=False) + + streamer = os.path.basename(config["streamer"]["cmd"][0]) + logger.info("Trying to find and kill %r ...", streamer) + try: + subprocess.check_output(["killall", streamer], stderr=subprocess.STDOUT) + time.sleep(3) + subprocess.check_output(["killall", "-9", streamer], stderr=subprocess.STDOUT) + except subprocess.CalledProcessError: + pass + + logger.info("Bye-bye") diff --git a/kvmd/apps/cleanup/__main__.py b/kvmd/apps/cleanup/__main__.py new file mode 100644 index 00000000..031df43e --- /dev/null +++ b/kvmd/apps/cleanup/__main__.py @@ -0,0 +1,2 @@ +from . import main +main() diff --git a/kvmd/apps/kvmd/__init__.py b/kvmd/apps/kvmd/__init__.py new file mode 100644 index 00000000..b58b6df3 --- /dev/null +++ b/kvmd/apps/kvmd/__init__.py @@ -0,0 +1,81 @@ +import asyncio + +from ...application import init +from ...logging import get_logger +from ...logging import Log + +from ... import gpio + +from .hid import Hid +from .atx import Atx +from .msd import MassStorageDevice +from .streamer import Streamer +from .server import Server + + +# ===== +def main() -> None: + config = init() + with gpio.bcm(): + loop = asyncio.get_event_loop() + + log = Log( + services=list(config["log"]["services"]), + loop=loop, + ) + + hid = Hid( + reset=int(config["hid"]["pinout"]["reset"]), + device_path=str(config["hid"]["device"]), + speed=int(config["hid"]["speed"]), + reset_delay=float(config["hid"]["reset_delay"]), + ) + + atx = Atx( + power_led=int(config["atx"]["pinout"]["power_led"]), + hdd_led=int(config["atx"]["pinout"]["hdd_led"]), + power_switch=int(config["atx"]["pinout"]["power_switch"]), + reset_switch=int(config["atx"]["pinout"]["reset_switch"]), + click_delay=float(config["atx"]["click_delay"]), + long_click_delay=float(config["atx"]["long_click_delay"]), + ) + + msd = MassStorageDevice( + target=int(config["msd"]["pinout"]["target"]), + reset=int(config["msd"]["pinout"]["reset"]), + device_path=str(config["msd"]["device"]), + init_delay=float(config["msd"]["init_delay"]), + reset_delay=float(config["msd"]["reset_delay"]), + write_meta=bool(config["msd"]["write_meta"]), + loop=loop, + ) + + streamer = Streamer( + cap_power=int(config["streamer"]["pinout"]["cap"]), + conv_power=int(config["streamer"]["pinout"]["conv"]), + sync_delay=float(config["streamer"]["sync_delay"]), + init_delay=float(config["streamer"]["init_delay"]), + init_restart_after=float(config["streamer"]["init_restart_after"]), + quality=int(config["streamer"]["quality"]), + soft_fps=int(config["streamer"]["soft_fps"]), + cmd=list(map(str, config["streamer"]["cmd"])), + loop=loop, + ) + + Server( + log=log, + hid=hid, + atx=atx, + msd=msd, + streamer=streamer, + heartbeat=float(config["server"]["heartbeat"]), + atx_state_poll=float(config["atx"]["state_poll"]), + streamer_shutdown_delay=float(config["streamer"]["shutdown_delay"]), + msd_chunk_size=int(config["msd"]["chunk_size"]), + loop=loop, + ).run( + host=str(config["server"]["host"]), + port=int(config["server"]["port"]), + ) + + get_logger().info("Bye-bye") diff --git a/kvmd/apps/kvmd/__main__.py b/kvmd/apps/kvmd/__main__.py new file mode 100644 index 00000000..031df43e --- /dev/null +++ b/kvmd/apps/kvmd/__main__.py @@ -0,0 +1,2 @@ +from . import main +main() diff --git a/kvmd/apps/kvmd/atx.py b/kvmd/apps/kvmd/atx.py new file mode 100644 index 00000000..d49708e0 --- /dev/null +++ b/kvmd/apps/kvmd/atx.py @@ -0,0 +1,63 @@ +import asyncio + +from typing import Dict + +from ...logging import get_logger + +from ... import aioregion +from ... import gpio + + +# ===== +class AtxIsBusy(aioregion.RegionIsBusyError): + pass + + +class Atx: + def __init__( + self, + power_led: int, + hdd_led: int, + + power_switch: int, + reset_switch: int, + click_delay: float, + long_click_delay: float, + ) -> None: + + self.__power_led = gpio.set_input(power_led) + self.__hdd_led = gpio.set_input(hdd_led) + + self.__power_switch = gpio.set_output(power_switch) + self.__reset_switch = gpio.set_output(reset_switch) + self.__click_delay = click_delay + self.__long_click_delay = long_click_delay + + self.__region = aioregion.AioExclusiveRegion(AtxIsBusy) + + def get_state(self) -> Dict: + return { + "busy": self.__region.is_busy(), + "leds": { + "power": (not gpio.read(self.__power_led)), + "hdd": (not gpio.read(self.__hdd_led)), + }, + } + + async def click_power(self) -> None: + await self.__click(self.__power_switch, self.__click_delay) + get_logger().info("Clicked power") + + async def click_power_long(self) -> None: + await self.__click(self.__power_switch, self.__long_click_delay) + get_logger().info("Clicked power (long press)") + + async def click_reset(self) -> None: + await self.__click(self.__reset_switch, self.__click_delay) + get_logger().info("Clicked reset") + + async def __click(self, pin: int, delay: float) -> None: + with self.__region: + for flag in (True, False): + gpio.write(pin, flag) + await asyncio.sleep(delay) diff --git a/kvmd/apps/kvmd/hid.py b/kvmd/apps/kvmd/hid.py new file mode 100644 index 00000000..d7eb47cf --- /dev/null +++ b/kvmd/apps/kvmd/hid.py @@ -0,0 +1,213 @@ +import asyncio +import multiprocessing +import multiprocessing.queues +import queue +import struct +import pkgutil +import time + +from typing import Dict +from typing import Set +from typing import NamedTuple + +import yaml +import serial +import setproctitle + +from ...logging import get_logger + +from ... import gpio + + +# ===== +def _get_keymap() -> Dict[str, int]: + return yaml.load(pkgutil.get_data("kvmd", "data/keymap.yaml").decode()) # type: ignore + + +_KEYMAP = _get_keymap() + + +class _KeyEvent(NamedTuple): + key: str + state: bool + + +class _MouseMoveEvent(NamedTuple): + to_x: int + to_y: int + + +class _MouseButtonEvent(NamedTuple): + button: str + state: bool + + +class _MouseWheelEvent(NamedTuple): + delta_y: int + + +class Hid(multiprocessing.Process): # pylint: disable=too-many-instance-attributes + def __init__( + self, + reset: int, + device_path: str, + speed: int, + reset_delay: float, + ) -> None: + + super().__init__(daemon=True) + + self.__reset = gpio.set_output(reset) + self.__device_path = device_path + self.__speed = speed + self.__reset_delay = reset_delay + + self.__pressed_keys: Set[str] = set() + self.__pressed_mouse_buttons: Set[str] = set() + self.__lock = asyncio.Lock() + self.__queue: multiprocessing.queues.Queue = multiprocessing.Queue() + + self.__stop_event = multiprocessing.Event() + + def start(self) -> None: + get_logger().info("Starting HID daemon ...") + super().start() + + async def reset(self) -> None: + async with self.__lock: + gpio.write(self.__reset, True) + await asyncio.sleep(self.__reset_delay) + gpio.write(self.__reset, False) + + async def send_key_event(self, key: str, state: bool) -> None: + if not self.__stop_event.is_set(): + async with self.__lock: + if state and key not in self.__pressed_keys: + self.__pressed_keys.add(key) + self.__queue.put(_KeyEvent(key, state)) + elif not state and key in self.__pressed_keys: + self.__pressed_keys.remove(key) + self.__queue.put(_KeyEvent(key, state)) + + async def send_mouse_move_event(self, to_x: int, to_y: int) -> None: + if not self.__stop_event.is_set(): + async with self.__lock: + self.__queue.put(_MouseMoveEvent(to_x, to_y)) + + async def send_mouse_button_event(self, button: str, state: bool) -> None: + if not self.__stop_event.is_set(): + async with self.__lock: + if state and button not in self.__pressed_mouse_buttons: + self.__pressed_mouse_buttons.add(button) + self.__queue.put(_MouseButtonEvent(button, state)) + elif not state and button in self.__pressed_mouse_buttons: + self.__pressed_mouse_buttons.remove(button) + self.__queue.put(_MouseButtonEvent(button, state)) + + async def send_mouse_wheel_event(self, delta_y: int) -> None: + if not self.__stop_event.is_set(): + async with self.__lock: + self.__queue.put(_MouseWheelEvent(delta_y)) + + async def clear_events(self) -> None: + if not self.__stop_event.is_set(): + async with self.__lock: + self.__unsafe_clear_events() + + async def cleanup(self) -> None: + async with self.__lock: + if self.is_alive(): + self.__unsafe_clear_events() + get_logger().info("Stopping keyboard daemon ...") + self.__stop_event.set() + self.join() + else: + get_logger().warning("Emergency cleaning up HID events ...") + self.__emergency_clear_events() + gpio.write(self.__reset, False) + + def __unsafe_clear_events(self) -> None: + for button in self.__pressed_mouse_buttons: + self.__queue.put(_MouseButtonEvent(button, False)) + self.__pressed_mouse_buttons.clear() + for key in self.__pressed_keys: + self.__queue.put(_KeyEvent(key, False)) + self.__pressed_keys.clear() + + def __emergency_clear_events(self) -> None: + try: + with serial.Serial(self.__device_path, self.__speed) as tty: + self.__send_clear_hid(tty) + except Exception: + get_logger().exception("Can't execute emergency clear HID events") + + def run(self) -> None: # pylint: disable=too-many-branches + setproctitle.setproctitle("[hid] " + setproctitle.getproctitle()) + try: + with serial.Serial(self.__device_path, self.__speed) as tty: + hid_ready = False + while True: + if hid_ready: + try: + event = self.__queue.get(timeout=0.05) + except queue.Empty: + pass + else: + if isinstance(event, _KeyEvent): + self.__send_key_event(tty, event) + elif isinstance(event, _MouseMoveEvent): + self.__send_mouse_move_event(tty, event) + elif isinstance(event, _MouseButtonEvent): + self.__send_mouse_button_event(tty, event) + elif isinstance(event, _MouseWheelEvent): + self.__send_mouse_wheel_event(tty, event) + else: + raise RuntimeError("Unknown HID event") + hid_ready = False + + if tty.in_waiting: + while tty.in_waiting: + tty.read(tty.in_waiting) + hid_ready = True + else: + time.sleep(0.05) + + if self.__stop_event.is_set() and self.__queue.qsize() == 0: + break + except Exception: + get_logger().exception("Unhandled exception") + raise + + def __send_key_event(self, tty: serial.Serial, event: _KeyEvent) -> None: + code = _KEYMAP.get(event.key) + if code: + key_bytes = bytes([code]) + assert len(key_bytes) == 1, (event, key_bytes) + tty.write( + b"\01" + + key_bytes + + (b"\01" if event.state else b"\00") + + b"\00\00" + ) + + def __send_mouse_move_event(self, tty: serial.Serial, event: _MouseMoveEvent) -> None: + to_x = min(max(-32768, event.to_x), 32767) + to_y = min(max(-32768, event.to_y), 32767) + tty.write(b"\02" + struct.pack(">hh", to_x, to_y)) + + def __send_mouse_button_event(self, tty: serial.Serial, event: _MouseButtonEvent) -> None: + if event.button == "left": + code = (0b10000000 | (0b00001000 if event.state else 0)) + elif event.button == "right": + code = (0b01000000 | (0b00000100 if event.state else 0)) + else: + code = 0 + if code: + tty.write(b"\03" + bytes([code]) + b"\00\00\00") + + def __send_mouse_wheel_event(self, tty: serial.Serial, event: _MouseWheelEvent) -> None: + delta_y = min(max(-128, event.delta_y), 127) + tty.write(b"\04\00" + struct.pack(">b", delta_y) + b"\00\00") + + def __send_clear_hid(self, tty: serial.Serial) -> None: + tty.write(b"\00\00\00\00\00") diff --git a/kvmd/apps/kvmd/msd.py b/kvmd/apps/kvmd/msd.py new file mode 100644 index 00000000..2cdb9050 --- /dev/null +++ b/kvmd/apps/kvmd/msd.py @@ -0,0 +1,322 @@ +import os +import struct +import asyncio +import types + +from typing import Dict +from typing import NamedTuple +from typing import Callable +from typing import Type +from typing import Optional +from typing import Any + +import pyudev + +import aiofiles +import aiofiles.base + +from ...logging import get_logger + +from ... import aioregion +from ... import gpio + + +# ===== +class MsdError(Exception): + pass + + +class MsdOperationError(MsdError): + pass + + +class MsdIsNotOperationalError(MsdOperationError): + def __init__(self) -> None: + super().__init__("Missing path for mass-storage device") + + +class MsdAlreadyConnectedToPcError(MsdOperationError): + def __init__(self) -> None: + super().__init__("Mass-storage is already connected to Server") + + +class MsdAlreadyConnectedToKvmError(MsdOperationError): + def __init__(self) -> None: + super().__init__("Mass-storage is already connected to KVM") + + +class MsdIsNotConnectedToKvmError(MsdOperationError): + def __init__(self) -> None: + super().__init__("Mass-storage is not connected to KVM") + + +class MsdIsBusyError(MsdOperationError, aioregion.RegionIsBusyError): + pass + + +# ===== +class _HardwareInfo(NamedTuple): + manufacturer: str + product: str + serial: str + + +class _ImageInfo(NamedTuple): + name: str + size: int + complete: bool + + +class _MassStorageDeviceInfo(NamedTuple): + path: str + real: str + size: int + hw: Optional[_HardwareInfo] + image: Optional[_ImageInfo] + + +_IMAGE_INFO_SIZE = 4096 +_IMAGE_INFO_MAGIC_SIZE = 16 +_IMAGE_INFO_IMAGE_NAME_SIZE = 256 +_IMAGE_INFO_PADS_SIZE = _IMAGE_INFO_SIZE - _IMAGE_INFO_IMAGE_NAME_SIZE - 1 - 8 - _IMAGE_INFO_MAGIC_SIZE * 8 +_IMAGE_INFO_FORMAT = ">%dL%dc?Q%dx%dL" % ( + _IMAGE_INFO_MAGIC_SIZE, + _IMAGE_INFO_IMAGE_NAME_SIZE, + _IMAGE_INFO_PADS_SIZE, + _IMAGE_INFO_MAGIC_SIZE, +) +_IMAGE_INFO_MAGIC = [0x1ACE1ACE] * _IMAGE_INFO_MAGIC_SIZE + + +def _make_image_info_bytes(name: str, size: int, complete: bool) -> bytes: + return struct.pack( + _IMAGE_INFO_FORMAT, + *_IMAGE_INFO_MAGIC, + *memoryview(( # type: ignore + name.encode("utf-8") + + b"\x00" * _IMAGE_INFO_IMAGE_NAME_SIZE + )[:_IMAGE_INFO_IMAGE_NAME_SIZE]).cast("c"), + complete, + size, + *_IMAGE_INFO_MAGIC, + ) + + +def _parse_image_info_bytes(data: bytes) -> Optional[_ImageInfo]: + try: + parsed = list(struct.unpack(_IMAGE_INFO_FORMAT, data)) + except struct.error: + pass + else: + magic_begin = parsed[:_IMAGE_INFO_MAGIC_SIZE] + magic_end = parsed[-_IMAGE_INFO_MAGIC_SIZE:] + if magic_begin == magic_end == _IMAGE_INFO_MAGIC: + image_name_bytes = b"".join(parsed[_IMAGE_INFO_MAGIC_SIZE:_IMAGE_INFO_MAGIC_SIZE + _IMAGE_INFO_IMAGE_NAME_SIZE]) + return _ImageInfo( + name=image_name_bytes.decode("utf-8", errors="ignore").strip("\x00").strip(), + size=parsed[_IMAGE_INFO_MAGIC_SIZE + _IMAGE_INFO_IMAGE_NAME_SIZE + 1], + complete=parsed[_IMAGE_INFO_MAGIC_SIZE + _IMAGE_INFO_IMAGE_NAME_SIZE], + ) + return None + + +def _explore_device(device_path: str) -> Optional[_MassStorageDeviceInfo]: + # udevadm info -a -p $(udevadm info -q path -n /dev/sda) + ctx = pyudev.Context() + + device = pyudev.Devices.from_device_file(ctx, device_path) + if device.subsystem != "block": + return None + try: + size = device.attributes.asint("size") * 512 + except KeyError: + return None + + hw_info: Optional[_HardwareInfo] = None + usb_device = device.find_parent("usb", "usb_device") + if usb_device: + hw_info = _HardwareInfo(**{ + attr: usb_device.attributes.asstring(attr).strip() + for attr in ["manufacturer", "product", "serial"] + }) + + with open(device_path, "rb") as device_file: + device_file.seek(size - _IMAGE_INFO_SIZE) + image_info = _parse_image_info_bytes(device_file.read()) + + return _MassStorageDeviceInfo( + path=device_path, + real=os.path.realpath(device_path), + size=size, + image=image_info, + hw=hw_info, + ) + + +def _msd_operated(method: Callable) -> Callable: + async def wrap(self: "MassStorageDevice", *args: Any, **kwargs: Any) -> Any: + if not self._device_path: # pylint: disable=protected-access + MsdIsNotOperationalError() + return (await method(self, *args, **kwargs)) + return wrap + + +# ===== +class MassStorageDevice: # pylint: disable=too-many-instance-attributes + def __init__( + self, + target: int, + reset: int, + + device_path: str, + init_delay: float, + reset_delay: float, + write_meta: bool, + + loop: asyncio.AbstractEventLoop, + ) -> None: + + self.__target = gpio.set_output(target) + self.__reset = gpio.set_output(reset) + + self._device_path = device_path + self.__init_delay = init_delay + self.__reset_delay = reset_delay + self.__write_meta = write_meta + + self.__loop = loop + + self.__device_info: Optional[_MassStorageDeviceInfo] = None + self.__saved_device_info: Optional[_MassStorageDeviceInfo] = None + self.__region = aioregion.AioExclusiveRegion(MsdIsBusyError) + self.__device_file: Optional[aiofiles.base.AiofilesContextManager] = None + self.__written = 0 + + logger = get_logger(0) + if self._device_path: + logger.info("Using %r as mass-storage device", self._device_path) + try: + logger.info("Enabled image metadata writing") + loop.run_until_complete(self.connect_to_kvm(no_delay=True)) + except Exception as err: + if isinstance(err, MsdError): + log = logger.error + else: + log = logger.exception + log("Mass-storage device is not operational: %s", err) + self._device_path = "" + else: + logger.warning("Mass-storage device is not operational") + + @_msd_operated + async def connect_to_kvm(self, no_delay: bool=False) -> None: + with self.__region: + if self.__device_info: + raise MsdAlreadyConnectedToKvmError() + gpio.write(self.__target, False) + if not no_delay: + await asyncio.sleep(self.__init_delay) + await self.__load_device_info() + get_logger().info("Mass-storage device switched to KVM: %s", self.__device_info) + + @_msd_operated + async def connect_to_pc(self) -> None: + with self.__region: + if not self.__device_info: + raise MsdAlreadyConnectedToPcError() + gpio.write(self.__target, True) + self.__device_info = None + get_logger().info("Mass-storage device switched to Server") + + @_msd_operated + async def reset(self) -> None: + with self.__region: + gpio.write(self.__reset, True) + await asyncio.sleep(self.__reset_delay) + gpio.write(self.__reset, False) + + def get_state(self) -> Dict: + info = (self.__saved_device_info._asdict() if self.__saved_device_info else None) + if info: + info["hw"] = (info["hw"]._asdict() if info["hw"] else None) + info["image"] = (info["image"]._asdict() if info["image"] else None) + + connected_to: Optional[str] = None + if self._device_path: + connected_to = ("kvm" if self.__device_info else "server") + + return { + "in_operate": bool(self._device_path), + "connected_to": connected_to, + "busy": bool(self.__device_file), + "written": self.__written, + "info": info, + } + + async def cleanup(self) -> None: + await self.__close_device_file() + gpio.write(self.__target, False) + gpio.write(self.__reset, False) + + @_msd_operated + async def __aenter__(self) -> "MassStorageDevice": + self.__region.enter() + try: + if not self.__device_info: + raise MsdIsNotConnectedToKvmError() + self.__device_file = await aiofiles.open(self.__device_info.path, mode="w+b", buffering=0) + self.__written = 0 + return self + finally: + self.__region.exit() + + async def write_image_info(self, name: str, complete: bool) -> None: + assert self.__device_file + assert self.__device_info + if self.__write_meta: + if self.__device_info.size - self.__written > _IMAGE_INFO_SIZE: + await self.__device_file.seek(self.__device_info.size - _IMAGE_INFO_SIZE) + await self.__write_to_device_file(_make_image_info_bytes(name, self.__written, complete)) + await self.__device_file.seek(0) + await self.__load_device_info() + else: + get_logger().error("Can't write image info because device is full") + + async def write_image_chunk(self, chunk: bytes) -> int: + await self.__write_to_device_file(chunk) + self.__written += len(chunk) + return self.__written + + async def __aexit__( + self, + _exc_type: Type[BaseException], + _exc: BaseException, + _tb: types.TracebackType, + ) -> None: + try: + await self.__close_device_file() + finally: + self.__region.exit() + + async def __write_to_device_file(self, data: bytes) -> None: + assert self.__device_file + await self.__device_file.write(data) + await self.__device_file.flush() + await self.__loop.run_in_executor(None, os.fsync, self.__device_file.fileno()) + + async def __load_device_info(self) -> None: + device_info = await self.__loop.run_in_executor(None, _explore_device, self._device_path) + if not device_info: + raise MsdError("Can't explore device %r" % (self._device_path)) + self.__device_info = self.__saved_device_info = device_info + + async def __close_device_file(self) -> None: + try: + if self.__device_file: + get_logger().info("Closing mass-storage device file ...") + await self.__device_file.close() + except Exception: + get_logger().exception("Can't close mass-storage device file") + await self.reset() + self.__device_file = None + self.__written = 0 diff --git a/kvmd/apps/kvmd/server.py b/kvmd/apps/kvmd/server.py new file mode 100644 index 00000000..8522cb16 --- /dev/null +++ b/kvmd/apps/kvmd/server.py @@ -0,0 +1,471 @@ +import os +import signal +import asyncio +import json +import time + +from typing import List +from typing import Dict +from typing import Set +from typing import Callable +from typing import Optional + +import aiohttp.web +import setproctitle + +from ...logging import get_logger +from ...logging import Log + +from ...aioregion import RegionIsBusyError + +from ... import __version__ + +from .hid import Hid + +from .atx import Atx + +from .msd import MsdOperationError +from .msd import MassStorageDevice + +from .streamer import Streamer + + +# ===== +def _system_task(method: Callable) -> Callable: + async def wrap(self: "Server") -> None: + try: + await method(self) + except asyncio.CancelledError: + pass + except Exception: + get_logger().exception("Unhandled exception, killing myself ...") + os.kill(os.getpid(), signal.SIGTERM) + return wrap + + +def _json(result: Optional[Dict]=None, status: int=200) -> aiohttp.web.Response: + return aiohttp.web.Response( + text=json.dumps({ + "ok": (True if status == 200 else False), + "result": (result or {}), + }, sort_keys=True, indent=4), + status=status, + content_type="application/json", + ) + + +def _json_exception(msg: str, err: Exception, status: int) -> aiohttp.web.Response: + msg = "%s: %s" % (msg, err) + get_logger().error(msg) + return _json({ + "error": type(err).__name__, + "error_msg": msg, + }, status=status) + + +class BadRequest(Exception): + pass + + +def _valid_bool(name: str, flag: Optional[str]) -> bool: + flag = str(flag).strip().lower() + if flag in ["1", "true", "yes"]: + return True + elif flag in ["0", "false", "no"]: + return False + raise BadRequest("Invalid param '%s'" % (name)) + + +def _valid_int(name: str, value: Optional[str], min_value: Optional[int]=None, max_value: Optional[int]=None) -> int: + try: + value_int = int(value) # type: ignore + if ( + (min_value is not None and value_int < min_value) + or (max_value is not None and value_int > max_value) + ): + raise ValueError() + return value_int + except Exception: + raise BadRequest("Invalid param %r" % (name)) + + +def _wrap_exceptions_for_web(msg: str) -> Callable: + def make_wrapper(method: Callable) -> Callable: + async def wrap(self: "Server", request: aiohttp.web.Request) -> aiohttp.web.Response: + try: + return (await method(self, request)) + except RegionIsBusyError as err: + return _json_exception(msg, err, 409) + except (BadRequest, MsdOperationError) as err: + return _json_exception(msg, err, 400) + return wrap + return make_wrapper + + +class Server: # pylint: disable=too-many-instance-attributes + def __init__( # pylint: disable=too-many-arguments + self, + log: Log, + hid: Hid, + atx: Atx, + msd: MassStorageDevice, + streamer: Streamer, + + heartbeat: float, + atx_state_poll: float, + streamer_shutdown_delay: float, + msd_chunk_size: int, + + loop: asyncio.AbstractEventLoop, + ) -> None: + + self.__log = log + self.__hid = hid + self.__atx = atx + self.__msd = msd + self.__streamer = streamer + + self.__heartbeat = heartbeat + self.__streamer_shutdown_delay = streamer_shutdown_delay + self.__atx_state_poll = atx_state_poll + self.__msd_chunk_size = msd_chunk_size + + self.__loop = loop + + self.__sockets: Set[aiohttp.web.WebSocketResponse] = set() + self.__sockets_lock = asyncio.Lock() + + self.__system_tasks: List[asyncio.Task] = [] + + self.__reset_streamer = False + self.__streamer_quality = streamer.get_current_quality() + self.__streamer_soft_fps = streamer.get_current_soft_fps() + + def run(self, host: str, port: int) -> None: + self.__hid.start() + + setproctitle.setproctitle("[main] " + setproctitle.getproctitle()) + + app = aiohttp.web.Application(loop=self.__loop) + + app.router.add_get("/info", self.__info_handler) + app.router.add_get("/log", self.__log_handler) + + app.router.add_get("/ws", self.__ws_handler) + + app.router.add_post("/hid/reset", self.__hid_reset_handler) + + app.router.add_get("/atx", self.__atx_state_handler) + app.router.add_post("/atx/click", self.__atx_click_handler) + + app.router.add_get("/msd", self.__msd_state_handler) + app.router.add_post("/msd/connect", self.__msd_connect_handler) + app.router.add_post("/msd/write", self.__msd_write_handler) + app.router.add_post("/msd/reset", self.__msd_reset_handler) + + app.router.add_get("/streamer", self.__streamer_state_handler) + app.router.add_post("/streamer/set_params", self.__streamer_set_params_handler) + app.router.add_post("/streamer/reset", self.__streamer_reset_handler) + + app.on_shutdown.append(self.__on_shutdown) + app.on_cleanup.append(self.__on_cleanup) + + self.__system_tasks.extend([ + self.__loop.create_task(self.__hid_watchdog()), + self.__loop.create_task(self.__stream_controller()), + self.__loop.create_task(self.__poll_dead_sockets()), + self.__loop.create_task(self.__poll_atx_state()), + ]) + + aiohttp.web.run_app(app, host=host, port=port, print=self.__run_app_print) + + # ===== SYSTEM + + async def __info_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response: + return _json({ + "version": { + "kvmd": __version__, + "streamer": await self.__streamer.get_version(), + }, + "streamer": self.__streamer.get_app(), + }) + + @_wrap_exceptions_for_web("Log error") + async def __log_handler(self, request: aiohttp.web.Request) -> aiohttp.web.StreamResponse: + seek = _valid_int("seek", request.query.get("seek", "0"), 0) + follow = _valid_bool("follow", request.query.get("follow", "false")) + response = aiohttp.web.StreamResponse(status=200, reason="OK", headers={"Content-Type": "text/plain"}) + await response.prepare(request) + async for record in self.__log.log(seek, follow): + await response.write(("[%s %s] --- %s" % ( + record["dt"].strftime("%Y-%m-%d %H:%M:%S"), + record["service"], + record["msg"], + )).encode("utf-8") + b"\r\n") + return response + + # ===== WEBSOCKET + + async def __ws_handler(self, request: aiohttp.web.Request) -> aiohttp.web.WebSocketResponse: + logger = get_logger(0) + ws = aiohttp.web.WebSocketResponse(heartbeat=self.__heartbeat) + await ws.prepare(request) + await self.__register_socket(ws) + async for msg in ws: + if msg.type == aiohttp.web.WSMsgType.TEXT: + try: + event = json.loads(msg.data) + except Exception as err: + logger.error("Can't parse JSON event from websocket: %s", err) + else: + event_type = event.get("event_type") + if event_type == "ping": + await ws.send_str(json.dumps({"msg_type": "pong"})) + elif event_type == "key": + await self.__handle_ws_key_event(event) + elif event_type == "mouse_move": + await self.__handle_ws_mouse_move_event(event) + elif event_type == "mouse_button": + await self.__handle_ws_mouse_button_event(event) + elif event_type == "mouse_wheel": + await self.__handle_ws_mouse_wheel_event(event) + else: + logger.error("Unknown websocket event: %r", event) + else: + break + return ws + + async def __handle_ws_key_event(self, event: Dict) -> None: + key = str(event.get("key", ""))[:64].strip() + state = event.get("state") + if key and state in [True, False]: + await self.__hid.send_key_event(key, state) + + async def __handle_ws_mouse_move_event(self, event: Dict) -> None: + try: + to_x = int(event["to"]["x"]) + to_y = int(event["to"]["y"]) + except Exception: + return + await self.__hid.send_mouse_move_event(to_x, to_y) + + async def __handle_ws_mouse_button_event(self, event: Dict) -> None: + button = str(event.get("button", ""))[:64].strip() + state = event.get("state") + if button and state in [True, False]: + await self.__hid.send_mouse_button_event(button, state) + + async def __handle_ws_mouse_wheel_event(self, event: Dict) -> None: + try: + delta_y = int(event["delta"]["y"]) + except Exception: + return + await self.__hid.send_mouse_wheel_event(delta_y) + + # ===== HID + + async def __hid_reset_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response: + await self.__hid.reset() + return _json() + + # ===== ATX + + async def __atx_state_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response: + return _json(self.__atx.get_state()) + + @_wrap_exceptions_for_web("Click error") + async def __atx_click_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response: + button = request.query.get("button") + clicker = { + "power": self.__atx.click_power, + "power_long": self.__atx.click_power_long, + "reset": self.__atx.click_reset, + }.get(button) + if not clicker: + raise BadRequest("Invalid param 'button'") + await self.__broadcast_event("atx_click", button=button) # type: ignore + await clicker() + await self.__broadcast_event("atx_click", button=None) # type: ignore + return _json({"clicked": button}) + + # ===== MSD + + async def __msd_state_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response: + return _json(self.__msd.get_state()) + + @_wrap_exceptions_for_web("Mass-storage error") + async def __msd_connect_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response: + to = request.query.get("to") + if to == "kvm": + await self.__msd.connect_to_kvm() + state = self.__msd.get_state() + await self.__broadcast_event("msd_state", **state) + elif to == "server": + await self.__msd.connect_to_pc() + state = self.__msd.get_state() + await self.__broadcast_event("msd_state", **state) + else: + raise BadRequest("Invalid param 'to'") + return _json(state) + + @_wrap_exceptions_for_web("Can't write data to mass-storage device") + async def __msd_write_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response: + logger = get_logger(0) + reader = await request.multipart() + written = 0 + try: + field = await reader.next() + if not field or field.name != "image_name": + raise BadRequest("Missing 'image_name' field") + image_name = (await field.read()).decode("utf-8")[:256] + + field = await reader.next() + if not field or field.name != "image_data": + raise BadRequest("Missing 'image_data' field") + + async with self.__msd: + await self.__broadcast_event("msd_state", **self.__msd.get_state()) + logger.info("Writing image %r to mass-storage device ...", image_name) + await self.__msd.write_image_info(image_name, False) + while True: + chunk = await field.read_chunk(self.__msd_chunk_size) + if not chunk: + break + written = await self.__msd.write_image_chunk(chunk) + await self.__msd.write_image_info(image_name, True) + finally: + await self.__broadcast_event("msd_state", **self.__msd.get_state()) + if written != 0: + logger.info("Written %d bytes to mass-storage device", written) + return _json({"written": written}) + + @_wrap_exceptions_for_web("Mass-storage error") + async def __msd_reset_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response: + await self.__msd.reset() + return _json() + + # ===== STREAMER + + async def __streamer_state_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response: + return _json(self.__streamer.get_state()) + + @_wrap_exceptions_for_web("Can't set stream params") + async def __streamer_set_params_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response: + quality = request.query.get("quality") + if quality: + self.__streamer_quality = _valid_int("quality", quality, 1, 100) + soft_fps = request.query.get("soft_fps") + if soft_fps: + self.__streamer_soft_fps = _valid_int("soft_fps", soft_fps, 1, 30) + return _json() + + async def __streamer_reset_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response: + self.__reset_streamer = True + return _json() + + # ===== + + def __run_app_print(self, text: str) -> None: + logger = get_logger() + for line in text.strip().splitlines(): + logger.info(line.strip()) + + async def __on_shutdown(self, _: aiohttp.web.Application) -> None: + logger = get_logger(0) + + logger.info("Cancelling system tasks ...") + for task in self.__system_tasks: + task.cancel() + await asyncio.gather(*self.__system_tasks) + + logger.info("Disconnecting clients ...") + for ws in list(self.__sockets): + await self.__remove_socket(ws) + + async def __on_cleanup(self, _: aiohttp.web.Application) -> None: + await self.__hid.cleanup() + await self.__streamer.cleanup() + await self.__msd.cleanup() + + @_system_task + async def __hid_watchdog(self) -> None: + while self.__hid.is_alive(): + await asyncio.sleep(0.1) + raise RuntimeError("HID is dead") + + @_system_task + async def __stream_controller(self) -> None: + prev = 0 + shutdown_at = 0.0 + + while True: + cur = len(self.__sockets) + if prev == 0 and cur > 0: + if not self.__streamer.is_running(): + await self.__streamer.start(self.__streamer_quality, self.__streamer_soft_fps) + await self.__broadcast_event("streamer_state", **self.__streamer.get_state()) + elif prev > 0 and cur == 0: + shutdown_at = time.time() + self.__streamer_shutdown_delay + elif prev == 0 and cur == 0 and time.time() > shutdown_at: + if self.__streamer.is_running(): + await self.__streamer.stop() + await self.__broadcast_event("streamer_state", **self.__streamer.get_state()) + + if ( + self.__reset_streamer + or self.__streamer_quality != self.__streamer.get_current_quality() + or self.__streamer_soft_fps != self.__streamer.get_current_soft_fps() + ): + if self.__streamer.is_running(): + await self.__streamer.stop() + await self.__streamer.start(self.__streamer_quality, self.__streamer_soft_fps, no_init_restart=True) + await self.__broadcast_event("streamer_state", **self.__streamer.get_state()) + self.__reset_streamer = False + + prev = cur + await asyncio.sleep(0.1) + + @_system_task + async def __poll_dead_sockets(self) -> None: + while True: + for ws in list(self.__sockets): + if ws.closed or not ws._req.transport: # pylint: disable=protected-access + await self.__remove_socket(ws) + await asyncio.sleep(0.1) + + @_system_task + async def __poll_atx_state(self) -> None: + while True: + if self.__sockets: + await self.__broadcast_event("atx_state", **self.__atx.get_state()) + await asyncio.sleep(self.__atx_state_poll) + + async def __broadcast_event(self, event: str, **kwargs: Dict) -> None: + await asyncio.gather(*[ + ws.send_str(json.dumps({ + "msg_type": "event", + "msg": { + "event": event, + "event_attrs": kwargs, + }, + })) + for ws in list(self.__sockets) + if not ws.closed and ws._req.transport # pylint: disable=protected-access + ], return_exceptions=True) + + async def __register_socket(self, ws: aiohttp.web.WebSocketResponse) -> None: + async with self.__sockets_lock: + self.__sockets.add(ws) + get_logger().info("Registered new client socket: remote=%s; id=%d; active=%d", + ws._req.remote, id(ws), len(self.__sockets)) # pylint: disable=protected-access + + async def __remove_socket(self, ws: aiohttp.web.WebSocketResponse) -> None: + async with self.__sockets_lock: + await self.__hid.clear_events() + try: + self.__sockets.remove(ws) + get_logger().info("Removed client socket: remote=%s; id=%d; active=%d", + ws._req.remote, id(ws), len(self.__sockets)) # pylint: disable=protected-access + await ws.close() + except Exception: + pass diff --git a/kvmd/apps/kvmd/streamer.py b/kvmd/apps/kvmd/streamer.py new file mode 100644 index 00000000..bfb77d28 --- /dev/null +++ b/kvmd/apps/kvmd/streamer.py @@ -0,0 +1,175 @@ +import os +import asyncio +import asyncio.subprocess + +from typing import List +from typing import Dict +from typing import Optional + +from ...logging import get_logger + +from ... import gpio + + +# ===== +class Streamer: # pylint: disable=too-many-instance-attributes + def __init__( + self, + cap_power: int, + conv_power: int, + sync_delay: float, + init_delay: float, + init_restart_after: float, + quality: int, + soft_fps: int, + cmd: List[str], + loop: asyncio.AbstractEventLoop, + ) -> None: + + self.__cap_power = (gpio.set_output(cap_power) if cap_power > 0 else cap_power) + self.__conv_power = (gpio.set_output(conv_power) if conv_power > 0 else conv_power) + self.__sync_delay = sync_delay + self.__init_delay = init_delay + self.__init_restart_after = init_restart_after + self.__quality = quality + self.__soft_fps = soft_fps + self.__cmd = cmd + + self.__loop = loop + + self.__proc_task: Optional[asyncio.Task] = None + + async def start(self, quality: int, soft_fps: int, no_init_restart: bool=False) -> None: + logger = get_logger() + logger.info("Starting streamer ...") + + assert 1 <= quality <= 100 + self.__quality = quality + + assert 1 <= soft_fps <= 30 + self.__soft_fps = soft_fps + + await self.__inner_start() + if self.__init_restart_after > 0.0 and not no_init_restart: + logger.info("Stopping streamer to restart ...") + await self.__inner_stop() + logger.info("Starting again ...") + await self.__inner_start() + + async def stop(self) -> None: + get_logger().info("Stopping streamer ...") + await self.__inner_stop() + + def is_running(self) -> bool: + return bool(self.__proc_task) + + def get_current_quality(self) -> int: + return self.__quality + + def get_current_soft_fps(self) -> int: + return self.__soft_fps + + def get_state(self) -> Dict: + return { + "is_running": self.is_running(), + "quality": self.__quality, + "soft_fps": self.__soft_fps, + } + + def get_app(self) -> str: + return os.path.basename(self.__cmd[0]) + + async def get_version(self) -> str: + proc = await asyncio.create_subprocess_exec( + *[self.__cmd[0], "--version"], + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.DEVNULL, + ) + (stdout, _) = await proc.communicate() + return stdout.decode(errors="ignore").strip() + + async def cleanup(self) -> None: + if self.is_running(): + await self.stop() + + async def __inner_start(self) -> None: + assert not self.__proc_task + await self.__set_hw_enabled(True) + self.__proc_task = self.__loop.create_task(self.__process()) + + async def __inner_stop(self) -> None: + assert self.__proc_task + self.__proc_task.cancel() + await asyncio.gather(self.__proc_task, return_exceptions=True) + await self.__set_hw_enabled(False) + self.__proc_task = None + + async def __set_hw_enabled(self, enabled: bool) -> None: + # XXX: This sequence is very important to enable converter and cap board + if self.__cap_power > 0: + gpio.write(self.__cap_power, enabled) + if self.__conv_power > 0: + if enabled: + await asyncio.sleep(self.__sync_delay) + gpio.write(self.__conv_power, enabled) + if enabled: + await asyncio.sleep(self.__init_delay) + + async def __process(self) -> None: # pylint: disable=too-many-branches + logger = get_logger(0) + + while True: # pylint: disable=too-many-nested-blocks + proc: Optional[asyncio.subprocess.Process] = None # pylint: disable=no-member + try: + cmd = [part.format(quality=self.__quality, soft_fps=self.__soft_fps) for part in self.__cmd] + proc = await asyncio.create_subprocess_exec( + *cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.STDOUT, + ) + logger.info("Started streamer pid=%d: %s", proc.pid, cmd) + + empty = 0 + async for line_bytes in proc.stdout: # type: ignore + line = line_bytes.decode(errors="ignore").strip() + if line: + logger.info("Streamer: %s", line) + empty = 0 + else: + empty += 1 + if empty == 100: # asyncio bug + raise RuntimeError("Streamer/asyncio: too many empty lines") + + raise RuntimeError("Streamer unexpectedly died") + + except asyncio.CancelledError: + break + + except Exception as err: + if proc: + logger.exception("Unexpected streamer error: pid=%d", proc.pid) + else: + logger.exception("Can't start streamer: %s", err) + await asyncio.sleep(1) + + finally: + if proc and proc.returncode is None: + await self.__kill(proc) + + async def __kill(self, proc: asyncio.subprocess.Process) -> None: # pylint: disable=no-member + try: + proc.terminate() + await asyncio.sleep(1) + if proc.returncode is None: + try: + proc.kill() + except Exception: + if proc.returncode is not None: + raise + await proc.wait() + get_logger().info("Streamer killed: pid=%d; retcode=%d", proc.pid, proc.returncode) + except Exception: + if proc.returncode is None: + get_logger().exception("Can't kill streamer pid=%d", proc.pid) + else: + get_logger().info("Streamer killed: pid=%d; retcode=%d", proc.pid, proc.returncode) diff --git a/kvmd/apps/wscli/__init__.py b/kvmd/apps/wscli/__init__.py new file mode 100644 index 00000000..19d3181b --- /dev/null +++ b/kvmd/apps/wscli/__init__.py @@ -0,0 +1,50 @@ +#!/usr/bin/env python3 + + +import sys +import signal +import asyncio +import argparse +import time + +import aiohttp + + +# ===== +async def _run_client(loop: asyncio.AbstractEventLoop, url: str) -> None: + def stdin_callback() -> None: + line = sys.stdin.buffer.readline().decode() + if line: + asyncio.ensure_future(ws.send_str(line), loop=loop) + else: + loop.stop() + + loop.add_reader(sys.stdin.fileno(), stdin_callback) + + async def dispatch() -> None: + while True: + msg = await ws.receive() + if msg.type == aiohttp.WSMsgType.TEXT: + print("[%.5f] Received: %s" % (time.time(), msg.data.strip())) + else: + if msg.type == aiohttp.WSMsgType.CLOSE: + await ws.close() + elif msg.type == aiohttp.WSMsgType.ERROR: + print("[%.5f] Error during receive: %s" % (time.time(), ws.exception())) + elif msg.type == aiohttp.WSMsgType.CLOSED: + pass + break + + async with aiohttp.ClientSession().ws_connect(url) as ws: + await dispatch() + + +def main() -> None: + parser = argparse.ArgumentParser() + parser.add_argument("-u", "--url", default="http://127.0.0.1:8081/ws") + options = parser.parse_args() + + loop = asyncio.get_event_loop() + loop.add_signal_handler(signal.SIGINT, loop.stop) + loop.create_task(_run_client(loop, options.url)) + loop.run_forever() diff --git a/kvmd/apps/wscli/__main__.py b/kvmd/apps/wscli/__main__.py new file mode 100644 index 00000000..031df43e --- /dev/null +++ b/kvmd/apps/wscli/__main__.py @@ -0,0 +1,2 @@ +from . import main +main() |