summaryrefslogtreecommitdiff
path: root/kvmd
diff options
context:
space:
mode:
authorDevaev Maxim <[email protected]>2019-10-29 02:16:12 +0300
committerDevaev Maxim <[email protected]>2019-11-07 01:27:07 +0300
commit10f8c2b3352c951549cc1249d6b24789fb94d688 (patch)
treecbf38bcf2716896db748f64bcae66375b7135287 /kvmd
parentf6214191af093560d5697cd7b1ea6f245ee95b98 (diff)
otg msd and big refactoring
Diffstat (limited to 'kvmd')
-rw-r--r--kvmd/aiotools.py11
-rw-r--r--kvmd/apps/kvmd/server.py56
-rw-r--r--kvmd/inotify.py335
-rw-r--r--kvmd/plugins/msd/__init__.py80
-rw-r--r--kvmd/plugins/msd/disabled.py47
-rw-r--r--kvmd/plugins/msd/otg.py108
-rw-r--r--kvmd/plugins/msd/otg/__init__.py531
-rw-r--r--kvmd/plugins/msd/otg/drive.py80
-rw-r--r--kvmd/plugins/msd/otg/helpers.py79
-rw-r--r--kvmd/plugins/msd/relay.py260
10 files changed, 1256 insertions, 331 deletions
diff --git a/kvmd/aiotools.py b/kvmd/aiotools.py
index 574c8bc7..98a2a524 100644
--- a/kvmd/aiotools.py
+++ b/kvmd/aiotools.py
@@ -20,6 +20,7 @@
# ========================================================================== #
+import os
import asyncio
import functools
import contextlib
@@ -34,6 +35,9 @@ from typing import AsyncGenerator
from typing import TypeVar
from typing import Any
+import aiofiles
+import aiofiles.base
+
from . import aioregion
from .logging import get_logger
@@ -118,3 +122,10 @@ async def unlock_only_on_exception(lock: asyncio.Lock) -> AsyncGenerator[None, N
except: # noqa: E722
lock.release()
raise
+
+
+# =====
+async def afile_write_now(afile: aiofiles.base.AiofilesContextManager, data: bytes) -> None:
+ await afile.write(data)
+ await afile.flush()
+ await run_async(os.fsync, afile.fileno())
diff --git a/kvmd/apps/kvmd/server.py b/kvmd/apps/kvmd/server.py
index 04030545..b4e1ad22 100644
--- a/kvmd/apps/kvmd/server.py
+++ b/kvmd/apps/kvmd/server.py
@@ -368,7 +368,7 @@ class Server: # pylint: disable=too-many-instance-attributes
self.__broadcast_event(_Events.INFO_STATE, (await self.__make_info())),
self.__broadcast_event(_Events.HID_STATE, self.__hid.get_state()),
self.__broadcast_event(_Events.ATX_STATE, self.__atx.get_state()),
- self.__broadcast_event(_Events.MSD_STATE, self.__msd.get_state()),
+ self.__broadcast_event(_Events.MSD_STATE, (await self.__msd.get_state())),
self.__broadcast_event(_Events.STREAMER_STATE, (await self.__streamer.get_state())),
])
async for msg in ws:
@@ -469,52 +469,60 @@ class Server: # pylint: disable=too-many-instance-attributes
@_exposed("GET", "/msd")
async def __msd_state_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
- return _json(self.__msd.get_state())
+ return _json(await self.__msd.get_state())
+
+ @_exposed("POST", "/msd/set_params")
+ async def __msd_set_params_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response:
+ params = {
+ key: validator(request.query.get(param))
+ for (param, key, validator) in [
+ ("image", "name", (lambda arg: str(arg).strip() and valid_msd_image_name(arg))),
+ ("cdrom", "cdrom", valid_bool),
+ ]
+ if request.query.get(param) is not None
+ }
+ await self.__msd.set_params(**params) # type: ignore
+ return _json()
@_exposed("POST", "/msd/connect")
async def __msd_connect_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
- return _json(await self.__msd.connect())
+ await self.__msd.connect()
+ return _json()
@_exposed("POST", "/msd/disconnect")
async def __msd_disconnect_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
- return _json(await self.__msd.disconnect())
-
- @_exposed("POST", "/msd/select")
- async def __msd_select_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response:
- image_name = valid_msd_image_name(request.query.get("image_name"))
- cdrom = valid_bool(request.query.get("cdrom", "true"))
- return _json(await self.__msd.select(image_name, cdrom))
-
- @_exposed("POST", "/msd/remove")
- async def __msd_remove_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response:
- return _json(await self.__msd.remove(valid_msd_image_name(request.query.get("image_name"))))
+ await self.__msd.disconnect()
+ return _json()
@_exposed("POST", "/msd/write")
async def __msd_write_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response:
assert self.__sync_chunk_size is not None
logger = get_logger(0)
reader = await request.multipart()
- image_name = ""
+ name = ""
written = 0
try:
- async with self.__msd:
- name_field = await _get_multipart_field(reader, "image_name")
- image_name = valid_msd_image_name((await name_field.read()).decode("utf-8"))
+ name_field = await _get_multipart_field(reader, "image")
+ name = valid_msd_image_name((await name_field.read()).decode("utf-8"))
- data_field = await _get_multipart_field(reader, "image_data")
+ data_field = await _get_multipart_field(reader, "data")
- logger.info("Writing image %r to MSD ...", image_name)
- await self.__msd.write_image_info(image_name, False)
+ async with self.__msd.write_image(name):
+ logger.info("Writing image %r to MSD ...", name)
while True:
chunk = await data_field.read_chunk(self.__sync_chunk_size)
if not chunk:
break
written = await self.__msd.write_image_chunk(chunk)
- await self.__msd.write_image_info(image_name, True)
finally:
if written != 0:
- logger.info("Written image %r with size=%d bytes to MSD", image_name, written)
- return _json({"image": {"name": image_name, "size": written}})
+ logger.info("Written image %r with size=%d bytes to MSD", name, written)
+ return _json({"image": {"name": name, "size": written}})
+
+ @_exposed("POST", "/msd/remove")
+ async def __msd_remove_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response:
+ await self.__msd.remove(valid_msd_image_name(request.query.get("image")))
+ return _json()
@_exposed("POST", "/msd/reset")
async def __msd_reset_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
diff --git a/kvmd/inotify.py b/kvmd/inotify.py
new file mode 100644
index 00000000..cdded981
--- /dev/null
+++ b/kvmd/inotify.py
@@ -0,0 +1,335 @@
+# ========================================================================== #
+# #
+# KVMD - The main Pi-KVM daemon. #
+# #
+# Copyright (C) 2018 Maxim Devaev <[email protected]> #
+# #
+# This source file is partially based on python-watchdog module. #
+# #
+# This program is free software: you can redistribute it and/or modify #
+# it under the terms of the GNU General Public License as published by #
+# the Free Software Foundation, either version 3 of the License, or #
+# (at your option) any later version. #
+# #
+# This program is distributed in the hope that it will be useful, #
+# but WITHOUT ANY WARRANTY; without even the implied warranty of #
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
+# GNU General Public License for more details. #
+# #
+# You should have received a copy of the GNU General Public License #
+# along with this program. If not, see <https://www.gnu.org/licenses/>. #
+# #
+# ========================================================================== #
+
+
+import sys
+import os
+import asyncio
+import asyncio.queues
+import ctypes
+import ctypes.util
+import struct
+import dataclasses
+import types
+import errno
+
+from ctypes import c_int
+from ctypes import c_char_p
+from ctypes import c_uint32
+
+from typing import Tuple
+from typing import List
+from typing import Dict
+from typing import Type
+from typing import Generator
+from typing import Optional
+
+from .logging import get_logger
+
+
+# =====
+def _load_libc() -> ctypes.CDLL:
+ try:
+ path = ctypes.util.find_library("c")
+ except (OSError, IOError, RuntimeError):
+ pass
+ else:
+ if path:
+ return ctypes.CDLL(path)
+
+ names = ["libc.so", "libc.so.6", "libc.so.0"]
+ for (index, name) in enumerate(names):
+ try:
+ return ctypes.CDLL(name)
+ except (OSError, IOError):
+ if index == len(names) - 1:
+ raise
+
+ raise RuntimeError("Where is libc?")
+
+
+_libc = _load_libc()
+
+
+def _get_libc_func(name: str, restype, argtypes=None): # type: ignore
+ return ctypes.CFUNCTYPE(restype, *(argtypes or []), use_errno=True)((name, _libc))
+
+
+_inotify_init = _get_libc_func("inotify_init", c_int)
+_inotify_add_watch = _get_libc_func("inotify_add_watch", c_int, [c_int, c_char_p, c_uint32])
+_inotify_rm_watch = _get_libc_func("inotify_rm_watch", c_int, [c_int, c_uint32])
+
+
+# =====
+_EVENT_HEAD_FMT = "iIII"
+_EVENT_HEAD_SIZE = struct.calcsize(_EVENT_HEAD_FMT)
+_EVENTS_BUFFER_LENGTH = 4096 * (_EVENT_HEAD_SIZE + 256) # count * (head + max_file_name_size + null_character)
+
+_FS_FALLBACK_ENCODING = "utf-8"
+_FS_ENCODING = (sys.getfilesystemencoding() or _FS_FALLBACK_ENCODING)
+
+
+# =====
+def _inotify_parsed_buffer(data: bytes) -> Generator[Tuple[int, int, int, bytes], None, None]:
+ offset = 0
+ while offset + _EVENT_HEAD_SIZE <= len(data):
+ (wd, mask, cookie, length) = struct.unpack_from("iIII", data, offset)
+ name = data[
+ offset + _EVENT_HEAD_SIZE # noqa: E203
+ :
+ offset + _EVENT_HEAD_SIZE + length
+ ].rstrip(b"\0")
+ offset += _EVENT_HEAD_SIZE + length
+ if wd >= 0:
+ yield (wd, mask, cookie, name)
+
+
+def _inotify_check(retval: int) -> int:
+ if retval < 0:
+ c_errno = ctypes.get_errno()
+ if c_errno == errno.ENOSPC: # pylint: disable=no-else-raise
+ raise OSError(c_errno, "Inotify watch limit reached")
+ elif c_errno == errno.EMFILE:
+ raise OSError(c_errno, "Inotify instance limit reached")
+ else:
+ raise OSError(c_errno, os.strerror(c_errno))
+ return retval
+
+
+def _fs_encode(path: str) -> bytes:
+ try:
+ return path.encode(_FS_ENCODING, "strict")
+ except UnicodeEncodeError:
+ return path.encode(_FS_FALLBACK_ENCODING, "strict")
+
+
+def _fs_decode(path: bytes) -> str:
+ try:
+ return path.decode(_FS_ENCODING, "strict")
+ except UnicodeDecodeError:
+ return path.decode(_FS_FALLBACK_ENCODING, "strict")
+
+
+# =====
+class InotifyMask:
+ # Userspace events
+ ACCESS = 0x00000001 # File was accessed
+ ATTRIB = 0x00000004 # Meta-data changed
+ CLOSE_WRITE = 0x00000008 # Writable file was closed
+ CLOSE_NOWRITE = 0x00000010 # Unwritable file closed
+ CREATE = 0x00000100 # Subfile was created
+ DELETE = 0x00000200 # Subfile was deleted
+ DELETE_SELF = 0x00000400 # Self was deleted
+ MODIFY = 0x00000002 # File was modified
+ MOVE_SELF = 0x00000800 # Self was moved
+ MOVED_FROM = 0x00000040 # File was moved from X
+ MOVED_TO = 0x00000080 # File was moved to Y
+ OPEN = 0x00000020 # File was opened
+
+ # Events sent by the kernel to a watch
+ IGNORED = 0x00008000 # File was ignored
+ ISDIR = 0x40000000 # Event occurred against directory
+ Q_OVERFLOW = 0x00004000 # Event queued overflowed
+ UNMOUNT = 0x00002000 # Backing file system was unmounted
+
+ # Helper userspace events
+# CLOSE = CLOSE_WRITE | CLOSE_NOWRITE # Close
+# MOVE = MOVED_FROM | MOVED_TO # Moves
+
+ # Helper for userspace events
+# ALL_EVENTS = (
+# ACCESS
+# | ATTRIB
+# | CLOSE_WRITE
+# | CLOSE_NOWRITE
+# | CREATE
+# | DELETE
+# | DELETE_SELF
+# | MODIFY
+# | MOVE_SELF
+# | MOVED_FROM
+# | MOVED_TO
+# | OPEN
+# )
+
+ # Helper for all modify events
+ ALL_MODIFY_EVENTS = (
+ CLOSE_WRITE
+ | CREATE
+ | DELETE
+ | DELETE_SELF
+ | MODIFY
+ | MOVE_SELF
+ | MOVED_FROM
+ | MOVED_TO
+ )
+
+ # Special flags for watch()
+# DONT_FOLLOW = 0x02000000 # Don't follow a symbolic link
+# EXCL_UNLINK = 0x04000000 # Exclude events on unlinked objects
+# MASK_CREATE = 0x10000000 # Don't overwrite existent watchers (since 4.18)
+# MASK_ADD = 0x20000000 # Add to the mask of an existing watch
+# ONESHOT = 0x80000000 # Only send event once
+# ONLYDIR = 0x01000000 # Only watch the path if it's a directory
+
+ @classmethod
+ def to_string(cls, mask: int) -> str:
+ flags: List[str] = []
+ for name in dir(cls):
+ if (
+ name[0].isupper()
+ and not name.startswith("ALL_")
+ and name not in ["CLOSE", "MOVE"]
+ and mask & getattr(cls, name)
+ ):
+ flags.append(name)
+ return "|".join(flags)
+
+
[email protected](frozen=True, repr=False)
+class InotifyEvent:
+ wd: int
+ mask: int
+ cookie: int
+ name: str
+ path: str
+
+ def __repr__(self) -> str:
+ return (
+ f"<InotifyEvent: wd={self.wd}, mask={InotifyMask.to_string(self.mask)},"
+ f" cookie={self.cookie}, name={self.name}, path={self.path}>"
+ )
+
+
+class Inotify:
+ def __init__(self) -> None:
+ self.__fd = -1
+
+ self.__wd_by_path: Dict[str, int] = {}
+ self.__path_by_wd: Dict[int, str] = {}
+
+ self.__moved: Dict[int, str] = {}
+
+ self.__events_queue: asyncio.queues.Queue = asyncio.Queue()
+
+ def watch(self, path: str, mask: int) -> None:
+ path = os.path.normpath(path)
+ assert path not in self.__wd_by_path, path
+ get_logger().info("Watching for %s: %s", path, InotifyMask.to_string(mask))
+ wd = _inotify_check(_inotify_add_watch(self.__fd, _fs_encode(path), mask))
+ self.__wd_by_path[path] = wd
+ self.__path_by_wd[wd] = path
+
+# def unwatch(self, path: str) -> None:
+# path = os.path.normpath(path)
+# assert path in self.__wd_by_path, path
+# get_logger().info("Unwatching %s", path)
+# wd = self.__wd_by_path[path]
+# _inotify_check(_inotify_rm_watch(self.__fd, wd))
+# del self.__wd_by_path[path]
+# del self.__path_by_wd[wd]
+
+# def has_events(self) -> bool:
+# return (not self.__events_queue.empty())
+
+ async def get_event(self, timeout: float) -> Optional[InotifyEvent]:
+ assert timeout > 0
+ try:
+ return (await asyncio.wait_for(self.__events_queue.get(), timeout=timeout))
+ except asyncio.TimeoutError:
+ return None
+
+ async def get_series(self, timeout: float) -> List[InotifyEvent]:
+ series: List[InotifyEvent] = []
+ event = await self.get_event(timeout)
+ if event:
+ series.append(event)
+ while event:
+ event = await self.get_event(timeout)
+ if event:
+ series.append(event)
+ return series
+
+ def __read_and_queue_events(self) -> None:
+ logger = get_logger()
+ for event in self.__read_parsed_events():
+ # XXX: Ни в коем случае не приводить self.__read_parsed_events() к списку.
+ # Он использует self.__wd_by_path и self.__path_by_wd, содержимое которых
+ # корректируется кодом ниже. В противном случае все сломается.
+
+ if event.mask & InotifyMask.MOVED_FROM:
+ self.__moved[event.cookie] = event.path # Save moved_from_path
+ elif event.mask & InotifyMask.MOVED_TO:
+ moved_from_path = self.__moved.pop(event.cookie, None)
+ if moved_from_path is not None:
+ wd = self.__wd_by_path.pop(moved_from_path, None)
+ if wd is not None:
+ self.__wd_by_path[event.path] = wd
+ self.__path_by_wd[wd] = event.path
+
+ if event.mask & InotifyMask.IGNORED:
+ ignored_path = self.__path_by_wd[event.wd]
+ if self.__wd_by_path[ignored_path] == event.wd:
+ logger.info("Unwatching %s because IGNORED was received", ignored_path)
+ del self.__wd_by_path[ignored_path]
+ continue
+
+ self.__events_queue.put_nowait(event)
+
+ def __read_parsed_events(self) -> Generator[InotifyEvent, None, None]:
+ for (wd, mask, cookie, name_bytes) in _inotify_parsed_buffer(self.__read_buffer()):
+ wd_path = self.__path_by_wd.get(wd, None)
+ if wd_path is not None:
+ name = _fs_decode(name_bytes)
+ path = (os.path.join(wd_path, name) if name else wd_path) # Avoid trailing slash
+ yield InotifyEvent(wd, mask, cookie, name, path)
+
+ def __read_buffer(self) -> bytes:
+ while True:
+ try:
+ return os.read(self.__fd, _EVENTS_BUFFER_LENGTH)
+ except OSError as err:
+ if err.errno == errno.EINTR:
+ pass
+
+ def __enter__(self) -> "Inotify":
+ assert self.__fd < 0
+ self.__fd = _inotify_check(_inotify_init())
+ asyncio.get_event_loop().add_reader(self.__fd, self.__read_and_queue_events)
+ return self
+
+ def __exit__(
+ self,
+ _exc_type: Type[BaseException],
+ _exc: BaseException,
+ _tb: types.TracebackType,
+ ) -> None:
+
+ if self.__fd >= 0:
+ asyncio.get_event_loop().remove_reader(self.__fd)
+ for wd in list(self.__wd_by_path.values()):
+ _inotify_rm_watch(self.__fd, wd)
+ try:
+ os.close(self.__fd)
+ except Exception:
+ pass
diff --git a/kvmd/plugins/msd/__init__.py b/kvmd/plugins/msd/__init__.py
index 899b6f47..f43a40c2 100644
--- a/kvmd/plugins/msd/__init__.py
+++ b/kvmd/plugins/msd/__init__.py
@@ -20,11 +20,12 @@
# ========================================================================== #
-import types
+import contextlib
from typing import Dict
from typing import Type
from typing import AsyncGenerator
+from typing import Optional
from .. import BasePlugin
from .. import get_plugin_class
@@ -44,19 +45,29 @@ class MsdOfflineError(MsdOperationError):
super().__init__("MSD is not found")
-class MsdAlreadyConnectedError(MsdOperationError):
+class MsdConnectedError(MsdOperationError):
def __init__(self) -> None:
- super().__init__("MSD is already connected to Server")
+ super().__init__("MSD is connected to Server, but shouldn't for this operation")
-class MsdAlreadyDisconnectedError(MsdOperationError):
+class MsdDisconnectedError(MsdOperationError):
def __init__(self) -> None:
- super().__init__("MSD is already disconnected from Server")
+ super().__init__("MSD is disconnected from Server, but should be for this operation")
-class MsdConnectedError(MsdOperationError):
+class MsdImageNotSelected(MsdOperationError):
+ def __init__(self) -> None:
+ super().__init__("The image is not selected")
+
+
+class MsdUnknownImageError(MsdOperationError):
+ def __init__(self) -> None:
+ super().__init__("The image is not found in the storage")
+
+
+class MsdImageExistsError(MsdOperationError):
def __init__(self) -> None:
- super().__init__("MSD connected to Server, but should not")
+ super().__init__("This image is already exists")
class MsdIsBusyError(MsdOperationError):
@@ -69,52 +80,51 @@ class MsdMultiNotSupported(MsdOperationError):
super().__init__("This MSD does not support storing multiple images")
+class MsdCdromNotSupported(MsdOperationError):
+ def __init__(self) -> None:
+ super().__init__("This MSD does not support CD-ROM emulation")
+
+
# =====
class BaseMsd(BasePlugin):
- def get_state(self) -> Dict:
- raise NotImplementedError
+ async def get_state(self) -> Dict:
+ raise NotImplementedError()
async def poll_state(self) -> AsyncGenerator[Dict, None]:
- yield {}
- raise NotImplementedError
+ if True: # pylint: disable=using-constant-test
+ # XXX: Vulture hack
+ raise NotImplementedError()
+ yield
async def reset(self) -> None:
- raise NotImplementedError
+ raise NotImplementedError()
async def cleanup(self) -> None:
pass
# =====
- async def connect(self) -> Dict:
- raise NotImplementedError
+ async def set_params(self, name: Optional[str]=None, cdrom: Optional[bool]=None) -> None:
+ raise NotImplementedError()
- async def disconnect(self) -> Dict:
- raise NotImplementedError
+ async def connect(self) -> None:
+ raise NotImplementedError()
- async def select(self, name: str, cdrom: bool) -> Dict:
- raise NotImplementedError
+ async def disconnect(self) -> None:
+ raise NotImplementedError()
- async def remove(self, name: str) -> Dict:
- raise NotImplementedError
-
- async def __aenter__(self) -> "BaseMsd":
- raise NotImplementedError
-
- async def write_image_info(self, name: str, complete: bool) -> None:
- raise NotImplementedError
+ @contextlib.asynccontextmanager
+ async def write_image(self, name: str) -> AsyncGenerator[None, None]:
+ if True: # pylint: disable=using-constant-test
+ # XXX: Vulture hack
+ raise NotImplementedError()
+ yield
async def write_image_chunk(self, chunk: bytes) -> int:
- raise NotImplementedError
-
- async def __aexit__(
- self,
- _exc_type: Type[BaseException],
- _exc: BaseException,
- _tb: types.TracebackType,
- ) -> None:
+ raise NotImplementedError()
- raise NotImplementedError
+ async def remove(self, name: str) -> None:
+ raise NotImplementedError()
# =====
diff --git a/kvmd/plugins/msd/disabled.py b/kvmd/plugins/msd/disabled.py
index eafbdd65..97835513 100644
--- a/kvmd/plugins/msd/disabled.py
+++ b/kvmd/plugins/msd/disabled.py
@@ -21,11 +21,11 @@
import asyncio
-import types
+import contextlib
from typing import Dict
-from typing import Type
from typing import AsyncGenerator
+from typing import Optional
from . import MsdOperationError
from . import BaseMsd
@@ -39,23 +39,22 @@ class MsdDisabledError(MsdOperationError):
# =====
class Plugin(BaseMsd):
- def get_state(self) -> Dict:
+ async def get_state(self) -> Dict:
return {
"enabled": False,
- "multi": False,
"online": False,
"busy": False,
- "uploading": False,
- "written": 0,
- "current": None,
"storage": None,
- "cdrom": None,
- "connected": False,
+ "drive": None,
+ "features": {
+ "multi": False,
+ "cdrom": False,
+ },
}
async def poll_state(self) -> AsyncGenerator[Dict, None]:
while True:
- yield self.get_state()
+ yield (await self.get_state())
await asyncio.sleep(60)
async def reset(self) -> None:
@@ -63,32 +62,24 @@ class Plugin(BaseMsd):
# =====
- async def connect(self) -> Dict:
+ async def set_params(self, name: Optional[str]=None, cdrom: Optional[bool]=None) -> None:
raise MsdDisabledError()
- async def disconnect(self) -> Dict:
+ async def connect(self) -> None:
raise MsdDisabledError()
- async def select(self, name: str, cdrom: bool) -> Dict:
+ async def disconnect(self) -> None:
raise MsdDisabledError()
- async def remove(self, name: str) -> Dict:
- raise MsdDisabledError()
-
- async def __aenter__(self) -> BaseMsd:
- raise MsdDisabledError()
-
- async def write_image_info(self, name: str, complete: bool) -> None:
- raise MsdDisabledError()
+ @contextlib.asynccontextmanager
+ async def write_image(self, name: str) -> AsyncGenerator[None, None]:
+ if True: # pylint: disable=using-constant-test
+ # XXX: Vulture hack
+ raise MsdDisabledError()
+ yield
async def write_image_chunk(self, chunk: bytes) -> int:
raise MsdDisabledError()
- async def __aexit__(
- self,
- _exc_type: Type[BaseException],
- _exc: BaseException,
- _tb: types.TracebackType,
- ) -> None:
-
+ async def remove(self, name: str) -> None:
raise MsdDisabledError()
diff --git a/kvmd/plugins/msd/otg.py b/kvmd/plugins/msd/otg.py
deleted file mode 100644
index bf636bcf..00000000
--- a/kvmd/plugins/msd/otg.py
+++ /dev/null
@@ -1,108 +0,0 @@
-# ========================================================================== #
-# #
-# KVMD - The main Pi-KVM daemon. #
-# #
-# Copyright (C) 2018 Maxim Devaev <[email protected]> #
-# #
-# This program is free software: you can redistribute it and/or modify #
-# it under the terms of the GNU General Public License as published by #
-# the Free Software Foundation, either version 3 of the License, or #
-# (at your option) any later version. #
-# #
-# This program is distributed in the hope that it will be useful, #
-# but WITHOUT ANY WARRANTY; without even the implied warranty of #
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
-# GNU General Public License for more details. #
-# #
-# You should have received a copy of the GNU General Public License #
-# along with this program. If not, see <https://www.gnu.org/licenses/>. #
-# #
-# ========================================================================== #
-
-
-import asyncio
-import types
-
-from typing import Dict
-from typing import Type
-from typing import AsyncGenerator
-
-from ...yamlconf import Option
-
-from ...validators.os import valid_abs_dir
-from ...validators.os import valid_command
-
-from . import MsdOperationError
-from . import BaseMsd
-
-
-# =====
-class MsdCliOnlyError(MsdOperationError):
- def __init__(self) -> None:
- super().__init__("Only CLI")
-
-
-# =====
-class Plugin(BaseMsd):
- @classmethod
- def get_plugin_options(cls) -> Dict:
- sudo = ["/usr/bin/sudo", "--non-interactive"]
- return {
- "storage": Option("/var/lib/kvmd/msd", type=valid_abs_dir, unpack_as="storage_path"),
- "remount_cmd": Option([*sudo, "/usr/bin/kvmd-helper-otgmsd-remount", "{mode}"], type=valid_command),
- "unlock_cmd": Option([*sudo, "/usr/bin/kvmd-helper-otgmsd-unlock", "unlock"], type=valid_command),
- }
-
- def get_state(self) -> Dict:
- return {
- "enabled": False,
- "multi": False,
- "online": False,
- "busy": False,
- "uploading": False,
- "written": 0,
- "current": None,
- "storage": None,
- "cdrom": None,
- "connected": False,
- }
-
- async def poll_state(self) -> AsyncGenerator[Dict, None]:
- while True:
- yield self.get_state()
- await asyncio.sleep(60)
-
- async def reset(self) -> None:
- raise MsdCliOnlyError()
-
- # =====
-
- async def connect(self) -> Dict:
- raise MsdCliOnlyError()
-
- async def disconnect(self) -> Dict:
- raise MsdCliOnlyError()
-
- async def select(self, name: str, cdrom: bool) -> Dict:
- raise MsdCliOnlyError()
-
- async def remove(self, name: str) -> Dict:
- raise MsdCliOnlyError()
-
- async def __aenter__(self) -> BaseMsd:
- raise MsdCliOnlyError()
-
- async def write_image_info(self, name: str, complete: bool) -> None:
- raise MsdCliOnlyError()
-
- async def write_image_chunk(self, chunk: bytes) -> int:
- raise MsdCliOnlyError()
-
- async def __aexit__(
- self,
- _exc_type: Type[BaseException],
- _exc: BaseException,
- _tb: types.TracebackType,
- ) -> None:
-
- raise MsdCliOnlyError()
diff --git a/kvmd/plugins/msd/otg/__init__.py b/kvmd/plugins/msd/otg/__init__.py
new file mode 100644
index 00000000..76ef7d07
--- /dev/null
+++ b/kvmd/plugins/msd/otg/__init__.py
@@ -0,0 +1,531 @@
+# ========================================================================== #
+# #
+# KVMD - The main Pi-KVM daemon. #
+# #
+# Copyright (C) 2018 Maxim Devaev <[email protected]> #
+# #
+# This program is free software: you can redistribute it and/or modify #
+# it under the terms of the GNU General Public License as published by #
+# the Free Software Foundation, either version 3 of the License, or #
+# (at your option) any later version. #
+# #
+# This program is distributed in the hope that it will be useful, #
+# but WITHOUT ANY WARRANTY; without even the implied warranty of #
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
+# GNU General Public License for more details. #
+# #
+# You should have received a copy of the GNU General Public License #
+# along with this program. If not, see <https://www.gnu.org/licenses/>. #
+# #
+# ========================================================================== #
+
+
+import os
+import asyncio
+import contextlib
+import dataclasses
+
+from typing import List
+from typing import Dict
+from typing import AsyncGenerator
+from typing import Optional
+
+import aiofiles
+import aiofiles.base
+
+from ....logging import get_logger
+
+from ....inotify import InotifyMask
+from ....inotify import Inotify
+
+from ....yamlconf import Option
+
+from ....validators.os import valid_abs_dir
+from ....validators.os import valid_command
+
+from .... import aiotools
+from .... import aioregion
+
+from .. import MsdError
+from .. import MsdOfflineError
+from .. import MsdConnectedError
+from .. import MsdDisconnectedError
+from .. import MsdImageNotSelected
+from .. import MsdUnknownImageError
+from .. import MsdImageExistsError
+from .. import MsdIsBusyError
+from .. import BaseMsd
+
+from .drive import Drive
+
+from .helpers import remount_storage
+from .helpers import unlock_drive
+
+
+# =====
[email protected](frozen=True)
+class _DriveImage:
+ name: str
+ path: str
+ size: int
+ complete: bool
+ in_storage: bool
+
+
[email protected](frozen=True)
+class _DriveState:
+ image: Optional[_DriveImage]
+ cdrom: bool
+ rw: bool
+
+
[email protected](frozen=True)
+class _StorageState:
+ size: int
+ free: int
+ images: Dict[str, _DriveImage]
+
+
+# =====
+class _VirtualDriveState:
+ image: Optional[_DriveImage]
+ connected: bool
+ cdrom: bool
+
+ @classmethod
+ def from_drive_state(cls, state: _DriveState) -> "_VirtualDriveState":
+ return _VirtualDriveState(
+ image=state.image,
+ connected=bool(state.image),
+ cdrom=state.cdrom,
+ )
+
+
+class _State:
+ def __init__(self, changes_queue: asyncio.queues.Queue) -> None:
+ self.__changes_queue = changes_queue
+
+ self.storage: Optional[_StorageState] = None
+ self.vd: Optional[_VirtualDriveState] = None
+
+ self._lock = asyncio.Lock()
+ self._region = aioregion.AioExclusiveRegion(MsdIsBusyError)
+
+ @contextlib.asynccontextmanager
+ async def busy(self, check_online: bool=True) -> AsyncGenerator[None, None]:
+ with self._region:
+ async with self._lock:
+ await self.__changes_queue.put(None)
+ if check_online:
+ if self.vd is None:
+ raise MsdOfflineError()
+ assert self.storage
+ yield
+ await self.__changes_queue.put(None)
+
+ def is_busy(self) -> bool:
+ return self._region.is_busy()
+
+
+# =====
+class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes
+ def __init__( # pylint: disable=super-init-not-called
+ self,
+ storage_path: str,
+
+ remount_cmd: List[str],
+ unlock_cmd: List[str],
+
+ gadget: str, # XXX: Not from options, see /kvmd/apps/kvmd/__init__.py for details
+ ) -> None:
+
+ self.__storage_path = os.path.normpath(storage_path)
+ self.__images_path = os.path.join(self.__storage_path, "images")
+ self.__meta_path = os.path.join(self.__storage_path, "meta")
+
+ self.__remount_cmd = remount_cmd
+ self.__unlock_cmd = unlock_cmd
+
+ self.__drive = Drive(gadget, instance=0, lun=0)
+
+ self.__new_file: Optional[aiofiles.base.AiofilesContextManager] = None
+ self.__new_file_written = 0
+
+ self.__changes_queue: asyncio.queues.Queue = asyncio.Queue()
+
+ self.__state = _State(self.__changes_queue)
+
+ logger = get_logger(0)
+ logger.info("Using OTG gadget %r as MSD", gadget)
+ aiotools.run_sync(self.__reload_state())
+
+ @classmethod
+ def get_plugin_options(cls) -> Dict:
+ sudo = ["/usr/bin/sudo", "--non-interactive"]
+ return {
+ "storage": Option("/var/lib/kvmd/msd", type=valid_abs_dir, unpack_as="storage_path"),
+ "remount_cmd": Option([*sudo, "/usr/bin/kvmd-helper-otgmsd-remount", "{mode}"], type=valid_command),
+ "unlock_cmd": Option([*sudo, "/usr/bin/kvmd-helper-otgmsd-unlock", "unlock"], type=valid_command),
+ }
+
+ async def get_state(self) -> Dict:
+ async with self.__state._lock: # pylint: disable=protected-access
+ storage: Optional[Dict] = None
+ if self.__state.storage:
+ storage = dataclasses.asdict(self.__state.storage)
+ for name in list(storage["images"]):
+ del storage["images"][name]["path"]
+ del storage["images"][name]["in_storage"]
+ storage["uploading"] = bool(self.__new_file)
+
+ vd: Optional[Dict] = None
+ if self.__state.vd:
+ vd = dataclasses.asdict(self.__state.vd)
+ if vd["image"]:
+ del vd["image"]["path"]
+
+ return {
+ "enabled": False, # FIXME
+ "online": bool(self.__state.vd),
+ "busy": self.__state.is_busy(),
+ "storage": storage,
+ "drive": vd,
+ "features": {
+ "multi": True,
+ "cdrom": True,
+ },
+ }
+
+ async def poll_state(self) -> AsyncGenerator[Dict, None]:
+ inotify_task = asyncio.create_task(self.__watch_inotify())
+ prev_state: Dict = {}
+ try:
+ while True:
+ if inotify_task.cancelled():
+ break
+ if inotify_task.done():
+ RuntimeError("Inotify task is dead")
+
+ try:
+ await asyncio.wait_for(self.__changes_queue.get(), timeout=0.1)
+ except asyncio.TimeoutError:
+ continue
+
+ state = await self.get_state()
+ if state != prev_state:
+ yield state
+ prev_state = state
+ finally:
+ if not inotify_task.done():
+ inotify_task.cancel()
+ await inotify_task
+
+ @aiotools.atomic
+ async def reset(self) -> None:
+ async with self.__state.busy(check_online=False):
+ try:
+ await self.__unlock_drive()
+ self.__drive.set_image_path("")
+ self.__drive.set_rw_flag(False)
+ self.__drive.set_cdrom_flag(False)
+ except asyncio.CancelledError:
+ raise
+ except Exception:
+ get_logger(0).exception("Can't reset MSD")
+
+ @aiotools.atomic
+ async def cleanup(self) -> None:
+ await self.__close_new_file()
+
+ # =====
+
+ @aiotools.atomic
+ async def set_params(self, name: Optional[str]=None, cdrom: Optional[bool]=None) -> None:
+ async with self.__state.busy():
+ assert self.__state.storage
+ assert self.__state.vd
+
+ if self.__state.vd.connected or self.__drive.get_image_path():
+ raise MsdConnectedError()
+
+ if name is not None:
+ if name:
+ image = self.__state.storage.images.get(name)
+ if image is None or not os.path.exists(image.path):
+ raise MsdUnknownImageError()
+ assert image.in_storage
+ self.__state.vd.image = image
+ else:
+ self.__state.vd.image = None
+
+ if cdrom is not None:
+ self.__state.vd.cdrom = cdrom
+
+ @aiotools.atomic
+ async def connect(self) -> None:
+ async with self.__state.busy():
+ assert self.__state.vd
+
+ if self.__state.vd.connected or self.__drive.get_image_path():
+ raise MsdConnectedError()
+ if self.__state.vd.image is None:
+ raise MsdImageNotSelected()
+
+ assert self.__state.vd.image.in_storage
+
+ if not os.path.exists(self.__state.vd.image.path):
+ raise MsdUnknownImageError()
+
+ await self.__unlock_drive()
+ self.__drive.set_cdrom_flag(self.__state.vd.cdrom)
+ self.__drive.set_image_path(self.__state.vd.image.path)
+ self.__state.vd.connected = True
+
+ @aiotools.atomic
+ async def disconnect(self) -> None:
+ async with self.__state.busy():
+ assert self.__state.vd
+
+ if not (self.__state.vd.connected or self.__drive.get_image_path()):
+ raise MsdDisconnectedError()
+
+ await self.__unlock_drive()
+ self.__drive.set_image_path("")
+ self.__state.vd.connected = False
+
+ @contextlib.asynccontextmanager
+ async def write_image(self, name: str) -> AsyncGenerator[None, None]:
+ try:
+ with self.__state._region: # pylint: disable=protected-access
+ try:
+ async with self.__state._lock: # pylint: disable=protected-access
+ await self.__changes_queue.put(None)
+ assert self.__state.storage
+ assert self.__state.vd
+
+ if self.__state.vd.connected or self.__drive.get_image_path():
+ raise MsdConnectedError()
+
+ path = os.path.join(self.__images_path, name)
+ if name in self.__state.storage.images or os.path.exists(path):
+ raise MsdImageExistsError()
+
+ await self.__remount_storage(rw=True)
+ self.__set_image_complete(name, False)
+ self.__new_file_written = 0
+ self.__new_file = await aiofiles.open(path, mode="w+b", buffering=0)
+
+ await self.__changes_queue.put(None)
+ yield
+ self.__set_image_complete(name, True)
+
+ finally:
+ await self.__close_new_file()
+ try:
+ await self.__remount_storage(rw=False)
+ except asyncio.CancelledError: # pylint: disable=try-except-raise
+ raise
+ except Exception:
+ pass
+ finally:
+ await self.__changes_queue.put(None)
+
+ @aiotools.atomic
+ async def write_image_chunk(self, chunk: bytes) -> int:
+ assert self.__new_file
+ await aiotools.afile_write_now(self.__new_file, chunk)
+ self.__new_file_written += len(chunk)
+ return self.__new_file_written
+
+ async def remove(self, name: str) -> None:
+ async with self.__state.busy():
+ assert self.__state.storage
+ assert self.__state.vd
+
+ if self.__state.vd.connected or self.__drive.get_image_path():
+ raise MsdConnectedError()
+
+ image = self.__state.storage.images.get(name)
+ if image is None or not os.path.exists(image.path):
+ raise MsdUnknownImageError()
+ assert image.in_storage
+
+ if self.__state.vd.image == image:
+ self.__state.vd.image = None
+ del self.__state.storage.images[name]
+
+ await self.__remount_storage(rw=True)
+ os.remove(image.path)
+ self.__set_image_complete(name, False)
+ await self.__remount_storage(rw=False)
+
+ # =====
+
+ async def __close_new_file(self) -> None:
+ try:
+ if self.__new_file:
+ get_logger().info("Closing new image file ...")
+ await self.__new_file.close()
+ except asyncio.CancelledError: # pylint: disable=try-except-raise
+ raise
+ except Exception:
+ get_logger().exception("Can't close device file")
+ finally:
+ self.__new_file = None
+ self.__new_file_written = 0
+
+ # =====
+
+ async def __watch_inotify(self) -> None:
+ logger = get_logger(0)
+ while True:
+ try:
+ while True:
+ # Активно ждем, пока не будут на месте все каталоги.
+ await self.__reload_state()
+ await self.__changes_queue.put(None)
+ if self.__state.vd:
+ break
+ await asyncio.sleep(5)
+
+ with Inotify() as inotify:
+ inotify.watch(self.__images_path, InotifyMask.ALL_MODIFY_EVENTS)
+ inotify.watch(self.__meta_path, InotifyMask.ALL_MODIFY_EVENTS)
+ inotify.watch(self.__drive.get_sysfs_path(), InotifyMask.ALL_MODIFY_EVENTS)
+
+ # После установки вотчеров еще раз проверяем стейт, чтобы ничего не потерять
+ await self.__reload_state()
+ await self.__changes_queue.put(None)
+
+ while self.__state.vd: # Если живы после предыдущей проверки
+ need_restart = False
+ need_reload_state = False
+ for event in (await inotify.get_series(timeout=1)):
+ need_reload_state = True
+ if event.mask & (InotifyMask.DELETE_SELF | InotifyMask.MOVE_SELF | InotifyMask.UNMOUNT):
+ # Если выгрузили OTG, что-то отмонтировали или делают еще какую-то странную фигню
+ logger.warning("Got fatal inotify event: %s; reinitializing MSD ...", event)
+ need_restart = True
+ break
+ if need_restart:
+ break
+ if need_reload_state:
+ await self.__reload_state()
+ await self.__changes_queue.put(None)
+ except asyncio.CancelledError: # pylint: disable=try-except-raise
+ raise
+ except Exception:
+ logger.exception("Unexpected MSD watcher error")
+
+ async def __reload_state(self) -> None:
+ logger = get_logger(0)
+ async with self.__state._lock: # pylint: disable=protected-access
+ try:
+ drive_state = self.__get_drive_state()
+ if drive_state.rw:
+ # Внештатное использование MSD, ломаемся
+ raise MsdError("MSD has been switched to RW-mode manually")
+
+ if self.__state.vd is None and drive_state.image is None:
+ # Если только что включились и образ не подключен - попробовать
+ # перемонтировать хранилище (и создать images и meta).
+ logger.info("Probing to remount storage ...")
+ await self.__remount_storage(rw=True)
+ await self.__remount_storage(rw=False)
+
+ storage_state = self.__get_storage_state()
+ except asyncio.CancelledError: # pylint: disable=try-except-raise
+ raise
+ except Exception:
+ logger.exception("Error while reloading MSD state; switching to offline")
+ self.__state.storage = None
+ self.__state.vd = None
+ else:
+ self.__state.storage = storage_state
+ if drive_state.image:
+ # При подключенном образе виртуальный стейт заменяется реальным
+ self.__state.vd = _VirtualDriveState.from_drive_state(drive_state)
+ else:
+ if self.__state.vd is None:
+ # Если раньше MSD был отключен
+ self.__state.vd = _VirtualDriveState.from_drive_state(drive_state)
+
+ if (
+ self.__state.vd.image
+ and (not self.__state.vd.image.in_storage or not os.path.exists(self.__state.vd.image.path))
+ ):
+ # Если только что отключили ручной образ вне хранилища или ранее выбранный образ был удален
+ self.__state.vd.image = None
+
+ self.__state.vd.connected = False
+
+ # =====
+
+ def __get_storage_state(self) -> _StorageState:
+ images: Dict[str, _DriveImage] = {}
+ for name in os.listdir(self.__images_path):
+ path = os.path.join(self.__images_path, name)
+ if os.path.exists(path):
+ size = self.__get_file_size(path)
+ if size >= 0:
+ images[name] = _DriveImage(
+ name=name,
+ path=path,
+ size=size,
+ complete=self.__is_image_complete(name),
+ in_storage=True,
+ )
+ st = os.statvfs(self.__storage_path)
+ return _StorageState(
+ size=(st.f_blocks * st.f_frsize),
+ free=(st.f_bavail * st.f_frsize),
+ images=images,
+ )
+
+ def __get_drive_state(self) -> _DriveState:
+ image: Optional[_DriveImage] = None
+ path = self.__drive.get_image_path()
+ if path:
+ name = os.path.basename(path)
+ in_storage = (os.path.dirname(path) == self.__images_path)
+ image = _DriveImage(
+ name=name,
+ path=path,
+ size=max(self.__get_file_size(path), 0),
+ complete=(self.__is_image_complete(name) if in_storage else True),
+ in_storage=in_storage,
+ )
+ return _DriveState(
+ image=image,
+ cdrom=self.__drive.get_cdrom_flag(),
+ rw=self.__drive.get_rw_flag(),
+ )
+
+ # =====
+
+ def __get_file_size(self, path: str) -> int:
+ try:
+ return os.path.getsize(path)
+ except Exception as err:
+ get_logger().warning("Can't get size of file %s: %s", path, err)
+ return -1
+
+ def __is_image_complete(self, name: str) -> bool:
+ return os.path.exists(os.path.join(self.__meta_path, name + ".complete"))
+
+ def __set_image_complete(self, name: str, flag: bool) -> None:
+ path = os.path.join(self.__meta_path, name + ".complete")
+ if flag:
+ open(path, "w").close()
+ else:
+ if os.path.exists(path):
+ os.remove(path)
+
+ # =====
+
+ async def __remount_storage(self, rw: bool) -> None:
+ await remount_storage(self.__remount_cmd, rw)
+
+ async def __unlock_drive(self) -> None:
+ await unlock_drive(self.__unlock_cmd)
diff --git a/kvmd/plugins/msd/otg/drive.py b/kvmd/plugins/msd/otg/drive.py
new file mode 100644
index 00000000..e6457527
--- /dev/null
+++ b/kvmd/plugins/msd/otg/drive.py
@@ -0,0 +1,80 @@
+# ========================================================================== #
+# #
+# KVMD - The main Pi-KVM daemon. #
+# #
+# Copyright (C) 2018 Maxim Devaev <[email protected]> #
+# #
+# This program is free software: you can redistribute it and/or modify #
+# it under the terms of the GNU General Public License as published by #
+# the Free Software Foundation, either version 3 of the License, or #
+# (at your option) any later version. #
+# #
+# This program is distributed in the hope that it will be useful, #
+# but WITHOUT ANY WARRANTY; without even the implied warranty of #
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
+# GNU General Public License for more details. #
+# #
+# You should have received a copy of the GNU General Public License #
+# along with this program. If not, see <https://www.gnu.org/licenses/>. #
+# #
+# ========================================================================== #
+
+
+import os
+import errno
+
+from .. import MsdOperationError
+
+
+# =====
+class MsdDriveLockedError(MsdOperationError):
+ def __init__(self) -> None:
+ super().__init__("MSD drive is locked on IO operation")
+
+
+# =====
+class Drive:
+ def __init__(self, gadget: str, instance: int, lun: int) -> None:
+ self.__path = os.path.join(
+ "/sys/kernel/config/usb_gadget",
+ gadget,
+ f"functions/mass_storage.usb{instance}/lun.{lun}",
+ )
+
+ def get_sysfs_path(self) -> str:
+ return self.__path
+
+ # =====
+
+ def set_image_path(self, path: str) -> None:
+ self.__set_param("file", path)
+
+ def get_image_path(self) -> str:
+ return self.__get_param("file")
+
+ def set_cdrom_flag(self, flag: bool) -> None:
+ self.__set_param("cdrom", str(int(flag)))
+
+ def get_cdrom_flag(self) -> bool:
+ return bool(int(self.__get_param("cdrom")))
+
+ def set_rw_flag(self, flag: bool) -> None:
+ self.__set_param("ro", str(int(not flag)))
+
+ def get_rw_flag(self) -> bool:
+ return (not int(self.__get_param("ro")))
+
+ # =====
+
+ def __get_param(self, param: str) -> str:
+ with open(os.path.join(self.__path, param)) as param_file:
+ return param_file.read().strip()
+
+ def __set_param(self, param: str, value: str) -> None:
+ try:
+ with open(os.path.join(self.__path, param), "w") as param_file:
+ param_file.write(value + "\n")
+ except OSError as err:
+ if err.errno == errno.EBUSY:
+ raise MsdDriveLockedError()
+ raise
diff --git a/kvmd/plugins/msd/otg/helpers.py b/kvmd/plugins/msd/otg/helpers.py
new file mode 100644
index 00000000..ee759ef0
--- /dev/null
+++ b/kvmd/plugins/msd/otg/helpers.py
@@ -0,0 +1,79 @@
+# ========================================================================== #
+# #
+# KVMD - The main Pi-KVM daemon. #
+# #
+# Copyright (C) 2018 Maxim Devaev <[email protected]> #
+# #
+# This program is free software: you can redistribute it and/or modify #
+# it under the terms of the GNU General Public License as published by #
+# the Free Software Foundation, either version 3 of the License, or #
+# (at your option) any later version. #
+# #
+# This program is distributed in the hope that it will be useful, #
+# but WITHOUT ANY WARRANTY; without even the implied warranty of #
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
+# GNU General Public License for more details. #
+# #
+# You should have received a copy of the GNU General Public License #
+# along with this program. If not, see <https://www.gnu.org/licenses/>. #
+# #
+# ========================================================================== #
+
+
+import signal
+import asyncio
+import asyncio.subprocess
+
+from typing import List
+
+from ....logging import get_logger
+
+from .. import MsdError
+
+
+# =====
+async def remount_storage(base_cmd: List[str], rw: bool) -> None:
+ logger = get_logger(0)
+ mode = ("rw" if rw else "ro")
+ cmd = [
+ part.format(mode=mode)
+ for part in base_cmd
+ ]
+ logger.info("Remounting internal storage to %s ...", mode.upper())
+ try:
+ await _run_helper(cmd)
+ except Exception:
+ logger.error("Can't remount internal storage")
+ raise
+
+
+async def unlock_drive(base_cmd: List[str]) -> None:
+ logger = get_logger(0)
+ logger.info("Unlocking the drive ...")
+ try:
+ await _run_helper(base_cmd)
+ except Exception:
+ logger.error("Can't unlock the drive")
+ raise
+
+
+# =====
+async def _run_helper(cmd: List[str]) -> None:
+ logger = get_logger(0)
+ logger.info("Executing helper %s ...", cmd)
+
+ proc = await asyncio.create_subprocess_exec(
+ *cmd,
+ stdout=asyncio.subprocess.PIPE,
+ stderr=asyncio.subprocess.STDOUT,
+ preexec_fn=(lambda: signal.signal(signal.SIGINT, signal.SIG_IGN)),
+ )
+
+ stdout = (await proc.communicate())[0].decode(errors="ignore").strip()
+ if stdout:
+ log = (logger.info if proc.returncode == 0 else logger.error)
+ for line in stdout.split("\n"):
+ log("Console: %s", line)
+
+ if proc.returncode != 0:
+ raise MsdError(f"Error while helper execution: pid={proc.pid}; retcode={proc.returncode}")
diff --git a/kvmd/plugins/msd/relay.py b/kvmd/plugins/msd/relay.py
index eff22883..46b5a371 100644
--- a/kvmd/plugins/msd/relay.py
+++ b/kvmd/plugins/msd/relay.py
@@ -26,16 +26,13 @@ import fcntl
import struct
import asyncio
import asyncio.queues
+import contextlib
import dataclasses
-import types
from typing import Dict
from typing import IO
-from typing import Callable
-from typing import Type
from typing import AsyncGenerator
from typing import Optional
-from typing import Any
import aiofiles
import aiofiles.base
@@ -57,11 +54,11 @@ from ...validators.hw import valid_gpio_pin
from . import MsdError
from . import MsdOfflineError
-from . import MsdAlreadyConnectedError
-from . import MsdAlreadyDisconnectedError
from . import MsdConnectedError
+from . import MsdDisconnectedError
from . import MsdIsBusyError
from . import MsdMultiNotSupported
+from . import MsdCdromNotSupported
from . import BaseMsd
@@ -83,11 +80,11 @@ class _DeviceInfo:
_IMAGE_INFO_SIZE = 4096
_IMAGE_INFO_MAGIC_SIZE = 16
-_IMAGE_INFO_IMAGE_NAME_SIZE = 256
-_IMAGE_INFO_PADS_SIZE = _IMAGE_INFO_SIZE - _IMAGE_INFO_IMAGE_NAME_SIZE - 1 - 8 - _IMAGE_INFO_MAGIC_SIZE * 8
+_IMAGE_INFO_NAME_SIZE = 256
+_IMAGE_INFO_PADS_SIZE = _IMAGE_INFO_SIZE - _IMAGE_INFO_NAME_SIZE - 1 - 8 - _IMAGE_INFO_MAGIC_SIZE * 8
_IMAGE_INFO_FORMAT = ">%dL%dc?Q%dx%dL" % (
_IMAGE_INFO_MAGIC_SIZE,
- _IMAGE_INFO_IMAGE_NAME_SIZE,
+ _IMAGE_INFO_NAME_SIZE,
_IMAGE_INFO_PADS_SIZE,
_IMAGE_INFO_MAGIC_SIZE,
)
@@ -100,8 +97,8 @@ def _make_image_info_bytes(name: str, size: int, complete: bool) -> bytes:
*_IMAGE_INFO_MAGIC,
*memoryview(( # type: ignore
name.encode("utf-8")
- + b"\x00" * _IMAGE_INFO_IMAGE_NAME_SIZE
- )[:_IMAGE_INFO_IMAGE_NAME_SIZE]).cast("c"),
+ + b"\x00" * _IMAGE_INFO_NAME_SIZE
+ )[:_IMAGE_INFO_NAME_SIZE]).cast("c"),
complete,
size,
*_IMAGE_INFO_MAGIC,
@@ -117,11 +114,15 @@ def _parse_image_info_bytes(data: bytes) -> Optional[_ImageInfo]:
magic_begin = parsed[:_IMAGE_INFO_MAGIC_SIZE]
magic_end = parsed[-_IMAGE_INFO_MAGIC_SIZE:]
if magic_begin == magic_end == _IMAGE_INFO_MAGIC:
- image_name_bytes = b"".join(parsed[_IMAGE_INFO_MAGIC_SIZE:_IMAGE_INFO_MAGIC_SIZE + _IMAGE_INFO_IMAGE_NAME_SIZE])
+ image_name_bytes = b"".join(parsed[
+ _IMAGE_INFO_MAGIC_SIZE # noqa: E203
+ :
+ _IMAGE_INFO_MAGIC_SIZE + _IMAGE_INFO_NAME_SIZE
+ ])
return _ImageInfo(
name=image_name_bytes.decode("utf-8", errors="ignore").strip("\x00").strip(),
- size=parsed[_IMAGE_INFO_MAGIC_SIZE + _IMAGE_INFO_IMAGE_NAME_SIZE + 1],
- complete=parsed[_IMAGE_INFO_MAGIC_SIZE + _IMAGE_INFO_IMAGE_NAME_SIZE],
+ size=parsed[_IMAGE_INFO_MAGIC_SIZE + _IMAGE_INFO_NAME_SIZE + 1],
+ complete=parsed[_IMAGE_INFO_MAGIC_SIZE + _IMAGE_INFO_NAME_SIZE],
)
return None
@@ -152,14 +153,7 @@ def _explore_device(device_path: str) -> _DeviceInfo:
)
-def _msd_working(method: Callable) -> Callable:
- async def wrapper(self: "Plugin", *args: Any, **kwargs: Any) -> Any:
- if not self._device_info: # pylint: disable=protected-access
- raise MsdOfflineError()
- return (await method(self, *args, **kwargs))
- return wrapper
-
-
+# =====
class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes
def __init__( # pylint: disable=super-init-not-called
self,
@@ -182,10 +176,11 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes
self.__region = aioregion.AioExclusiveRegion(MsdIsBusyError)
- self._device_info: Optional[_DeviceInfo] = None
+ self.__device_info: Optional[_DeviceInfo] = None
+ self.__connected = False
+
self.__device_file: Optional[aiofiles.base.AiofilesContextManager] = None
self.__written = 0
- self.__on_kvm = True
self.__state_queue: asyncio.queues.Queue = asyncio.Queue()
@@ -209,27 +204,29 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes
"reset_delay": Option(1.0, type=valid_float_f01),
}
- def get_state(self) -> Dict:
- current: Optional[Dict] = None
+ async def get_state(self) -> Dict:
storage: Optional[Dict] = None
- if self._device_info:
+ drive: Optional[Dict] = None
+ if self.__device_info:
storage = {
- "size": self._device_info.size,
- "free": self._device_info.free,
+ "size": self.__device_info.size,
+ "free": self.__device_info.free,
+ "uploading": bool(self.__device_file)
+ }
+ drive = {
+ "image": (self.__device_info.image and dataclasses.asdict(self.__device_info.image)),
+ "connected": self.__connected,
}
- if self._device_info.image:
- current = dataclasses.asdict(self._device_info.image)
return {
"enabled": True,
- "multi": False,
- "online": bool(self._device_info),
+ "online": bool(self.__device_info),
"busy": self.__region.is_busy(),
- "uploading": bool(self.__device_file),
- "written": self.__written,
- "current": current,
"storage": storage,
- "cdrom": None,
- "connected": (not self.__on_kvm),
+ "drive": drive,
+ "features": {
+ "multi": False,
+ "cdrom": False,
+ },
}
async def poll_state(self) -> AsyncGenerator[Dict, None]:
@@ -250,7 +247,7 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes
gpio.write(self.__reset_pin, False)
gpio.write(self.__target_pin, False)
- self.__on_kvm = True
+ self.__connected = False
await self.__load_device_info()
get_logger(0).info("MSD reset has been successful")
@@ -259,7 +256,7 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes
gpio.write(self.__reset_pin, False)
finally:
self.__region.exit()
- await self.__state_queue.put(self.get_state())
+ await self.__state_queue.put(await self.get_state())
@aiotools.atomic
async def cleanup(self) -> None:
@@ -269,116 +266,107 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes
# =====
- @_msd_working
+ async def set_params(self, name: Optional[str]=None, cdrom: Optional[bool]=None) -> None:
+ async with self.__working():
+ if name is not None:
+ raise MsdMultiNotSupported()
+ if cdrom is not None:
+ raise MsdCdromNotSupported()
+
@aiotools.atomic
- async def connect(self) -> Dict:
- notify = False
- state: Dict = {}
- try:
- with self.__region:
- if not self.__on_kvm:
- raise MsdAlreadyConnectedError()
- notify = True
+ async def connect(self) -> None:
+ async with self.__working():
+ notify = False
+ try:
+ with self.__region:
+ if self.__connected:
+ raise MsdConnectedError()
+ notify = True
+
+ gpio.write(self.__target_pin, True)
+ self.__connected = True
+ get_logger(0).info("MSD switched to Server")
+ finally:
+ if notify:
+ await self.__state_queue.put(await self.get_state())
- gpio.write(self.__target_pin, True)
- self.__on_kvm = False
- get_logger(0).info("MSD switched to Server")
+ @aiotools.atomic
+ async def disconnect(self) -> None:
+ async with self.__working():
+ notify = False
+ try:
+ with self.__region:
+ if not self.__connected:
+ raise MsdDisconnectedError()
+ notify = True
+
+ gpio.write(self.__target_pin, False)
+ try:
+ await self.__load_device_info()
+ except Exception:
+ if self.__connected:
+ gpio.write(self.__target_pin, True)
+ raise
+ self.__connected = False
+ get_logger(0).info("MSD switched to KVM: %s", self.__device_info)
+ finally:
+ if notify:
+ await self.__state_queue.put(await self.get_state())
- state = self.get_state()
- return state
- finally:
- if notify:
- await self.__state_queue.put(state or self.get_state())
+ @contextlib.asynccontextmanager
+ async def write_image(self, name: str) -> AsyncGenerator[None, None]:
+ async with self.__working():
+ self.__region.enter()
+ try:
+ assert self.__device_info
+ if self.__connected:
+ raise MsdConnectedError()
- @_msd_working
- @aiotools.atomic
- async def disconnect(self) -> Dict:
- notify = False
- state: Dict = {}
- try:
- with self.__region:
- if self.__on_kvm:
- raise MsdAlreadyDisconnectedError()
- notify = True
+ self.__device_file = await aiofiles.open(self.__device_info.path, mode="w+b", buffering=0)
+ self.__written = 0
- gpio.write(self.__target_pin, False)
+ await self.__write_image_info(name, complete=False)
+ await self.__state_queue.put(await self.get_state())
+ yield
+ await self.__write_image_info(name, complete=True)
+ finally:
try:
+ await self.__close_device_file()
await self.__load_device_info()
- except Exception:
- if not self.__on_kvm:
- gpio.write(self.__target_pin, True)
- raise
- self.__on_kvm = True
- get_logger(0).info("MSD switched to KVM: %s", self._device_info)
-
- state = self.get_state()
- return state
- finally:
- if notify:
- await self.__state_queue.put(state or self.get_state())
-
- @_msd_working
- async def select(self, name: str, cdrom: bool) -> Dict:
- raise MsdMultiNotSupported()
-
- @_msd_working
- async def remove(self, name: str) -> Dict:
- raise MsdMultiNotSupported()
-
- @_msd_working
- @aiotools.atomic
- async def __aenter__(self) -> "Plugin":
- assert self._device_info
- self.__region.enter()
- try:
- if not self.__on_kvm:
- raise MsdConnectedError()
- self.__device_file = await aiofiles.open(self._device_info.path, mode="w+b", buffering=0)
- self.__written = 0
- return self
- except Exception:
- self.__region.exit()
- raise
- finally:
- await self.__state_queue.put(self.get_state())
-
- @aiotools.atomic
- async def write_image_info(self, name: str, complete: bool) -> None:
- assert self.__device_file
- assert self._device_info
- if self._device_info.size - self.__written > _IMAGE_INFO_SIZE:
- await self.__device_file.seek(self._device_info.size - _IMAGE_INFO_SIZE)
- await self.__write_to_device_file(_make_image_info_bytes(name, self.__written, complete))
- await self.__device_file.seek(0)
- else:
- get_logger().error("Can't write image info because device is full")
+ finally:
+ self.__region.exit()
+ await self.__state_queue.put(await self.get_state())
@aiotools.atomic
async def write_image_chunk(self, chunk: bytes) -> int:
- await self.__write_to_device_file(chunk)
+ assert self.__device_file
+ await aiotools.afile_write_now(self.__device_file, chunk)
self.__written += len(chunk)
return self.__written
- @aiotools.atomic
- async def __aexit__(
- self,
- _exc_type: Type[BaseException],
- _exc: BaseException,
- _tb: types.TracebackType,
- ) -> None:
+ async def remove(self, name: str) -> None:
+ async with self.__working():
+ raise MsdMultiNotSupported()
- try:
- await self.__close_device_file()
- await self.__load_device_info()
- finally:
- self.__region.exit()
- await self.__state_queue.put(self.get_state())
+ # =====
+
+ @contextlib.asynccontextmanager
+ async def __working(self) -> AsyncGenerator[None, None]:
+ if not self.__device_info:
+ raise MsdOfflineError()
+ yield
+
+ # =====
- async def __write_to_device_file(self, data: bytes) -> None:
+ async def __write_image_info(self, name: str, complete: bool) -> None:
assert self.__device_file
- await self.__device_file.write(data)
- await self.__device_file.flush()
- await aiotools.run_async(os.fsync, self.__device_file.fileno())
+ assert self.__device_info
+ if self.__device_info.size - self.__written > _IMAGE_INFO_SIZE:
+ await self.__device_file.seek(self.__device_info.size - _IMAGE_INFO_SIZE)
+ await aiotools.afile_write_now(self.__device_file, _make_image_info_bytes(name, self.__written, complete))
+ await self.__device_file.seek(0)
+ else:
+ get_logger().error("Can't write image info because device is full")
async def __close_device_file(self) -> None:
try:
@@ -398,13 +386,13 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes
while True:
await asyncio.sleep(self.__init_delay)
try:
- self._device_info = await aiotools.run_async(_explore_device, self.__device_path)
+ self.__device_info = await aiotools.run_async(_explore_device, self.__device_path)
break
except asyncio.CancelledError: # pylint: disable=try-except-raise
raise
except Exception:
if retries == 0:
- self._device_info = None
+ self.__device_info = None
raise MsdError("Can't load device info")
get_logger().exception("Can't load device info; retries=%d", retries)
retries -= 1