From 10f8c2b3352c951549cc1249d6b24789fb94d688 Mon Sep 17 00:00:00 2001 From: Devaev Maxim Date: Tue, 29 Oct 2019 02:16:12 +0300 Subject: otg msd and big refactoring --- kvmd/aiotools.py | 11 + kvmd/apps/kvmd/server.py | 56 +++-- kvmd/inotify.py | 335 ++++++++++++++++++++++++ kvmd/plugins/msd/__init__.py | 80 +++--- kvmd/plugins/msd/disabled.py | 47 ++-- kvmd/plugins/msd/otg.py | 108 -------- kvmd/plugins/msd/otg/__init__.py | 531 +++++++++++++++++++++++++++++++++++++++ kvmd/plugins/msd/otg/drive.py | 80 ++++++ kvmd/plugins/msd/otg/helpers.py | 79 ++++++ kvmd/plugins/msd/relay.py | 260 +++++++++---------- 10 files changed, 1256 insertions(+), 331 deletions(-) create mode 100644 kvmd/inotify.py delete mode 100644 kvmd/plugins/msd/otg.py create mode 100644 kvmd/plugins/msd/otg/__init__.py create mode 100644 kvmd/plugins/msd/otg/drive.py create mode 100644 kvmd/plugins/msd/otg/helpers.py (limited to 'kvmd') diff --git a/kvmd/aiotools.py b/kvmd/aiotools.py index 574c8bc7..98a2a524 100644 --- a/kvmd/aiotools.py +++ b/kvmd/aiotools.py @@ -20,6 +20,7 @@ # ========================================================================== # +import os import asyncio import functools import contextlib @@ -34,6 +35,9 @@ from typing import AsyncGenerator from typing import TypeVar from typing import Any +import aiofiles +import aiofiles.base + from . import aioregion from .logging import get_logger @@ -118,3 +122,10 @@ async def unlock_only_on_exception(lock: asyncio.Lock) -> AsyncGenerator[None, N except: # noqa: E722 lock.release() raise + + +# ===== +async def afile_write_now(afile: aiofiles.base.AiofilesContextManager, data: bytes) -> None: + await afile.write(data) + await afile.flush() + await run_async(os.fsync, afile.fileno()) diff --git a/kvmd/apps/kvmd/server.py b/kvmd/apps/kvmd/server.py index 04030545..b4e1ad22 100644 --- a/kvmd/apps/kvmd/server.py +++ b/kvmd/apps/kvmd/server.py @@ -368,7 +368,7 @@ class Server: # pylint: disable=too-many-instance-attributes self.__broadcast_event(_Events.INFO_STATE, (await self.__make_info())), self.__broadcast_event(_Events.HID_STATE, self.__hid.get_state()), self.__broadcast_event(_Events.ATX_STATE, self.__atx.get_state()), - self.__broadcast_event(_Events.MSD_STATE, self.__msd.get_state()), + self.__broadcast_event(_Events.MSD_STATE, (await self.__msd.get_state())), self.__broadcast_event(_Events.STREAMER_STATE, (await self.__streamer.get_state())), ]) async for msg in ws: @@ -469,52 +469,60 @@ class Server: # pylint: disable=too-many-instance-attributes @_exposed("GET", "/msd") async def __msd_state_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response: - return _json(self.__msd.get_state()) + return _json(await self.__msd.get_state()) + + @_exposed("POST", "/msd/set_params") + async def __msd_set_params_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response: + params = { + key: validator(request.query.get(param)) + for (param, key, validator) in [ + ("image", "name", (lambda arg: str(arg).strip() and valid_msd_image_name(arg))), + ("cdrom", "cdrom", valid_bool), + ] + if request.query.get(param) is not None + } + await self.__msd.set_params(**params) # type: ignore + return _json() @_exposed("POST", "/msd/connect") async def __msd_connect_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response: - return _json(await self.__msd.connect()) + await self.__msd.connect() + return _json() @_exposed("POST", "/msd/disconnect") async def __msd_disconnect_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response: - return _json(await self.__msd.disconnect()) - - @_exposed("POST", "/msd/select") - async def __msd_select_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response: - image_name = valid_msd_image_name(request.query.get("image_name")) - cdrom = valid_bool(request.query.get("cdrom", "true")) - return _json(await self.__msd.select(image_name, cdrom)) - - @_exposed("POST", "/msd/remove") - async def __msd_remove_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response: - return _json(await self.__msd.remove(valid_msd_image_name(request.query.get("image_name")))) + await self.__msd.disconnect() + return _json() @_exposed("POST", "/msd/write") async def __msd_write_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response: assert self.__sync_chunk_size is not None logger = get_logger(0) reader = await request.multipart() - image_name = "" + name = "" written = 0 try: - async with self.__msd: - name_field = await _get_multipart_field(reader, "image_name") - image_name = valid_msd_image_name((await name_field.read()).decode("utf-8")) + name_field = await _get_multipart_field(reader, "image") + name = valid_msd_image_name((await name_field.read()).decode("utf-8")) - data_field = await _get_multipart_field(reader, "image_data") + data_field = await _get_multipart_field(reader, "data") - logger.info("Writing image %r to MSD ...", image_name) - await self.__msd.write_image_info(image_name, False) + async with self.__msd.write_image(name): + logger.info("Writing image %r to MSD ...", name) while True: chunk = await data_field.read_chunk(self.__sync_chunk_size) if not chunk: break written = await self.__msd.write_image_chunk(chunk) - await self.__msd.write_image_info(image_name, True) finally: if written != 0: - logger.info("Written image %r with size=%d bytes to MSD", image_name, written) - return _json({"image": {"name": image_name, "size": written}}) + logger.info("Written image %r with size=%d bytes to MSD", name, written) + return _json({"image": {"name": name, "size": written}}) + + @_exposed("POST", "/msd/remove") + async def __msd_remove_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response: + await self.__msd.remove(valid_msd_image_name(request.query.get("image"))) + return _json() @_exposed("POST", "/msd/reset") async def __msd_reset_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response: diff --git a/kvmd/inotify.py b/kvmd/inotify.py new file mode 100644 index 00000000..cdded981 --- /dev/null +++ b/kvmd/inotify.py @@ -0,0 +1,335 @@ +# ========================================================================== # +# # +# KVMD - The main Pi-KVM daemon. # +# # +# Copyright (C) 2018 Maxim Devaev # +# # +# This source file is partially based on python-watchdog module. # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see . # +# # +# ========================================================================== # + + +import sys +import os +import asyncio +import asyncio.queues +import ctypes +import ctypes.util +import struct +import dataclasses +import types +import errno + +from ctypes import c_int +from ctypes import c_char_p +from ctypes import c_uint32 + +from typing import Tuple +from typing import List +from typing import Dict +from typing import Type +from typing import Generator +from typing import Optional + +from .logging import get_logger + + +# ===== +def _load_libc() -> ctypes.CDLL: + try: + path = ctypes.util.find_library("c") + except (OSError, IOError, RuntimeError): + pass + else: + if path: + return ctypes.CDLL(path) + + names = ["libc.so", "libc.so.6", "libc.so.0"] + for (index, name) in enumerate(names): + try: + return ctypes.CDLL(name) + except (OSError, IOError): + if index == len(names) - 1: + raise + + raise RuntimeError("Where is libc?") + + +_libc = _load_libc() + + +def _get_libc_func(name: str, restype, argtypes=None): # type: ignore + return ctypes.CFUNCTYPE(restype, *(argtypes or []), use_errno=True)((name, _libc)) + + +_inotify_init = _get_libc_func("inotify_init", c_int) +_inotify_add_watch = _get_libc_func("inotify_add_watch", c_int, [c_int, c_char_p, c_uint32]) +_inotify_rm_watch = _get_libc_func("inotify_rm_watch", c_int, [c_int, c_uint32]) + + +# ===== +_EVENT_HEAD_FMT = "iIII" +_EVENT_HEAD_SIZE = struct.calcsize(_EVENT_HEAD_FMT) +_EVENTS_BUFFER_LENGTH = 4096 * (_EVENT_HEAD_SIZE + 256) # count * (head + max_file_name_size + null_character) + +_FS_FALLBACK_ENCODING = "utf-8" +_FS_ENCODING = (sys.getfilesystemencoding() or _FS_FALLBACK_ENCODING) + + +# ===== +def _inotify_parsed_buffer(data: bytes) -> Generator[Tuple[int, int, int, bytes], None, None]: + offset = 0 + while offset + _EVENT_HEAD_SIZE <= len(data): + (wd, mask, cookie, length) = struct.unpack_from("iIII", data, offset) + name = data[ + offset + _EVENT_HEAD_SIZE # noqa: E203 + : + offset + _EVENT_HEAD_SIZE + length + ].rstrip(b"\0") + offset += _EVENT_HEAD_SIZE + length + if wd >= 0: + yield (wd, mask, cookie, name) + + +def _inotify_check(retval: int) -> int: + if retval < 0: + c_errno = ctypes.get_errno() + if c_errno == errno.ENOSPC: # pylint: disable=no-else-raise + raise OSError(c_errno, "Inotify watch limit reached") + elif c_errno == errno.EMFILE: + raise OSError(c_errno, "Inotify instance limit reached") + else: + raise OSError(c_errno, os.strerror(c_errno)) + return retval + + +def _fs_encode(path: str) -> bytes: + try: + return path.encode(_FS_ENCODING, "strict") + except UnicodeEncodeError: + return path.encode(_FS_FALLBACK_ENCODING, "strict") + + +def _fs_decode(path: bytes) -> str: + try: + return path.decode(_FS_ENCODING, "strict") + except UnicodeDecodeError: + return path.decode(_FS_FALLBACK_ENCODING, "strict") + + +# ===== +class InotifyMask: + # Userspace events + ACCESS = 0x00000001 # File was accessed + ATTRIB = 0x00000004 # Meta-data changed + CLOSE_WRITE = 0x00000008 # Writable file was closed + CLOSE_NOWRITE = 0x00000010 # Unwritable file closed + CREATE = 0x00000100 # Subfile was created + DELETE = 0x00000200 # Subfile was deleted + DELETE_SELF = 0x00000400 # Self was deleted + MODIFY = 0x00000002 # File was modified + MOVE_SELF = 0x00000800 # Self was moved + MOVED_FROM = 0x00000040 # File was moved from X + MOVED_TO = 0x00000080 # File was moved to Y + OPEN = 0x00000020 # File was opened + + # Events sent by the kernel to a watch + IGNORED = 0x00008000 # File was ignored + ISDIR = 0x40000000 # Event occurred against directory + Q_OVERFLOW = 0x00004000 # Event queued overflowed + UNMOUNT = 0x00002000 # Backing file system was unmounted + + # Helper userspace events +# CLOSE = CLOSE_WRITE | CLOSE_NOWRITE # Close +# MOVE = MOVED_FROM | MOVED_TO # Moves + + # Helper for userspace events +# ALL_EVENTS = ( +# ACCESS +# | ATTRIB +# | CLOSE_WRITE +# | CLOSE_NOWRITE +# | CREATE +# | DELETE +# | DELETE_SELF +# | MODIFY +# | MOVE_SELF +# | MOVED_FROM +# | MOVED_TO +# | OPEN +# ) + + # Helper for all modify events + ALL_MODIFY_EVENTS = ( + CLOSE_WRITE + | CREATE + | DELETE + | DELETE_SELF + | MODIFY + | MOVE_SELF + | MOVED_FROM + | MOVED_TO + ) + + # Special flags for watch() +# DONT_FOLLOW = 0x02000000 # Don't follow a symbolic link +# EXCL_UNLINK = 0x04000000 # Exclude events on unlinked objects +# MASK_CREATE = 0x10000000 # Don't overwrite existent watchers (since 4.18) +# MASK_ADD = 0x20000000 # Add to the mask of an existing watch +# ONESHOT = 0x80000000 # Only send event once +# ONLYDIR = 0x01000000 # Only watch the path if it's a directory + + @classmethod + def to_string(cls, mask: int) -> str: + flags: List[str] = [] + for name in dir(cls): + if ( + name[0].isupper() + and not name.startswith("ALL_") + and name not in ["CLOSE", "MOVE"] + and mask & getattr(cls, name) + ): + flags.append(name) + return "|".join(flags) + + +@dataclasses.dataclass(frozen=True, repr=False) +class InotifyEvent: + wd: int + mask: int + cookie: int + name: str + path: str + + def __repr__(self) -> str: + return ( + f"" + ) + + +class Inotify: + def __init__(self) -> None: + self.__fd = -1 + + self.__wd_by_path: Dict[str, int] = {} + self.__path_by_wd: Dict[int, str] = {} + + self.__moved: Dict[int, str] = {} + + self.__events_queue: asyncio.queues.Queue = asyncio.Queue() + + def watch(self, path: str, mask: int) -> None: + path = os.path.normpath(path) + assert path not in self.__wd_by_path, path + get_logger().info("Watching for %s: %s", path, InotifyMask.to_string(mask)) + wd = _inotify_check(_inotify_add_watch(self.__fd, _fs_encode(path), mask)) + self.__wd_by_path[path] = wd + self.__path_by_wd[wd] = path + +# def unwatch(self, path: str) -> None: +# path = os.path.normpath(path) +# assert path in self.__wd_by_path, path +# get_logger().info("Unwatching %s", path) +# wd = self.__wd_by_path[path] +# _inotify_check(_inotify_rm_watch(self.__fd, wd)) +# del self.__wd_by_path[path] +# del self.__path_by_wd[wd] + +# def has_events(self) -> bool: +# return (not self.__events_queue.empty()) + + async def get_event(self, timeout: float) -> Optional[InotifyEvent]: + assert timeout > 0 + try: + return (await asyncio.wait_for(self.__events_queue.get(), timeout=timeout)) + except asyncio.TimeoutError: + return None + + async def get_series(self, timeout: float) -> List[InotifyEvent]: + series: List[InotifyEvent] = [] + event = await self.get_event(timeout) + if event: + series.append(event) + while event: + event = await self.get_event(timeout) + if event: + series.append(event) + return series + + def __read_and_queue_events(self) -> None: + logger = get_logger() + for event in self.__read_parsed_events(): + # XXX: Ни в коем случае не приводить self.__read_parsed_events() к списку. + # Он использует self.__wd_by_path и self.__path_by_wd, содержимое которых + # корректируется кодом ниже. В противном случае все сломается. + + if event.mask & InotifyMask.MOVED_FROM: + self.__moved[event.cookie] = event.path # Save moved_from_path + elif event.mask & InotifyMask.MOVED_TO: + moved_from_path = self.__moved.pop(event.cookie, None) + if moved_from_path is not None: + wd = self.__wd_by_path.pop(moved_from_path, None) + if wd is not None: + self.__wd_by_path[event.path] = wd + self.__path_by_wd[wd] = event.path + + if event.mask & InotifyMask.IGNORED: + ignored_path = self.__path_by_wd[event.wd] + if self.__wd_by_path[ignored_path] == event.wd: + logger.info("Unwatching %s because IGNORED was received", ignored_path) + del self.__wd_by_path[ignored_path] + continue + + self.__events_queue.put_nowait(event) + + def __read_parsed_events(self) -> Generator[InotifyEvent, None, None]: + for (wd, mask, cookie, name_bytes) in _inotify_parsed_buffer(self.__read_buffer()): + wd_path = self.__path_by_wd.get(wd, None) + if wd_path is not None: + name = _fs_decode(name_bytes) + path = (os.path.join(wd_path, name) if name else wd_path) # Avoid trailing slash + yield InotifyEvent(wd, mask, cookie, name, path) + + def __read_buffer(self) -> bytes: + while True: + try: + return os.read(self.__fd, _EVENTS_BUFFER_LENGTH) + except OSError as err: + if err.errno == errno.EINTR: + pass + + def __enter__(self) -> "Inotify": + assert self.__fd < 0 + self.__fd = _inotify_check(_inotify_init()) + asyncio.get_event_loop().add_reader(self.__fd, self.__read_and_queue_events) + return self + + def __exit__( + self, + _exc_type: Type[BaseException], + _exc: BaseException, + _tb: types.TracebackType, + ) -> None: + + if self.__fd >= 0: + asyncio.get_event_loop().remove_reader(self.__fd) + for wd in list(self.__wd_by_path.values()): + _inotify_rm_watch(self.__fd, wd) + try: + os.close(self.__fd) + except Exception: + pass diff --git a/kvmd/plugins/msd/__init__.py b/kvmd/plugins/msd/__init__.py index 899b6f47..f43a40c2 100644 --- a/kvmd/plugins/msd/__init__.py +++ b/kvmd/plugins/msd/__init__.py @@ -20,11 +20,12 @@ # ========================================================================== # -import types +import contextlib from typing import Dict from typing import Type from typing import AsyncGenerator +from typing import Optional from .. import BasePlugin from .. import get_plugin_class @@ -44,19 +45,29 @@ class MsdOfflineError(MsdOperationError): super().__init__("MSD is not found") -class MsdAlreadyConnectedError(MsdOperationError): +class MsdConnectedError(MsdOperationError): def __init__(self) -> None: - super().__init__("MSD is already connected to Server") + super().__init__("MSD is connected to Server, but shouldn't for this operation") -class MsdAlreadyDisconnectedError(MsdOperationError): +class MsdDisconnectedError(MsdOperationError): def __init__(self) -> None: - super().__init__("MSD is already disconnected from Server") + super().__init__("MSD is disconnected from Server, but should be for this operation") -class MsdConnectedError(MsdOperationError): +class MsdImageNotSelected(MsdOperationError): + def __init__(self) -> None: + super().__init__("The image is not selected") + + +class MsdUnknownImageError(MsdOperationError): + def __init__(self) -> None: + super().__init__("The image is not found in the storage") + + +class MsdImageExistsError(MsdOperationError): def __init__(self) -> None: - super().__init__("MSD connected to Server, but should not") + super().__init__("This image is already exists") class MsdIsBusyError(MsdOperationError): @@ -69,52 +80,51 @@ class MsdMultiNotSupported(MsdOperationError): super().__init__("This MSD does not support storing multiple images") +class MsdCdromNotSupported(MsdOperationError): + def __init__(self) -> None: + super().__init__("This MSD does not support CD-ROM emulation") + + # ===== class BaseMsd(BasePlugin): - def get_state(self) -> Dict: - raise NotImplementedError + async def get_state(self) -> Dict: + raise NotImplementedError() async def poll_state(self) -> AsyncGenerator[Dict, None]: - yield {} - raise NotImplementedError + if True: # pylint: disable=using-constant-test + # XXX: Vulture hack + raise NotImplementedError() + yield async def reset(self) -> None: - raise NotImplementedError + raise NotImplementedError() async def cleanup(self) -> None: pass # ===== - async def connect(self) -> Dict: - raise NotImplementedError + async def set_params(self, name: Optional[str]=None, cdrom: Optional[bool]=None) -> None: + raise NotImplementedError() - async def disconnect(self) -> Dict: - raise NotImplementedError + async def connect(self) -> None: + raise NotImplementedError() - async def select(self, name: str, cdrom: bool) -> Dict: - raise NotImplementedError + async def disconnect(self) -> None: + raise NotImplementedError() - async def remove(self, name: str) -> Dict: - raise NotImplementedError - - async def __aenter__(self) -> "BaseMsd": - raise NotImplementedError - - async def write_image_info(self, name: str, complete: bool) -> None: - raise NotImplementedError + @contextlib.asynccontextmanager + async def write_image(self, name: str) -> AsyncGenerator[None, None]: + if True: # pylint: disable=using-constant-test + # XXX: Vulture hack + raise NotImplementedError() + yield async def write_image_chunk(self, chunk: bytes) -> int: - raise NotImplementedError - - async def __aexit__( - self, - _exc_type: Type[BaseException], - _exc: BaseException, - _tb: types.TracebackType, - ) -> None: + raise NotImplementedError() - raise NotImplementedError + async def remove(self, name: str) -> None: + raise NotImplementedError() # ===== diff --git a/kvmd/plugins/msd/disabled.py b/kvmd/plugins/msd/disabled.py index eafbdd65..97835513 100644 --- a/kvmd/plugins/msd/disabled.py +++ b/kvmd/plugins/msd/disabled.py @@ -21,11 +21,11 @@ import asyncio -import types +import contextlib from typing import Dict -from typing import Type from typing import AsyncGenerator +from typing import Optional from . import MsdOperationError from . import BaseMsd @@ -39,23 +39,22 @@ class MsdDisabledError(MsdOperationError): # ===== class Plugin(BaseMsd): - def get_state(self) -> Dict: + async def get_state(self) -> Dict: return { "enabled": False, - "multi": False, "online": False, "busy": False, - "uploading": False, - "written": 0, - "current": None, "storage": None, - "cdrom": None, - "connected": False, + "drive": None, + "features": { + "multi": False, + "cdrom": False, + }, } async def poll_state(self) -> AsyncGenerator[Dict, None]: while True: - yield self.get_state() + yield (await self.get_state()) await asyncio.sleep(60) async def reset(self) -> None: @@ -63,32 +62,24 @@ class Plugin(BaseMsd): # ===== - async def connect(self) -> Dict: + async def set_params(self, name: Optional[str]=None, cdrom: Optional[bool]=None) -> None: raise MsdDisabledError() - async def disconnect(self) -> Dict: + async def connect(self) -> None: raise MsdDisabledError() - async def select(self, name: str, cdrom: bool) -> Dict: + async def disconnect(self) -> None: raise MsdDisabledError() - async def remove(self, name: str) -> Dict: - raise MsdDisabledError() - - async def __aenter__(self) -> BaseMsd: - raise MsdDisabledError() - - async def write_image_info(self, name: str, complete: bool) -> None: - raise MsdDisabledError() + @contextlib.asynccontextmanager + async def write_image(self, name: str) -> AsyncGenerator[None, None]: + if True: # pylint: disable=using-constant-test + # XXX: Vulture hack + raise MsdDisabledError() + yield async def write_image_chunk(self, chunk: bytes) -> int: raise MsdDisabledError() - async def __aexit__( - self, - _exc_type: Type[BaseException], - _exc: BaseException, - _tb: types.TracebackType, - ) -> None: - + async def remove(self, name: str) -> None: raise MsdDisabledError() diff --git a/kvmd/plugins/msd/otg.py b/kvmd/plugins/msd/otg.py deleted file mode 100644 index bf636bcf..00000000 --- a/kvmd/plugins/msd/otg.py +++ /dev/null @@ -1,108 +0,0 @@ -# ========================================================================== # -# # -# KVMD - The main Pi-KVM daemon. # -# # -# Copyright (C) 2018 Maxim Devaev # -# # -# This program is free software: you can redistribute it and/or modify # -# it under the terms of the GNU General Public License as published by # -# the Free Software Foundation, either version 3 of the License, or # -# (at your option) any later version. # -# # -# This program is distributed in the hope that it will be useful, # -# but WITHOUT ANY WARRANTY; without even the implied warranty of # -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # -# GNU General Public License for more details. # -# # -# You should have received a copy of the GNU General Public License # -# along with this program. If not, see . # -# # -# ========================================================================== # - - -import asyncio -import types - -from typing import Dict -from typing import Type -from typing import AsyncGenerator - -from ...yamlconf import Option - -from ...validators.os import valid_abs_dir -from ...validators.os import valid_command - -from . import MsdOperationError -from . import BaseMsd - - -# ===== -class MsdCliOnlyError(MsdOperationError): - def __init__(self) -> None: - super().__init__("Only CLI") - - -# ===== -class Plugin(BaseMsd): - @classmethod - def get_plugin_options(cls) -> Dict: - sudo = ["/usr/bin/sudo", "--non-interactive"] - return { - "storage": Option("/var/lib/kvmd/msd", type=valid_abs_dir, unpack_as="storage_path"), - "remount_cmd": Option([*sudo, "/usr/bin/kvmd-helper-otgmsd-remount", "{mode}"], type=valid_command), - "unlock_cmd": Option([*sudo, "/usr/bin/kvmd-helper-otgmsd-unlock", "unlock"], type=valid_command), - } - - def get_state(self) -> Dict: - return { - "enabled": False, - "multi": False, - "online": False, - "busy": False, - "uploading": False, - "written": 0, - "current": None, - "storage": None, - "cdrom": None, - "connected": False, - } - - async def poll_state(self) -> AsyncGenerator[Dict, None]: - while True: - yield self.get_state() - await asyncio.sleep(60) - - async def reset(self) -> None: - raise MsdCliOnlyError() - - # ===== - - async def connect(self) -> Dict: - raise MsdCliOnlyError() - - async def disconnect(self) -> Dict: - raise MsdCliOnlyError() - - async def select(self, name: str, cdrom: bool) -> Dict: - raise MsdCliOnlyError() - - async def remove(self, name: str) -> Dict: - raise MsdCliOnlyError() - - async def __aenter__(self) -> BaseMsd: - raise MsdCliOnlyError() - - async def write_image_info(self, name: str, complete: bool) -> None: - raise MsdCliOnlyError() - - async def write_image_chunk(self, chunk: bytes) -> int: - raise MsdCliOnlyError() - - async def __aexit__( - self, - _exc_type: Type[BaseException], - _exc: BaseException, - _tb: types.TracebackType, - ) -> None: - - raise MsdCliOnlyError() diff --git a/kvmd/plugins/msd/otg/__init__.py b/kvmd/plugins/msd/otg/__init__.py new file mode 100644 index 00000000..76ef7d07 --- /dev/null +++ b/kvmd/plugins/msd/otg/__init__.py @@ -0,0 +1,531 @@ +# ========================================================================== # +# # +# KVMD - The main Pi-KVM daemon. # +# # +# Copyright (C) 2018 Maxim Devaev # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see . # +# # +# ========================================================================== # + + +import os +import asyncio +import contextlib +import dataclasses + +from typing import List +from typing import Dict +from typing import AsyncGenerator +from typing import Optional + +import aiofiles +import aiofiles.base + +from ....logging import get_logger + +from ....inotify import InotifyMask +from ....inotify import Inotify + +from ....yamlconf import Option + +from ....validators.os import valid_abs_dir +from ....validators.os import valid_command + +from .... import aiotools +from .... import aioregion + +from .. import MsdError +from .. import MsdOfflineError +from .. import MsdConnectedError +from .. import MsdDisconnectedError +from .. import MsdImageNotSelected +from .. import MsdUnknownImageError +from .. import MsdImageExistsError +from .. import MsdIsBusyError +from .. import BaseMsd + +from .drive import Drive + +from .helpers import remount_storage +from .helpers import unlock_drive + + +# ===== +@dataclasses.dataclass(frozen=True) +class _DriveImage: + name: str + path: str + size: int + complete: bool + in_storage: bool + + +@dataclasses.dataclass(frozen=True) +class _DriveState: + image: Optional[_DriveImage] + cdrom: bool + rw: bool + + +@dataclasses.dataclass(frozen=True) +class _StorageState: + size: int + free: int + images: Dict[str, _DriveImage] + + +# ===== +@dataclasses.dataclass +class _VirtualDriveState: + image: Optional[_DriveImage] + connected: bool + cdrom: bool + + @classmethod + def from_drive_state(cls, state: _DriveState) -> "_VirtualDriveState": + return _VirtualDriveState( + image=state.image, + connected=bool(state.image), + cdrom=state.cdrom, + ) + + +class _State: + def __init__(self, changes_queue: asyncio.queues.Queue) -> None: + self.__changes_queue = changes_queue + + self.storage: Optional[_StorageState] = None + self.vd: Optional[_VirtualDriveState] = None + + self._lock = asyncio.Lock() + self._region = aioregion.AioExclusiveRegion(MsdIsBusyError) + + @contextlib.asynccontextmanager + async def busy(self, check_online: bool=True) -> AsyncGenerator[None, None]: + with self._region: + async with self._lock: + await self.__changes_queue.put(None) + if check_online: + if self.vd is None: + raise MsdOfflineError() + assert self.storage + yield + await self.__changes_queue.put(None) + + def is_busy(self) -> bool: + return self._region.is_busy() + + +# ===== +class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes + def __init__( # pylint: disable=super-init-not-called + self, + storage_path: str, + + remount_cmd: List[str], + unlock_cmd: List[str], + + gadget: str, # XXX: Not from options, see /kvmd/apps/kvmd/__init__.py for details + ) -> None: + + self.__storage_path = os.path.normpath(storage_path) + self.__images_path = os.path.join(self.__storage_path, "images") + self.__meta_path = os.path.join(self.__storage_path, "meta") + + self.__remount_cmd = remount_cmd + self.__unlock_cmd = unlock_cmd + + self.__drive = Drive(gadget, instance=0, lun=0) + + self.__new_file: Optional[aiofiles.base.AiofilesContextManager] = None + self.__new_file_written = 0 + + self.__changes_queue: asyncio.queues.Queue = asyncio.Queue() + + self.__state = _State(self.__changes_queue) + + logger = get_logger(0) + logger.info("Using OTG gadget %r as MSD", gadget) + aiotools.run_sync(self.__reload_state()) + + @classmethod + def get_plugin_options(cls) -> Dict: + sudo = ["/usr/bin/sudo", "--non-interactive"] + return { + "storage": Option("/var/lib/kvmd/msd", type=valid_abs_dir, unpack_as="storage_path"), + "remount_cmd": Option([*sudo, "/usr/bin/kvmd-helper-otgmsd-remount", "{mode}"], type=valid_command), + "unlock_cmd": Option([*sudo, "/usr/bin/kvmd-helper-otgmsd-unlock", "unlock"], type=valid_command), + } + + async def get_state(self) -> Dict: + async with self.__state._lock: # pylint: disable=protected-access + storage: Optional[Dict] = None + if self.__state.storage: + storage = dataclasses.asdict(self.__state.storage) + for name in list(storage["images"]): + del storage["images"][name]["path"] + del storage["images"][name]["in_storage"] + storage["uploading"] = bool(self.__new_file) + + vd: Optional[Dict] = None + if self.__state.vd: + vd = dataclasses.asdict(self.__state.vd) + if vd["image"]: + del vd["image"]["path"] + + return { + "enabled": False, # FIXME + "online": bool(self.__state.vd), + "busy": self.__state.is_busy(), + "storage": storage, + "drive": vd, + "features": { + "multi": True, + "cdrom": True, + }, + } + + async def poll_state(self) -> AsyncGenerator[Dict, None]: + inotify_task = asyncio.create_task(self.__watch_inotify()) + prev_state: Dict = {} + try: + while True: + if inotify_task.cancelled(): + break + if inotify_task.done(): + RuntimeError("Inotify task is dead") + + try: + await asyncio.wait_for(self.__changes_queue.get(), timeout=0.1) + except asyncio.TimeoutError: + continue + + state = await self.get_state() + if state != prev_state: + yield state + prev_state = state + finally: + if not inotify_task.done(): + inotify_task.cancel() + await inotify_task + + @aiotools.atomic + async def reset(self) -> None: + async with self.__state.busy(check_online=False): + try: + await self.__unlock_drive() + self.__drive.set_image_path("") + self.__drive.set_rw_flag(False) + self.__drive.set_cdrom_flag(False) + except asyncio.CancelledError: + raise + except Exception: + get_logger(0).exception("Can't reset MSD") + + @aiotools.atomic + async def cleanup(self) -> None: + await self.__close_new_file() + + # ===== + + @aiotools.atomic + async def set_params(self, name: Optional[str]=None, cdrom: Optional[bool]=None) -> None: + async with self.__state.busy(): + assert self.__state.storage + assert self.__state.vd + + if self.__state.vd.connected or self.__drive.get_image_path(): + raise MsdConnectedError() + + if name is not None: + if name: + image = self.__state.storage.images.get(name) + if image is None or not os.path.exists(image.path): + raise MsdUnknownImageError() + assert image.in_storage + self.__state.vd.image = image + else: + self.__state.vd.image = None + + if cdrom is not None: + self.__state.vd.cdrom = cdrom + + @aiotools.atomic + async def connect(self) -> None: + async with self.__state.busy(): + assert self.__state.vd + + if self.__state.vd.connected or self.__drive.get_image_path(): + raise MsdConnectedError() + if self.__state.vd.image is None: + raise MsdImageNotSelected() + + assert self.__state.vd.image.in_storage + + if not os.path.exists(self.__state.vd.image.path): + raise MsdUnknownImageError() + + await self.__unlock_drive() + self.__drive.set_cdrom_flag(self.__state.vd.cdrom) + self.__drive.set_image_path(self.__state.vd.image.path) + self.__state.vd.connected = True + + @aiotools.atomic + async def disconnect(self) -> None: + async with self.__state.busy(): + assert self.__state.vd + + if not (self.__state.vd.connected or self.__drive.get_image_path()): + raise MsdDisconnectedError() + + await self.__unlock_drive() + self.__drive.set_image_path("") + self.__state.vd.connected = False + + @contextlib.asynccontextmanager + async def write_image(self, name: str) -> AsyncGenerator[None, None]: + try: + with self.__state._region: # pylint: disable=protected-access + try: + async with self.__state._lock: # pylint: disable=protected-access + await self.__changes_queue.put(None) + assert self.__state.storage + assert self.__state.vd + + if self.__state.vd.connected or self.__drive.get_image_path(): + raise MsdConnectedError() + + path = os.path.join(self.__images_path, name) + if name in self.__state.storage.images or os.path.exists(path): + raise MsdImageExistsError() + + await self.__remount_storage(rw=True) + self.__set_image_complete(name, False) + self.__new_file_written = 0 + self.__new_file = await aiofiles.open(path, mode="w+b", buffering=0) + + await self.__changes_queue.put(None) + yield + self.__set_image_complete(name, True) + + finally: + await self.__close_new_file() + try: + await self.__remount_storage(rw=False) + except asyncio.CancelledError: # pylint: disable=try-except-raise + raise + except Exception: + pass + finally: + await self.__changes_queue.put(None) + + @aiotools.atomic + async def write_image_chunk(self, chunk: bytes) -> int: + assert self.__new_file + await aiotools.afile_write_now(self.__new_file, chunk) + self.__new_file_written += len(chunk) + return self.__new_file_written + + async def remove(self, name: str) -> None: + async with self.__state.busy(): + assert self.__state.storage + assert self.__state.vd + + if self.__state.vd.connected or self.__drive.get_image_path(): + raise MsdConnectedError() + + image = self.__state.storage.images.get(name) + if image is None or not os.path.exists(image.path): + raise MsdUnknownImageError() + assert image.in_storage + + if self.__state.vd.image == image: + self.__state.vd.image = None + del self.__state.storage.images[name] + + await self.__remount_storage(rw=True) + os.remove(image.path) + self.__set_image_complete(name, False) + await self.__remount_storage(rw=False) + + # ===== + + async def __close_new_file(self) -> None: + try: + if self.__new_file: + get_logger().info("Closing new image file ...") + await self.__new_file.close() + except asyncio.CancelledError: # pylint: disable=try-except-raise + raise + except Exception: + get_logger().exception("Can't close device file") + finally: + self.__new_file = None + self.__new_file_written = 0 + + # ===== + + async def __watch_inotify(self) -> None: + logger = get_logger(0) + while True: + try: + while True: + # Активно ждем, пока не будут на месте все каталоги. + await self.__reload_state() + await self.__changes_queue.put(None) + if self.__state.vd: + break + await asyncio.sleep(5) + + with Inotify() as inotify: + inotify.watch(self.__images_path, InotifyMask.ALL_MODIFY_EVENTS) + inotify.watch(self.__meta_path, InotifyMask.ALL_MODIFY_EVENTS) + inotify.watch(self.__drive.get_sysfs_path(), InotifyMask.ALL_MODIFY_EVENTS) + + # После установки вотчеров еще раз проверяем стейт, чтобы ничего не потерять + await self.__reload_state() + await self.__changes_queue.put(None) + + while self.__state.vd: # Если живы после предыдущей проверки + need_restart = False + need_reload_state = False + for event in (await inotify.get_series(timeout=1)): + need_reload_state = True + if event.mask & (InotifyMask.DELETE_SELF | InotifyMask.MOVE_SELF | InotifyMask.UNMOUNT): + # Если выгрузили OTG, что-то отмонтировали или делают еще какую-то странную фигню + logger.warning("Got fatal inotify event: %s; reinitializing MSD ...", event) + need_restart = True + break + if need_restart: + break + if need_reload_state: + await self.__reload_state() + await self.__changes_queue.put(None) + except asyncio.CancelledError: # pylint: disable=try-except-raise + raise + except Exception: + logger.exception("Unexpected MSD watcher error") + + async def __reload_state(self) -> None: + logger = get_logger(0) + async with self.__state._lock: # pylint: disable=protected-access + try: + drive_state = self.__get_drive_state() + if drive_state.rw: + # Внештатное использование MSD, ломаемся + raise MsdError("MSD has been switched to RW-mode manually") + + if self.__state.vd is None and drive_state.image is None: + # Если только что включились и образ не подключен - попробовать + # перемонтировать хранилище (и создать images и meta). + logger.info("Probing to remount storage ...") + await self.__remount_storage(rw=True) + await self.__remount_storage(rw=False) + + storage_state = self.__get_storage_state() + except asyncio.CancelledError: # pylint: disable=try-except-raise + raise + except Exception: + logger.exception("Error while reloading MSD state; switching to offline") + self.__state.storage = None + self.__state.vd = None + else: + self.__state.storage = storage_state + if drive_state.image: + # При подключенном образе виртуальный стейт заменяется реальным + self.__state.vd = _VirtualDriveState.from_drive_state(drive_state) + else: + if self.__state.vd is None: + # Если раньше MSD был отключен + self.__state.vd = _VirtualDriveState.from_drive_state(drive_state) + + if ( + self.__state.vd.image + and (not self.__state.vd.image.in_storage or not os.path.exists(self.__state.vd.image.path)) + ): + # Если только что отключили ручной образ вне хранилища или ранее выбранный образ был удален + self.__state.vd.image = None + + self.__state.vd.connected = False + + # ===== + + def __get_storage_state(self) -> _StorageState: + images: Dict[str, _DriveImage] = {} + for name in os.listdir(self.__images_path): + path = os.path.join(self.__images_path, name) + if os.path.exists(path): + size = self.__get_file_size(path) + if size >= 0: + images[name] = _DriveImage( + name=name, + path=path, + size=size, + complete=self.__is_image_complete(name), + in_storage=True, + ) + st = os.statvfs(self.__storage_path) + return _StorageState( + size=(st.f_blocks * st.f_frsize), + free=(st.f_bavail * st.f_frsize), + images=images, + ) + + def __get_drive_state(self) -> _DriveState: + image: Optional[_DriveImage] = None + path = self.__drive.get_image_path() + if path: + name = os.path.basename(path) + in_storage = (os.path.dirname(path) == self.__images_path) + image = _DriveImage( + name=name, + path=path, + size=max(self.__get_file_size(path), 0), + complete=(self.__is_image_complete(name) if in_storage else True), + in_storage=in_storage, + ) + return _DriveState( + image=image, + cdrom=self.__drive.get_cdrom_flag(), + rw=self.__drive.get_rw_flag(), + ) + + # ===== + + def __get_file_size(self, path: str) -> int: + try: + return os.path.getsize(path) + except Exception as err: + get_logger().warning("Can't get size of file %s: %s", path, err) + return -1 + + def __is_image_complete(self, name: str) -> bool: + return os.path.exists(os.path.join(self.__meta_path, name + ".complete")) + + def __set_image_complete(self, name: str, flag: bool) -> None: + path = os.path.join(self.__meta_path, name + ".complete") + if flag: + open(path, "w").close() + else: + if os.path.exists(path): + os.remove(path) + + # ===== + + async def __remount_storage(self, rw: bool) -> None: + await remount_storage(self.__remount_cmd, rw) + + async def __unlock_drive(self) -> None: + await unlock_drive(self.__unlock_cmd) diff --git a/kvmd/plugins/msd/otg/drive.py b/kvmd/plugins/msd/otg/drive.py new file mode 100644 index 00000000..e6457527 --- /dev/null +++ b/kvmd/plugins/msd/otg/drive.py @@ -0,0 +1,80 @@ +# ========================================================================== # +# # +# KVMD - The main Pi-KVM daemon. # +# # +# Copyright (C) 2018 Maxim Devaev # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see . # +# # +# ========================================================================== # + + +import os +import errno + +from .. import MsdOperationError + + +# ===== +class MsdDriveLockedError(MsdOperationError): + def __init__(self) -> None: + super().__init__("MSD drive is locked on IO operation") + + +# ===== +class Drive: + def __init__(self, gadget: str, instance: int, lun: int) -> None: + self.__path = os.path.join( + "/sys/kernel/config/usb_gadget", + gadget, + f"functions/mass_storage.usb{instance}/lun.{lun}", + ) + + def get_sysfs_path(self) -> str: + return self.__path + + # ===== + + def set_image_path(self, path: str) -> None: + self.__set_param("file", path) + + def get_image_path(self) -> str: + return self.__get_param("file") + + def set_cdrom_flag(self, flag: bool) -> None: + self.__set_param("cdrom", str(int(flag))) + + def get_cdrom_flag(self) -> bool: + return bool(int(self.__get_param("cdrom"))) + + def set_rw_flag(self, flag: bool) -> None: + self.__set_param("ro", str(int(not flag))) + + def get_rw_flag(self) -> bool: + return (not int(self.__get_param("ro"))) + + # ===== + + def __get_param(self, param: str) -> str: + with open(os.path.join(self.__path, param)) as param_file: + return param_file.read().strip() + + def __set_param(self, param: str, value: str) -> None: + try: + with open(os.path.join(self.__path, param), "w") as param_file: + param_file.write(value + "\n") + except OSError as err: + if err.errno == errno.EBUSY: + raise MsdDriveLockedError() + raise diff --git a/kvmd/plugins/msd/otg/helpers.py b/kvmd/plugins/msd/otg/helpers.py new file mode 100644 index 00000000..ee759ef0 --- /dev/null +++ b/kvmd/plugins/msd/otg/helpers.py @@ -0,0 +1,79 @@ +# ========================================================================== # +# # +# KVMD - The main Pi-KVM daemon. # +# # +# Copyright (C) 2018 Maxim Devaev # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see . # +# # +# ========================================================================== # + + +import signal +import asyncio +import asyncio.subprocess + +from typing import List + +from ....logging import get_logger + +from .. import MsdError + + +# ===== +async def remount_storage(base_cmd: List[str], rw: bool) -> None: + logger = get_logger(0) + mode = ("rw" if rw else "ro") + cmd = [ + part.format(mode=mode) + for part in base_cmd + ] + logger.info("Remounting internal storage to %s ...", mode.upper()) + try: + await _run_helper(cmd) + except Exception: + logger.error("Can't remount internal storage") + raise + + +async def unlock_drive(base_cmd: List[str]) -> None: + logger = get_logger(0) + logger.info("Unlocking the drive ...") + try: + await _run_helper(base_cmd) + except Exception: + logger.error("Can't unlock the drive") + raise + + +# ===== +async def _run_helper(cmd: List[str]) -> None: + logger = get_logger(0) + logger.info("Executing helper %s ...", cmd) + + proc = await asyncio.create_subprocess_exec( + *cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.STDOUT, + preexec_fn=(lambda: signal.signal(signal.SIGINT, signal.SIG_IGN)), + ) + + stdout = (await proc.communicate())[0].decode(errors="ignore").strip() + if stdout: + log = (logger.info if proc.returncode == 0 else logger.error) + for line in stdout.split("\n"): + log("Console: %s", line) + + if proc.returncode != 0: + raise MsdError(f"Error while helper execution: pid={proc.pid}; retcode={proc.returncode}") diff --git a/kvmd/plugins/msd/relay.py b/kvmd/plugins/msd/relay.py index eff22883..46b5a371 100644 --- a/kvmd/plugins/msd/relay.py +++ b/kvmd/plugins/msd/relay.py @@ -26,16 +26,13 @@ import fcntl import struct import asyncio import asyncio.queues +import contextlib import dataclasses -import types from typing import Dict from typing import IO -from typing import Callable -from typing import Type from typing import AsyncGenerator from typing import Optional -from typing import Any import aiofiles import aiofiles.base @@ -57,11 +54,11 @@ from ...validators.hw import valid_gpio_pin from . import MsdError from . import MsdOfflineError -from . import MsdAlreadyConnectedError -from . import MsdAlreadyDisconnectedError from . import MsdConnectedError +from . import MsdDisconnectedError from . import MsdIsBusyError from . import MsdMultiNotSupported +from . import MsdCdromNotSupported from . import BaseMsd @@ -83,11 +80,11 @@ class _DeviceInfo: _IMAGE_INFO_SIZE = 4096 _IMAGE_INFO_MAGIC_SIZE = 16 -_IMAGE_INFO_IMAGE_NAME_SIZE = 256 -_IMAGE_INFO_PADS_SIZE = _IMAGE_INFO_SIZE - _IMAGE_INFO_IMAGE_NAME_SIZE - 1 - 8 - _IMAGE_INFO_MAGIC_SIZE * 8 +_IMAGE_INFO_NAME_SIZE = 256 +_IMAGE_INFO_PADS_SIZE = _IMAGE_INFO_SIZE - _IMAGE_INFO_NAME_SIZE - 1 - 8 - _IMAGE_INFO_MAGIC_SIZE * 8 _IMAGE_INFO_FORMAT = ">%dL%dc?Q%dx%dL" % ( _IMAGE_INFO_MAGIC_SIZE, - _IMAGE_INFO_IMAGE_NAME_SIZE, + _IMAGE_INFO_NAME_SIZE, _IMAGE_INFO_PADS_SIZE, _IMAGE_INFO_MAGIC_SIZE, ) @@ -100,8 +97,8 @@ def _make_image_info_bytes(name: str, size: int, complete: bool) -> bytes: *_IMAGE_INFO_MAGIC, *memoryview(( # type: ignore name.encode("utf-8") - + b"\x00" * _IMAGE_INFO_IMAGE_NAME_SIZE - )[:_IMAGE_INFO_IMAGE_NAME_SIZE]).cast("c"), + + b"\x00" * _IMAGE_INFO_NAME_SIZE + )[:_IMAGE_INFO_NAME_SIZE]).cast("c"), complete, size, *_IMAGE_INFO_MAGIC, @@ -117,11 +114,15 @@ def _parse_image_info_bytes(data: bytes) -> Optional[_ImageInfo]: magic_begin = parsed[:_IMAGE_INFO_MAGIC_SIZE] magic_end = parsed[-_IMAGE_INFO_MAGIC_SIZE:] if magic_begin == magic_end == _IMAGE_INFO_MAGIC: - image_name_bytes = b"".join(parsed[_IMAGE_INFO_MAGIC_SIZE:_IMAGE_INFO_MAGIC_SIZE + _IMAGE_INFO_IMAGE_NAME_SIZE]) + image_name_bytes = b"".join(parsed[ + _IMAGE_INFO_MAGIC_SIZE # noqa: E203 + : + _IMAGE_INFO_MAGIC_SIZE + _IMAGE_INFO_NAME_SIZE + ]) return _ImageInfo( name=image_name_bytes.decode("utf-8", errors="ignore").strip("\x00").strip(), - size=parsed[_IMAGE_INFO_MAGIC_SIZE + _IMAGE_INFO_IMAGE_NAME_SIZE + 1], - complete=parsed[_IMAGE_INFO_MAGIC_SIZE + _IMAGE_INFO_IMAGE_NAME_SIZE], + size=parsed[_IMAGE_INFO_MAGIC_SIZE + _IMAGE_INFO_NAME_SIZE + 1], + complete=parsed[_IMAGE_INFO_MAGIC_SIZE + _IMAGE_INFO_NAME_SIZE], ) return None @@ -152,14 +153,7 @@ def _explore_device(device_path: str) -> _DeviceInfo: ) -def _msd_working(method: Callable) -> Callable: - async def wrapper(self: "Plugin", *args: Any, **kwargs: Any) -> Any: - if not self._device_info: # pylint: disable=protected-access - raise MsdOfflineError() - return (await method(self, *args, **kwargs)) - return wrapper - - +# ===== class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes def __init__( # pylint: disable=super-init-not-called self, @@ -182,10 +176,11 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes self.__region = aioregion.AioExclusiveRegion(MsdIsBusyError) - self._device_info: Optional[_DeviceInfo] = None + self.__device_info: Optional[_DeviceInfo] = None + self.__connected = False + self.__device_file: Optional[aiofiles.base.AiofilesContextManager] = None self.__written = 0 - self.__on_kvm = True self.__state_queue: asyncio.queues.Queue = asyncio.Queue() @@ -209,27 +204,29 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes "reset_delay": Option(1.0, type=valid_float_f01), } - def get_state(self) -> Dict: - current: Optional[Dict] = None + async def get_state(self) -> Dict: storage: Optional[Dict] = None - if self._device_info: + drive: Optional[Dict] = None + if self.__device_info: storage = { - "size": self._device_info.size, - "free": self._device_info.free, + "size": self.__device_info.size, + "free": self.__device_info.free, + "uploading": bool(self.__device_file) + } + drive = { + "image": (self.__device_info.image and dataclasses.asdict(self.__device_info.image)), + "connected": self.__connected, } - if self._device_info.image: - current = dataclasses.asdict(self._device_info.image) return { "enabled": True, - "multi": False, - "online": bool(self._device_info), + "online": bool(self.__device_info), "busy": self.__region.is_busy(), - "uploading": bool(self.__device_file), - "written": self.__written, - "current": current, "storage": storage, - "cdrom": None, - "connected": (not self.__on_kvm), + "drive": drive, + "features": { + "multi": False, + "cdrom": False, + }, } async def poll_state(self) -> AsyncGenerator[Dict, None]: @@ -250,7 +247,7 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes gpio.write(self.__reset_pin, False) gpio.write(self.__target_pin, False) - self.__on_kvm = True + self.__connected = False await self.__load_device_info() get_logger(0).info("MSD reset has been successful") @@ -259,7 +256,7 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes gpio.write(self.__reset_pin, False) finally: self.__region.exit() - await self.__state_queue.put(self.get_state()) + await self.__state_queue.put(await self.get_state()) @aiotools.atomic async def cleanup(self) -> None: @@ -269,116 +266,107 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes # ===== - @_msd_working + async def set_params(self, name: Optional[str]=None, cdrom: Optional[bool]=None) -> None: + async with self.__working(): + if name is not None: + raise MsdMultiNotSupported() + if cdrom is not None: + raise MsdCdromNotSupported() + @aiotools.atomic - async def connect(self) -> Dict: - notify = False - state: Dict = {} - try: - with self.__region: - if not self.__on_kvm: - raise MsdAlreadyConnectedError() - notify = True + async def connect(self) -> None: + async with self.__working(): + notify = False + try: + with self.__region: + if self.__connected: + raise MsdConnectedError() + notify = True + + gpio.write(self.__target_pin, True) + self.__connected = True + get_logger(0).info("MSD switched to Server") + finally: + if notify: + await self.__state_queue.put(await self.get_state()) - gpio.write(self.__target_pin, True) - self.__on_kvm = False - get_logger(0).info("MSD switched to Server") + @aiotools.atomic + async def disconnect(self) -> None: + async with self.__working(): + notify = False + try: + with self.__region: + if not self.__connected: + raise MsdDisconnectedError() + notify = True + + gpio.write(self.__target_pin, False) + try: + await self.__load_device_info() + except Exception: + if self.__connected: + gpio.write(self.__target_pin, True) + raise + self.__connected = False + get_logger(0).info("MSD switched to KVM: %s", self.__device_info) + finally: + if notify: + await self.__state_queue.put(await self.get_state()) - state = self.get_state() - return state - finally: - if notify: - await self.__state_queue.put(state or self.get_state()) + @contextlib.asynccontextmanager + async def write_image(self, name: str) -> AsyncGenerator[None, None]: + async with self.__working(): + self.__region.enter() + try: + assert self.__device_info + if self.__connected: + raise MsdConnectedError() - @_msd_working - @aiotools.atomic - async def disconnect(self) -> Dict: - notify = False - state: Dict = {} - try: - with self.__region: - if self.__on_kvm: - raise MsdAlreadyDisconnectedError() - notify = True + self.__device_file = await aiofiles.open(self.__device_info.path, mode="w+b", buffering=0) + self.__written = 0 - gpio.write(self.__target_pin, False) + await self.__write_image_info(name, complete=False) + await self.__state_queue.put(await self.get_state()) + yield + await self.__write_image_info(name, complete=True) + finally: try: + await self.__close_device_file() await self.__load_device_info() - except Exception: - if not self.__on_kvm: - gpio.write(self.__target_pin, True) - raise - self.__on_kvm = True - get_logger(0).info("MSD switched to KVM: %s", self._device_info) - - state = self.get_state() - return state - finally: - if notify: - await self.__state_queue.put(state or self.get_state()) - - @_msd_working - async def select(self, name: str, cdrom: bool) -> Dict: - raise MsdMultiNotSupported() - - @_msd_working - async def remove(self, name: str) -> Dict: - raise MsdMultiNotSupported() - - @_msd_working - @aiotools.atomic - async def __aenter__(self) -> "Plugin": - assert self._device_info - self.__region.enter() - try: - if not self.__on_kvm: - raise MsdConnectedError() - self.__device_file = await aiofiles.open(self._device_info.path, mode="w+b", buffering=0) - self.__written = 0 - return self - except Exception: - self.__region.exit() - raise - finally: - await self.__state_queue.put(self.get_state()) - - @aiotools.atomic - async def write_image_info(self, name: str, complete: bool) -> None: - assert self.__device_file - assert self._device_info - if self._device_info.size - self.__written > _IMAGE_INFO_SIZE: - await self.__device_file.seek(self._device_info.size - _IMAGE_INFO_SIZE) - await self.__write_to_device_file(_make_image_info_bytes(name, self.__written, complete)) - await self.__device_file.seek(0) - else: - get_logger().error("Can't write image info because device is full") + finally: + self.__region.exit() + await self.__state_queue.put(await self.get_state()) @aiotools.atomic async def write_image_chunk(self, chunk: bytes) -> int: - await self.__write_to_device_file(chunk) + assert self.__device_file + await aiotools.afile_write_now(self.__device_file, chunk) self.__written += len(chunk) return self.__written - @aiotools.atomic - async def __aexit__( - self, - _exc_type: Type[BaseException], - _exc: BaseException, - _tb: types.TracebackType, - ) -> None: + async def remove(self, name: str) -> None: + async with self.__working(): + raise MsdMultiNotSupported() - try: - await self.__close_device_file() - await self.__load_device_info() - finally: - self.__region.exit() - await self.__state_queue.put(self.get_state()) + # ===== + + @contextlib.asynccontextmanager + async def __working(self) -> AsyncGenerator[None, None]: + if not self.__device_info: + raise MsdOfflineError() + yield + + # ===== - async def __write_to_device_file(self, data: bytes) -> None: + async def __write_image_info(self, name: str, complete: bool) -> None: assert self.__device_file - await self.__device_file.write(data) - await self.__device_file.flush() - await aiotools.run_async(os.fsync, self.__device_file.fileno()) + assert self.__device_info + if self.__device_info.size - self.__written > _IMAGE_INFO_SIZE: + await self.__device_file.seek(self.__device_info.size - _IMAGE_INFO_SIZE) + await aiotools.afile_write_now(self.__device_file, _make_image_info_bytes(name, self.__written, complete)) + await self.__device_file.seek(0) + else: + get_logger().error("Can't write image info because device is full") async def __close_device_file(self) -> None: try: @@ -398,13 +386,13 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes while True: await asyncio.sleep(self.__init_delay) try: - self._device_info = await aiotools.run_async(_explore_device, self.__device_path) + self.__device_info = await aiotools.run_async(_explore_device, self.__device_path) break except asyncio.CancelledError: # pylint: disable=try-except-raise raise except Exception: if retries == 0: - self._device_info = None + self.__device_info = None raise MsdError("Can't load device info") get_logger().exception("Can't load device info; retries=%d", retries) retries -= 1 -- cgit v1.2.3