diff options
author | Devaev Maxim <[email protected]> | 2020-09-09 12:52:45 +0300 |
---|---|---|
committer | Devaev Maxim <[email protected]> | 2020-09-09 12:52:45 +0300 |
commit | 170ed92bd446a7dde3fedc2cb143d2a328f8a4cb (patch) | |
tree | a9b50ab83bffaa61066d657a7fc58f28df61cfda /kvmd/plugins | |
parent | 04284584fecb279c5a1872d64697a3b26d04318c (diff) |
cleanup on driver side
Diffstat (limited to 'kvmd/plugins')
-rw-r--r-- | kvmd/plugins/ugpio/__init__.py | 2 | ||||
-rw-r--r-- | kvmd/plugins/ugpio/gpio.py | 5 | ||||
-rw-r--r-- | kvmd/plugins/ugpio/hidrelay.py | 69 |
3 files changed, 49 insertions, 27 deletions
diff --git a/kvmd/plugins/ugpio/__init__.py b/kvmd/plugins/ugpio/__init__.py index 96195d30..1e7725e8 100644 --- a/kvmd/plugins/ugpio/__init__.py +++ b/kvmd/plugins/ugpio/__init__.py @@ -74,7 +74,7 @@ class BaseUserGpioDriver(BasePlugin): raise NotImplementedError def cleanup(self) -> None: - pass + raise NotImplementedError def read(self, pin: int) -> bool: raise NotImplementedError diff --git a/kvmd/plugins/ugpio/gpio.py b/kvmd/plugins/ugpio/gpio.py index 96c3eee7..90426e13 100644 --- a/kvmd/plugins/ugpio/gpio.py +++ b/kvmd/plugins/ugpio/gpio.py @@ -83,6 +83,11 @@ class Plugin(BaseUserGpioDriver): assert self.__reader await self.__reader.poll() + def cleanup(self) -> None: + for (pin, initial) in self.__output_pins.items(): + if initial is not None: + gpio.write(pin, initial) + def read(self, pin: int) -> bool: return gpio.read(pin) diff --git a/kvmd/plugins/ugpio/hidrelay.py b/kvmd/plugins/ugpio/hidrelay.py index c024087c..e9182303 100644 --- a/kvmd/plugins/ugpio/hidrelay.py +++ b/kvmd/plugins/ugpio/hidrelay.py @@ -81,19 +81,19 @@ class Plugin(BaseUserGpioDriver): def prepare(self) -> None: logger = get_logger(0) - logger.info("Initializing %s ...", self) + logger.info("Probing driver %s ...", self) try: - for (pid, state) in self.__initials.items(): - if state is not None: - self.write(pid, state) - except Exception: - logger.exception("Can't perform first initialization of %s", self) + with self.__ensure_device("probing"): + pass + except Exception as err: + logger.error("Can't probe %s: %s: %s", self, type(err).__name__, err) + self.__reset_pins() async def run(self) -> None: prev_raw = -1 while True: try: - raw = self.__read_raw() + raw = self.__inner_read_raw() except Exception: raw = -1 if raw != prev_raw: @@ -102,40 +102,57 @@ class Plugin(BaseUserGpioDriver): await asyncio.sleep(self.__state_poll) def cleanup(self) -> None: + self.__reset_pins() self.__close_device() self.__stop = True def read(self, pin: int) -> bool: - if self.__check_pin(pin): - try: - return bool(self.__read_raw() & (1 << pin)) - except Exception: - raise GpioDriverOfflineError(self) from None - return False + try: + return self.__inner_read(pin) + except Exception: + raise GpioDriverOfflineError(self) def write(self, pin: int, state: bool) -> None: - if self.__check_pin(pin): - try: - with self.__ensure_device("writing") as device: - report = [(0xFF if state else 0xFD), pin + 1] # Pin numeration starts from 0 - result = device.send_feature_report(report) - if result < 0: - raise RuntimeError(f"Retval of send_feature_report() < 0: {result}") - except Exception: - raise GpioDriverOfflineError(self) from None + try: + return self.__inner_write(pin, state) + except Exception: + raise GpioDriverOfflineError(self) # ===== + def __reset_pins(self) -> None: + logger = get_logger(0) + for (pin, state) in self.__initials.items(): + if state is not None: + logger.info("Resetting pin=%d to state=%d of %s: ...", pin, state, self) + try: + self.__inner_write(pin, state) + except Exception as err: + logger.error("Can't reset pin=%d of %s: %s: %s", pin, self, type(err).__name__, err) + + def __inner_read(self, pin: int) -> bool: + if self.__check_pin(pin): + return bool(self.__inner_read_raw() & (1 << pin)) + return False + + def __inner_read_raw(self) -> int: + with self.__ensure_device("reading") as device: + return device.get_feature_report(1, 8)[7] + + def __inner_write(self, pin: int, state: bool) -> None: + if self.__check_pin(pin): + with self.__ensure_device("writing") as device: + report = [(0xFF if state else 0xFD), pin + 1] # Pin numeration starts from 0 + result = device.send_feature_report(report) + if result < 0: + raise RuntimeError(f"Retval of send_feature_report() < 0: {result}") + def __check_pin(self, pin: int) -> bool: ok = (0 <= pin <= 7) if not ok: get_logger(0).warning("Unsupported pin for %s: %d", self, pin) return ok - def __read_raw(self) -> int: - with self.__ensure_device("reading") as device: - return device.get_feature_report(1, 8)[7] - @contextlib.contextmanager def __ensure_device(self, context: str) -> hid.device: assert not self.__stop |