# ========================================================================== # # # # KVMD - The main Pi-KVM daemon. # # # # Copyright (C) 2018 Maxim Devaev # # # # This program is free software: you can redistribute it and/or modify # # it under the terms of the GNU General Public License as published by # # the Free Software Foundation, either version 3 of the License, or # # (at your option) any later version. # # # # This program is distributed in the hope that it will be useful, # # but WITHOUT ANY WARRANTY; without even the implied warranty of # # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # # GNU General Public License for more details. # # # # You should have received a copy of the GNU General Public License # # along with this program. If not, see . # # # # ========================================================================== # import os import contextlib import time from typing import List from typing import Dict from typing import Generator from typing import Any import spidev from ...yamlconf import Option from ...validators.basic import valid_int_f0 from ...validators.basic import valid_int_f1 from ...validators.basic import valid_float_f0 from ...validators.basic import valid_float_f01 from ._mcu import BasePhyConnection from ._mcu import BasePhy from ._mcu import BaseMcuHid # ===== class SpiPhyError(Exception): pass class _SpiPhyConnection(BasePhyConnection): def __init__( self, spi: spidev.SpiDev, read_timeout: float, read_delay: float, ) -> None: self.__spi = spi self.__read_timeout = read_timeout self.__read_delay = read_delay def send(self, request: bytes, receive: int) -> bytes: assert 0 < receive <= len(request) dummy = b"\x00" * len(request) deadline_ts = time.time() + self.__read_timeout while time.time() < deadline_ts: garbage = bytes(self.__spi.xfer(dummy)) if garbage == dummy: break else: raise SpiPhyError("Timeout reached while reading a garbage") self.__spi.xfer(request) response: List[int] = [] dummy = b"\x00" * receive deadline_ts = time.time() + self.__read_timeout found = False while time.time() < deadline_ts: if not found: time.sleep(self.__read_delay) for byte in self.__spi.xfer(dummy): if not found: if byte == 0: continue found = True response.append(byte) if len(response) >= receive: break if len(response) >= receive: break else: raise SpiPhyError("Timeout reached while responce waiting") assert len(response) == receive return bytes(response) class _SpiPhy(BasePhy): def __init__( self, bus: int, chip: int, max_freq: int, read_timeout: float, read_delay: float, ) -> None: self.__bus = bus self.__chip = chip self.__max_freq = max_freq self.__read_timeout = read_timeout self.__read_delay = read_delay def has_device(self) -> bool: return os.path.exists(f"/dev/spidev{self.__bus}.{self.__chip}") @contextlib.contextmanager def connected(self) -> Generator[_SpiPhyConnection, None, None]: # type: ignore with contextlib.closing(spidev.SpiDev(self.__bus, self.__chip)) as spi: spi.mode = 0 spi.max_speed_hz = self.__max_freq yield _SpiPhyConnection(spi, self.__read_timeout, self.__read_delay) # ===== class Plugin(BaseMcuHid): def __init__( self, bus: int, chip: int, max_freq: int, read_timeout: float, read_delay: float, **kwargs: Any, ) -> None: super().__init__( phy=_SpiPhy(bus, chip, max_freq, read_timeout, read_delay), **kwargs, ) @classmethod def get_plugin_options(cls) -> Dict: return { "bus": Option(0, type=valid_int_f0), "chip": Option(0, type=valid_int_f0), "max_freq": Option(1000000, type=valid_int_f1), "read_timeout": Option(2.0, type=valid_float_f01), "read_delay": Option(0.001, type=valid_float_f0), **BaseMcuHid.get_plugin_options(), }