summaryrefslogtreecommitdiff
path: root/kvmd/plugins/msd/relay/drive.py
diff options
context:
space:
mode:
Diffstat (limited to 'kvmd/plugins/msd/relay/drive.py')
-rw-r--r--kvmd/plugins/msd/relay/drive.py142
1 files changed, 0 insertions, 142 deletions
diff --git a/kvmd/plugins/msd/relay/drive.py b/kvmd/plugins/msd/relay/drive.py
deleted file mode 100644
index 51a85392..00000000
--- a/kvmd/plugins/msd/relay/drive.py
+++ /dev/null
@@ -1,142 +0,0 @@
-# ========================================================================== #
-# #
-# KVMD - The main PiKVM daemon. #
-# #
-# Copyright (C) 2018-2022 Maxim Devaev <[email protected]> #
-# #
-# This program is free software: you can redistribute it and/or modify #
-# it under the terms of the GNU General Public License as published by #
-# the Free Software Foundation, either version 3 of the License, or #
-# (at your option) any later version. #
-# #
-# This program is distributed in the hope that it will be useful, #
-# but WITHOUT ANY WARRANTY; without even the implied warranty of #
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
-# GNU General Public License for more details. #
-# #
-# You should have received a copy of the GNU General Public License #
-# along with this program. If not, see <https://www.gnu.org/licenses/>. #
-# #
-# ========================================================================== #
-
-
-import os
-import stat
-import fcntl
-import struct
-import dataclasses
-
-from typing import IO
-
-from .... import aiotools
-from .... import aiofs
-
-from .. import MsdFileWriter
-
-
-# =====
-_IMAGE_INFO_SIZE = 4096
-_IMAGE_INFO_MAGIC_SIZE = 16
-_IMAGE_INFO_NAME_SIZE = 256
-_IMAGE_INFO_PADS_SIZE = _IMAGE_INFO_SIZE - _IMAGE_INFO_NAME_SIZE - 1 - 8 - _IMAGE_INFO_MAGIC_SIZE * 8
-_IMAGE_INFO_FORMAT = ">%dL%dc?Q%dx%dL" % (
- _IMAGE_INFO_MAGIC_SIZE,
- _IMAGE_INFO_NAME_SIZE,
- _IMAGE_INFO_PADS_SIZE,
- _IMAGE_INFO_MAGIC_SIZE,
-)
-_IMAGE_INFO_MAGIC = [0x1ACE1ACE] * _IMAGE_INFO_MAGIC_SIZE
-
-
-# =====
[email protected](frozen=True)
-class ImageInfo:
- name: str
- size: int
- complete: bool
-
- @classmethod
- def from_bytes(cls, data: bytes) -> ("ImageInfo" | None):
- try:
- parsed = list(struct.unpack(_IMAGE_INFO_FORMAT, data))
- except struct.error:
- pass
- else:
- magic_begin = parsed[:_IMAGE_INFO_MAGIC_SIZE]
- magic_end = parsed[-_IMAGE_INFO_MAGIC_SIZE:]
- if magic_begin == magic_end == _IMAGE_INFO_MAGIC:
- image_name_bytes = b"".join(parsed[
- _IMAGE_INFO_MAGIC_SIZE # noqa: E203
- :
- _IMAGE_INFO_MAGIC_SIZE + _IMAGE_INFO_NAME_SIZE
- ])
- return ImageInfo(
- name=image_name_bytes.decode("utf-8", errors="ignore").strip("\x00").strip(),
- size=parsed[_IMAGE_INFO_MAGIC_SIZE + _IMAGE_INFO_NAME_SIZE + 1],
- complete=parsed[_IMAGE_INFO_MAGIC_SIZE + _IMAGE_INFO_NAME_SIZE],
- )
- return None
-
- def to_bytes(self) -> bytes:
- return struct.pack(
- _IMAGE_INFO_FORMAT,
- *_IMAGE_INFO_MAGIC,
- *memoryview(( # type: ignore
- self.name.encode("utf-8")
- + b"\x00" * _IMAGE_INFO_NAME_SIZE
- )[:_IMAGE_INFO_NAME_SIZE]).cast("c"),
- self.complete,
- self.size,
- *_IMAGE_INFO_MAGIC,
- )
-
-
[email protected](frozen=True)
-class DeviceInfo:
- path: str
- size: int
- free: int
- image: (ImageInfo | None)
-
- @classmethod
- async def read(cls, device_path: str) -> "DeviceInfo":
- return (await aiotools.run_async(cls.__inner_read, device_path))
-
- @classmethod
- def __inner_read(cls, device_path: str) -> "DeviceInfo":
- if not stat.S_ISBLK(os.stat(device_path).st_mode):
- raise RuntimeError(f"Not a block device: {device_path}")
-
- with open(device_path, "rb") as device_file:
- # size = BLKGETSIZE * BLKSSZGET
- size = _ioctl_uint32(device_file, 0x1260) * _ioctl_uint32(device_file, 0x1268)
- device_file.seek(size - _IMAGE_INFO_SIZE)
- image_info = ImageInfo.from_bytes(device_file.read())
-
- return DeviceInfo(
- path=device_path,
- size=size,
- free=(size - image_info.size if image_info else size),
- image=image_info,
- )
-
- async def write_image_info(self, device_writer: MsdFileWriter, complete: bool) -> bool:
- device_file = device_writer.get_file()
- state = device_writer.get_state()
- image_info = ImageInfo(state["name"], state["written"], complete)
-
- if self.size - image_info.size > _IMAGE_INFO_SIZE:
- await device_file.seek(self.size - _IMAGE_INFO_SIZE) # type: ignore
- await device_file.write(image_info.to_bytes()) # type: ignore
- await aiofs.afile_sync(device_file)
- await device_file.seek(0) # type: ignore
- return True
- return False # Device is full
-
-
-def _ioctl_uint32(device_file: IO, request: int) -> int:
- buf = b"\0" * 4
- buf = fcntl.ioctl(device_file.fileno(), request, buf) # type: ignore
- result = struct.unpack("I", buf)[0]
- assert result > 0, (device_file, request, buf)
- return result