summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kvmd/apps/otgconf/__init__.py92
1 files changed, 49 insertions, 43 deletions
diff --git a/kvmd/apps/otgconf/__init__.py b/kvmd/apps/otgconf/__init__.py
index 96aa83f0..2ea40fca 100644
--- a/kvmd/apps/otgconf/__init__.py
+++ b/kvmd/apps/otgconf/__init__.py
@@ -24,6 +24,7 @@ import os
import json
import contextlib
import argparse
+import time
from typing import List
from typing import Generator
@@ -37,47 +38,51 @@ from .. import init
# =====
-def _udc_stopped(gadget: str, udc: str) -> Generator[None, None, None]:
- udc = usb.find_udc(udc)
- udc_path = usb.get_gadget_path(gadget, usb.G_UDC)
- with open(udc_path) as udc_file:
- enabled = bool(udc_file.read().strip())
- if enabled:
- with open(udc_path, "w") as udc_file:
- udc_file.write("\n")
- try:
- yield
- finally:
+class _GadgetControl:
+ def __init__(self, meta_path: str, gadget: str, udc: str, init_delay: float) -> None:
+ self.__meta_path = meta_path
+ self.__gadget = gadget
+ self.__udc = udc
+ self.__init_delay = init_delay
+
+ @contextlib.contextmanager
+ def __udc_stopped(self) -> Generator[None, None, None]:
+ udc = usb.find_udc(self.__udc)
+ udc_path = usb.get_gadget_path(self.__gadget, usb.G_UDC)
+ with open(udc_path) as udc_file:
+ enabled = bool(udc_file.read().strip())
if enabled:
with open(udc_path, "w") as udc_file:
- udc_file.write(udc)
-
-
-def _enable_function(gadget: str, udc: str, func: str) -> None:
- with _udc_stopped(gadget, udc):
- os.symlink(
- usb.get_gadget_path(gadget, usb.G_FUNCTIONS, func),
- usb.get_gadget_path(gadget, usb.G_PROFILE, func),
- )
-
-
-def _disable_function(gadget: str, udc: str, func: str) -> None:
- with _udc_stopped(gadget, udc):
- os.unlink(usb.get_gadget_path(gadget, usb.G_PROFILE, func))
-
-
-def _list_functions(gadget: str, meta_path: str) -> None:
- for meta_name in sorted(os.listdir(meta_path)):
- with open(os.path.join(meta_path, meta_name)) as meta_file:
- meta = json.loads(meta_file.read())
- enabled = os.path.exists(usb.get_gadget_path(gadget, usb.G_PROFILE, meta["func"]))
- print(f"{'+' if enabled else '-'} {meta['func']} # {meta['name']}")
-
-
-def _reset_gadget(gadget: str, udc: str) -> None:
- with _udc_stopped(gadget, udc):
- pass
+ udc_file.write("\n")
+ try:
+ yield
+ finally:
+ if enabled:
+ time.sleep(self.__init_delay)
+ with open(udc_path, "w") as udc_file:
+ udc_file.write(udc)
+
+ def enable_function(self, func: str) -> None:
+ with self.__udc_stopped():
+ os.symlink(
+ usb.get_gadget_path(self.__gadget, usb.G_FUNCTIONS, func),
+ usb.get_gadget_path(self.__gadget, usb.G_PROFILE, func),
+ )
+
+ def disable_function(self, func: str) -> None:
+ with self.__udc_stopped():
+ os.unlink(usb.get_gadget_path(self.__gadget, usb.G_PROFILE, func))
+
+ def list_functions(self) -> None:
+ for meta_name in sorted(os.listdir(self.__meta_path)):
+ with open(os.path.join(self.__meta_path, meta_name)) as meta_file:
+ meta = json.loads(meta_file.read())
+ enabled = os.path.exists(usb.get_gadget_path(self.__gadget, usb.G_PROFILE, meta["func"]))
+ print(f"{'+' if enabled else '-'} {meta['func']} # {meta['name']}")
+
+ def reset(self) -> None:
+ with self.__udc_stopped():
+ pass
# =====
@@ -99,11 +104,12 @@ def main(argv: Optional[List[str]]=None) -> None:
parser.add_argument("-r", "--reset-gadget", action="store_true", help="Reset gadget")
options = parser.parse_args(argv[1:])
+ gc = _GadgetControl(config.otg.meta, config.otg.gadget, config.otg.udc, config.otg.init_delay)
if options.reset_gadget:
- _reset_gadget(config.otg.gadget, config.otg.udc)
+ gc.reset()
return
elif options.enable_function:
- _enable_function(config.otg.gadget, config.otg.udc, options.enable_function)
+ gc.enable_function(options.enable_function)
elif options.disable_function:
- _disable_function(config.otg.gadget, config.otg.udc, options.disable_function)
- _list_functions(config.otg.gadget, config.otg.meta)
+ gc.disable_function(options.disable_function)
+ gc.list_functions()