diff options
author | Devaev Maxim <[email protected]> | 2020-11-22 15:36:45 +0300 |
---|---|---|
committer | Devaev Maxim <[email protected]> | 2020-11-22 15:36:45 +0300 |
commit | 16ad64db886296c58adc3ec8b0f266e57f75fb1b (patch) | |
tree | 1cd6c7e09ae91511032f53802114f91a47c4005b /kvmd/plugins/msd/relay/drive.py | |
parent | b7e0ee3300ddca0b4c7049ee9bfb172996b56ff2 (diff) |
refactoring
Diffstat (limited to 'kvmd/plugins/msd/relay/drive.py')
-rw-r--r-- | kvmd/plugins/msd/relay/drive.py | 143 |
1 files changed, 143 insertions, 0 deletions
diff --git a/kvmd/plugins/msd/relay/drive.py b/kvmd/plugins/msd/relay/drive.py new file mode 100644 index 00000000..a04a6e9f --- /dev/null +++ b/kvmd/plugins/msd/relay/drive.py @@ -0,0 +1,143 @@ +# ========================================================================== # +# # +# KVMD - The main Pi-KVM daemon. # +# # +# Copyright (C) 2018 Maxim Devaev <[email protected]> # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see <https://www.gnu.org/licenses/>. # +# # +# ========================================================================== # + + +import os +import stat +import fcntl +import struct +import dataclasses + +from typing import IO +from typing import Optional + +import aiofiles.base + +from .... import aiotools +from .... import aiofs + + +# ===== +_IMAGE_INFO_SIZE = 4096 +_IMAGE_INFO_MAGIC_SIZE = 16 +_IMAGE_INFO_NAME_SIZE = 256 +_IMAGE_INFO_PADS_SIZE = _IMAGE_INFO_SIZE - _IMAGE_INFO_NAME_SIZE - 1 - 8 - _IMAGE_INFO_MAGIC_SIZE * 8 +_IMAGE_INFO_FORMAT = ">%dL%dc?Q%dx%dL" % ( + _IMAGE_INFO_MAGIC_SIZE, + _IMAGE_INFO_NAME_SIZE, + _IMAGE_INFO_PADS_SIZE, + _IMAGE_INFO_MAGIC_SIZE, +) +_IMAGE_INFO_MAGIC = [0x1ACE1ACE] * _IMAGE_INFO_MAGIC_SIZE + + +# ===== [email protected](frozen=True) +class ImageInfo: + name: str + size: int + complete: bool + + @classmethod + def from_bytes(cls, data: bytes) -> Optional["ImageInfo"]: + try: + parsed = list(struct.unpack(_IMAGE_INFO_FORMAT, data)) + except struct.error: + pass + else: + magic_begin = parsed[:_IMAGE_INFO_MAGIC_SIZE] + magic_end = parsed[-_IMAGE_INFO_MAGIC_SIZE:] + if magic_begin == magic_end == _IMAGE_INFO_MAGIC: + image_name_bytes = b"".join(parsed[ + _IMAGE_INFO_MAGIC_SIZE # noqa: E203 + : + _IMAGE_INFO_MAGIC_SIZE + _IMAGE_INFO_NAME_SIZE + ]) + return ImageInfo( + name=image_name_bytes.decode("utf-8", errors="ignore").strip("\x00").strip(), + size=parsed[_IMAGE_INFO_MAGIC_SIZE + _IMAGE_INFO_NAME_SIZE + 1], + complete=parsed[_IMAGE_INFO_MAGIC_SIZE + _IMAGE_INFO_NAME_SIZE], + ) + return None + + def to_bytes(self) -> bytes: + return struct.pack( + _IMAGE_INFO_FORMAT, + *_IMAGE_INFO_MAGIC, + *memoryview(( # type: ignore + self.name.encode("utf-8") + + b"\x00" * _IMAGE_INFO_NAME_SIZE + )[:_IMAGE_INFO_NAME_SIZE]).cast("c"), + self.complete, + self.size, + *_IMAGE_INFO_MAGIC, + ) + + [email protected](frozen=True) +class DeviceInfo: + path: str + size: int + free: int + image: Optional[ImageInfo] + + @classmethod + async def read(cls, device_path: str) -> "DeviceInfo": + return (await aiotools.run_async(cls.__inner_read, device_path)) + + @classmethod + def __inner_read(cls, device_path: str) -> "DeviceInfo": + if not stat.S_ISBLK(os.stat(device_path).st_mode): + raise RuntimeError(f"Not a block device: {device_path}") + + with open(device_path, "rb") as device_file: + # size = BLKGETSIZE * BLKSSZGET + size = _ioctl_uint32(device_file, 0x1260) * _ioctl_uint32(device_file, 0x1268) + device_file.seek(size - _IMAGE_INFO_SIZE) + image_info = ImageInfo.from_bytes(device_file.read()) + + return DeviceInfo( + path=device_path, + size=size, + free=(size - image_info.size if image_info else size), + image=image_info, + ) + + async def write_image_info( + self, + device_file: aiofiles.base.AiofilesContextManager, + image_info: ImageInfo, + ) -> bool: + + if self.size - image_info.size > _IMAGE_INFO_SIZE: + await device_file.seek(self.size - _IMAGE_INFO_SIZE) # type: ignore + await aiofs.afile_write_now(device_file, image_info.to_bytes()) + await device_file.seek(0) # type: ignore + return True + return False # Device is full + + +def _ioctl_uint32(device_file: IO, request: int) -> int: + buf = b"\0" * 4 + buf = fcntl.ioctl(device_file.fileno(), request, buf) # type: ignore + result = struct.unpack("I", buf)[0] + assert result > 0, (device_file, request, buf) + return result |