diff options
-rw-r--r-- | kvmd/plugins/hid/_mcu/__init__.py | 37 | ||||
-rw-r--r-- | kvmd/plugins/hid/_mcu/gpio.py | 39 | ||||
-rw-r--r-- | kvmd/plugins/hid/serial.py | 3 | ||||
-rw-r--r-- | kvmd/plugins/hid/spi.py | 3 |
4 files changed, 61 insertions, 21 deletions
diff --git a/kvmd/plugins/hid/_mcu/__init__.py b/kvmd/plugins/hid/_mcu/__init__.py index 646e689b..6001b6de 100644 --- a/kvmd/plugins/hid/_mcu/__init__.py +++ b/kvmd/plugins/hid/_mcu/__init__.py @@ -28,6 +28,7 @@ import time from typing import Iterable from typing import Generator from typing import AsyncGenerator +from typing import Any from ....logging import get_logger @@ -103,18 +104,13 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many- def __init__( # pylint: disable=too-many-arguments,super-init-not-called self, phy: BasePhy, - - gpio_device_path: str, - reset_pin: int, - reset_inverted: bool, - reset_delay: float, reset_self: bool, - read_retries: int, common_retries: int, retries_delay: float, errors_threshold: int, noop: bool, + **gpio_kwargs: Any, ) -> None: multiprocessing.Process.__init__(self, daemon=True) @@ -126,7 +122,8 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many- self.__noop = noop self.__phy = phy - self.__gpio = Gpio(gpio_device_path, reset_pin, reset_inverted, reset_delay) + gpio_device_path = gpio_kwargs.pop("gpio_device_path") + self.__gpio = Gpio(device_path=gpio_device_path, **gpio_kwargs) self.__reset_self = reset_self self.__reset_required_event = multiprocessing.Event() @@ -144,11 +141,15 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many- @classmethod def get_plugin_options(cls) -> dict: return { - "gpio_device": Option("/dev/gpiochip0", type=valid_abs_path, unpack_as="gpio_device_path"), - "reset_pin": Option(4, type=valid_gpio_pin_optional), - "reset_inverted": Option(False, type=valid_bool), - "reset_delay": Option(0.1, type=valid_float_f01), - "reset_self": Option(False, type=valid_bool), + # <gpio_kwargs> + "gpio_device": Option("/dev/gpiochip0", type=valid_abs_path, unpack_as="gpio_device_path"), + "power_detect_pin": Option(-1, type=valid_gpio_pin_optional), + "power_detect_pull_down": Option(False, type=valid_bool), + "reset_pin": Option(4, type=valid_gpio_pin_optional), + "reset_inverted": Option(False, type=valid_bool), + "reset_delay": Option(0.1, type=valid_float_f01), + # </gpio_kwargs> + "reset_self": Option(False, type=valid_bool), "read_retries": Option(5, type=valid_int_f1), "common_retries": Option(5, type=valid_int_f1), @@ -329,18 +330,18 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many- def __hid_loop_wait_device(self) -> bool: logger = get_logger(0) - logger.info("Initial HID reset and wait ...") + logger.info("Initial HID reset and wait for %s ...", self.__phy) self.__gpio.reset() # На самом деле SPI и Serial-девайсы не пропадают, просто резет и ожидание # логичнее всего делать именно здесь. Ну и на будущее, да for _ in range(10): if self.__phy.has_device(): - logger.info("HID found") + logger.info("Physical HID interface found: %s", self.__phy) return True if self.__stop_event.is_set(): break time.sleep(1) - logger.error("Missing HID") + logger.error("Missing physical HID interface: %s", self.__phy) return False def __process_request(self, conn: BasePhyConnection, request: bytes) -> bool: # pylint: disable=too-many-branches @@ -352,7 +353,7 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many- read_retries = self.__read_retries error_retval = False - while common_retries and read_retries: + while self.__gpio.is_powered() and common_retries and read_retries: response = (RESPONSE_LEGACY_OK if self.__noop else conn.send(request)) try: if len(response) < 4: @@ -402,6 +403,10 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many- if common_retries and read_retries: time.sleep(self.__retries_delay) + if not self.__gpio.is_powered(): + self.__set_state_online(False) + return True + for msg in error_messages: logger.error(msg) if not (common_retries and read_retries): diff --git a/kvmd/plugins/hid/_mcu/gpio.py b/kvmd/plugins/hid/_mcu/gpio.py index f79820f1..07b0b598 100644 --- a/kvmd/plugins/hid/_mcu/gpio.py +++ b/kvmd/plugins/hid/_mcu/gpio.py @@ -29,30 +29,48 @@ from ....logging import get_logger # ===== -class Gpio: +class Gpio: # pylint: disable=too-many-instance-attributes def __init__( self, device_path: str, + power_detect_pin: int, + power_detect_pull_down: bool, reset_pin: int, reset_inverted: bool, reset_delay: float, ) -> None: self.__device_path = device_path + self.__power_detect_pin = power_detect_pin + self.__power_detect_pull_down = power_detect_pull_down self.__reset_pin = reset_pin self.__reset_inverted = reset_inverted self.__reset_delay = reset_delay self.__chip: (gpiod.Chip | None) = None + self.__power_detect_line: (gpiod.Line | None) = None self.__reset_line: (gpiod.Line | None) = None + self.__last_power: (bool | None) = None + def __enter__(self) -> None: - if self.__reset_pin >= 0: + if self.__power_detect_pin >= 0 or self.__reset_pin >= 0: assert self.__chip is None - assert self.__reset_line is None self.__chip = gpiod.Chip(self.__device_path) - self.__reset_line = self.__chip.get_line(self.__reset_pin) - self.__reset_line.request("kvmd::hid::reset", gpiod.LINE_REQ_DIR_OUT, default_vals=[int(self.__reset_inverted)]) + if self.__power_detect_pin >= 0: + assert self.__power_detect_line is None + self.__power_detect_line = self.__chip.get_line(self.__power_detect_pin) + self.__power_detect_line.request( + "kvmd::hid::power_detect", gpiod.LINE_REQ_DIR_IN, + flags=(gpiod.LINE_REQ_FLAG_BIAS_PULL_DOWN if self.__power_detect_pull_down else 0), + ) + if self.__reset_pin >= 0: + assert self.__reset_line is None + self.__reset_line = self.__chip.get_line(self.__reset_pin) + self.__reset_line.request( + "kvmd::hid::reset", gpiod.LINE_REQ_DIR_OUT, + default_vals=[int(self.__reset_inverted)], + ) def __exit__( self, @@ -66,9 +84,20 @@ class Gpio: self.__chip.close() except Exception: pass + self.__last_power = None + self.__power_detect_line = None self.__reset_line = None self.__chip = None + def is_powered(self) -> bool: + if self.__power_detect_line is not None: + power = bool(self.__power_detect_line.get_value()) + if power != self.__last_power: + get_logger(0).info("HID power state changed: %s -> %s", self.__last_power, power) + self.__last_power = power + return power + return True + def reset(self) -> None: if self.__reset_pin >= 0: assert self.__reset_line diff --git a/kvmd/plugins/hid/serial.py b/kvmd/plugins/hid/serial.py index d936e02b..593c50ae 100644 --- a/kvmd/plugins/hid/serial.py +++ b/kvmd/plugins/hid/serial.py @@ -80,6 +80,9 @@ class _SerialPhy(BasePhy): with serial.Serial(self.__device_path, self.__speed, timeout=self.__read_timeout) as tty: yield _SerialPhyConnection(tty) + def __str__(self) -> str: + return f"Serial(path={self.__device_path})" + # ===== class Plugin(BaseMcuHid): diff --git a/kvmd/plugins/hid/spi.py b/kvmd/plugins/hid/spi.py index f43f344d..6b093f38 100644 --- a/kvmd/plugins/hid/spi.py +++ b/kvmd/plugins/hid/spi.py @@ -162,6 +162,9 @@ class _SpiPhy(BasePhy): # pylint: disable=too-many-instance-attributes else: yield None + def __str__(self) -> str: + return f"SPI(bus={self.__bus}, chip={self.__chip})" + # ===== class Plugin(BaseMcuHid): |