diff options
author | Maxim Devaev <[email protected]> | 2022-04-02 10:19:29 +0300 |
---|---|---|
committer | Maxim Devaev <[email protected]> | 2022-04-02 10:39:50 +0300 |
commit | bd8984dd0633d953f09a9f09e0d6bf9999e9c4db (patch) | |
tree | a4afcc37b6118671ee04992ab5f3ab3750d8c2ef /kvmd | |
parent | f1e9f33c137ef7f46563b69e892959435425b498 (diff) |
otgconf gpio plugin
Diffstat (limited to 'kvmd')
-rw-r--r-- | kvmd/apps/kvmd/__init__.py | 2 | ||||
-rw-r--r-- | kvmd/apps/kvmd/ugpio.py | 4 | ||||
-rw-r--r-- | kvmd/apps/otg/__init__.py | 3 | ||||
-rw-r--r-- | kvmd/apps/otgconf/__init__.py | 60 | ||||
-rw-r--r-- | kvmd/plugins/ugpio/otgconf.py (renamed from kvmd/plugins/ugpio/otgbind.py) | 51 | ||||
-rw-r--r-- | kvmd/validators/ugpio.py | 2 |
6 files changed, 97 insertions, 25 deletions
diff --git a/kvmd/apps/kvmd/__init__.py b/kvmd/apps/kvmd/__init__.py index 633e3346..3de764fd 100644 --- a/kvmd/apps/kvmd/__init__.py +++ b/kvmd/apps/kvmd/__init__.py @@ -86,7 +86,7 @@ def main(argv: Optional[List[str]]=None) -> None: ), info_manager=InfoManager(global_config), log_reader=LogReader(), - user_gpio=UserGpio(config.gpio, global_config.otg.udc, global_config.otg.gadget), + user_gpio=UserGpio(config.gpio, global_config.otg), ocr=TesseractOcr(**config.ocr._unpack()), hid=hid, diff --git a/kvmd/apps/kvmd/ugpio.py b/kvmd/apps/kvmd/ugpio.py index 04d72e28..458710dc 100644 --- a/kvmd/apps/kvmd/ugpio.py +++ b/kvmd/apps/kvmd/ugpio.py @@ -231,7 +231,7 @@ class _GpioOutput: # pylint: disable=too-many-instance-attributes # ===== class UserGpio: - def __init__(self, config: Section, udc: str, gadget: str) -> None: + def __init__(self, config: Section, otg_config: Section) -> None: self.__view = config.view self.__notifier = aiotools.AioNotifier() @@ -241,7 +241,7 @@ class UserGpio: instance_name=driver, notifier=self.__notifier, **drv_config._unpack(ignore=["instance_name", "notifier", "type"]), - **({"udc": udc, "gadget": gadget} if drv_config.type == "otgbind" else {}), # Hack + **({"otg_config": otg_config} if drv_config.type == "otgconf" else {}), # Hack ) for (driver, drv_config) in tools.sorted_kvs(config.drivers) } diff --git a/kvmd/apps/otg/__init__.py b/kvmd/apps/otg/__init__.py index c073ed91..1d33b98e 100644 --- a/kvmd/apps/otg/__init__.py +++ b/kvmd/apps/otg/__init__.py @@ -270,8 +270,9 @@ def _cmd_start(config: Section) -> None: # pylint: disable=too-many-statements _write(join(gadget_path, "UDC"), udc) time.sleep(config.otg.init_delay) - logger.info("Setting UDC permissions ...") + logger.info("Setting up permissions ...") _chown(join(gadget_path, "UDC"), config.otg.user) + _chown(profile_path, config.otg.user) logger.info("Ready to work") diff --git a/kvmd/apps/otgconf/__init__.py b/kvmd/apps/otgconf/__init__.py index 2ea40fca..61701ccf 100644 --- a/kvmd/apps/otgconf/__init__.py +++ b/kvmd/apps/otgconf/__init__.py @@ -27,9 +27,12 @@ import argparse import time from typing import List +from typing import Dict from typing import Generator from typing import Optional +import yaml + from ...validators.basic import valid_stripped_string_not_empty from ... import usb @@ -57,10 +60,14 @@ class _GadgetControl: try: yield finally: - if enabled: - time.sleep(self.__init_delay) - with open(udc_path, "w") as udc_file: - udc_file.write(udc) + time.sleep(self.__init_delay) + with open(udc_path, "w") as udc_file: + udc_file.write(udc) + + def __read_metas(self) -> Generator[Dict, None, None]: + for meta_name in sorted(os.listdir(self.__meta_path)): + with open(os.path.join(self.__meta_path, meta_name)) as meta_file: + yield json.loads(meta_file.read()) def enable_function(self, func: str) -> None: with self.__udc_stopped(): @@ -74,12 +81,30 @@ class _GadgetControl: os.unlink(usb.get_gadget_path(self.__gadget, usb.G_PROFILE, func)) def list_functions(self) -> None: - for meta_name in sorted(os.listdir(self.__meta_path)): - with open(os.path.join(self.__meta_path, meta_name)) as meta_file: - meta = json.loads(meta_file.read()) + for meta in self.__read_metas(): enabled = os.path.exists(usb.get_gadget_path(self.__gadget, usb.G_PROFILE, meta["func"])) print(f"{'+' if enabled else '-'} {meta['func']} # {meta['name']}") + def make_gpio_config(self) -> None: + config = { + "drivers": {"otgconf": {"type": "otgconf"}}, + "scheme": {}, + "view": {"table": []}, + } + for meta in self.__read_metas(): + config["scheme"][meta["func"]] = { # type: ignore + "driver": "otgconf", + "pin": meta["func"], + "mode": "output", + "pulse": {"delay": 0}, + } + config["view"]["table"].append([ # type: ignore + "#" + meta["name"], + "#" + meta["func"], + meta["func"], + ]) + print(yaml.dump({"kvmd": {"gpio": config}}, indent=4)) + def reset(self) -> None: with self.__udc_stopped(): pass @@ -102,14 +127,27 @@ def main(argv: Optional[List[str]]=None) -> None: parser.add_argument("-d", "--disable-function", type=valid_stripped_string_not_empty, metavar="<name>", help="Disable function") parser.add_argument("-r", "--reset-gadget", action="store_true", help="Reset gadget") + parser.add_argument("--make-gpio-config", action="store_true") options = parser.parse_args(argv[1:]) gc = _GadgetControl(config.otg.meta, config.otg.gadget, config.otg.udc, config.otg.init_delay) - if options.reset_gadget: - gc.reset() - return + + if options.list_functions: + gc.list_functions() + elif options.enable_function: gc.enable_function(options.enable_function) + gc.list_functions() + elif options.disable_function: gc.disable_function(options.disable_function) - gc.list_functions() + gc.list_functions() + + elif options.reset_gadget: + gc.reset() + + elif options.make_gpio_config: + gc.make_gpio_config() + + else: + gc.list_functions() diff --git a/kvmd/plugins/ugpio/otgbind.py b/kvmd/plugins/ugpio/otgconf.py index 4be96aeb..8b888c8e 100644 --- a/kvmd/plugins/ugpio/otgbind.py +++ b/kvmd/plugins/ugpio/otgconf.py @@ -34,6 +34,10 @@ from ...inotify import Inotify from ... import aiotools from ... import usb +from ...yamlconf import Section + +from ...validators.basic import valid_stripped_string_not_empty + from . import BaseUserGpioDriver @@ -44,19 +48,24 @@ class Plugin(BaseUserGpioDriver): instance_name: str, notifier: aiotools.AioNotifier, - udc: str, # XXX: Not from options, see /kvmd/apps/kvmd/__init__.py for details - gadget: str, # ditto + otg_config: Section, # XXX: Not from options, see /kvmd/apps/kvmd/__init__.py for details ) -> None: super().__init__(instance_name, notifier) - self.__udc = udc + self.__udc: str = otg_config.udc + self.__init_delay: float = otg_config.init_delay + gadget: str = otg_config.gadget self.__udc_path = usb.get_gadget_path(gadget, usb.G_UDC) + self.__functions_path = usb.get_gadget_path(gadget, usb.G_FUNCTIONS) + self.__profile_path = usb.get_gadget_path(gadget, usb.G_PROFILE) + + self.__lock = asyncio.Lock() @classmethod def get_pin_validator(cls) -> Callable[[Any], Any]: - return str + return valid_stripped_string_not_empty def prepare(self) -> None: self.__udc = usb.find_udc(self.__udc) @@ -74,6 +83,7 @@ class Plugin(BaseUserGpioDriver): with Inotify() as inotify: inotify.watch(os.path.dirname(self.__udc_path), InotifyMask.ALL_MODIFY_EVENTS) + inotify.watch(self.__profile_path, InotifyMask.ALL_MODIFY_EVENTS) await self._notifier.notify() while True: need_restart = False @@ -90,16 +100,39 @@ class Plugin(BaseUserGpioDriver): await self._notifier.notify() except Exception: logger.exception("Unexpected OTG-bind watcher error") + await asyncio.sleep(1) async def read(self, pin: str) -> bool: - _ = pin - with open(self.__udc_path) as udc_file: - return bool(udc_file.read().strip()) + if pin == "udc": + return self.__is_udc_enabled() + return os.path.exists(os.path.join(self.__profile_path, pin)) async def write(self, pin: str, state: bool) -> None: - _ = pin + async with self.__lock: + if pin == "udc": + self.__set_udc_enabled(state) + else: + if self.__is_udc_enabled(): + self.__set_udc_enabled(False) + try: + if state: + os.symlink( + os.path.join(self.__functions_path, pin), + os.path.join(self.__profile_path, pin), + ) + else: + os.unlink(os.path.join(self.__profile_path, pin)) + finally: + await asyncio.sleep(self.__init_delay) + self.__set_udc_enabled(True) + + def __set_udc_enabled(self, enabled: bool) -> None: with open(self.__udc_path, "w") as udc_file: - udc_file.write(self.__udc if state else "\n") + udc_file.write(self.__udc if enabled else "\n") + + def __is_udc_enabled(self) -> bool: + with open(self.__udc_path) as udc_file: + return bool(udc_file.read().strip()) def __str__(self) -> str: return f"GPIO({self._instance_name})" diff --git a/kvmd/validators/ugpio.py b/kvmd/validators/ugpio.py index e70e2590..6f32a26f 100644 --- a/kvmd/validators/ugpio.py +++ b/kvmd/validators/ugpio.py @@ -42,7 +42,7 @@ def valid_ugpio_driver(arg: Any, variants: Optional[Set[str]]=None) -> str: def valid_ugpio_channel(arg: Any) -> str: name = "GPIO channel" - return check_len(check_re_match(arg, name, r"^[a-zA-Z_][a-zA-Z0-9_-]*$"), name, 255) + return check_len(check_re_match(arg, name, r"^[a-zA-Z_][a-zA-Z0-9_.-]*$"), name, 255) def valid_ugpio_mode(arg: Any, variants: Set[str]) -> str: |