diff options
author | Devaev Maxim <[email protected]> | 2020-10-29 07:03:59 +0300 |
---|---|---|
committer | Devaev Maxim <[email protected]> | 2020-11-11 22:24:25 +0300 |
commit | aaef672ac2ce5dd2cee17ad571e6cab35f284ee4 (patch) | |
tree | 51d78a5b01301185953525494105044c066bfe51 /kvmd | |
parent | e54449fd8ef0ea98aea881d25c9bb134a3bc2e11 (diff) |
kvmd spi driver
Diffstat (limited to 'kvmd')
-rw-r--r-- | kvmd/plugins/hid/spi.py | 154 |
1 files changed, 154 insertions, 0 deletions
diff --git a/kvmd/plugins/hid/spi.py b/kvmd/plugins/hid/spi.py new file mode 100644 index 00000000..77d2f34c --- /dev/null +++ b/kvmd/plugins/hid/spi.py @@ -0,0 +1,154 @@ +# ========================================================================== # +# # +# KVMD - The main Pi-KVM daemon. # +# # +# Copyright (C) 2018 Maxim Devaev <[email protected]> # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see <https://www.gnu.org/licenses/>. # +# # +# ========================================================================== # + + +import os +import contextlib +import time + +from typing import List +from typing import Dict +from typing import Generator +from typing import Any + +import spidev + +from ...yamlconf import Option + +from ...validators.basic import valid_int_f0 +from ...validators.basic import valid_int_f1 +from ...validators.basic import valid_float_f0 +from ...validators.basic import valid_float_f01 + +from ._mcu import BasePhyConnection +from ._mcu import BasePhy +from ._mcu import BaseMcuHid + + +# ===== +class SpiPhyError(Exception): + pass + + +class _SpiPhyConnection(BasePhyConnection): + def __init__( + self, + spi: spidev.SpiDev, + read_timeout: float, + read_delay: float, + ) -> None: + + self.__spi = spi + self.__read_timeout = read_timeout + self.__read_delay = read_delay + + def send(self, request: bytes, receive: int) -> bytes: + assert 0 < receive <= len(request) + + dummy = b"\x00" * len(request) + deadline_ts = time.time() + self.__read_timeout + while time.time() < deadline_ts: + garbage = bytes(self.__spi.xfer(dummy)) + if garbage == dummy: + break + else: + raise SpiPhyError("Timeout reached while reading a garbage") + + self.__spi.xfer(request) + + response: List[int] = [] + dummy = b"\x00" * receive + deadline_ts = time.time() + self.__read_timeout + found = False + while time.time() < deadline_ts: + if not found: + time.sleep(self.__read_delay) + for byte in self.__spi.xfer(dummy): + if not found: + if byte == 0: + continue + found = True + response.append(byte) + if len(response) >= receive: + break + if len(response) >= receive: + break + else: + raise SpiPhyError("Timeout reached while responce waiting") + + assert len(response) == receive + return bytes(response) + + +class _SpiPhy(BasePhy): + def __init__( + self, + bus: int, + chip: int, + max_freq: int, + read_timeout: float, + read_delay: float, + ) -> None: + + self.__bus = bus + self.__chip = chip + self.__max_freq = max_freq + self.__read_timeout = read_timeout + self.__read_delay = read_delay + + def has_device(self) -> bool: + return os.path.exists(f"/dev/spidev{self.__bus}.{self.__chip}") + + @contextlib.contextmanager + def connected(self) -> Generator[_SpiPhyConnection, None, None]: # type: ignore + with contextlib.closing(spidev.SpiDev(self.__bus, self.__chip)) as spi: + spi.mode = 0 + spi.max_speed_hz = self.__max_freq + yield _SpiPhyConnection(spi, self.__read_timeout, self.__read_delay) + + +# ===== +class Plugin(BaseMcuHid): + def __init__( + self, + bus: int, + chip: int, + max_freq: int, + read_timeout: float, + read_delay: float, + **kwargs: Any, + ) -> None: + + super().__init__( + phy=_SpiPhy(bus, chip, max_freq, read_timeout, read_delay), + **kwargs, + ) + + @classmethod + def get_plugin_options(cls) -> Dict: + return { + "bus": Option(0, type=valid_int_f0), + "chip": Option(0, type=valid_int_f0), + "max_freq": Option(1000000, type=valid_int_f1), + "read_timeout": Option(2.0, type=valid_float_f01), + "read_delay": Option(0.001, type=valid_float_f0), + **BaseMcuHid.get_plugin_options(), + } |