diff options
Diffstat (limited to 'kvmd/apps/otgconf')
-rw-r--r-- | kvmd/apps/otgconf/__init__.py | 92 |
1 files changed, 49 insertions, 43 deletions
diff --git a/kvmd/apps/otgconf/__init__.py b/kvmd/apps/otgconf/__init__.py index 96aa83f0..2ea40fca 100644 --- a/kvmd/apps/otgconf/__init__.py +++ b/kvmd/apps/otgconf/__init__.py @@ -24,6 +24,7 @@ import os import json import contextlib import argparse +import time from typing import List from typing import Generator @@ -37,47 +38,51 @@ from .. import init # ===== -def _udc_stopped(gadget: str, udc: str) -> Generator[None, None, None]: - udc = usb.find_udc(udc) - udc_path = usb.get_gadget_path(gadget, usb.G_UDC) - with open(udc_path) as udc_file: - enabled = bool(udc_file.read().strip()) - if enabled: - with open(udc_path, "w") as udc_file: - udc_file.write("\n") - try: - yield - finally: +class _GadgetControl: + def __init__(self, meta_path: str, gadget: str, udc: str, init_delay: float) -> None: + self.__meta_path = meta_path + self.__gadget = gadget + self.__udc = udc + self.__init_delay = init_delay + + @contextlib.contextmanager + def __udc_stopped(self) -> Generator[None, None, None]: + udc = usb.find_udc(self.__udc) + udc_path = usb.get_gadget_path(self.__gadget, usb.G_UDC) + with open(udc_path) as udc_file: + enabled = bool(udc_file.read().strip()) if enabled: with open(udc_path, "w") as udc_file: - udc_file.write(udc) - - -def _enable_function(gadget: str, udc: str, func: str) -> None: - with _udc_stopped(gadget, udc): - os.symlink( - usb.get_gadget_path(gadget, usb.G_FUNCTIONS, func), - usb.get_gadget_path(gadget, usb.G_PROFILE, func), - ) - - -def _disable_function(gadget: str, udc: str, func: str) -> None: - with _udc_stopped(gadget, udc): - os.unlink(usb.get_gadget_path(gadget, usb.G_PROFILE, func)) - - -def _list_functions(gadget: str, meta_path: str) -> None: - for meta_name in sorted(os.listdir(meta_path)): - with open(os.path.join(meta_path, meta_name)) as meta_file: - meta = json.loads(meta_file.read()) - enabled = os.path.exists(usb.get_gadget_path(gadget, usb.G_PROFILE, meta["func"])) - print(f"{'+' if enabled else '-'} {meta['func']} # {meta['name']}") - - -def _reset_gadget(gadget: str, udc: str) -> None: - with _udc_stopped(gadget, udc): - pass + udc_file.write("\n") + try: + yield + finally: + if enabled: + time.sleep(self.__init_delay) + with open(udc_path, "w") as udc_file: + udc_file.write(udc) + + def enable_function(self, func: str) -> None: + with self.__udc_stopped(): + os.symlink( + usb.get_gadget_path(self.__gadget, usb.G_FUNCTIONS, func), + usb.get_gadget_path(self.__gadget, usb.G_PROFILE, func), + ) + + def disable_function(self, func: str) -> None: + with self.__udc_stopped(): + os.unlink(usb.get_gadget_path(self.__gadget, usb.G_PROFILE, func)) + + def list_functions(self) -> None: + for meta_name in sorted(os.listdir(self.__meta_path)): + with open(os.path.join(self.__meta_path, meta_name)) as meta_file: + meta = json.loads(meta_file.read()) + enabled = os.path.exists(usb.get_gadget_path(self.__gadget, usb.G_PROFILE, meta["func"])) + print(f"{'+' if enabled else '-'} {meta['func']} # {meta['name']}") + + def reset(self) -> None: + with self.__udc_stopped(): + pass # ===== @@ -99,11 +104,12 @@ def main(argv: Optional[List[str]]=None) -> None: parser.add_argument("-r", "--reset-gadget", action="store_true", help="Reset gadget") options = parser.parse_args(argv[1:]) + gc = _GadgetControl(config.otg.meta, config.otg.gadget, config.otg.udc, config.otg.init_delay) if options.reset_gadget: - _reset_gadget(config.otg.gadget, config.otg.udc) + gc.reset() return elif options.enable_function: - _enable_function(config.otg.gadget, config.otg.udc, options.enable_function) + gc.enable_function(options.enable_function) elif options.disable_function: - _disable_function(config.otg.gadget, config.otg.udc, options.disable_function) - _list_functions(config.otg.gadget, config.otg.meta) + gc.disable_function(options.disable_function) + gc.list_functions() |