diff options
-rw-r--r-- | PKGBUILD | 1 | ||||
-rw-r--r-- | kvmd/plugins/hid/spi.py | 154 | ||||
-rw-r--r-- | testenv/linters/vulture-wl.py | 2 | ||||
-rw-r--r-- | testenv/requirements.txt | 1 |
4 files changed, 158 insertions, 0 deletions
@@ -40,6 +40,7 @@ depends=( python-aiofiles python-passlib python-pyserial + python-spidev python-setproctitle python-psutil python-systemd diff --git a/kvmd/plugins/hid/spi.py b/kvmd/plugins/hid/spi.py new file mode 100644 index 00000000..77d2f34c --- /dev/null +++ b/kvmd/plugins/hid/spi.py @@ -0,0 +1,154 @@ +# ========================================================================== # +# # +# KVMD - The main Pi-KVM daemon. # +# # +# Copyright (C) 2018 Maxim Devaev <[email protected]> # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see <https://www.gnu.org/licenses/>. # +# # +# ========================================================================== # + + +import os +import contextlib +import time + +from typing import List +from typing import Dict +from typing import Generator +from typing import Any + +import spidev + +from ...yamlconf import Option + +from ...validators.basic import valid_int_f0 +from ...validators.basic import valid_int_f1 +from ...validators.basic import valid_float_f0 +from ...validators.basic import valid_float_f01 + +from ._mcu import BasePhyConnection +from ._mcu import BasePhy +from ._mcu import BaseMcuHid + + +# ===== +class SpiPhyError(Exception): + pass + + +class _SpiPhyConnection(BasePhyConnection): + def __init__( + self, + spi: spidev.SpiDev, + read_timeout: float, + read_delay: float, + ) -> None: + + self.__spi = spi + self.__read_timeout = read_timeout + self.__read_delay = read_delay + + def send(self, request: bytes, receive: int) -> bytes: + assert 0 < receive <= len(request) + + dummy = b"\x00" * len(request) + deadline_ts = time.time() + self.__read_timeout + while time.time() < deadline_ts: + garbage = bytes(self.__spi.xfer(dummy)) + if garbage == dummy: + break + else: + raise SpiPhyError("Timeout reached while reading a garbage") + + self.__spi.xfer(request) + + response: List[int] = [] + dummy = b"\x00" * receive + deadline_ts = time.time() + self.__read_timeout + found = False + while time.time() < deadline_ts: + if not found: + time.sleep(self.__read_delay) + for byte in self.__spi.xfer(dummy): + if not found: + if byte == 0: + continue + found = True + response.append(byte) + if len(response) >= receive: + break + if len(response) >= receive: + break + else: + raise SpiPhyError("Timeout reached while responce waiting") + + assert len(response) == receive + return bytes(response) + + +class _SpiPhy(BasePhy): + def __init__( + self, + bus: int, + chip: int, + max_freq: int, + read_timeout: float, + read_delay: float, + ) -> None: + + self.__bus = bus + self.__chip = chip + self.__max_freq = max_freq + self.__read_timeout = read_timeout + self.__read_delay = read_delay + + def has_device(self) -> bool: + return os.path.exists(f"/dev/spidev{self.__bus}.{self.__chip}") + + @contextlib.contextmanager + def connected(self) -> Generator[_SpiPhyConnection, None, None]: # type: ignore + with contextlib.closing(spidev.SpiDev(self.__bus, self.__chip)) as spi: + spi.mode = 0 + spi.max_speed_hz = self.__max_freq + yield _SpiPhyConnection(spi, self.__read_timeout, self.__read_delay) + + +# ===== +class Plugin(BaseMcuHid): + def __init__( + self, + bus: int, + chip: int, + max_freq: int, + read_timeout: float, + read_delay: float, + **kwargs: Any, + ) -> None: + + super().__init__( + phy=_SpiPhy(bus, chip, max_freq, read_timeout, read_delay), + **kwargs, + ) + + @classmethod + def get_plugin_options(cls) -> Dict: + return { + "bus": Option(0, type=valid_int_f0), + "chip": Option(0, type=valid_int_f0), + "max_freq": Option(1000000, type=valid_int_f1), + "read_timeout": Option(2.0, type=valid_float_f01), + "read_delay": Option(0.001, type=valid_float_f0), + **BaseMcuHid.get_plugin_options(), + } diff --git a/testenv/linters/vulture-wl.py b/testenv/linters/vulture-wl.py index 8d353b08..75240082 100644 --- a/testenv/linters/vulture-wl.py +++ b/testenv/linters/vulture-wl.py @@ -18,6 +18,8 @@ InotifyMask.UNMOUNT IpmiServer.handle_raw_request +SpiDev.max_speed_hz + _AtxApiPart.switch_power _KeyMapping.web_name diff --git a/testenv/requirements.txt b/testenv/requirements.txt index bde383b7..b157fea5 100644 --- a/testenv/requirements.txt +++ b/testenv/requirements.txt @@ -1 +1,2 @@ pyghmi +spidev |