summaryrefslogtreecommitdiff
path: root/kvmd/apps/otg
diff options
context:
space:
mode:
authorMaxim Devaev <[email protected]>2022-03-31 08:27:55 +0300
committerMaxim Devaev <[email protected]>2022-03-31 08:27:55 +0300
commit686d6f7c2c201f8ae8d05bfd7d3ff5716bb48f5c (patch)
treed70ef9cc35934ef20020eaeca4a665c2c689d103 /kvmd/apps/otg
parent17412be3fead0e24290f5f704e964d3787e6a889 (diff)
refactoring
Diffstat (limited to 'kvmd/apps/otg')
-rw-r--r--kvmd/apps/otg/__init__.py151
1 files changed, 72 insertions, 79 deletions
diff --git a/kvmd/apps/otg/__init__.py b/kvmd/apps/otg/__init__.py
index 428f20ab..e5c04bdc 100644
--- a/kvmd/apps/otg/__init__.py
+++ b/kvmd/apps/otg/__init__.py
@@ -104,78 +104,69 @@ def _check_config(config: Section) -> None:
# =====
-def _create_serial(gadget_path: str, config_path: str) -> None:
- func_path = join(gadget_path, "functions/acm.usb0")
- _mkdir(func_path)
- _symlink(func_path, join(config_path, "acm.usb0"))
-
-
-def _create_ethernet(gadget_path: str, config_path: str, driver: str, host_mac: str, kvm_mac: str) -> None:
- if host_mac and kvm_mac and host_mac == kvm_mac:
- raise RuntimeError("Ethernet host_mac should not be equal to kvm_mac")
- real_driver = driver
- if driver == "rndis5":
- real_driver = "rndis"
- func_path = join(gadget_path, f"functions/{real_driver}.usb0")
- _mkdir(func_path)
- if host_mac:
- _write(join(func_path, "host_addr"), host_mac)
- if kvm_mac:
- _write(join(func_path, "dev_addr"), kvm_mac)
- if driver in ["ncm", "rndis"]:
- _write(join(gadget_path, "os_desc/use"), "1")
- _write(join(gadget_path, "os_desc/b_vendor_code"), "0xCD")
- _write(join(gadget_path, "os_desc/qw_sign"), "MSFT100")
- if driver == "ncm":
- _write(join(func_path, "os_desc/interface.ncm/compatible_id"), "WINNCM")
- elif driver == "rndis":
- # On Windows 7 and later, the RNDIS 5.1 driver would be used by default,
- # but it does not work very well. The RNDIS 6.0 driver works better.
- # In order to get this driver to load automatically, we have to use
- # a Microsoft-specific extension of USB.
- _write(join(func_path, "os_desc/interface.rndis/compatible_id"), "RNDIS")
- _write(join(func_path, "os_desc/interface.rndis/sub_compatible_id"), "5162001")
- _symlink(config_path, join(gadget_path, "os_desc/c.1"))
- _symlink(func_path, join(config_path, f"{real_driver}.usb0"))
-
-
-def _create_hid(gadget_path: str, config_path: str, instance: int, remote_wakeup: bool, hid: Hid) -> None:
- func_path = join(gadget_path, f"functions/hid.usb{instance}")
- _mkdir(func_path)
- _write(join(func_path, "no_out_endpoint"), "1", optional=True)
- if remote_wakeup:
- _write(join(func_path, "wakeup_on_write"), "1", optional=True)
- _write(join(func_path, "protocol"), str(hid.protocol))
- _write(join(func_path, "subclass"), str(hid.subclass))
- _write(join(func_path, "report_length"), str(hid.report_length))
- _write_bytes(join(func_path, "report_desc"), hid.report_descriptor)
- _symlink(func_path, join(config_path, f"hid.usb{instance}"))
-
-
-def _create_msd(
- gadget_path: str,
- config_path: str,
- instance: int,
- user: str,
- stall: bool,
- cdrom: bool,
- rw: bool,
- removable: bool,
- fua: bool,
-) -> None:
-
- func_path = join(gadget_path, f"functions/mass_storage.usb{instance}")
- _mkdir(func_path)
- _write(join(func_path, "stall"), str(int(stall)))
- _write(join(func_path, "lun.0/cdrom"), str(int(cdrom)))
- _write(join(func_path, "lun.0/ro"), str(int(not rw)))
- _write(join(func_path, "lun.0/removable"), str(int(removable)))
- _write(join(func_path, "lun.0/nofua"), str(int(not fua)))
- if user != "root":
- _chown(join(func_path, "lun.0/cdrom"), user)
- _chown(join(func_path, "lun.0/ro"), user)
- _chown(join(func_path, "lun.0/file"), user)
- _symlink(func_path, join(config_path, f"mass_storage.usb{instance}"))
+class _GadgetConfig:
+ def __init__(self, gadget_path: str, config_path: str) -> None:
+ self.__gadget_path = gadget_path
+ self.__config_path = config_path
+
+ def create_serial(self) -> None:
+ func_path = join(self.__gadget_path, "functions/acm.usb0")
+ _mkdir(func_path)
+ _symlink(func_path, join(self.__config_path, "acm.usb0"))
+
+ def create_ethernet(self, driver: str, host_mac: str, kvm_mac: str) -> None:
+ if host_mac and kvm_mac and host_mac == kvm_mac:
+ raise RuntimeError("Ethernet host_mac should not be equal to kvm_mac")
+ real_driver = driver
+ if driver == "rndis5":
+ real_driver = "rndis"
+ func_path = join(self.__gadget_path, f"functions/{real_driver}.usb0")
+ _mkdir(func_path)
+ if host_mac:
+ _write(join(func_path, "host_addr"), host_mac)
+ if kvm_mac:
+ _write(join(func_path, "dev_addr"), kvm_mac)
+ if driver in ["ncm", "rndis"]:
+ _write(join(self.__gadget_path, "os_desc/use"), "1")
+ _write(join(self.__gadget_path, "os_desc/b_vendor_code"), "0xCD")
+ _write(join(self.__gadget_path, "os_desc/qw_sign"), "MSFT100")
+ if driver == "ncm":
+ _write(join(func_path, "os_desc/interface.ncm/compatible_id"), "WINNCM")
+ elif driver == "rndis":
+ # On Windows 7 and later, the RNDIS 5.1 driver would be used by default,
+ # but it does not work very well. The RNDIS 6.0 driver works better.
+ # In order to get this driver to load automatically, we have to use
+ # a Microsoft-specific extension of USB.
+ _write(join(func_path, "os_desc/interface.rndis/compatible_id"), "RNDIS")
+ _write(join(func_path, "os_desc/interface.rndis/sub_compatible_id"), "5162001")
+ _symlink(self.__config_path, join(self.__gadget_path, "os_desc/c.1"))
+ _symlink(func_path, join(self.__config_path, f"{real_driver}.usb0"))
+
+ def create_hid(self, instance: int, remote_wakeup: bool, hid: Hid) -> None:
+ func_path = join(self.__gadget_path, f"functions/hid.usb{instance}")
+ _mkdir(func_path)
+ _write(join(func_path, "no_out_endpoint"), "1", optional=True)
+ if remote_wakeup:
+ _write(join(func_path, "wakeup_on_write"), "1", optional=True)
+ _write(join(func_path, "protocol"), str(hid.protocol))
+ _write(join(func_path, "subclass"), str(hid.subclass))
+ _write(join(func_path, "report_length"), str(hid.report_length))
+ _write_bytes(join(func_path, "report_desc"), hid.report_descriptor)
+ _symlink(func_path, join(self.__config_path, f"hid.usb{instance}"))
+
+ def create_msd(self, instance: int, user: str, stall: bool, cdrom: bool, rw: bool, removable: bool, fua: bool) -> None:
+ func_path = join(self.__gadget_path, f"functions/mass_storage.usb{instance}")
+ _mkdir(func_path)
+ _write(join(func_path, "stall"), str(int(stall)))
+ _write(join(func_path, "lun.0/cdrom"), str(int(cdrom)))
+ _write(join(func_path, "lun.0/ro"), str(int(not rw)))
+ _write(join(func_path, "lun.0/removable"), str(int(removable)))
+ _write(join(func_path, "lun.0/nofua"), str(int(not fua)))
+ if user != "root":
+ _chown(join(func_path, "lun.0/cdrom"), user)
+ _chown(join(func_path, "lun.0/ro"), user)
+ _chown(join(func_path, "lun.0/file"), user)
+ _symlink(func_path, join(self.__config_path, f"mass_storage.usb{instance}"))
def _cmd_start(config: Section) -> None: # pylint: disable=too-many-statements
@@ -221,36 +212,38 @@ def _cmd_start(config: Section) -> None: # pylint: disable=too-many-statements
# XXX: Should we use MaxPower=100 with Remote Wakeup?
_write(join(config_path, "bmAttributes"), "0xA0")
+ gc = _GadgetConfig(gadget_path, config_path)
+
if config.otg.devices.serial.enabled:
logger.info("===== Serial =====")
- _create_serial(gadget_path, config_path)
+ gc.create_serial()
if config.otg.devices.ethernet.enabled:
logger.info("===== Ethernet =====")
- _create_ethernet(gadget_path, config_path, **config.otg.devices.ethernet._unpack(ignore=["enabled"]))
+ gc.create_ethernet(**config.otg.devices.ethernet._unpack(ignore=["enabled"]))
if config.kvmd.hid.type == "otg":
logger.info("===== HID-Keyboard =====")
- _create_hid(gadget_path, config_path, 0, config.otg.remote_wakeup, make_keyboard_hid())
+ gc.create_hid(0, config.otg.remote_wakeup, make_keyboard_hid())
logger.info("===== HID-Mouse =====")
- _create_hid(gadget_path, config_path, 1, config.otg.remote_wakeup, make_mouse_hid(
+ gc.create_hid(1, config.otg.remote_wakeup, make_mouse_hid(
absolute=config.kvmd.hid.mouse.absolute,
horizontal_wheel=config.kvmd.hid.mouse.horizontal_wheel,
))
if config.kvmd.hid.mouse_alt.device:
logger.info("===== HID-Mouse-Alt =====")
- _create_hid(gadget_path, config_path, 2, config.otg.remote_wakeup, make_mouse_hid(
+ gc.create_hid(2, config.otg.remote_wakeup, make_mouse_hid(
absolute=(not config.kvmd.hid.mouse.absolute),
horizontal_wheel=config.kvmd.hid.mouse_alt.horizontal_wheel,
))
if config.kvmd.msd.type == "otg":
logger.info("===== MSD =====")
- _create_msd(gadget_path, config_path, 0, config.otg.user, **config.otg.devices.msd.default._unpack())
+ gc.create_msd(0, config.otg.user, **config.otg.devices.msd.default._unpack())
if config.otg.devices.drives.enabled:
for instance in range(config.otg.devices.drives.count):
logger.info("===== MSD Extra: %d =====", config.otg.devices.drives.count)
- _create_msd(gadget_path, config_path, instance + 1, "root", **config.otg.devices.drives.default._unpack())
+ gc.create_msd(instance + 1, "root", **config.otg.devices.drives.default._unpack())
logger.info("===== Preparing complete =====")