From ab7a16a4f7afb2393f69a7ac9b3f5cea7c3ed4d6 Mon Sep 17 00:00:00 2001 From: Devaev Maxim Date: Thu, 12 Sep 2019 01:53:25 +0300 Subject: msd plugins --- configs/kvmd/main/v0-hdmi.yaml | 2 +- configs/kvmd/main/v0-vga.yaml | 2 +- kvmd/apps/__init__.py | 16 +- kvmd/apps/cleanup/__init__.py | 23 ++- kvmd/apps/kvmd/__init__.py | 4 +- kvmd/apps/kvmd/msd.py | 425 ----------------------------------------- kvmd/apps/kvmd/server.py | 10 +- kvmd/plugins/msd/__init__.py | 113 +++++++++++ kvmd/plugins/msd/none.py | 86 +++++++++ kvmd/plugins/msd/relay.py | 404 +++++++++++++++++++++++++++++++++++++++ 10 files changed, 630 insertions(+), 455 deletions(-) delete mode 100644 kvmd/apps/kvmd/msd.py create mode 100644 kvmd/plugins/msd/__init__.py create mode 100644 kvmd/plugins/msd/none.py create mode 100644 kvmd/plugins/msd/relay.py diff --git a/configs/kvmd/main/v0-hdmi.yaml b/configs/kvmd/main/v0-hdmi.yaml index 2271c757..44469158 100644 --- a/configs/kvmd/main/v0-hdmi.yaml +++ b/configs/kvmd/main/v0-hdmi.yaml @@ -25,7 +25,7 @@ kvmd: reset_switch_pin: 27 msd: - enabled: false + type: none streamer: desired_fps: 30 diff --git a/configs/kvmd/main/v0-vga.yaml b/configs/kvmd/main/v0-vga.yaml index 9f2d895c..0d510743 100644 --- a/configs/kvmd/main/v0-vga.yaml +++ b/configs/kvmd/main/v0-vga.yaml @@ -25,7 +25,7 @@ kvmd: reset_switch_pin: 27 msd: - enabled: false + type: none streamer: unix: /run/kvmd/ustreamer.sock diff --git a/kvmd/apps/__init__.py b/kvmd/apps/__init__.py index fd4ddec7..60df70fc 100644 --- a/kvmd/apps/__init__.py +++ b/kvmd/apps/__init__.py @@ -39,6 +39,7 @@ from ..plugins import UnknownPluginError from ..plugins.auth import get_auth_service_class from ..plugins.hid import get_hid_class from ..plugins.atx import get_atx_class +from ..plugins.msd import get_msd_class from ..yamlconf import ConfigError from ..yamlconf import make_config @@ -50,7 +51,6 @@ from ..yamlconf.loader import load_yaml_file from ..validators.basic import valid_bool from ..validators.basic import valid_number -from ..validators.basic import valid_int_f1 from ..validators.basic import valid_float_f01 from ..validators.auth import valid_users_list @@ -66,7 +66,6 @@ from ..validators.net import valid_port from ..validators.kvm import valid_stream_quality from ..validators.kvm import valid_stream_fps -from ..validators.hw import valid_gpio_pin from ..validators.hw import valid_gpio_pin_optional @@ -119,6 +118,7 @@ def _init_config(config_path: str, sections: List[str], override_options: List[s scheme["kvmd"]["hid"].update(get_hid_class(config.kvmd.hid.type).get_plugin_options()) scheme["kvmd"]["atx"].update(get_atx_class(config.kvmd.atx.type).get_plugin_options()) + scheme["kvmd"]["msd"].update(get_msd_class(config.kvmd.msd.type).get_plugin_options()) config = make_config(raw_config, scheme) @@ -189,17 +189,7 @@ def _get_config_scheme(sections: List[str]) -> Dict: }, "msd": { - "enabled": Option(True, type=valid_bool), - - "target_pin": Option(-1, type=valid_gpio_pin, only_if="enabled"), - "reset_pin": Option(-1, type=valid_gpio_pin, only_if="enabled"), - - "device": Option("", type=valid_abs_path, only_if="enabled", unpack_as="device_path"), - "init_delay": Option(1.0, type=valid_float_f01), - "init_retries": Option(5, type=valid_int_f1), - "reset_delay": Option(1.0, type=valid_float_f01), - "write_meta": Option(True, type=valid_bool), - "chunk_size": Option(65536, type=(lambda arg: valid_number(arg, min=1024))), + "type": Option("relay"), }, "streamer": { diff --git a/kvmd/apps/cleanup/__init__.py b/kvmd/apps/cleanup/__init__.py index 680f080b..d7790b67 100644 --- a/kvmd/apps/cleanup/__init__.py +++ b/kvmd/apps/cleanup/__init__.py @@ -47,20 +47,25 @@ def main(argv: Optional[List[str]]=None) -> None: logger.info("Cleaning up ...") with gpio.bcm(): - for (name, pin, enabled) in [ + for (name, pin) in [ *([ - ("hid_reset_pin", config.hid.reset_pin, True), + ("tty_hid_reset_pin", config.hid.reset_pin), ] if config.hid.type == "tty" else []), + *([ - ("atx_power_switch_pin", config.atx.power_switch_pin, True), - ("atx_reset_switch_pin", config.atx.reset_switch_pin, True), + ("gpio_atx_power_switch_pin", config.atx.power_switch_pin), + ("gpio_atx_reset_switch_pin", config.atx.reset_switch_pin), ] if config.atx.type == "gpio" else []), - ("msd_target_pin", config.msd.target_pin, config.msd.enabled), - ("msd_reset_pin", config.msd.reset_pin, config.msd.enabled), - ("streamer_cap_pin", config.streamer.cap_pin, True), - ("streamer_conv_pin", config.streamer.conv_pin, True), + + *([ + ("relay_msd_target_pin", config.msd.target_pin), + ("relay_msd_reset_pin", config.msd.reset_pin), + ] if config.msd.type == "relay" else []), + + ("streamer_cap_pin", config.streamer.cap_pin), + ("streamer_conv_pin", config.streamer.conv_pin), ]: - if enabled and pin >= 0: + if pin >= 0: logger.info("Writing value=0 to GPIO pin=%d (%s)", pin, name) try: gpio.set_output(pin, initial=False) diff --git a/kvmd/apps/kvmd/__init__.py b/kvmd/apps/kvmd/__init__.py index bc26e442..f44dc669 100644 --- a/kvmd/apps/kvmd/__init__.py +++ b/kvmd/apps/kvmd/__init__.py @@ -29,13 +29,13 @@ from ... import gpio from ...plugins.hid import get_hid_class from ...plugins.atx import get_atx_class +from ...plugins.msd import get_msd_class from .. import init from .auth import AuthManager from .info import InfoManager from .logreader import LogReader -from .msd import MassStorageDevice from .streamer import Streamer from .server import Server @@ -64,7 +64,7 @@ def main(argv: Optional[List[str]]=None) -> None: hid=get_hid_class(config.hid.type)(**config.hid._unpack(ignore=["type"])), atx=get_atx_class(config.atx.type)(**config.atx._unpack(ignore=["type"])), - msd=MassStorageDevice(**config.msd._unpack()), + msd=get_msd_class(config.msd.type)(**config.msd._unpack(ignore=["type"])), streamer=Streamer(**config.streamer._unpack()), ).run(**config.server._unpack()) diff --git a/kvmd/apps/kvmd/msd.py b/kvmd/apps/kvmd/msd.py deleted file mode 100644 index cb26e520..00000000 --- a/kvmd/apps/kvmd/msd.py +++ /dev/null @@ -1,425 +0,0 @@ -# ========================================================================== # -# # -# KVMD - The main Pi-KVM daemon. # -# # -# Copyright (C) 2018 Maxim Devaev # -# # -# This program is free software: you can redistribute it and/or modify # -# it under the terms of the GNU General Public License as published by # -# the Free Software Foundation, either version 3 of the License, or # -# (at your option) any later version. # -# # -# This program is distributed in the hope that it will be useful, # -# but WITHOUT ANY WARRANTY; without even the implied warranty of # -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # -# GNU General Public License for more details. # -# # -# You should have received a copy of the GNU General Public License # -# along with this program. If not, see . # -# # -# ========================================================================== # - - -import os -import stat -import fcntl -import struct -import asyncio -import asyncio.queues -import dataclasses -import types - -from typing import Dict -from typing import IO -from typing import Callable -from typing import Type -from typing import AsyncGenerator -from typing import Optional -from typing import Any - -import aiofiles -import aiofiles.base - -from ...logging import get_logger - -from ... import aiotools -from ... import aioregion -from ... import gpio - - -# ===== -class MsdError(Exception): - pass - - -class MsdOperationError(MsdError): - pass - - -class MsdDisabledError(MsdOperationError): - def __init__(self) -> None: - super().__init__("MSD is disabled") - - -class MsdOfflineError(MsdOperationError): - def __init__(self) -> None: - super().__init__("MSD is not found") - - -class MsdAlreadyOnServerError(MsdOperationError): - def __init__(self) -> None: - super().__init__("MSD is already connected to Server") - - -class MsdAlreadyOnKvmError(MsdOperationError): - def __init__(self) -> None: - super().__init__("MSD is already connected to KVM") - - -class MsdNotOnKvmError(MsdOperationError): - def __init__(self) -> None: - super().__init__("MSD is not connected to KVM") - - -class MsdIsBusyError(MsdOperationError, aioregion.RegionIsBusyError): - pass - - -# ===== -@dataclasses.dataclass(frozen=True) -class _ImageInfo: - name: str - size: int - complete: bool - - -@dataclasses.dataclass(frozen=True) -class _MassStorageDeviceInfo: - path: str - real: str - size: int - image: Optional[_ImageInfo] - - -_IMAGE_INFO_SIZE = 4096 -_IMAGE_INFO_MAGIC_SIZE = 16 -_IMAGE_INFO_IMAGE_NAME_SIZE = 256 -_IMAGE_INFO_PADS_SIZE = _IMAGE_INFO_SIZE - _IMAGE_INFO_IMAGE_NAME_SIZE - 1 - 8 - _IMAGE_INFO_MAGIC_SIZE * 8 -_IMAGE_INFO_FORMAT = ">%dL%dc?Q%dx%dL" % ( - _IMAGE_INFO_MAGIC_SIZE, - _IMAGE_INFO_IMAGE_NAME_SIZE, - _IMAGE_INFO_PADS_SIZE, - _IMAGE_INFO_MAGIC_SIZE, -) -_IMAGE_INFO_MAGIC = [0x1ACE1ACE] * _IMAGE_INFO_MAGIC_SIZE - - -def _make_image_info_bytes(name: str, size: int, complete: bool) -> bytes: - return struct.pack( - _IMAGE_INFO_FORMAT, - *_IMAGE_INFO_MAGIC, - *memoryview(( # type: ignore - name.encode("utf-8") - + b"\x00" * _IMAGE_INFO_IMAGE_NAME_SIZE - )[:_IMAGE_INFO_IMAGE_NAME_SIZE]).cast("c"), - complete, - size, - *_IMAGE_INFO_MAGIC, - ) - - -def _parse_image_info_bytes(data: bytes) -> Optional[_ImageInfo]: - try: - parsed = list(struct.unpack(_IMAGE_INFO_FORMAT, data)) - except struct.error: - pass - else: - magic_begin = parsed[:_IMAGE_INFO_MAGIC_SIZE] - magic_end = parsed[-_IMAGE_INFO_MAGIC_SIZE:] - if magic_begin == magic_end == _IMAGE_INFO_MAGIC: - image_name_bytes = b"".join(parsed[_IMAGE_INFO_MAGIC_SIZE:_IMAGE_INFO_MAGIC_SIZE + _IMAGE_INFO_IMAGE_NAME_SIZE]) - return _ImageInfo( - name=image_name_bytes.decode("utf-8", errors="ignore").strip("\x00").strip(), - size=parsed[_IMAGE_INFO_MAGIC_SIZE + _IMAGE_INFO_IMAGE_NAME_SIZE + 1], - complete=parsed[_IMAGE_INFO_MAGIC_SIZE + _IMAGE_INFO_IMAGE_NAME_SIZE], - ) - return None - - -def _ioctl_uint32(device_file: IO, request: int) -> int: - buf = b"\0" * 4 - buf = fcntl.ioctl(device_file.fileno(), request, buf) - result = struct.unpack("I", buf)[0] - assert result > 0, (device_file, request, buf) - return result - - -def _explore_device(device_path: str) -> _MassStorageDeviceInfo: - if not stat.S_ISBLK(os.stat(device_path).st_mode): - raise RuntimeError(f"Not a block device: {device_path}") - - with open(device_path, "rb") as device_file: - # size = BLKGETSIZE * BLKSSZGET - size = _ioctl_uint32(device_file, 0x1260) * _ioctl_uint32(device_file, 0x1268) - device_file.seek(size - _IMAGE_INFO_SIZE) - image_info = _parse_image_info_bytes(device_file.read()) - - return _MassStorageDeviceInfo( - path=device_path, - real=os.path.realpath(device_path), - size=size, - image=image_info, - ) - - -def _msd_working(method: Callable) -> Callable: - async def wrapper(self: "MassStorageDevice", *args: Any, **kwargs: Any) -> Any: - if not self._enabled: # pylint: disable=protected-access - raise MsdDisabledError() - if not self._device_info: # pylint: disable=protected-access - raise MsdOfflineError() - return (await method(self, *args, **kwargs)) - return wrapper - - -class MassStorageDevice: # pylint: disable=too-many-instance-attributes - def __init__( - self, - enabled: bool, - - target_pin: int, - reset_pin: int, - - device_path: str, - init_delay: float, - init_retries: int, - reset_delay: float, - write_meta: bool, - chunk_size: int, - ) -> None: - - self._enabled = enabled - - if self._enabled: - self.__target_pin = gpio.set_output(target_pin) - self.__reset_pin = gpio.set_output(reset_pin) - assert bool(device_path) - else: - self.__target_pin = -1 - self.__reset_pin = -1 - - self.__device_path = device_path - self.__init_delay = init_delay - self.__init_retries = init_retries - self.__reset_delay = reset_delay - self.__write_meta = write_meta - self.chunk_size = chunk_size - - self.__region = aioregion.AioExclusiveRegion(MsdIsBusyError) - - self._device_info: Optional[_MassStorageDeviceInfo] = None - self.__device_file: Optional[aiofiles.base.AiofilesContextManager] = None - self.__written = 0 - self.__on_kvm = True - - self.__state_queue: asyncio.queues.Queue = asyncio.Queue() - - logger = get_logger(0) - if self._enabled: - logger.info("Using %r as MSD", self.__device_path) - try: - aiotools.run_sync(self.__load_device_info()) - if self.__write_meta: - logger.info("Enabled image metadata writing") - except Exception as err: - log = (logger.error if isinstance(err, MsdError) else logger.exception) - log("MSD is offline: %s", err) - else: - logger.info("MSD is disabled") - - def get_state(self) -> Dict: - online = (self._enabled and bool(self._device_info)) - return { - "enabled": self._enabled, - "online": online, - "busy": self.__region.is_busy(), - "uploading": bool(self.__device_file), - "written": self.__written, - "info": (dataclasses.asdict(self._device_info) if online else None), - "connected_to": (("kvm" if self.__on_kvm else "server") if online else None), - } - - async def poll_state(self) -> AsyncGenerator[Dict, None]: - while True: - if self._enabled: - yield (await self.__state_queue.get()) - else: - await asyncio.sleep(60) - - @aiotools.atomic - async def cleanup(self) -> None: - if self._enabled: - await self.__close_device_file() - gpio.write(self.__target_pin, False) - gpio.write(self.__reset_pin, False) - - @_msd_working - @aiotools.atomic - async def connect_to_kvm(self) -> Dict: - notify = False - state: Dict = {} - try: - with self.__region: - if self.__on_kvm: - raise MsdAlreadyOnKvmError() - notify = True - - gpio.write(self.__target_pin, False) - try: - await self.__load_device_info() - except Exception: - if not self.__on_kvm: - gpio.write(self.__target_pin, True) - raise - self.__on_kvm = True - get_logger().info("MSD switched to KVM: %s", self._device_info) - - state = self.get_state() - return state - finally: - if notify: - await self.__state_queue.put(state or self.get_state()) - - @_msd_working - @aiotools.atomic - async def connect_to_server(self) -> Dict: - notify = False - state: Dict = {} - try: - with self.__region: - if not self.__on_kvm: - raise MsdAlreadyOnServerError() - notify = True - - gpio.write(self.__target_pin, True) - self.__on_kvm = False - get_logger().info("MSD switched to Server") - - state = self.get_state() - return state - finally: - if notify: - await self.__state_queue.put(state or self.get_state()) - - @aiotools.atomic - async def reset(self) -> None: - if not self._enabled: - raise MsdDisabledError() - with aiotools.unregion_only_on_exception(self.__region): - await self.__inner_reset() - - @aiotools.tasked - @aiotools.muted("Can't reset MSD or operation was not completed") - async def __inner_reset(self) -> None: - try: - gpio.write(self.__reset_pin, True) - await asyncio.sleep(self.__reset_delay) - gpio.write(self.__reset_pin, False) - - gpio.write(self.__target_pin, False) - self.__on_kvm = True - - await self.__load_device_info() - get_logger(0).info("MSD reset has been successful") - finally: - try: - gpio.write(self.__reset_pin, False) - finally: - try: - await self.__state_queue.put(self.get_state()) - finally: - self.__region.exit() - - @_msd_working - @aiotools.atomic - async def __aenter__(self) -> "MassStorageDevice": - assert self._device_info - self.__region.enter() - try: - if not self.__on_kvm: - raise MsdNotOnKvmError() - self.__device_file = await aiofiles.open(self._device_info.path, mode="w+b", buffering=0) - self.__written = 0 - return self - except Exception: - self.__region.exit() - raise - finally: - await self.__state_queue.put(self.get_state()) - - @aiotools.atomic - async def write_image_info(self, name: str, complete: bool) -> None: - assert self.__device_file - assert self._device_info - if self.__write_meta: - if self._device_info.size - self.__written > _IMAGE_INFO_SIZE: - await self.__device_file.seek(self._device_info.size - _IMAGE_INFO_SIZE) - await self.__write_to_device_file(_make_image_info_bytes(name, self.__written, complete)) - await self.__device_file.seek(0) - else: - get_logger().error("Can't write image info because device is full") - - @aiotools.atomic - async def write_image_chunk(self, chunk: bytes) -> int: - await self.__write_to_device_file(chunk) - self.__written += len(chunk) - return self.__written - - @aiotools.atomic - async def __aexit__( - self, - _exc_type: Type[BaseException], - _exc: BaseException, - _tb: types.TracebackType, - ) -> None: - try: - await self.__close_device_file() - await self.__load_device_info() - finally: - self.__region.exit() - await self.__state_queue.put(self.get_state()) - - async def __write_to_device_file(self, data: bytes) -> None: - assert self.__device_file - await self.__device_file.write(data) - await self.__device_file.flush() - await aiotools.run_async(os.fsync, self.__device_file.fileno()) - - async def __close_device_file(self) -> None: - try: - if self.__device_file: - get_logger().info("Closing device file ...") - await self.__device_file.close() - except asyncio.CancelledError: # pylint: disable=try-except-raise - raise - except Exception: - get_logger().exception("Can't close device file") - finally: - self.__device_file = None - self.__written = 0 - - async def __load_device_info(self) -> None: - retries = self.__init_retries - while True: - await asyncio.sleep(self.__init_delay) - try: - self._device_info = await aiotools.run_async(_explore_device, self.__device_path) - break - except asyncio.CancelledError: # pylint: disable=try-except-raise - raise - except Exception: - if retries == 0: - self._device_info = None - raise MsdError("Can't load device info") - get_logger().exception("Can't load device info; retries=%d", retries) - retries -= 1 diff --git a/kvmd/apps/kvmd/server.py b/kvmd/apps/kvmd/server.py index 21e30f0f..a0c14224 100644 --- a/kvmd/apps/kvmd/server.py +++ b/kvmd/apps/kvmd/server.py @@ -49,6 +49,9 @@ from ...plugins.hid import BaseHid from ...plugins.atx import AtxOperationError from ...plugins.atx import BaseAtx +from ...plugins.msd import MsdOperationError +from ...plugins.msd import BaseMsd + from ...validators import ValidatorError from ...validators.basic import valid_bool @@ -75,8 +78,6 @@ from ... import __version__ from .auth import AuthManager from .info import InfoManager from .logreader import LogReader -from .msd import MsdOperationError -from .msd import MassStorageDevice from .streamer import Streamer @@ -234,7 +235,7 @@ class Server: # pylint: disable=too-many-instance-attributes hid: BaseHid, atx: BaseAtx, - msd: MassStorageDevice, + msd: BaseMsd, streamer: Streamer, ) -> None: @@ -486,8 +487,9 @@ class Server: # pylint: disable=too-many-instance-attributes logger.info("Writing image %r to MSD ...", image_name) await self.__msd.write_image_info(image_name, False) + chunk_size = self.__msd.get_chunk_size() while True: - chunk = await data_field.read_chunk(self.__msd.chunk_size) + chunk = await data_field.read_chunk(chunk_size) if not chunk: break written = await self.__msd.write_image_chunk(chunk) diff --git a/kvmd/plugins/msd/__init__.py b/kvmd/plugins/msd/__init__.py new file mode 100644 index 00000000..64d339eb --- /dev/null +++ b/kvmd/plugins/msd/__init__.py @@ -0,0 +1,113 @@ +# ========================================================================== # +# # +# KVMD - The main Pi-KVM daemon. # +# # +# Copyright (C) 2018 Maxim Devaev # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see . # +# # +# ========================================================================== # + + +import types + +from typing import Dict +from typing import Type +from typing import AsyncGenerator + +from ... import aioregion + +from .. import BasePlugin +from .. import get_plugin_class + + +# ===== +class MsdError(Exception): + pass + + +class MsdOperationError(MsdError): + pass + + +class MsdOfflineError(MsdOperationError): + def __init__(self) -> None: + super().__init__("MSD is not found") + + +class MsdAlreadyOnServerError(MsdOperationError): + def __init__(self) -> None: + super().__init__("MSD is already connected to Server") + + +class MsdAlreadyOnKvmError(MsdOperationError): + def __init__(self) -> None: + super().__init__("MSD is already connected to KVM") + + +class MsdNotOnKvmError(MsdOperationError): + def __init__(self) -> None: + super().__init__("MSD is not connected to KVM") + + +class MsdIsBusyError(MsdOperationError, aioregion.RegionIsBusyError): + pass + + +# ===== +class BaseMsd(BasePlugin): + def get_state(self) -> Dict: + raise NotImplementedError + + async def poll_state(self) -> AsyncGenerator[Dict, None]: + yield {} + raise NotImplementedError + + async def cleanup(self) -> None: + pass + + async def connect_to_kvm(self) -> Dict: + raise NotImplementedError + + async def connect_to_server(self) -> Dict: + raise NotImplementedError + + async def reset(self) -> None: + raise NotImplementedError + + async def __aenter__(self) -> "BaseMsd": + raise NotImplementedError + + def get_chunk_size(self) -> int: + raise NotImplementedError + + async def write_image_info(self, name: str, complete: bool) -> None: + raise NotImplementedError + + async def write_image_chunk(self, chunk: bytes) -> int: + raise NotImplementedError + + async def __aexit__( + self, + _exc_type: Type[BaseException], + _exc: BaseException, + _tb: types.TracebackType, + ) -> None: + + raise NotImplementedError + + +# ===== +def get_msd_class(name: str) -> Type[BaseMsd]: + return get_plugin_class("msd", (name or "none")) # type: ignore diff --git a/kvmd/plugins/msd/none.py b/kvmd/plugins/msd/none.py new file mode 100644 index 00000000..dab0f406 --- /dev/null +++ b/kvmd/plugins/msd/none.py @@ -0,0 +1,86 @@ +# ========================================================================== # +# # +# KVMD - The main Pi-KVM daemon. # +# # +# Copyright (C) 2018 Maxim Devaev # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see . # +# # +# ========================================================================== # + + +import asyncio +import types + +from typing import Dict +from typing import Type +from typing import AsyncGenerator + +from . import MsdOperationError +from . import BaseMsd + + +# ===== +class MsdDisabledError(MsdOperationError): + def __init__(self) -> None: + super().__init__("MSD is disabled") + + +# ===== +class Plugin(BaseMsd): + def get_state(self) -> Dict: + return { + "enabled": False, + "online": False, + "busy": False, + "uploading": False, + "written": False, + "info": None, + "connected_to": None, + } + + async def poll_state(self) -> AsyncGenerator[Dict, None]: + while True: + yield self.get_state() + await asyncio.sleep(60) + + async def connect_to_kvm(self) -> Dict: + raise MsdDisabledError() + + async def connect_to_server(self) -> Dict: + raise MsdDisabledError() + + async def reset(self) -> None: + raise MsdDisabledError() + + async def __aenter__(self) -> BaseMsd: + raise MsdDisabledError() + + def get_chunk_size(self) -> int: + raise MsdDisabledError() + + async def write_image_info(self, name: str, complete: bool) -> None: + raise MsdDisabledError() + + async def write_image_chunk(self, chunk: bytes) -> int: + raise MsdDisabledError() + + async def __aexit__( + self, + _exc_type: Type[BaseException], + _exc: BaseException, + _tb: types.TracebackType, + ) -> None: + + raise MsdDisabledError() diff --git a/kvmd/plugins/msd/relay.py b/kvmd/plugins/msd/relay.py new file mode 100644 index 00000000..8c9285de --- /dev/null +++ b/kvmd/plugins/msd/relay.py @@ -0,0 +1,404 @@ +# ========================================================================== # +# # +# KVMD - The main Pi-KVM daemon. # +# # +# Copyright (C) 2018 Maxim Devaev # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see . # +# # +# ========================================================================== # + + +import os +import stat +import fcntl +import struct +import asyncio +import asyncio.queues +import dataclasses +import types + +from typing import Dict +from typing import IO +from typing import Callable +from typing import Type +from typing import AsyncGenerator +from typing import Optional +from typing import Any + +import aiofiles +import aiofiles.base + +from ...logging import get_logger + +from ... import aiotools +from ... import aioregion +from ... import gpio + +from ...yamlconf import Option + +from ...validators.basic import valid_bool +from ...validators.basic import valid_number +from ...validators.basic import valid_int_f1 +from ...validators.basic import valid_float_f01 + +from ...validators.os import valid_abs_path + +from ...validators.hw import valid_gpio_pin + +from . import MsdError +from . import MsdOfflineError +from . import MsdAlreadyOnServerError +from . import MsdAlreadyOnKvmError +from . import MsdNotOnKvmError +from . import MsdIsBusyError +from . import BaseMsd + + +# ===== +@dataclasses.dataclass(frozen=True) +class _ImageInfo: + name: str + size: int + complete: bool + + +@dataclasses.dataclass(frozen=True) +class _DeviceInfo: + path: str + real: str + size: int + image: Optional[_ImageInfo] + + +_IMAGE_INFO_SIZE = 4096 +_IMAGE_INFO_MAGIC_SIZE = 16 +_IMAGE_INFO_IMAGE_NAME_SIZE = 256 +_IMAGE_INFO_PADS_SIZE = _IMAGE_INFO_SIZE - _IMAGE_INFO_IMAGE_NAME_SIZE - 1 - 8 - _IMAGE_INFO_MAGIC_SIZE * 8 +_IMAGE_INFO_FORMAT = ">%dL%dc?Q%dx%dL" % ( + _IMAGE_INFO_MAGIC_SIZE, + _IMAGE_INFO_IMAGE_NAME_SIZE, + _IMAGE_INFO_PADS_SIZE, + _IMAGE_INFO_MAGIC_SIZE, +) +_IMAGE_INFO_MAGIC = [0x1ACE1ACE] * _IMAGE_INFO_MAGIC_SIZE + + +def _make_image_info_bytes(name: str, size: int, complete: bool) -> bytes: + return struct.pack( + _IMAGE_INFO_FORMAT, + *_IMAGE_INFO_MAGIC, + *memoryview(( # type: ignore + name.encode("utf-8") + + b"\x00" * _IMAGE_INFO_IMAGE_NAME_SIZE + )[:_IMAGE_INFO_IMAGE_NAME_SIZE]).cast("c"), + complete, + size, + *_IMAGE_INFO_MAGIC, + ) + + +def _parse_image_info_bytes(data: bytes) -> Optional[_ImageInfo]: + try: + parsed = list(struct.unpack(_IMAGE_INFO_FORMAT, data)) + except struct.error: + pass + else: + magic_begin = parsed[:_IMAGE_INFO_MAGIC_SIZE] + magic_end = parsed[-_IMAGE_INFO_MAGIC_SIZE:] + if magic_begin == magic_end == _IMAGE_INFO_MAGIC: + image_name_bytes = b"".join(parsed[_IMAGE_INFO_MAGIC_SIZE:_IMAGE_INFO_MAGIC_SIZE + _IMAGE_INFO_IMAGE_NAME_SIZE]) + return _ImageInfo( + name=image_name_bytes.decode("utf-8", errors="ignore").strip("\x00").strip(), + size=parsed[_IMAGE_INFO_MAGIC_SIZE + _IMAGE_INFO_IMAGE_NAME_SIZE + 1], + complete=parsed[_IMAGE_INFO_MAGIC_SIZE + _IMAGE_INFO_IMAGE_NAME_SIZE], + ) + return None + + +def _ioctl_uint32(device_file: IO, request: int) -> int: + buf = b"\0" * 4 + buf = fcntl.ioctl(device_file.fileno(), request, buf) + result = struct.unpack("I", buf)[0] + assert result > 0, (device_file, request, buf) + return result + + +def _explore_device(device_path: str) -> _DeviceInfo: + if not stat.S_ISBLK(os.stat(device_path).st_mode): + raise RuntimeError(f"Not a block device: {device_path}") + + with open(device_path, "rb") as device_file: + # size = BLKGETSIZE * BLKSSZGET + size = _ioctl_uint32(device_file, 0x1260) * _ioctl_uint32(device_file, 0x1268) + device_file.seek(size - _IMAGE_INFO_SIZE) + image_info = _parse_image_info_bytes(device_file.read()) + + return _DeviceInfo( + path=device_path, + real=os.path.realpath(device_path), + size=size, + image=image_info, + ) + + +def _msd_working(method: Callable) -> Callable: + async def wrapper(self: "Plugin", *args: Any, **kwargs: Any) -> Any: + if not self._device_info: # pylint: disable=protected-access + raise MsdOfflineError() + return (await method(self, *args, **kwargs)) + return wrapper + + +class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes + def __init__( # pylint: disable=super-init-not-called + self, + target_pin: int, + reset_pin: int, + + device_path: str, + init_delay: float, + init_retries: int, + reset_delay: float, + write_meta: bool, + chunk_size: int, + ) -> None: + + self.__target_pin = gpio.set_output(target_pin) + self.__reset_pin = gpio.set_output(reset_pin) + + self.__device_path = device_path + self.__init_delay = init_delay + self.__init_retries = init_retries + self.__reset_delay = reset_delay + self.__write_meta = write_meta + self.__chunk_size = chunk_size + + self.__region = aioregion.AioExclusiveRegion(MsdIsBusyError) + + self._device_info: Optional[_DeviceInfo] = None + self.__device_file: Optional[aiofiles.base.AiofilesContextManager] = None + self.__written = 0 + self.__on_kvm = True + + self.__state_queue: asyncio.queues.Queue = asyncio.Queue() + + logger = get_logger(0) + logger.info("Using %r as MSD", self.__device_path) + try: + aiotools.run_sync(self.__load_device_info()) + if self.__write_meta: + logger.info("Enabled image metadata writing") + except Exception as err: + log = (logger.error if isinstance(err, MsdError) else logger.exception) + log("MSD is offline: %s", err) + + @classmethod + def get_plugin_options(cls) -> Dict[str, Option]: + return { + "target_pin": Option(-1, type=valid_gpio_pin), + "reset_pin": Option(-1, type=valid_gpio_pin), + + "device": Option("", type=valid_abs_path, unpack_as="device_path"), + "init_delay": Option(1.0, type=valid_float_f01), + "init_retries": Option(5, type=valid_int_f1), + "reset_delay": Option(1.0, type=valid_float_f01), + "write_meta": Option(True, type=valid_bool), + "chunk_size": Option(65536, type=(lambda arg: valid_number(arg, min=1024))), + } + + def get_state(self) -> Dict: + online = bool(self._device_info) + return { + "enabled": True, + "online": online, + "busy": self.__region.is_busy(), + "uploading": bool(self.__device_file), + "written": self.__written, + "info": (dataclasses.asdict(self._device_info) if online else None), + "connected_to": (("kvm" if self.__on_kvm else "server") if online else None), + } + + async def poll_state(self) -> AsyncGenerator[Dict, None]: + while True: + yield (await self.__state_queue.get()) + + @aiotools.atomic + async def cleanup(self) -> None: + await self.__close_device_file() + gpio.write(self.__target_pin, False) + gpio.write(self.__reset_pin, False) + + @_msd_working + @aiotools.atomic + async def connect_to_kvm(self) -> Dict: + notify = False + state: Dict = {} + try: + with self.__region: + if self.__on_kvm: + raise MsdAlreadyOnKvmError() + notify = True + + gpio.write(self.__target_pin, False) + try: + await self.__load_device_info() + except Exception: + if not self.__on_kvm: + gpio.write(self.__target_pin, True) + raise + self.__on_kvm = True + get_logger().info("MSD switched to KVM: %s", self._device_info) + + state = self.get_state() + return state + finally: + if notify: + await self.__state_queue.put(state or self.get_state()) + + @_msd_working + @aiotools.atomic + async def connect_to_server(self) -> Dict: + notify = False + state: Dict = {} + try: + with self.__region: + if not self.__on_kvm: + raise MsdAlreadyOnServerError() + notify = True + + gpio.write(self.__target_pin, True) + self.__on_kvm = False + get_logger().info("MSD switched to Server") + + state = self.get_state() + return state + finally: + if notify: + await self.__state_queue.put(state or self.get_state()) + + @aiotools.atomic + async def reset(self) -> None: + with aiotools.unregion_only_on_exception(self.__region): + await self.__inner_reset() + + @aiotools.tasked + @aiotools.muted("Can't reset MSD or operation was not completed") + async def __inner_reset(self) -> None: + try: + gpio.write(self.__reset_pin, True) + await asyncio.sleep(self.__reset_delay) + gpio.write(self.__reset_pin, False) + + gpio.write(self.__target_pin, False) + self.__on_kvm = True + + await self.__load_device_info() + get_logger(0).info("MSD reset has been successful") + finally: + try: + gpio.write(self.__reset_pin, False) + finally: + try: + await self.__state_queue.put(self.get_state()) + finally: + self.__region.exit() + + @_msd_working + @aiotools.atomic + async def __aenter__(self) -> "Plugin": + assert self._device_info + self.__region.enter() + try: + if not self.__on_kvm: + raise MsdNotOnKvmError() + self.__device_file = await aiofiles.open(self._device_info.path, mode="w+b", buffering=0) + self.__written = 0 + return self + except Exception: + self.__region.exit() + raise + finally: + await self.__state_queue.put(self.get_state()) + + def get_chunk_size(self) -> int: + return self.__chunk_size + + @aiotools.atomic + async def write_image_info(self, name: str, complete: bool) -> None: + assert self.__device_file + assert self._device_info + if self.__write_meta: + if self._device_info.size - self.__written > _IMAGE_INFO_SIZE: + await self.__device_file.seek(self._device_info.size - _IMAGE_INFO_SIZE) + await self.__write_to_device_file(_make_image_info_bytes(name, self.__written, complete)) + await self.__device_file.seek(0) + else: + get_logger().error("Can't write image info because device is full") + + @aiotools.atomic + async def write_image_chunk(self, chunk: bytes) -> int: + await self.__write_to_device_file(chunk) + self.__written += len(chunk) + return self.__written + + @aiotools.atomic + async def __aexit__( + self, + _exc_type: Type[BaseException], + _exc: BaseException, + _tb: types.TracebackType, + ) -> None: + + try: + await self.__close_device_file() + await self.__load_device_info() + finally: + self.__region.exit() + await self.__state_queue.put(self.get_state()) + + async def __write_to_device_file(self, data: bytes) -> None: + assert self.__device_file + await self.__device_file.write(data) + await self.__device_file.flush() + await aiotools.run_async(os.fsync, self.__device_file.fileno()) + + async def __close_device_file(self) -> None: + try: + if self.__device_file: + get_logger().info("Closing device file ...") + await self.__device_file.close() + except asyncio.CancelledError: # pylint: disable=try-except-raise + raise + except Exception: + get_logger().exception("Can't close device file") + finally: + self.__device_file = None + self.__written = 0 + + async def __load_device_info(self) -> None: + retries = self.__init_retries + while True: + await asyncio.sleep(self.__init_delay) + try: + self._device_info = await aiotools.run_async(_explore_device, self.__device_path) + break + except asyncio.CancelledError: # pylint: disable=try-except-raise + raise + except Exception: + if retries == 0: + self._device_info = None + raise MsdError("Can't load device info") + get_logger().exception("Can't load device info; retries=%d", retries) + retries -= 1 -- cgit v1.2.3