diff options
author | Maxim Devaev <[email protected]> | 2024-09-20 01:11:22 +0300 |
---|---|---|
committer | Maxim Devaev <[email protected]> | 2024-09-20 01:11:22 +0300 |
commit | 842ddc91a19b828db701c4b0b44e201a948fd627 (patch) | |
tree | b285492971ab9147feb3248a3fd11096c7fb4d5d /kvmd/apps | |
parent | 7a53f1445619fc471c2823e7081de8b6039b938e (diff) |
refactoring
Diffstat (limited to 'kvmd/apps')
-rw-r--r-- | kvmd/apps/edidconf/__init__.py | 257 |
1 files changed, 10 insertions, 247 deletions
diff --git a/kvmd/apps/edidconf/__init__.py b/kvmd/apps/edidconf/__init__.py index d213a871..e21f797b 100644 --- a/kvmd/apps/edidconf/__init__.py +++ b/kvmd/apps/edidconf/__init__.py @@ -22,259 +22,22 @@ import sys import os -import re -import dataclasses -import contextlib import subprocess import argparse import time -from typing import IO -from typing import Generator from typing import Callable from ...validators.basic import valid_bool from ...validators.basic import valid_int_f0 +from ...edid import EdidNoBlockError +from ...edid import Edid + # from .. import init # ===== -class NoBlockError(Exception): - pass - - -def _smart_open(path: str, mode: str) -> Generator[IO, None, None]: - fd = (0 if "r" in mode else 1) - with (os.fdopen(fd, mode, closefd=False) if path == "-" else open(path, mode)) as file: - yield file - if "w" in mode: - file.flush() - - [email protected](frozen=True) -class _CeaBlock: - tag: int - data: bytes - - def __post_init__(self) -> None: - assert 0 < self.tag <= 0b111 - assert 0 < len(self.data) <= 0b11111 - - @property - def size(self) -> int: - return len(self.data) + 1 - - def pack(self) -> bytes: - header = (self.tag << 5) | len(self.data) - return header.to_bytes() + self.data - - @classmethod - def first_from_raw(cls, raw: (bytes | list[int])) -> "_CeaBlock": - assert 0 < raw[0] <= 0xFF - tag = (raw[0] & 0b11100000) >> 5 - data_size = (raw[0] & 0b00011111) - data = bytes(raw[1:data_size + 1]) - return _CeaBlock(tag, data) - - -_CEA = 128 -_CEA_AUDIO = 1 -_CEA_SPEAKERS = 4 - - -class _Edid: - # https://en.wikipedia.org/wiki/Extended_Display_Identification_Data - - def __init__(self, path: str) -> None: - with _smart_open(path, "rb") as file: - data = file.read() - if data.startswith(b"\x00\xFF\xFF\xFF\xFF\xFF\xFF\x00"): - self.__data = list(data) - else: - text = re.sub(r"\s", "", data.decode()) - self.__data = [ - int(text[index:index + 2], 16) - for index in range(0, len(text), 2) - ] - assert len(self.__data) == 256, f"Invalid EDID length: {len(self.__data)}, should be 256 bytes" - assert self.__data[126] == 1, "Zero extensions number" - assert (self.__data[_CEA + 0], self.__data[_CEA + 1]) == (0x02, 0x03), "Can't find CEA extension" - - def write_hex(self, path: str) -> None: - self.__update_checksums() - text = "\n".join( - "".join( - f"{item:0{2}X}" - for item in self.__data[index:index + 16] - ) - for index in range(0, len(self.__data), 16) - ) + "\n" - with _smart_open(path, "w") as file: - file.write(text) - - def write_bin(self, path: str) -> None: - self.__update_checksums() - with _smart_open(path, "wb") as file: - file.write(bytes(self.__data)) - - def __update_checksums(self) -> None: - self.__data[127] = 256 - (sum(self.__data[:127]) % 256) - self.__data[255] = 256 - (sum(self.__data[128:255]) % 256) - - # ===== - - def get_mfc_id(self) -> str: - raw = self.__data[8] << 8 | self.__data[9] - return bytes([ - ((raw >> 10) & 0b11111) + 0x40, - ((raw >> 5) & 0b11111) + 0x40, - (raw & 0b11111) + 0x40, - ]).decode("ascii") - - def set_mfc_id(self, mfc_id: str) -> None: - assert len(mfc_id) == 3, "Mfc ID must be 3 characters long" - data = mfc_id.upper().encode("ascii") - for ch in data: - assert 0x41 <= ch <= 0x5A, "Mfc ID must contain only A-Z characters" - raw = ( - (data[2] - 0x40) - | ((data[1] - 0x40) << 5) - | ((data[0] - 0x40) << 10) - ) - self.__data[8] = (raw >> 8) & 0xFF - self.__data[9] = raw & 0xFF - - # ===== - - def get_product_id(self) -> int: - return (self.__data[10] | self.__data[11] << 8) - - def set_product_id(self, product_id: int) -> None: - assert 0 <= product_id <= 0xFFFF, f"Product ID should be from 0 to {0xFFFF}" - self.__data[10] = product_id & 0xFF - self.__data[11] = (product_id >> 8) & 0xFF - - # ===== - - def get_serial(self) -> int: - return ( - self.__data[12] - | self.__data[13] << 8 - | self.__data[14] << 16 - | self.__data[15] << 24 - ) - - def set_serial(self, serial: int) -> None: - assert 0 <= serial <= 0xFFFFFFFF, f"Serial should be from 0 to {0xFFFFFFFF}" - self.__data[12] = serial & 0xFF - self.__data[13] = (serial >> 8) & 0xFF - self.__data[14] = (serial >> 16) & 0xFF - self.__data[15] = (serial >> 24) & 0xFF - - # ===== - - def get_monitor_name(self) -> str: - return self.__get_dtd_text(0xFC, "Monitor Name") - - def set_monitor_name(self, text: str) -> None: - self.__set_dtd_text(0xFC, "Monitor Name", text) - - def get_monitor_serial(self) -> str: - return self.__get_dtd_text(0xFF, "Monitor Serial") - - def set_monitor_serial(self, text: str) -> None: - self.__set_dtd_text(0xFF, "Monitor Serial", text) - - def __get_dtd_text(self, d_type: int, name: str) -> str: - index = self.__find_dtd_text(d_type, name) - return bytes(self.__data[index:index + 13]).decode("cp437").strip() - - def __set_dtd_text(self, d_type: int, name: str, text: str) -> None: - index = self.__find_dtd_text(d_type, name) - encoded = (text[:13] + "\n" + " " * 12)[:13].encode("cp437") - for (offset, ch) in enumerate(encoded): - self.__data[index + offset] = ch - - def __find_dtd_text(self, d_type: int, name: str) -> int: - for index in [54, 72, 90, 108]: - if self.__data[index + 3] == d_type: - return index + 5 - raise NoBlockError(f"Can't find DTD {name}") - - # ===== CEA ===== - - def get_audio(self) -> bool: - (cbs, _) = self.__parse_cea() - audio = False - speakers = False - for cb in cbs: - if cb.tag == _CEA_AUDIO: - audio = True - elif cb.tag == _CEA_SPEAKERS: - speakers = True - return (audio and speakers and self.__get_basic_audio()) - - def set_audio(self, enabled: bool) -> None: - (cbs, dtds) = self.__parse_cea() - cbs = [cb for cb in cbs if cb.tag not in [_CEA_AUDIO, _CEA_SPEAKERS]] - if enabled: - cbs.append(_CeaBlock(_CEA_AUDIO, b"\x09\x7f\x07")) - cbs.append(_CeaBlock(_CEA_SPEAKERS, b"\x01\x00\x00")) - self.__replace_cea(cbs, dtds) - self.__set_basic_audio(enabled) - - def __get_basic_audio(self) -> bool: - return bool(self.__data[_CEA + 3] & 0b01000000) - - def __set_basic_audio(self, enabled: bool) -> None: - if enabled: - self.__data[_CEA + 3] |= 0b01000000 - else: - self.__data[_CEA + 3] &= (0xFF - 0b01000000) # ~X - - def __parse_cea(self) -> tuple[list[_CeaBlock], bytes]: - cea = self.__data[_CEA:] - dtd_begin = cea[2] - if dtd_begin == 0: - return ([], b"") - - cbs: list[_CeaBlock] = [] - if dtd_begin > 4: - raw = cea[4:dtd_begin] - while len(raw) != 0: - cb = _CeaBlock.first_from_raw(raw) - cbs.append(cb) - raw = raw[cb.size:] - - dtds = b"" - assert dtd_begin >= 4 - raw = cea[dtd_begin:] - while len(raw) > (18 + 1) and raw[0] != 0: - dtds += bytes(raw[:18]) - raw = raw[18:] - - return (cbs, dtds) - - def __replace_cea(self, cbs: list[_CeaBlock], dtds: bytes) -> None: - cbs_packed = b"" - for cb in cbs: - cbs_packed += cb.pack() - - raw = cbs_packed + dtds - assert len(raw) <= (128 - 4 - 1), "Too many CEA blocks or DTDs" - - self.__data[_CEA + 2] = (0 if len(raw) == 0 else (len(cbs_packed) + 4)) - - for index in range(4, 127): - try: - ch = raw[index - 4] - except IndexError: - ch = 0 - self.__data[_CEA + index] = ch - - def _format_bool(value: bool) -> str: return ("yes" if value else "no") @@ -283,7 +46,7 @@ def _make_format_hex(size: int) -> Callable[[int], str]: return (lambda value: ("0x{:0%dX} ({})" % (size * 2)).format(value, value)) -def _print_edid(edid: _Edid) -> None: +def _print_edid(edid: Edid) -> None: for (key, get, fmt) in [ ("Manufacturer ID:", edid.get_mfc_id, str), ("Product ID: ", edid.get_product_id, _make_format_hex(2)), @@ -294,7 +57,7 @@ def _print_edid(edid: _Edid) -> None: ]: try: print(key, fmt(get()), file=sys.stderr) # type: ignore - except NoBlockError: + except EdidNoBlockError: pass @@ -348,12 +111,12 @@ def main(argv: (list[str] | None)=None) -> None: # pylint: disable=too-many-bra help="Presets directory", metavar="<dir>") options = parser.parse_args(argv[1:]) - base: (_Edid | None) = None + base: (Edid | None) = None if options.import_preset: imp = options.import_preset if "." in imp: (base_name, imp) = imp.split(".", 1) # v3.1080p-by-default - base = _Edid(os.path.join(options.presets_path, f"{base_name}.hex")) + base = Edid.from_file(os.path.join(options.presets_path, f"{base_name}.hex")) imp = f"_{imp}" options.imp = os.path.join(options.presets_path, f"{imp}.hex") @@ -362,16 +125,16 @@ def main(argv: (list[str] | None)=None) -> None: # pylint: disable=too-many-bra options.export_hex = options.edid_path options.edid_path = options.imp - edid = _Edid(options.edid_path) + edid = Edid.from_file(options.edid_path) changed = False - for cmd in dir(_Edid): + for cmd in dir(Edid): if cmd.startswith("set_"): value = getattr(options, cmd) if value is None and base is not None: try: value = getattr(base, cmd.replace("set_", "get_"))() - except NoBlockError: + except EdidNoBlockError: pass if value is not None: getattr(edid, cmd)(value) |