summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kvmd/apps/otg/__init__.py187
1 files changed, 122 insertions, 65 deletions
diff --git a/kvmd/apps/otg/__init__.py b/kvmd/apps/otg/__init__.py
index 2d5350ea..ed7666b4 100644
--- a/kvmd/apps/otg/__init__.py
+++ b/kvmd/apps/otg/__init__.py
@@ -24,16 +24,15 @@ import re
import time
import argparse
-from os import listdir
-from os import mkdir
-from os import symlink
-from os import rmdir
-from os import unlink
+import os
+
from os.path import join
from typing import List
from typing import Optional
+from ...logging import get_logger
+
from ...yamlconf import Section
from ...validators import ValidatorError
@@ -42,19 +41,47 @@ from .. import init
# =====
+def _mkdir(path: str) -> None:
+ get_logger().info("MKDIR --- %s", path)
+ os.mkdir(path)
+
+
+def _symlink(src: str, dest: str) -> None:
+ get_logger().info("SYMLINK - %s --> %s", dest, src)
+ os.symlink(src, dest)
+
+
+def _rmdir(path: str) -> None:
+ get_logger().info("RMDIR --- %s", path)
+ os.rmdir(path)
+
+
+def _unlink(path: str) -> None:
+ get_logger().info("RM ------ %s", path)
+ os.unlink(path)
+
+
def _write(path: str, text: str) -> None:
+ get_logger().info("WRITE --- %s", path)
with open(path, "w") as param_file:
param_file.write(text)
+def _write_bytes(path: str, data: bytes) -> None:
+ get_logger().info("WRITE --- %s", path)
+ with open(path, "wb") as param_file:
+ param_file.write(data)
+
+
def _find_udc(udc: str) -> str:
- udcs = sorted(listdir("/sys/class/udc"))
+ udcs = sorted(os.listdir("/sys/class/udc"))
if not udc:
if len(udcs) == 0:
raise RuntimeError("Can't find any UDC")
udc = udcs[0]
elif udc not in udcs:
raise RuntimeError(f"Can't find selected UDC: {udc}")
+ get_logger().info("Using UDC %s", udc)
return udc
@@ -67,16 +94,76 @@ def _check_config(config: Section) -> None:
raise RuntimeError("Nothing to do")
-def _cmd_start(config: Section) -> None: # pylint: disable=too-many-statements
+# =====
+def _create_acm(gadget_path: str, config_path: str) -> None:
+ func_path = join(gadget_path, "functions/acm.usb0")
+ _mkdir(func_path)
+ _symlink(func_path, join(config_path, "acm.usb0"))
+
+
+def _create_keyboard(gadget_path: str, config_path: str) -> None:
+ func_path = join(gadget_path, "functions/hid.usb0") # Keyboard
+ _mkdir(func_path)
+ _write(join(func_path, "protocol"), "1")
+ _write(join(func_path, "subclass"), "1")
+ _write(join(func_path, "report_length"), "8")
+ _write_bytes(
+ join(func_path, "report_desc"),
+ b"\x05\x01\x09\x06\xa1\x01\x05\x07\x19\xe0\x29\xe7\x15\x00"
+ b"\x25\x01\x75\x01\x95\x08\x81\x02\x95\x01\x75\x08\x81\x03"
+ b"\x95\x05\x75\x01\x05\x08\x19\x01\x29\x05\x91\x02\x95\x01"
+ b"\x75\x03\x91\x03\x95\x06\x75\x08\x15\x00\x25\x65\x05\x07"
+ b"\x19\x00\x29\x65\x81\x00\xc0"
+ )
+ _symlink(func_path, join(config_path, "hid.usb0"))
+
+
+def _create_mouse(gadget_path: str, config_path: str) -> None:
+ # https://github.com/NicoHood/HID/blob/0835e6a/src/SingleReport/SingleAbsoluteMouse.cpp
+ # Репорт взят отсюда ^^^, но изменен диапазон значений координат перемещений.
+ # Автор предлагает использовать -32768...32767, но семерка почему-то не хочет работать
+ # с отрицательными значениями координат, как не хочет хавать 65536 и 32768.
+ # Так что мы ей скармливаем диапазон 0...32767, и передаем рукожопам из микрософта привет,
+ # потому что линуксы прекрасно работают с любыми двухбайтовыми диапазонами.
+ func_path = join(gadget_path, "functions/hid.usb1") # Mouse
+ _mkdir(func_path)
+ _write(join(func_path, "protocol"), "0")
+ _write(join(func_path, "subclass"), "0")
+ _write(join(func_path, "report_length"), "6")
+ _write_bytes(
+ join(func_path, "report_desc"),
+ b"\x05\x01\x09\x02\xA1\x01\x05\x09\x19\x01\x29\x08\x15\x00"
+ b"\x25\x01\x95\x08\x75\x01\x81\x02\x05\x01\x09\x30\x09\x31"
+ b"\x16\x00\x00\x26\xFF\x7F\x75\x10\x95\x02\x81\x02\x09\x38"
+ b"\x15\x81\x25\x7f\x75\x08\x95\x01\x81\x06\xc0"
+ )
+ _symlink(func_path, join(config_path, "hid.usb1"))
+
+
+def _create_msd(gadget_path: str, config_path: str) -> None:
+ func_path = join(gadget_path, "functions/mass_storage.usb0")
+ _mkdir(func_path)
+ _write(join(func_path, "stall"), "0")
+ _write(join(func_path, "lun.0/cdrom"), "1")
+ _write(join(func_path, "lun.0/ro"), "1")
+ _write(join(func_path, "lun.0/removable"), "1")
+ _write(join(func_path, "lun.0/nofua"), "0")
+ _symlink(func_path, join(config_path, "mass_storage.usb0"))
+
+
+def _cmd_start(config: Section) -> None:
# https://www.kernel.org/doc/Documentation/usb/gadget_configfs.txt
# https://www.isticktoit.net/?p=1383
+ logger = get_logger()
+
_check_config(config)
udc = _find_udc(config.otg.udc)
+ logger.info("Creating gadget %r ...", config.otg.gadget)
gadget_path = join("/sys/kernel/config/usb_gadget", config.otg.gadget)
- mkdir(gadget_path)
+ _mkdir(gadget_path)
_write(join(gadget_path, "idVendor"), f"0x{config.otg.vendor_id:X}")
_write(join(gadget_path, "idProduct"), f"0x{config.otg.product_id:X}")
@@ -84,96 +171,66 @@ def _cmd_start(config: Section) -> None: # pylint: disable=too-many-statements
_write(join(gadget_path, "bcdUSB"), "0x0200")
lang_path = join(gadget_path, "strings/0x409")
- mkdir(lang_path)
+ _mkdir(lang_path)
_write(join(lang_path, "manufacturer"), config.otg.manufacturer)
_write(join(lang_path, "product"), config.otg.product)
_write(join(lang_path, "serialnumber"), config.otg.serial_number)
config_path = join(gadget_path, "configs/c.1")
- mkdir(config_path)
- mkdir(join(config_path, "strings/0x409"))
+ _mkdir(config_path)
+ _mkdir(join(config_path, "strings/0x409"))
_write(join(config_path, "strings/0x409/configuration"), "Config 1: ECM network")
_write(join(config_path, "MaxPower"), "250")
if config.otg.acm.enabled:
- func_path = join(gadget_path, "functions/acm.usb0")
- mkdir(func_path)
- symlink(func_path, join(config_path, "acm.usb0"))
+ logger.info("Required ACM")
+ _create_acm(gadget_path, config_path)
if config.kvmd.hid.type == "otg":
- func_path = join(gadget_path, "functions/hid.usb0") # Keyboard
- mkdir(func_path)
- _write(join(func_path, "protocol"), "1")
- _write(join(func_path, "subclass"), "1")
- _write(join(func_path, "report_length"), "8")
- with open(join(func_path, "report_desc"), "wb") as report_file:
- report_file.write(
- b"\x05\x01\x09\x06\xa1\x01\x05\x07\x19\xe0\x29\xe7\x15\x00"
- b"\x25\x01\x75\x01\x95\x08\x81\x02\x95\x01\x75\x08\x81\x03"
- b"\x95\x05\x75\x01\x05\x08\x19\x01\x29\x05\x91\x02\x95\x01"
- b"\x75\x03\x91\x03\x95\x06\x75\x08\x15\x00\x25\x65\x05\x07"
- b"\x19\x00\x29\x65\x81\x00\xc0"
- )
- symlink(func_path, join(config_path, "hid.usb0"))
-
- # https://github.com/NicoHood/HID/blob/0835e6a/src/SingleReport/SingleAbsoluteMouse.cpp
- # Репорт взят отсюда ^^^, но изменен диапазон значений координат перемещений.
- # Автор предлагает использовать -32768...32767, но семерка почему-то не хочет работать
- # с отрицательными значениями координат, как не хочет хавать 65536 и 32768.
- # Так что мы ей скармливаем диапазон 0...32767, и передаем рукожопам из микрософта привет,
- # потому что линуксы прекрасно работают с любыми двухбайтовыми диапазонами.
- func_path = join(gadget_path, "functions/hid.usb1") # Mouse
- mkdir(func_path)
- _write(join(func_path, "protocol"), "0")
- _write(join(func_path, "subclass"), "0")
- _write(join(func_path, "report_length"), "6")
- with open(join(func_path, "report_desc"), "wb") as report_file:
- report_file.write(
- b"\x05\x01\x09\x02\xA1\x01\x05\x09\x19\x01\x29\x08"
- b"\x15\x00\x25\x01\x95\x08\x75\x01\x81\x02\x05\x01\x09\x30"
- b"\x09\x31\x16\x00\x00\x26\xFF\x7F\x75\x10\x95\x02\x81\x02"
- b"\x09\x38\x15\x81\x25\x7f\x75\x08\x95\x01\x81\x06\xc0"
- )
- symlink(func_path, join(config_path, "hid.usb1"))
+ logger.info("Required HID")
+ _create_keyboard(gadget_path, config_path)
+ _create_mouse(gadget_path, config_path)
if config.kvmd.msd.type == "otg":
- func_path = join(gadget_path, "functions/mass_storage.usb0")
- mkdir(func_path)
- _write(join(func_path, "stall"), "0")
- _write(join(func_path, "lun.0/cdrom"), "1")
- _write(join(func_path, "lun.0/ro"), "1")
- _write(join(func_path, "lun.0/removable"), "1")
- _write(join(func_path, "lun.0/nofua"), "0")
- symlink(func_path, join(config_path, "mass_storage.usb0"))
+ logger.info("Required MSD")
+ _create_msd(gadget_path, config_path)
+ logger.info("Enabling the gadget ...")
_write(join(gadget_path, "UDC"), udc)
-
time.sleep(config.otg.init_delay)
+ logger.info("Ready to work")
+
+# =====
def _cmd_stop(config: Section) -> None:
# https://www.kernel.org/doc/Documentation/usb/gadget_configfs.txt
+ logger = get_logger()
+
_check_config(config)
gadget_path = join("/sys/kernel/config/usb_gadget", config.otg.gadget)
+ logger.info("Disabling gadget %r ...", config.otg.gadget)
_write(join(gadget_path, "UDC"), "")
config_path = join(gadget_path, "configs/c.1")
- for func in listdir(config_path):
+ for func in os.listdir(config_path):
if re.search(r"\.usb\d+$", func):
- unlink(join(config_path, func))
- rmdir(join(config_path, "strings/0x409"))
- rmdir(config_path)
+ _unlink(join(config_path, func))
+ _rmdir(join(config_path, "strings/0x409"))
+ _rmdir(config_path)
funcs_path = join(gadget_path, "functions")
- for func in listdir(funcs_path):
+ for func in os.listdir(funcs_path):
if re.search(r"\.usb\d+$", func):
- rmdir(join(funcs_path, func))
+ _rmdir(join(funcs_path, func))
+
+ _rmdir(join(gadget_path, "strings/0x409"))
+ _rmdir(gadget_path)
- rmdir(join(gadget_path, "strings/0x409"))
- rmdir(gadget_path)
+ logger.info("Bye-bye")
# =====