summaryrefslogtreecommitdiff
path: root/kvmd/plugins/msd/otg
diff options
context:
space:
mode:
authorDevaev Maxim <[email protected]>2019-10-29 02:16:12 +0300
committerDevaev Maxim <[email protected]>2019-11-07 01:27:07 +0300
commit10f8c2b3352c951549cc1249d6b24789fb94d688 (patch)
treecbf38bcf2716896db748f64bcae66375b7135287 /kvmd/plugins/msd/otg
parentf6214191af093560d5697cd7b1ea6f245ee95b98 (diff)
otg msd and big refactoring
Diffstat (limited to 'kvmd/plugins/msd/otg')
-rw-r--r--kvmd/plugins/msd/otg/__init__.py531
-rw-r--r--kvmd/plugins/msd/otg/drive.py80
-rw-r--r--kvmd/plugins/msd/otg/helpers.py79
3 files changed, 690 insertions, 0 deletions
diff --git a/kvmd/plugins/msd/otg/__init__.py b/kvmd/plugins/msd/otg/__init__.py
new file mode 100644
index 00000000..76ef7d07
--- /dev/null
+++ b/kvmd/plugins/msd/otg/__init__.py
@@ -0,0 +1,531 @@
+# ========================================================================== #
+# #
+# KVMD - The main Pi-KVM daemon. #
+# #
+# Copyright (C) 2018 Maxim Devaev <[email protected]> #
+# #
+# This program is free software: you can redistribute it and/or modify #
+# it under the terms of the GNU General Public License as published by #
+# the Free Software Foundation, either version 3 of the License, or #
+# (at your option) any later version. #
+# #
+# This program is distributed in the hope that it will be useful, #
+# but WITHOUT ANY WARRANTY; without even the implied warranty of #
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
+# GNU General Public License for more details. #
+# #
+# You should have received a copy of the GNU General Public License #
+# along with this program. If not, see <https://www.gnu.org/licenses/>. #
+# #
+# ========================================================================== #
+
+
+import os
+import asyncio
+import contextlib
+import dataclasses
+
+from typing import List
+from typing import Dict
+from typing import AsyncGenerator
+from typing import Optional
+
+import aiofiles
+import aiofiles.base
+
+from ....logging import get_logger
+
+from ....inotify import InotifyMask
+from ....inotify import Inotify
+
+from ....yamlconf import Option
+
+from ....validators.os import valid_abs_dir
+from ....validators.os import valid_command
+
+from .... import aiotools
+from .... import aioregion
+
+from .. import MsdError
+from .. import MsdOfflineError
+from .. import MsdConnectedError
+from .. import MsdDisconnectedError
+from .. import MsdImageNotSelected
+from .. import MsdUnknownImageError
+from .. import MsdImageExistsError
+from .. import MsdIsBusyError
+from .. import BaseMsd
+
+from .drive import Drive
+
+from .helpers import remount_storage
+from .helpers import unlock_drive
+
+
+# =====
[email protected](frozen=True)
+class _DriveImage:
+ name: str
+ path: str
+ size: int
+ complete: bool
+ in_storage: bool
+
+
[email protected](frozen=True)
+class _DriveState:
+ image: Optional[_DriveImage]
+ cdrom: bool
+ rw: bool
+
+
[email protected](frozen=True)
+class _StorageState:
+ size: int
+ free: int
+ images: Dict[str, _DriveImage]
+
+
+# =====
+class _VirtualDriveState:
+ image: Optional[_DriveImage]
+ connected: bool
+ cdrom: bool
+
+ @classmethod
+ def from_drive_state(cls, state: _DriveState) -> "_VirtualDriveState":
+ return _VirtualDriveState(
+ image=state.image,
+ connected=bool(state.image),
+ cdrom=state.cdrom,
+ )
+
+
+class _State:
+ def __init__(self, changes_queue: asyncio.queues.Queue) -> None:
+ self.__changes_queue = changes_queue
+
+ self.storage: Optional[_StorageState] = None
+ self.vd: Optional[_VirtualDriveState] = None
+
+ self._lock = asyncio.Lock()
+ self._region = aioregion.AioExclusiveRegion(MsdIsBusyError)
+
+ @contextlib.asynccontextmanager
+ async def busy(self, check_online: bool=True) -> AsyncGenerator[None, None]:
+ with self._region:
+ async with self._lock:
+ await self.__changes_queue.put(None)
+ if check_online:
+ if self.vd is None:
+ raise MsdOfflineError()
+ assert self.storage
+ yield
+ await self.__changes_queue.put(None)
+
+ def is_busy(self) -> bool:
+ return self._region.is_busy()
+
+
+# =====
+class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes
+ def __init__( # pylint: disable=super-init-not-called
+ self,
+ storage_path: str,
+
+ remount_cmd: List[str],
+ unlock_cmd: List[str],
+
+ gadget: str, # XXX: Not from options, see /kvmd/apps/kvmd/__init__.py for details
+ ) -> None:
+
+ self.__storage_path = os.path.normpath(storage_path)
+ self.__images_path = os.path.join(self.__storage_path, "images")
+ self.__meta_path = os.path.join(self.__storage_path, "meta")
+
+ self.__remount_cmd = remount_cmd
+ self.__unlock_cmd = unlock_cmd
+
+ self.__drive = Drive(gadget, instance=0, lun=0)
+
+ self.__new_file: Optional[aiofiles.base.AiofilesContextManager] = None
+ self.__new_file_written = 0
+
+ self.__changes_queue: asyncio.queues.Queue = asyncio.Queue()
+
+ self.__state = _State(self.__changes_queue)
+
+ logger = get_logger(0)
+ logger.info("Using OTG gadget %r as MSD", gadget)
+ aiotools.run_sync(self.__reload_state())
+
+ @classmethod
+ def get_plugin_options(cls) -> Dict:
+ sudo = ["/usr/bin/sudo", "--non-interactive"]
+ return {
+ "storage": Option("/var/lib/kvmd/msd", type=valid_abs_dir, unpack_as="storage_path"),
+ "remount_cmd": Option([*sudo, "/usr/bin/kvmd-helper-otgmsd-remount", "{mode}"], type=valid_command),
+ "unlock_cmd": Option([*sudo, "/usr/bin/kvmd-helper-otgmsd-unlock", "unlock"], type=valid_command),
+ }
+
+ async def get_state(self) -> Dict:
+ async with self.__state._lock: # pylint: disable=protected-access
+ storage: Optional[Dict] = None
+ if self.__state.storage:
+ storage = dataclasses.asdict(self.__state.storage)
+ for name in list(storage["images"]):
+ del storage["images"][name]["path"]
+ del storage["images"][name]["in_storage"]
+ storage["uploading"] = bool(self.__new_file)
+
+ vd: Optional[Dict] = None
+ if self.__state.vd:
+ vd = dataclasses.asdict(self.__state.vd)
+ if vd["image"]:
+ del vd["image"]["path"]
+
+ return {
+ "enabled": False, # FIXME
+ "online": bool(self.__state.vd),
+ "busy": self.__state.is_busy(),
+ "storage": storage,
+ "drive": vd,
+ "features": {
+ "multi": True,
+ "cdrom": True,
+ },
+ }
+
+ async def poll_state(self) -> AsyncGenerator[Dict, None]:
+ inotify_task = asyncio.create_task(self.__watch_inotify())
+ prev_state: Dict = {}
+ try:
+ while True:
+ if inotify_task.cancelled():
+ break
+ if inotify_task.done():
+ RuntimeError("Inotify task is dead")
+
+ try:
+ await asyncio.wait_for(self.__changes_queue.get(), timeout=0.1)
+ except asyncio.TimeoutError:
+ continue
+
+ state = await self.get_state()
+ if state != prev_state:
+ yield state
+ prev_state = state
+ finally:
+ if not inotify_task.done():
+ inotify_task.cancel()
+ await inotify_task
+
+ @aiotools.atomic
+ async def reset(self) -> None:
+ async with self.__state.busy(check_online=False):
+ try:
+ await self.__unlock_drive()
+ self.__drive.set_image_path("")
+ self.__drive.set_rw_flag(False)
+ self.__drive.set_cdrom_flag(False)
+ except asyncio.CancelledError:
+ raise
+ except Exception:
+ get_logger(0).exception("Can't reset MSD")
+
+ @aiotools.atomic
+ async def cleanup(self) -> None:
+ await self.__close_new_file()
+
+ # =====
+
+ @aiotools.atomic
+ async def set_params(self, name: Optional[str]=None, cdrom: Optional[bool]=None) -> None:
+ async with self.__state.busy():
+ assert self.__state.storage
+ assert self.__state.vd
+
+ if self.__state.vd.connected or self.__drive.get_image_path():
+ raise MsdConnectedError()
+
+ if name is not None:
+ if name:
+ image = self.__state.storage.images.get(name)
+ if image is None or not os.path.exists(image.path):
+ raise MsdUnknownImageError()
+ assert image.in_storage
+ self.__state.vd.image = image
+ else:
+ self.__state.vd.image = None
+
+ if cdrom is not None:
+ self.__state.vd.cdrom = cdrom
+
+ @aiotools.atomic
+ async def connect(self) -> None:
+ async with self.__state.busy():
+ assert self.__state.vd
+
+ if self.__state.vd.connected or self.__drive.get_image_path():
+ raise MsdConnectedError()
+ if self.__state.vd.image is None:
+ raise MsdImageNotSelected()
+
+ assert self.__state.vd.image.in_storage
+
+ if not os.path.exists(self.__state.vd.image.path):
+ raise MsdUnknownImageError()
+
+ await self.__unlock_drive()
+ self.__drive.set_cdrom_flag(self.__state.vd.cdrom)
+ self.__drive.set_image_path(self.__state.vd.image.path)
+ self.__state.vd.connected = True
+
+ @aiotools.atomic
+ async def disconnect(self) -> None:
+ async with self.__state.busy():
+ assert self.__state.vd
+
+ if not (self.__state.vd.connected or self.__drive.get_image_path()):
+ raise MsdDisconnectedError()
+
+ await self.__unlock_drive()
+ self.__drive.set_image_path("")
+ self.__state.vd.connected = False
+
+ @contextlib.asynccontextmanager
+ async def write_image(self, name: str) -> AsyncGenerator[None, None]:
+ try:
+ with self.__state._region: # pylint: disable=protected-access
+ try:
+ async with self.__state._lock: # pylint: disable=protected-access
+ await self.__changes_queue.put(None)
+ assert self.__state.storage
+ assert self.__state.vd
+
+ if self.__state.vd.connected or self.__drive.get_image_path():
+ raise MsdConnectedError()
+
+ path = os.path.join(self.__images_path, name)
+ if name in self.__state.storage.images or os.path.exists(path):
+ raise MsdImageExistsError()
+
+ await self.__remount_storage(rw=True)
+ self.__set_image_complete(name, False)
+ self.__new_file_written = 0
+ self.__new_file = await aiofiles.open(path, mode="w+b", buffering=0)
+
+ await self.__changes_queue.put(None)
+ yield
+ self.__set_image_complete(name, True)
+
+ finally:
+ await self.__close_new_file()
+ try:
+ await self.__remount_storage(rw=False)
+ except asyncio.CancelledError: # pylint: disable=try-except-raise
+ raise
+ except Exception:
+ pass
+ finally:
+ await self.__changes_queue.put(None)
+
+ @aiotools.atomic
+ async def write_image_chunk(self, chunk: bytes) -> int:
+ assert self.__new_file
+ await aiotools.afile_write_now(self.__new_file, chunk)
+ self.__new_file_written += len(chunk)
+ return self.__new_file_written
+
+ async def remove(self, name: str) -> None:
+ async with self.__state.busy():
+ assert self.__state.storage
+ assert self.__state.vd
+
+ if self.__state.vd.connected or self.__drive.get_image_path():
+ raise MsdConnectedError()
+
+ image = self.__state.storage.images.get(name)
+ if image is None or not os.path.exists(image.path):
+ raise MsdUnknownImageError()
+ assert image.in_storage
+
+ if self.__state.vd.image == image:
+ self.__state.vd.image = None
+ del self.__state.storage.images[name]
+
+ await self.__remount_storage(rw=True)
+ os.remove(image.path)
+ self.__set_image_complete(name, False)
+ await self.__remount_storage(rw=False)
+
+ # =====
+
+ async def __close_new_file(self) -> None:
+ try:
+ if self.__new_file:
+ get_logger().info("Closing new image file ...")
+ await self.__new_file.close()
+ except asyncio.CancelledError: # pylint: disable=try-except-raise
+ raise
+ except Exception:
+ get_logger().exception("Can't close device file")
+ finally:
+ self.__new_file = None
+ self.__new_file_written = 0
+
+ # =====
+
+ async def __watch_inotify(self) -> None:
+ logger = get_logger(0)
+ while True:
+ try:
+ while True:
+ # Активно ждем, пока не будут на месте все каталоги.
+ await self.__reload_state()
+ await self.__changes_queue.put(None)
+ if self.__state.vd:
+ break
+ await asyncio.sleep(5)
+
+ with Inotify() as inotify:
+ inotify.watch(self.__images_path, InotifyMask.ALL_MODIFY_EVENTS)
+ inotify.watch(self.__meta_path, InotifyMask.ALL_MODIFY_EVENTS)
+ inotify.watch(self.__drive.get_sysfs_path(), InotifyMask.ALL_MODIFY_EVENTS)
+
+ # После установки вотчеров еще раз проверяем стейт, чтобы ничего не потерять
+ await self.__reload_state()
+ await self.__changes_queue.put(None)
+
+ while self.__state.vd: # Если живы после предыдущей проверки
+ need_restart = False
+ need_reload_state = False
+ for event in (await inotify.get_series(timeout=1)):
+ need_reload_state = True
+ if event.mask & (InotifyMask.DELETE_SELF | InotifyMask.MOVE_SELF | InotifyMask.UNMOUNT):
+ # Если выгрузили OTG, что-то отмонтировали или делают еще какую-то странную фигню
+ logger.warning("Got fatal inotify event: %s; reinitializing MSD ...", event)
+ need_restart = True
+ break
+ if need_restart:
+ break
+ if need_reload_state:
+ await self.__reload_state()
+ await self.__changes_queue.put(None)
+ except asyncio.CancelledError: # pylint: disable=try-except-raise
+ raise
+ except Exception:
+ logger.exception("Unexpected MSD watcher error")
+
+ async def __reload_state(self) -> None:
+ logger = get_logger(0)
+ async with self.__state._lock: # pylint: disable=protected-access
+ try:
+ drive_state = self.__get_drive_state()
+ if drive_state.rw:
+ # Внештатное использование MSD, ломаемся
+ raise MsdError("MSD has been switched to RW-mode manually")
+
+ if self.__state.vd is None and drive_state.image is None:
+ # Если только что включились и образ не подключен - попробовать
+ # перемонтировать хранилище (и создать images и meta).
+ logger.info("Probing to remount storage ...")
+ await self.__remount_storage(rw=True)
+ await self.__remount_storage(rw=False)
+
+ storage_state = self.__get_storage_state()
+ except asyncio.CancelledError: # pylint: disable=try-except-raise
+ raise
+ except Exception:
+ logger.exception("Error while reloading MSD state; switching to offline")
+ self.__state.storage = None
+ self.__state.vd = None
+ else:
+ self.__state.storage = storage_state
+ if drive_state.image:
+ # При подключенном образе виртуальный стейт заменяется реальным
+ self.__state.vd = _VirtualDriveState.from_drive_state(drive_state)
+ else:
+ if self.__state.vd is None:
+ # Если раньше MSD был отключен
+ self.__state.vd = _VirtualDriveState.from_drive_state(drive_state)
+
+ if (
+ self.__state.vd.image
+ and (not self.__state.vd.image.in_storage or not os.path.exists(self.__state.vd.image.path))
+ ):
+ # Если только что отключили ручной образ вне хранилища или ранее выбранный образ был удален
+ self.__state.vd.image = None
+
+ self.__state.vd.connected = False
+
+ # =====
+
+ def __get_storage_state(self) -> _StorageState:
+ images: Dict[str, _DriveImage] = {}
+ for name in os.listdir(self.__images_path):
+ path = os.path.join(self.__images_path, name)
+ if os.path.exists(path):
+ size = self.__get_file_size(path)
+ if size >= 0:
+ images[name] = _DriveImage(
+ name=name,
+ path=path,
+ size=size,
+ complete=self.__is_image_complete(name),
+ in_storage=True,
+ )
+ st = os.statvfs(self.__storage_path)
+ return _StorageState(
+ size=(st.f_blocks * st.f_frsize),
+ free=(st.f_bavail * st.f_frsize),
+ images=images,
+ )
+
+ def __get_drive_state(self) -> _DriveState:
+ image: Optional[_DriveImage] = None
+ path = self.__drive.get_image_path()
+ if path:
+ name = os.path.basename(path)
+ in_storage = (os.path.dirname(path) == self.__images_path)
+ image = _DriveImage(
+ name=name,
+ path=path,
+ size=max(self.__get_file_size(path), 0),
+ complete=(self.__is_image_complete(name) if in_storage else True),
+ in_storage=in_storage,
+ )
+ return _DriveState(
+ image=image,
+ cdrom=self.__drive.get_cdrom_flag(),
+ rw=self.__drive.get_rw_flag(),
+ )
+
+ # =====
+
+ def __get_file_size(self, path: str) -> int:
+ try:
+ return os.path.getsize(path)
+ except Exception as err:
+ get_logger().warning("Can't get size of file %s: %s", path, err)
+ return -1
+
+ def __is_image_complete(self, name: str) -> bool:
+ return os.path.exists(os.path.join(self.__meta_path, name + ".complete"))
+
+ def __set_image_complete(self, name: str, flag: bool) -> None:
+ path = os.path.join(self.__meta_path, name + ".complete")
+ if flag:
+ open(path, "w").close()
+ else:
+ if os.path.exists(path):
+ os.remove(path)
+
+ # =====
+
+ async def __remount_storage(self, rw: bool) -> None:
+ await remount_storage(self.__remount_cmd, rw)
+
+ async def __unlock_drive(self) -> None:
+ await unlock_drive(self.__unlock_cmd)
diff --git a/kvmd/plugins/msd/otg/drive.py b/kvmd/plugins/msd/otg/drive.py
new file mode 100644
index 00000000..e6457527
--- /dev/null
+++ b/kvmd/plugins/msd/otg/drive.py
@@ -0,0 +1,80 @@
+# ========================================================================== #
+# #
+# KVMD - The main Pi-KVM daemon. #
+# #
+# Copyright (C) 2018 Maxim Devaev <[email protected]> #
+# #
+# This program is free software: you can redistribute it and/or modify #
+# it under the terms of the GNU General Public License as published by #
+# the Free Software Foundation, either version 3 of the License, or #
+# (at your option) any later version. #
+# #
+# This program is distributed in the hope that it will be useful, #
+# but WITHOUT ANY WARRANTY; without even the implied warranty of #
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
+# GNU General Public License for more details. #
+# #
+# You should have received a copy of the GNU General Public License #
+# along with this program. If not, see <https://www.gnu.org/licenses/>. #
+# #
+# ========================================================================== #
+
+
+import os
+import errno
+
+from .. import MsdOperationError
+
+
+# =====
+class MsdDriveLockedError(MsdOperationError):
+ def __init__(self) -> None:
+ super().__init__("MSD drive is locked on IO operation")
+
+
+# =====
+class Drive:
+ def __init__(self, gadget: str, instance: int, lun: int) -> None:
+ self.__path = os.path.join(
+ "/sys/kernel/config/usb_gadget",
+ gadget,
+ f"functions/mass_storage.usb{instance}/lun.{lun}",
+ )
+
+ def get_sysfs_path(self) -> str:
+ return self.__path
+
+ # =====
+
+ def set_image_path(self, path: str) -> None:
+ self.__set_param("file", path)
+
+ def get_image_path(self) -> str:
+ return self.__get_param("file")
+
+ def set_cdrom_flag(self, flag: bool) -> None:
+ self.__set_param("cdrom", str(int(flag)))
+
+ def get_cdrom_flag(self) -> bool:
+ return bool(int(self.__get_param("cdrom")))
+
+ def set_rw_flag(self, flag: bool) -> None:
+ self.__set_param("ro", str(int(not flag)))
+
+ def get_rw_flag(self) -> bool:
+ return (not int(self.__get_param("ro")))
+
+ # =====
+
+ def __get_param(self, param: str) -> str:
+ with open(os.path.join(self.__path, param)) as param_file:
+ return param_file.read().strip()
+
+ def __set_param(self, param: str, value: str) -> None:
+ try:
+ with open(os.path.join(self.__path, param), "w") as param_file:
+ param_file.write(value + "\n")
+ except OSError as err:
+ if err.errno == errno.EBUSY:
+ raise MsdDriveLockedError()
+ raise
diff --git a/kvmd/plugins/msd/otg/helpers.py b/kvmd/plugins/msd/otg/helpers.py
new file mode 100644
index 00000000..ee759ef0
--- /dev/null
+++ b/kvmd/plugins/msd/otg/helpers.py
@@ -0,0 +1,79 @@
+# ========================================================================== #
+# #
+# KVMD - The main Pi-KVM daemon. #
+# #
+# Copyright (C) 2018 Maxim Devaev <[email protected]> #
+# #
+# This program is free software: you can redistribute it and/or modify #
+# it under the terms of the GNU General Public License as published by #
+# the Free Software Foundation, either version 3 of the License, or #
+# (at your option) any later version. #
+# #
+# This program is distributed in the hope that it will be useful, #
+# but WITHOUT ANY WARRANTY; without even the implied warranty of #
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
+# GNU General Public License for more details. #
+# #
+# You should have received a copy of the GNU General Public License #
+# along with this program. If not, see <https://www.gnu.org/licenses/>. #
+# #
+# ========================================================================== #
+
+
+import signal
+import asyncio
+import asyncio.subprocess
+
+from typing import List
+
+from ....logging import get_logger
+
+from .. import MsdError
+
+
+# =====
+async def remount_storage(base_cmd: List[str], rw: bool) -> None:
+ logger = get_logger(0)
+ mode = ("rw" if rw else "ro")
+ cmd = [
+ part.format(mode=mode)
+ for part in base_cmd
+ ]
+ logger.info("Remounting internal storage to %s ...", mode.upper())
+ try:
+ await _run_helper(cmd)
+ except Exception:
+ logger.error("Can't remount internal storage")
+ raise
+
+
+async def unlock_drive(base_cmd: List[str]) -> None:
+ logger = get_logger(0)
+ logger.info("Unlocking the drive ...")
+ try:
+ await _run_helper(base_cmd)
+ except Exception:
+ logger.error("Can't unlock the drive")
+ raise
+
+
+# =====
+async def _run_helper(cmd: List[str]) -> None:
+ logger = get_logger(0)
+ logger.info("Executing helper %s ...", cmd)
+
+ proc = await asyncio.create_subprocess_exec(
+ *cmd,
+ stdout=asyncio.subprocess.PIPE,
+ stderr=asyncio.subprocess.STDOUT,
+ preexec_fn=(lambda: signal.signal(signal.SIGINT, signal.SIG_IGN)),
+ )
+
+ stdout = (await proc.communicate())[0].decode(errors="ignore").strip()
+ if stdout:
+ log = (logger.info if proc.returncode == 0 else logger.error)
+ for line in stdout.split("\n"):
+ log("Console: %s", line)
+
+ if proc.returncode != 0:
+ raise MsdError(f"Error while helper execution: pid={proc.pid}; retcode={proc.returncode}")