summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMaxim Devaev <[email protected]>2023-08-06 03:36:54 +0300
committerMaxim Devaev <[email protected]>2023-08-06 03:36:54 +0300
commit92c3620a86338061d6ccb476b56fc90f7541b621 (patch)
tree0ee72ef91a1429805625ef7ed55f980d0aff622a
parent472605734e3363b3f2f4e577a358eecfc3a7855a (diff)
mcu hid: optional power detecting on the hid device
-rw-r--r--kvmd/plugins/hid/_mcu/__init__.py37
-rw-r--r--kvmd/plugins/hid/_mcu/gpio.py39
-rw-r--r--kvmd/plugins/hid/serial.py3
-rw-r--r--kvmd/plugins/hid/spi.py3
4 files changed, 61 insertions, 21 deletions
diff --git a/kvmd/plugins/hid/_mcu/__init__.py b/kvmd/plugins/hid/_mcu/__init__.py
index 646e689b..6001b6de 100644
--- a/kvmd/plugins/hid/_mcu/__init__.py
+++ b/kvmd/plugins/hid/_mcu/__init__.py
@@ -28,6 +28,7 @@ import time
from typing import Iterable
from typing import Generator
from typing import AsyncGenerator
+from typing import Any
from ....logging import get_logger
@@ -103,18 +104,13 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many-
def __init__( # pylint: disable=too-many-arguments,super-init-not-called
self,
phy: BasePhy,
-
- gpio_device_path: str,
- reset_pin: int,
- reset_inverted: bool,
- reset_delay: float,
reset_self: bool,
-
read_retries: int,
common_retries: int,
retries_delay: float,
errors_threshold: int,
noop: bool,
+ **gpio_kwargs: Any,
) -> None:
multiprocessing.Process.__init__(self, daemon=True)
@@ -126,7 +122,8 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many-
self.__noop = noop
self.__phy = phy
- self.__gpio = Gpio(gpio_device_path, reset_pin, reset_inverted, reset_delay)
+ gpio_device_path = gpio_kwargs.pop("gpio_device_path")
+ self.__gpio = Gpio(device_path=gpio_device_path, **gpio_kwargs)
self.__reset_self = reset_self
self.__reset_required_event = multiprocessing.Event()
@@ -144,11 +141,15 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many-
@classmethod
def get_plugin_options(cls) -> dict:
return {
- "gpio_device": Option("/dev/gpiochip0", type=valid_abs_path, unpack_as="gpio_device_path"),
- "reset_pin": Option(4, type=valid_gpio_pin_optional),
- "reset_inverted": Option(False, type=valid_bool),
- "reset_delay": Option(0.1, type=valid_float_f01),
- "reset_self": Option(False, type=valid_bool),
+ # <gpio_kwargs>
+ "gpio_device": Option("/dev/gpiochip0", type=valid_abs_path, unpack_as="gpio_device_path"),
+ "power_detect_pin": Option(-1, type=valid_gpio_pin_optional),
+ "power_detect_pull_down": Option(False, type=valid_bool),
+ "reset_pin": Option(4, type=valid_gpio_pin_optional),
+ "reset_inverted": Option(False, type=valid_bool),
+ "reset_delay": Option(0.1, type=valid_float_f01),
+ # </gpio_kwargs>
+ "reset_self": Option(False, type=valid_bool),
"read_retries": Option(5, type=valid_int_f1),
"common_retries": Option(5, type=valid_int_f1),
@@ -329,18 +330,18 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many-
def __hid_loop_wait_device(self) -> bool:
logger = get_logger(0)
- logger.info("Initial HID reset and wait ...")
+ logger.info("Initial HID reset and wait for %s ...", self.__phy)
self.__gpio.reset()
# На самом деле SPI и Serial-девайсы не пропадают, просто резет и ожидание
# логичнее всего делать именно здесь. Ну и на будущее, да
for _ in range(10):
if self.__phy.has_device():
- logger.info("HID found")
+ logger.info("Physical HID interface found: %s", self.__phy)
return True
if self.__stop_event.is_set():
break
time.sleep(1)
- logger.error("Missing HID")
+ logger.error("Missing physical HID interface: %s", self.__phy)
return False
def __process_request(self, conn: BasePhyConnection, request: bytes) -> bool: # pylint: disable=too-many-branches
@@ -352,7 +353,7 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many-
read_retries = self.__read_retries
error_retval = False
- while common_retries and read_retries:
+ while self.__gpio.is_powered() and common_retries and read_retries:
response = (RESPONSE_LEGACY_OK if self.__noop else conn.send(request))
try:
if len(response) < 4:
@@ -402,6 +403,10 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many-
if common_retries and read_retries:
time.sleep(self.__retries_delay)
+ if not self.__gpio.is_powered():
+ self.__set_state_online(False)
+ return True
+
for msg in error_messages:
logger.error(msg)
if not (common_retries and read_retries):
diff --git a/kvmd/plugins/hid/_mcu/gpio.py b/kvmd/plugins/hid/_mcu/gpio.py
index f79820f1..07b0b598 100644
--- a/kvmd/plugins/hid/_mcu/gpio.py
+++ b/kvmd/plugins/hid/_mcu/gpio.py
@@ -29,30 +29,48 @@ from ....logging import get_logger
# =====
-class Gpio:
+class Gpio: # pylint: disable=too-many-instance-attributes
def __init__(
self,
device_path: str,
+ power_detect_pin: int,
+ power_detect_pull_down: bool,
reset_pin: int,
reset_inverted: bool,
reset_delay: float,
) -> None:
self.__device_path = device_path
+ self.__power_detect_pin = power_detect_pin
+ self.__power_detect_pull_down = power_detect_pull_down
self.__reset_pin = reset_pin
self.__reset_inverted = reset_inverted
self.__reset_delay = reset_delay
self.__chip: (gpiod.Chip | None) = None
+ self.__power_detect_line: (gpiod.Line | None) = None
self.__reset_line: (gpiod.Line | None) = None
+ self.__last_power: (bool | None) = None
+
def __enter__(self) -> None:
- if self.__reset_pin >= 0:
+ if self.__power_detect_pin >= 0 or self.__reset_pin >= 0:
assert self.__chip is None
- assert self.__reset_line is None
self.__chip = gpiod.Chip(self.__device_path)
- self.__reset_line = self.__chip.get_line(self.__reset_pin)
- self.__reset_line.request("kvmd::hid::reset", gpiod.LINE_REQ_DIR_OUT, default_vals=[int(self.__reset_inverted)])
+ if self.__power_detect_pin >= 0:
+ assert self.__power_detect_line is None
+ self.__power_detect_line = self.__chip.get_line(self.__power_detect_pin)
+ self.__power_detect_line.request(
+ "kvmd::hid::power_detect", gpiod.LINE_REQ_DIR_IN,
+ flags=(gpiod.LINE_REQ_FLAG_BIAS_PULL_DOWN if self.__power_detect_pull_down else 0),
+ )
+ if self.__reset_pin >= 0:
+ assert self.__reset_line is None
+ self.__reset_line = self.__chip.get_line(self.__reset_pin)
+ self.__reset_line.request(
+ "kvmd::hid::reset", gpiod.LINE_REQ_DIR_OUT,
+ default_vals=[int(self.__reset_inverted)],
+ )
def __exit__(
self,
@@ -66,9 +84,20 @@ class Gpio:
self.__chip.close()
except Exception:
pass
+ self.__last_power = None
+ self.__power_detect_line = None
self.__reset_line = None
self.__chip = None
+ def is_powered(self) -> bool:
+ if self.__power_detect_line is not None:
+ power = bool(self.__power_detect_line.get_value())
+ if power != self.__last_power:
+ get_logger(0).info("HID power state changed: %s -> %s", self.__last_power, power)
+ self.__last_power = power
+ return power
+ return True
+
def reset(self) -> None:
if self.__reset_pin >= 0:
assert self.__reset_line
diff --git a/kvmd/plugins/hid/serial.py b/kvmd/plugins/hid/serial.py
index d936e02b..593c50ae 100644
--- a/kvmd/plugins/hid/serial.py
+++ b/kvmd/plugins/hid/serial.py
@@ -80,6 +80,9 @@ class _SerialPhy(BasePhy):
with serial.Serial(self.__device_path, self.__speed, timeout=self.__read_timeout) as tty:
yield _SerialPhyConnection(tty)
+ def __str__(self) -> str:
+ return f"Serial(path={self.__device_path})"
+
# =====
class Plugin(BaseMcuHid):
diff --git a/kvmd/plugins/hid/spi.py b/kvmd/plugins/hid/spi.py
index f43f344d..6b093f38 100644
--- a/kvmd/plugins/hid/spi.py
+++ b/kvmd/plugins/hid/spi.py
@@ -162,6 +162,9 @@ class _SpiPhy(BasePhy): # pylint: disable=too-many-instance-attributes
else:
yield None
+ def __str__(self) -> str:
+ return f"SPI(bus={self.__bus}, chip={self.__chip})"
+
# =====
class Plugin(BaseMcuHid):