diff options
Diffstat (limited to 'kvmd')
-rw-r--r-- | kvmd/aiogp.py | 6 | ||||
-rw-r--r-- | kvmd/plugins/hid/_mcu/__init__.py | 8 | ||||
-rw-r--r-- | kvmd/plugins/hid/_mcu/gpio.py | 13 | ||||
-rw-r--r-- | kvmd/plugins/hid/spi.py | 54 |
4 files changed, 44 insertions, 37 deletions
diff --git a/kvmd/aiogp.py b/kvmd/aiogp.py index ba406e8d..46252914 100644 --- a/kvmd/aiogp.py +++ b/kvmd/aiogp.py @@ -34,12 +34,12 @@ from . import aiotools # ===== -async def pulse(line: gpiod.Line, delay: float, final: float) -> None: +async def pulse(line: gpiod.Line, delay: float, final: float, inverted: bool=False) -> None: try: - line.set_value(1) + line.set_value(int(not inverted)) await asyncio.sleep(delay) finally: - line.set_value(0) + line.set_value(int(inverted)) await asyncio.sleep(final) diff --git a/kvmd/plugins/hid/_mcu/__init__.py b/kvmd/plugins/hid/_mcu/__init__.py index f762567b..3f925cd8 100644 --- a/kvmd/plugins/hid/_mcu/__init__.py +++ b/kvmd/plugins/hid/_mcu/__init__.py @@ -172,6 +172,7 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many- phy: BasePhy, reset_pin: int, + reset_inverted: bool, reset_delay: float, read_retries: int, @@ -190,7 +191,7 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many- self.__noop = noop self.__phy = phy - self.__gpio = Gpio(reset_pin, reset_delay) + self.__gpio = Gpio(reset_pin, reset_inverted, reset_delay) self.__events_queue: "multiprocessing.Queue[_BaseEvent]" = multiprocessing.Queue() @@ -207,8 +208,9 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many- @classmethod def get_plugin_options(cls) -> Dict: return { - "reset_pin": Option(-1, type=valid_gpio_pin_optional), - "reset_delay": Option(0.1, type=valid_float_f01), + "reset_pin": Option(-1, type=valid_gpio_pin_optional), + "reset_inverted": Option(False, type=valid_bool), + "reset_delay": Option(0.1, type=valid_float_f01), "read_retries": Option(10, type=valid_int_f1), "common_retries": Option(100, type=valid_int_f1), diff --git a/kvmd/plugins/hid/_mcu/gpio.py b/kvmd/plugins/hid/_mcu/gpio.py index 830819f0..87f4b547 100644 --- a/kvmd/plugins/hid/_mcu/gpio.py +++ b/kvmd/plugins/hid/_mcu/gpio.py @@ -33,8 +33,15 @@ from .... import aiogp # ===== class Gpio: - def __init__(self, reset_pin: int, reset_delay: float) -> None: + def __init__( + self, + reset_pin: int, + reset_inverted: bool, + reset_delay: float, + ) -> None: + self.__reset_pin = reset_pin + self.__reset_inverted = reset_inverted self.__reset_delay = reset_delay self.__chip: Optional[gpiod.Chip] = None @@ -47,7 +54,7 @@ class Gpio: assert self.__reset_line is None self.__chip = gpiod.Chip(env.GPIO_DEVICE_PATH) self.__reset_line = self.__chip.get_line(self.__reset_pin) - self.__reset_line.request("kvmd::hid-mcu::reset", gpiod.LINE_REQ_DIR_OUT, default_vals=[0]) + self.__reset_line.request("kvmd::hid-mcu::reset", gpiod.LINE_REQ_DIR_OUT, default_vals=[int(self.__reset_inverted)]) def close(self) -> None: if self.__chip: @@ -63,7 +70,7 @@ class Gpio: if not self.__reset_wip: self.__reset_wip = True try: - await aiogp.pulse(self.__reset_line, self.__reset_delay, 1) + await aiogp.pulse(self.__reset_line, self.__reset_delay, 1, self.__reset_inverted) finally: self.__reset_wip = False get_logger(0).info("Reset HID performed") diff --git a/kvmd/plugins/hid/spi.py b/kvmd/plugins/hid/spi.py index 6020e04e..d71a4094 100644 --- a/kvmd/plugins/hid/spi.py +++ b/kvmd/plugins/hid/spi.py @@ -27,6 +27,7 @@ import time from typing import List from typing import Dict from typing import Generator +from typing import Callable from typing import Any import spidev @@ -49,31 +50,18 @@ from ._mcu import BaseMcuHid class _SpiPhyConnection(BasePhyConnection): def __init__( self, - spi: spidev.SpiDev, + xfer: Callable[[bytes], bytes], read_timeout: float, read_delay: float, ) -> None: - self.__spi = spi + self.__xfer = xfer self.__read_timeout = read_timeout self.__read_delay = read_delay - self.__empty8 = b"\x00" * 8 - self.__empty4 = b"\x00" * 4 - def send(self, request: bytes) -> bytes: assert len(request) == 8 - - deadline_ts = time.time() + self.__read_timeout - while time.time() < deadline_ts: - garbage = bytes(self.__spi.xfer(self.__empty8)) - if garbage == self.__empty8: - break - else: - get_logger(0).error("SPI timeout reached while reading the a garbage") - return b"" - - self.__spi.xfer(request) + self.__xfer(request) response: List[int] = [] deadline_ts = time.time() + self.__read_timeout @@ -81,21 +69,19 @@ class _SpiPhyConnection(BasePhyConnection): while time.time() < deadline_ts: if not found: time.sleep(self.__read_delay) - for byte in self.__spi.xfer(self.__empty4): + for byte in self.__xfer(b"\x00" * (4 - len(response))): if not found: if byte == 0: continue found = True response.append(byte) - if len(response) >= 4: + if len(response) == 4: break - if len(response) >= 4: + if len(response) == 4: break else: get_logger(0).error("SPI timeout reached while responce waiting") return b"" - - assert len(response) == 4 return bytes(response) @@ -105,6 +91,7 @@ class _SpiPhy(BasePhy): bus: int, chip: int, max_freq: int, + block_usec: int, read_timeout: float, read_delay: float, ) -> None: @@ -112,6 +99,7 @@ class _SpiPhy(BasePhy): self.__bus = bus self.__chip = chip self.__max_freq = max_freq + self.__block_usec = block_usec self.__read_timeout = read_timeout self.__read_delay = read_delay @@ -123,7 +111,15 @@ class _SpiPhy(BasePhy): with contextlib.closing(spidev.SpiDev(self.__bus, self.__chip)) as spi: spi.mode = 0 spi.max_speed_hz = self.__max_freq - yield _SpiPhyConnection(spi, self.__read_timeout, self.__read_delay) + + def xfer(data: bytes) -> bytes: + return spi.xfer(data, self.__max_freq, self.__block_usec) + + yield _SpiPhyConnection( + xfer=xfer, + read_timeout=self.__read_timeout, + read_delay=self.__read_delay, + ) # ===== @@ -133,23 +129,25 @@ class Plugin(BaseMcuHid): bus: int, chip: int, max_freq: int, + block_usec: int, read_timeout: float, read_delay: float, **kwargs: Any, ) -> None: super().__init__( - phy=_SpiPhy(bus, chip, max_freq, read_timeout, read_delay), + phy=_SpiPhy(bus, chip, max_freq, block_usec, read_timeout, read_delay), **kwargs, ) @classmethod def get_plugin_options(cls) -> Dict: return { - "bus": Option(0, type=valid_int_f0), - "chip": Option(0, type=valid_int_f0), - "max_freq": Option(1000000, type=valid_int_f1), - "read_timeout": Option(2.0, type=valid_float_f01), - "read_delay": Option(0.001, type=valid_float_f0), + "bus": Option(0, type=valid_int_f0), + "chip": Option(0, type=valid_int_f0), + "max_freq": Option(400000, type=valid_int_f1), + "block_usec": Option(1, type=valid_int_f0), + "read_timeout": Option(2.0, type=valid_float_f01), + "read_delay": Option(0.001, type=valid_float_f0), **BaseMcuHid.get_plugin_options(), } |