diff options
-rw-r--r-- | kvmd/apps/otg/__init__.py | 187 |
1 files changed, 122 insertions, 65 deletions
diff --git a/kvmd/apps/otg/__init__.py b/kvmd/apps/otg/__init__.py index 2d5350ea..ed7666b4 100644 --- a/kvmd/apps/otg/__init__.py +++ b/kvmd/apps/otg/__init__.py @@ -24,16 +24,15 @@ import re import time import argparse -from os import listdir -from os import mkdir -from os import symlink -from os import rmdir -from os import unlink +import os + from os.path import join from typing import List from typing import Optional +from ...logging import get_logger + from ...yamlconf import Section from ...validators import ValidatorError @@ -42,19 +41,47 @@ from .. import init # ===== +def _mkdir(path: str) -> None: + get_logger().info("MKDIR --- %s", path) + os.mkdir(path) + + +def _symlink(src: str, dest: str) -> None: + get_logger().info("SYMLINK - %s --> %s", dest, src) + os.symlink(src, dest) + + +def _rmdir(path: str) -> None: + get_logger().info("RMDIR --- %s", path) + os.rmdir(path) + + +def _unlink(path: str) -> None: + get_logger().info("RM ------ %s", path) + os.unlink(path) + + def _write(path: str, text: str) -> None: + get_logger().info("WRITE --- %s", path) with open(path, "w") as param_file: param_file.write(text) +def _write_bytes(path: str, data: bytes) -> None: + get_logger().info("WRITE --- %s", path) + with open(path, "wb") as param_file: + param_file.write(data) + + def _find_udc(udc: str) -> str: - udcs = sorted(listdir("/sys/class/udc")) + udcs = sorted(os.listdir("/sys/class/udc")) if not udc: if len(udcs) == 0: raise RuntimeError("Can't find any UDC") udc = udcs[0] elif udc not in udcs: raise RuntimeError(f"Can't find selected UDC: {udc}") + get_logger().info("Using UDC %s", udc) return udc @@ -67,16 +94,76 @@ def _check_config(config: Section) -> None: raise RuntimeError("Nothing to do") -def _cmd_start(config: Section) -> None: # pylint: disable=too-many-statements +# ===== +def _create_acm(gadget_path: str, config_path: str) -> None: + func_path = join(gadget_path, "functions/acm.usb0") + _mkdir(func_path) + _symlink(func_path, join(config_path, "acm.usb0")) + + +def _create_keyboard(gadget_path: str, config_path: str) -> None: + func_path = join(gadget_path, "functions/hid.usb0") # Keyboard + _mkdir(func_path) + _write(join(func_path, "protocol"), "1") + _write(join(func_path, "subclass"), "1") + _write(join(func_path, "report_length"), "8") + _write_bytes( + join(func_path, "report_desc"), + b"\x05\x01\x09\x06\xa1\x01\x05\x07\x19\xe0\x29\xe7\x15\x00" + b"\x25\x01\x75\x01\x95\x08\x81\x02\x95\x01\x75\x08\x81\x03" + b"\x95\x05\x75\x01\x05\x08\x19\x01\x29\x05\x91\x02\x95\x01" + b"\x75\x03\x91\x03\x95\x06\x75\x08\x15\x00\x25\x65\x05\x07" + b"\x19\x00\x29\x65\x81\x00\xc0" + ) + _symlink(func_path, join(config_path, "hid.usb0")) + + +def _create_mouse(gadget_path: str, config_path: str) -> None: + # https://github.com/NicoHood/HID/blob/0835e6a/src/SingleReport/SingleAbsoluteMouse.cpp + # Репорт взят отсюда ^^^, но изменен диапазон значений координат перемещений. + # Автор предлагает использовать -32768...32767, но семерка почему-то не хочет работать + # с отрицательными значениями координат, как не хочет хавать 65536 и 32768. + # Так что мы ей скармливаем диапазон 0...32767, и передаем рукожопам из микрософта привет, + # потому что линуксы прекрасно работают с любыми двухбайтовыми диапазонами. + func_path = join(gadget_path, "functions/hid.usb1") # Mouse + _mkdir(func_path) + _write(join(func_path, "protocol"), "0") + _write(join(func_path, "subclass"), "0") + _write(join(func_path, "report_length"), "6") + _write_bytes( + join(func_path, "report_desc"), + b"\x05\x01\x09\x02\xA1\x01\x05\x09\x19\x01\x29\x08\x15\x00" + b"\x25\x01\x95\x08\x75\x01\x81\x02\x05\x01\x09\x30\x09\x31" + b"\x16\x00\x00\x26\xFF\x7F\x75\x10\x95\x02\x81\x02\x09\x38" + b"\x15\x81\x25\x7f\x75\x08\x95\x01\x81\x06\xc0" + ) + _symlink(func_path, join(config_path, "hid.usb1")) + + +def _create_msd(gadget_path: str, config_path: str) -> None: + func_path = join(gadget_path, "functions/mass_storage.usb0") + _mkdir(func_path) + _write(join(func_path, "stall"), "0") + _write(join(func_path, "lun.0/cdrom"), "1") + _write(join(func_path, "lun.0/ro"), "1") + _write(join(func_path, "lun.0/removable"), "1") + _write(join(func_path, "lun.0/nofua"), "0") + _symlink(func_path, join(config_path, "mass_storage.usb0")) + + +def _cmd_start(config: Section) -> None: # https://www.kernel.org/doc/Documentation/usb/gadget_configfs.txt # https://www.isticktoit.net/?p=1383 + logger = get_logger() + _check_config(config) udc = _find_udc(config.otg.udc) + logger.info("Creating gadget %r ...", config.otg.gadget) gadget_path = join("/sys/kernel/config/usb_gadget", config.otg.gadget) - mkdir(gadget_path) + _mkdir(gadget_path) _write(join(gadget_path, "idVendor"), f"0x{config.otg.vendor_id:X}") _write(join(gadget_path, "idProduct"), f"0x{config.otg.product_id:X}") @@ -84,96 +171,66 @@ def _cmd_start(config: Section) -> None: # pylint: disable=too-many-statements _write(join(gadget_path, "bcdUSB"), "0x0200") lang_path = join(gadget_path, "strings/0x409") - mkdir(lang_path) + _mkdir(lang_path) _write(join(lang_path, "manufacturer"), config.otg.manufacturer) _write(join(lang_path, "product"), config.otg.product) _write(join(lang_path, "serialnumber"), config.otg.serial_number) config_path = join(gadget_path, "configs/c.1") - mkdir(config_path) - mkdir(join(config_path, "strings/0x409")) + _mkdir(config_path) + _mkdir(join(config_path, "strings/0x409")) _write(join(config_path, "strings/0x409/configuration"), "Config 1: ECM network") _write(join(config_path, "MaxPower"), "250") if config.otg.acm.enabled: - func_path = join(gadget_path, "functions/acm.usb0") - mkdir(func_path) - symlink(func_path, join(config_path, "acm.usb0")) + logger.info("Required ACM") + _create_acm(gadget_path, config_path) if config.kvmd.hid.type == "otg": - func_path = join(gadget_path, "functions/hid.usb0") # Keyboard - mkdir(func_path) - _write(join(func_path, "protocol"), "1") - _write(join(func_path, "subclass"), "1") - _write(join(func_path, "report_length"), "8") - with open(join(func_path, "report_desc"), "wb") as report_file: - report_file.write( - b"\x05\x01\x09\x06\xa1\x01\x05\x07\x19\xe0\x29\xe7\x15\x00" - b"\x25\x01\x75\x01\x95\x08\x81\x02\x95\x01\x75\x08\x81\x03" - b"\x95\x05\x75\x01\x05\x08\x19\x01\x29\x05\x91\x02\x95\x01" - b"\x75\x03\x91\x03\x95\x06\x75\x08\x15\x00\x25\x65\x05\x07" - b"\x19\x00\x29\x65\x81\x00\xc0" - ) - symlink(func_path, join(config_path, "hid.usb0")) - - # https://github.com/NicoHood/HID/blob/0835e6a/src/SingleReport/SingleAbsoluteMouse.cpp - # Репорт взят отсюда ^^^, но изменен диапазон значений координат перемещений. - # Автор предлагает использовать -32768...32767, но семерка почему-то не хочет работать - # с отрицательными значениями координат, как не хочет хавать 65536 и 32768. - # Так что мы ей скармливаем диапазон 0...32767, и передаем рукожопам из микрософта привет, - # потому что линуксы прекрасно работают с любыми двухбайтовыми диапазонами. - func_path = join(gadget_path, "functions/hid.usb1") # Mouse - mkdir(func_path) - _write(join(func_path, "protocol"), "0") - _write(join(func_path, "subclass"), "0") - _write(join(func_path, "report_length"), "6") - with open(join(func_path, "report_desc"), "wb") as report_file: - report_file.write( - b"\x05\x01\x09\x02\xA1\x01\x05\x09\x19\x01\x29\x08" - b"\x15\x00\x25\x01\x95\x08\x75\x01\x81\x02\x05\x01\x09\x30" - b"\x09\x31\x16\x00\x00\x26\xFF\x7F\x75\x10\x95\x02\x81\x02" - b"\x09\x38\x15\x81\x25\x7f\x75\x08\x95\x01\x81\x06\xc0" - ) - symlink(func_path, join(config_path, "hid.usb1")) + logger.info("Required HID") + _create_keyboard(gadget_path, config_path) + _create_mouse(gadget_path, config_path) if config.kvmd.msd.type == "otg": - func_path = join(gadget_path, "functions/mass_storage.usb0") - mkdir(func_path) - _write(join(func_path, "stall"), "0") - _write(join(func_path, "lun.0/cdrom"), "1") - _write(join(func_path, "lun.0/ro"), "1") - _write(join(func_path, "lun.0/removable"), "1") - _write(join(func_path, "lun.0/nofua"), "0") - symlink(func_path, join(config_path, "mass_storage.usb0")) + logger.info("Required MSD") + _create_msd(gadget_path, config_path) + logger.info("Enabling the gadget ...") _write(join(gadget_path, "UDC"), udc) - time.sleep(config.otg.init_delay) + logger.info("Ready to work") + +# ===== def _cmd_stop(config: Section) -> None: # https://www.kernel.org/doc/Documentation/usb/gadget_configfs.txt + logger = get_logger() + _check_config(config) gadget_path = join("/sys/kernel/config/usb_gadget", config.otg.gadget) + logger.info("Disabling gadget %r ...", config.otg.gadget) _write(join(gadget_path, "UDC"), "") config_path = join(gadget_path, "configs/c.1") - for func in listdir(config_path): + for func in os.listdir(config_path): if re.search(r"\.usb\d+$", func): - unlink(join(config_path, func)) - rmdir(join(config_path, "strings/0x409")) - rmdir(config_path) + _unlink(join(config_path, func)) + _rmdir(join(config_path, "strings/0x409")) + _rmdir(config_path) funcs_path = join(gadget_path, "functions") - for func in listdir(funcs_path): + for func in os.listdir(funcs_path): if re.search(r"\.usb\d+$", func): - rmdir(join(funcs_path, func)) + _rmdir(join(funcs_path, func)) + + _rmdir(join(gadget_path, "strings/0x409")) + _rmdir(gadget_path) - rmdir(join(gadget_path, "strings/0x409")) - rmdir(gadget_path) + logger.info("Bye-bye") # ===== |