# ========================================================================== # # # # KVMD - The main PiKVM daemon. # # # # Copyright (C) 2018-2022 Maxim Devaev # # # # This program is free software: you can redistribute it and/or modify # # it under the terms of the GNU General Public License as published by # # the Free Software Foundation, either version 3 of the License, or # # (at your option) any later version. # # # # This program is distributed in the hope that it will be useful, # # but WITHOUT ANY WARRANTY; without even the implied warranty of # # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # # GNU General Public License for more details. # # # # You should have received a copy of the GNU General Public License # # along with this program. If not, see . # # # # ========================================================================== # import os import contextlib from typing import Dict from typing import Type from typing import AsyncGenerator from typing import Optional import aiofiles import aiofiles.base from ...logging import get_logger from ...errors import OperationError from ...errors import IsBusyError from ... import aiofs from .. import BasePlugin from .. import get_plugin_class # ===== class MsdError(Exception): pass class MsdOperationError(OperationError, MsdError): pass class MsdIsBusyError(IsBusyError, MsdError): def __init__(self) -> None: super().__init__("Performing another MSD operation, please try again later") class MsdOfflineError(MsdOperationError): def __init__(self) -> None: super().__init__("MSD is not found") class MsdConnectedError(MsdOperationError): def __init__(self) -> None: super().__init__("MSD is connected to Server, but shouldn't for this operation") class MsdDisconnectedError(MsdOperationError): def __init__(self) -> None: super().__init__("MSD is disconnected from Server, but should be for this operation") class MsdImageNotSelected(MsdOperationError): def __init__(self) -> None: super().__init__("The image is not selected") class MsdUnknownImageError(MsdOperationError): def __init__(self) -> None: super().__init__("The image is not found in the storage") class MsdImageExistsError(MsdOperationError): def __init__(self) -> None: super().__init__("This image is already exists") class MsdMultiNotSupported(MsdOperationError): def __init__(self) -> None: super().__init__("This MSD does not support storing multiple images") class MsdCdromNotSupported(MsdOperationError): def __init__(self) -> None: super().__init__("This MSD does not support CD-ROM switching") class MsdRwNotSupported(MsdOperationError): def __init__(self) -> None: super().__init__("This MSD does not support RW switching") # ===== class BaseMsd(BasePlugin): async def get_state(self) -> Dict: raise NotImplementedError() async def poll_state(self) -> AsyncGenerator[Dict, None]: if self is not None: # XXX: Vulture and pylint hack raise NotImplementedError() yield async def reset(self) -> None: raise NotImplementedError() async def cleanup(self) -> None: pass # ===== async def set_params( self, name: Optional[str]=None, cdrom: Optional[bool]=None, rw: Optional[bool]=None, ) -> None: raise NotImplementedError() async def set_connected(self, connected: bool) -> None: raise NotImplementedError() @contextlib.asynccontextmanager async def read_image(self, name: str) -> AsyncGenerator[int, None]: # pylint: disable=unused-argument if self is not None: # XXX: Vulture and pylint hack raise NotImplementedError() yield 1 async def read_image_chunk(self) -> bytes: raise NotImplementedError() @contextlib.asynccontextmanager async def write_image(self, name: str, size: int) -> AsyncGenerator[int, None]: # pylint: disable=unused-argument if self is not None: # XXX: Vulture and pylint hack raise NotImplementedError() yield 1 async def write_image_chunk(self, chunk: bytes) -> int: raise NotImplementedError() async def remove(self, name: str) -> None: raise NotImplementedError() class MsdImageReader: def __init__(self, path: str, chunk_size: int) -> None: self.__name = os.path.basename(path) self.__path = path self.__chunk_size = chunk_size self.__file: Optional[aiofiles.base.AiofilesContextManager] = None self.__file_size: int = 0 async def open(self) -> "MsdImageReader": assert self.__file is None get_logger(1).info("Reading %r image from MSD ...", self.__name) self.__file_size = os.stat(self.__path).st_size self.__file = await aiofiles.open(self.__path, mode="rb") # type: ignore return self def get_size(self) -> int: assert self.__file is not None return self.__file_size async def read(self) -> bytes: assert self.__file is not None return (await self.__file.read(self.__chunk_size)) # type: ignore async def close(self) -> None: assert self.__file is not None logger = get_logger() logger.info("Closed image reader ...") try: await self.__file.close() # type: ignore except Exception: logger.exception("Can't close image reader") class MsdImageWriter: def __init__(self, path: str, size: int, sync: int) -> None: self.__name = os.path.basename(path) self.__path = path self.__size = size self.__sync = sync self.__file: Optional[aiofiles.base.AiofilesContextManager] = None self.__written = 0 self.__unsynced = 0 def get_file(self) -> aiofiles.base.AiofilesContextManager: assert self.__file is not None return self.__file def get_state(self) -> Dict: return { "name": self.__name, "size": self.__size, "written": self.__written, } async def open(self) -> "MsdImageWriter": assert self.__file is None get_logger(1).info("Writing %r image (%d bytes) to MSD ...", self.__name, self.__size) self.__file = await aiofiles.open(self.__path, mode="w+b", buffering=0) # type: ignore return self async def write(self, chunk: bytes) -> int: assert self.__file is not None await self.__file.write(chunk) # type: ignore self.__written += len(chunk) self.__unsynced += len(chunk) if self.__unsynced >= self.__sync: await aiofs.afile_sync(self.__file) self.__unsynced = 0 return self.__written async def close(self) -> None: assert self.__file is not None logger = get_logger() logger.info("Closing image writer ...") try: if self.__written == self.__size: (log, result) = (logger.info, "OK") elif self.__written < self.__size: (log, result) = (logger.error, "INCOMPLETE") else: # written > size (log, result) = (logger.warning, "OVERFLOW") log("Written %d of %d bytes to MSD image %r: %s", self.__written, self.__size, self.__name, result) try: await aiofs.afile_sync(self.__file) finally: await self.__file.close() # type: ignore except Exception: logger.exception("Can't close image writer") # ===== def get_msd_class(name: str) -> Type[BaseMsd]: return get_plugin_class("msd", name) # type: ignore