diff options
Diffstat (limited to 'kvmd')
-rw-r--r-- | kvmd/apps/kvmd/ugpio.py | 10 | ||||
-rw-r--r-- | kvmd/plugins/ugpio/__init__.py | 2 | ||||
-rw-r--r-- | kvmd/plugins/ugpio/gpio.py | 5 | ||||
-rw-r--r-- | kvmd/plugins/ugpio/hidrelay.py | 69 |
4 files changed, 49 insertions, 37 deletions
diff --git a/kvmd/apps/kvmd/ugpio.py b/kvmd/apps/kvmd/ugpio.py index c51b4c81..2cce6a51 100644 --- a/kvmd/apps/kvmd/ugpio.py +++ b/kvmd/apps/kvmd/ugpio.py @@ -117,7 +117,6 @@ class _GpioOutput: # pylint: disable=too-many-instance-attributes self.__channel = channel self.__pin: int = config.pin self.__inverted: bool = config.inverted - self.__initial: Optional[bool] = config.initial self.__switch: bool = config.switch @@ -160,13 +159,6 @@ class _GpioOutput: # pylint: disable=too-many-instance-attributes "busy": busy, } - def cleanup(self) -> None: - if self.__initial is not None: - try: - self.__driver.write(self.__pin, (self.__initial ^ self.__inverted)) - except Exception: - get_logger().exception("Can't cleanup %s", self) - async def switch(self, state: bool) -> bool: if not self.__switch: raise GpioSwitchNotSupported() @@ -278,8 +270,6 @@ class UserGpio: ]) async def cleanup(self) -> None: - for gout in self.__outputs.values(): - gout.cleanup() for driver in self.__drivers.values(): try: driver.cleanup() diff --git a/kvmd/plugins/ugpio/__init__.py b/kvmd/plugins/ugpio/__init__.py index 96195d30..1e7725e8 100644 --- a/kvmd/plugins/ugpio/__init__.py +++ b/kvmd/plugins/ugpio/__init__.py @@ -74,7 +74,7 @@ class BaseUserGpioDriver(BasePlugin): raise NotImplementedError def cleanup(self) -> None: - pass + raise NotImplementedError def read(self, pin: int) -> bool: raise NotImplementedError diff --git a/kvmd/plugins/ugpio/gpio.py b/kvmd/plugins/ugpio/gpio.py index 96c3eee7..90426e13 100644 --- a/kvmd/plugins/ugpio/gpio.py +++ b/kvmd/plugins/ugpio/gpio.py @@ -83,6 +83,11 @@ class Plugin(BaseUserGpioDriver): assert self.__reader await self.__reader.poll() + def cleanup(self) -> None: + for (pin, initial) in self.__output_pins.items(): + if initial is not None: + gpio.write(pin, initial) + def read(self, pin: int) -> bool: return gpio.read(pin) diff --git a/kvmd/plugins/ugpio/hidrelay.py b/kvmd/plugins/ugpio/hidrelay.py index c024087c..e9182303 100644 --- a/kvmd/plugins/ugpio/hidrelay.py +++ b/kvmd/plugins/ugpio/hidrelay.py @@ -81,19 +81,19 @@ class Plugin(BaseUserGpioDriver): def prepare(self) -> None: logger = get_logger(0) - logger.info("Initializing %s ...", self) + logger.info("Probing driver %s ...", self) try: - for (pid, state) in self.__initials.items(): - if state is not None: - self.write(pid, state) - except Exception: - logger.exception("Can't perform first initialization of %s", self) + with self.__ensure_device("probing"): + pass + except Exception as err: + logger.error("Can't probe %s: %s: %s", self, type(err).__name__, err) + self.__reset_pins() async def run(self) -> None: prev_raw = -1 while True: try: - raw = self.__read_raw() + raw = self.__inner_read_raw() except Exception: raw = -1 if raw != prev_raw: @@ -102,40 +102,57 @@ class Plugin(BaseUserGpioDriver): await asyncio.sleep(self.__state_poll) def cleanup(self) -> None: + self.__reset_pins() self.__close_device() self.__stop = True def read(self, pin: int) -> bool: - if self.__check_pin(pin): - try: - return bool(self.__read_raw() & (1 << pin)) - except Exception: - raise GpioDriverOfflineError(self) from None - return False + try: + return self.__inner_read(pin) + except Exception: + raise GpioDriverOfflineError(self) def write(self, pin: int, state: bool) -> None: - if self.__check_pin(pin): - try: - with self.__ensure_device("writing") as device: - report = [(0xFF if state else 0xFD), pin + 1] # Pin numeration starts from 0 - result = device.send_feature_report(report) - if result < 0: - raise RuntimeError(f"Retval of send_feature_report() < 0: {result}") - except Exception: - raise GpioDriverOfflineError(self) from None + try: + return self.__inner_write(pin, state) + except Exception: + raise GpioDriverOfflineError(self) # ===== + def __reset_pins(self) -> None: + logger = get_logger(0) + for (pin, state) in self.__initials.items(): + if state is not None: + logger.info("Resetting pin=%d to state=%d of %s: ...", pin, state, self) + try: + self.__inner_write(pin, state) + except Exception as err: + logger.error("Can't reset pin=%d of %s: %s: %s", pin, self, type(err).__name__, err) + + def __inner_read(self, pin: int) -> bool: + if self.__check_pin(pin): + return bool(self.__inner_read_raw() & (1 << pin)) + return False + + def __inner_read_raw(self) -> int: + with self.__ensure_device("reading") as device: + return device.get_feature_report(1, 8)[7] + + def __inner_write(self, pin: int, state: bool) -> None: + if self.__check_pin(pin): + with self.__ensure_device("writing") as device: + report = [(0xFF if state else 0xFD), pin + 1] # Pin numeration starts from 0 + result = device.send_feature_report(report) + if result < 0: + raise RuntimeError(f"Retval of send_feature_report() < 0: {result}") + def __check_pin(self, pin: int) -> bool: ok = (0 <= pin <= 7) if not ok: get_logger(0).warning("Unsupported pin for %s: %d", self, pin) return ok - def __read_raw(self) -> int: - with self.__ensure_device("reading") as device: - return device.get_feature_report(1, 8)[7] - @contextlib.contextmanager def __ensure_device(self, context: str) -> hid.device: assert not self.__stop |