diff options
Diffstat (limited to 'kvmd/plugins')
-rw-r--r-- | kvmd/plugins/ugpio/otgconf.py (renamed from kvmd/plugins/ugpio/otgbind.py) | 51 |
1 files changed, 42 insertions, 9 deletions
diff --git a/kvmd/plugins/ugpio/otgbind.py b/kvmd/plugins/ugpio/otgconf.py index 4be96aeb..8b888c8e 100644 --- a/kvmd/plugins/ugpio/otgbind.py +++ b/kvmd/plugins/ugpio/otgconf.py @@ -34,6 +34,10 @@ from ...inotify import Inotify from ... import aiotools from ... import usb +from ...yamlconf import Section + +from ...validators.basic import valid_stripped_string_not_empty + from . import BaseUserGpioDriver @@ -44,19 +48,24 @@ class Plugin(BaseUserGpioDriver): instance_name: str, notifier: aiotools.AioNotifier, - udc: str, # XXX: Not from options, see /kvmd/apps/kvmd/__init__.py for details - gadget: str, # ditto + otg_config: Section, # XXX: Not from options, see /kvmd/apps/kvmd/__init__.py for details ) -> None: super().__init__(instance_name, notifier) - self.__udc = udc + self.__udc: str = otg_config.udc + self.__init_delay: float = otg_config.init_delay + gadget: str = otg_config.gadget self.__udc_path = usb.get_gadget_path(gadget, usb.G_UDC) + self.__functions_path = usb.get_gadget_path(gadget, usb.G_FUNCTIONS) + self.__profile_path = usb.get_gadget_path(gadget, usb.G_PROFILE) + + self.__lock = asyncio.Lock() @classmethod def get_pin_validator(cls) -> Callable[[Any], Any]: - return str + return valid_stripped_string_not_empty def prepare(self) -> None: self.__udc = usb.find_udc(self.__udc) @@ -74,6 +83,7 @@ class Plugin(BaseUserGpioDriver): with Inotify() as inotify: inotify.watch(os.path.dirname(self.__udc_path), InotifyMask.ALL_MODIFY_EVENTS) + inotify.watch(self.__profile_path, InotifyMask.ALL_MODIFY_EVENTS) await self._notifier.notify() while True: need_restart = False @@ -90,16 +100,39 @@ class Plugin(BaseUserGpioDriver): await self._notifier.notify() except Exception: logger.exception("Unexpected OTG-bind watcher error") + await asyncio.sleep(1) async def read(self, pin: str) -> bool: - _ = pin - with open(self.__udc_path) as udc_file: - return bool(udc_file.read().strip()) + if pin == "udc": + return self.__is_udc_enabled() + return os.path.exists(os.path.join(self.__profile_path, pin)) async def write(self, pin: str, state: bool) -> None: - _ = pin + async with self.__lock: + if pin == "udc": + self.__set_udc_enabled(state) + else: + if self.__is_udc_enabled(): + self.__set_udc_enabled(False) + try: + if state: + os.symlink( + os.path.join(self.__functions_path, pin), + os.path.join(self.__profile_path, pin), + ) + else: + os.unlink(os.path.join(self.__profile_path, pin)) + finally: + await asyncio.sleep(self.__init_delay) + self.__set_udc_enabled(True) + + def __set_udc_enabled(self, enabled: bool) -> None: with open(self.__udc_path, "w") as udc_file: - udc_file.write(self.__udc if state else "\n") + udc_file.write(self.__udc if enabled else "\n") + + def __is_udc_enabled(self) -> bool: + with open(self.__udc_path) as udc_file: + return bool(udc_file.read().strip()) def __str__(self) -> str: return f"GPIO({self._instance_name})" |