# ========================================================================== # # # # KVMD - The main PiKVM daemon. # # # # Copyright (C) 2018-2021 Maxim Devaev # # # # This program is free software: you can redistribute it and/or modify # # it under the terms of the GNU General Public License as published by # # the Free Software Foundation, either version 3 of the License, or # # (at your option) any later version. # # # # This program is distributed in the hope that it will be useful, # # but WITHOUT ANY WARRANTY; without even the implied warranty of # # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # # GNU General Public License for more details. # # # # You should have received a copy of the GNU General Public License # # along with this program. If not, see . # # # # ========================================================================== # import os import contextlib import time from typing import List from typing import Dict from typing import Generator from typing import Callable from typing import Optional from typing import Any import spidev import gpiod from ...logging import get_logger from ...yamlconf import Option from ...validators.basic import valid_bool from ...validators.basic import valid_int_f0 from ...validators.basic import valid_int_f1 from ...validators.basic import valid_float_f01 from ...validators.hw import valid_gpio_pin_optional from ._mcu import BasePhyConnection from ._mcu import BasePhy from ._mcu import BaseMcuHid # ===== class _SpiPhyConnection(BasePhyConnection): def __init__( self, xfer: Callable[[bytes], bytes], read_timeout: float, ) -> None: self.__xfer = xfer self.__read_timeout = read_timeout def send(self, request: bytes) -> bytes: assert len(request) == 8 assert request[0] == 0x33 deadline_ts = time.monotonic() + self.__read_timeout dummy = b"\x00" * 10 while time.monotonic() < deadline_ts: if bytes(self.__xfer(dummy)) == dummy: break else: get_logger(0).error("SPI timeout reached while garbage reading") return b"" self.__xfer(request) response: List[int] = [] deadline_ts = time.monotonic() + self.__read_timeout found = False while time.monotonic() < deadline_ts: for byte in self.__xfer(b"\x00" * (9 - len(response))): if not found: if byte == 0: continue found = True response.append(byte) if len(response) == 8: break if len(response) == 8: break else: get_logger(0).error("SPI timeout reached while responce waiting") return b"" return bytes(response) class _SpiPhy(BasePhy): # pylint: disable=too-many-instance-attributes def __init__( self, gpio_device_path: str, bus: int, chip: int, hw_cs: bool, sw_cs_pin: int, max_freq: int, block_usec: int, read_timeout: float, ) -> None: self.__gpio_device_path = gpio_device_path self.__bus = bus self.__chip = chip self.__hw_cs = hw_cs self.__sw_cs_pin = sw_cs_pin self.__max_freq = max_freq self.__block_usec = block_usec self.__read_timeout = read_timeout def has_device(self) -> bool: return os.path.exists(f"/dev/spidev{self.__bus}.{self.__chip}") @contextlib.contextmanager def connected(self) -> Generator[_SpiPhyConnection, None, None]: # type: ignore with self.__sw_cs_connected() as sw_cs_line: with contextlib.closing(spidev.SpiDev(self.__bus, self.__chip)) as spi: spi.mode = 0 spi.no_cs = (not self.__hw_cs) spi.max_speed_hz = self.__max_freq def xfer(data: bytes) -> bytes: try: if sw_cs_line is not None: sw_cs_line.set_value(0) return spi.xfer(data, self.__max_freq, self.__block_usec) finally: if sw_cs_line is not None: sw_cs_line.set_value(1) yield _SpiPhyConnection( xfer=xfer, read_timeout=self.__read_timeout, ) @contextlib.contextmanager def __sw_cs_connected(self) -> Generator[Optional[gpiod.Line], None, None]: if self.__sw_cs_pin > 0: with contextlib.closing(gpiod.Chip(self.__gpio_device_path)) as chip: line = chip.get_line(self.__sw_cs_pin) line.request("kvmd::hid::sw_cs", gpiod.LINE_REQ_DIR_OUT, default_vals=[1]) yield line else: yield None # ===== class Plugin(BaseMcuHid): def __init__(self, **kwargs: Any) -> None: phy_kwargs: Dict = {key: kwargs.pop(key) for key in self.__get_phy_options()} phy_kwargs["gpio_device_path"] = kwargs["gpio_device_path"] super().__init__(phy=_SpiPhy(**phy_kwargs), **kwargs) @classmethod def get_plugin_options(cls) -> Dict: return { **cls.__get_phy_options(), **BaseMcuHid.get_plugin_options(), } @classmethod def __get_phy_options(cls) -> Dict: return { "bus": Option(-1, type=valid_int_f0), "chip": Option(-1, type=valid_int_f0), "hw_cs": Option(False, type=valid_bool), "sw_cs_pin": Option(-1, type=valid_gpio_pin_optional), "max_freq": Option(100000, type=valid_int_f1), "block_usec": Option(1, type=valid_int_f0), "read_timeout": Option(0.5, type=valid_float_f01), }