diff options
-rw-r--r-- | PKGBUILD | 1 | ||||
-rw-r--r-- | kvmd/apps/kvmd/__init__.py | 79 | ||||
-rw-r--r-- | kvmd/gpio.py | 101 | ||||
-rw-r--r-- | testenv/linters/vulture-wl.py | 2 | ||||
-rw-r--r-- | testenv/requirements.txt | 2 | ||||
-rw-r--r-- | testenv/tests/__init__.py | 39 | ||||
-rw-r--r-- | testenv/tests/test_gpio.py | 58 |
7 files changed, 38 insertions, 244 deletions
@@ -39,7 +39,6 @@ depends=( python-aiohttp python-aiofiles python-passlib - python-raspberry-gpio python-pyserial python-setproctitle python-psutil diff --git a/kvmd/apps/kvmd/__init__.py b/kvmd/apps/kvmd/__init__.py index e257145d..bd08e157 100644 --- a/kvmd/apps/kvmd/__init__.py +++ b/kvmd/apps/kvmd/__init__.py @@ -25,8 +25,6 @@ from typing import Optional from ...logging import get_logger -from ... import gpio - from ...plugins.hid import get_hid_class from ...plugins.atx import get_atx_class from ...plugins.msd import get_msd_class @@ -45,6 +43,8 @@ from .server import KvmdServer # ===== def main(argv: Optional[List[str]]=None) -> None: + # pylint: disable=protected-access + config = init( prog="kvmd", description="The main Pi-KVM daemon", @@ -56,48 +56,45 @@ def main(argv: Optional[List[str]]=None) -> None: load_gpio=True, )[2] - with gpio.bcm(): - # pylint: disable=protected-access - - msd_kwargs = config.kvmd.msd._unpack(ignore=["type"]) - if config.kvmd.msd.type == "otg": - msd_kwargs["gadget"] = config.otg.gadget # XXX: Small crutch to pass gadget name to plugin - - global_config = config - config = config.kvmd - - hid = get_hid_class(config.hid.type)(**config.hid._unpack(ignore=["type", "keymap"])) - streamer = Streamer(**config.streamer._unpack()) - - KvmdServer( - auth_manager=AuthManager( - internal_type=config.auth.internal.type, - internal_kwargs=config.auth.internal._unpack(ignore=["type", "force_users"]), - external_type=config.auth.external.type, - external_kwargs=(config.auth.external._unpack(ignore=["type"]) if config.auth.external.type else {}), - force_internal_users=config.auth.internal.force_users, - enabled=config.auth.enabled, - ), - info_manager=InfoManager(global_config), - log_reader=LogReader(), - wol=WakeOnLan(**config.wol._unpack()), - user_gpio=UserGpio(config.gpio), - + msd_kwargs = config.kvmd.msd._unpack(ignore=["type"]) + if config.kvmd.msd.type == "otg": + msd_kwargs["gadget"] = config.otg.gadget # XXX: Small crutch to pass gadget name to plugin + + global_config = config + config = config.kvmd + + hid = get_hid_class(config.hid.type)(**config.hid._unpack(ignore=["type", "keymap"])) + streamer = Streamer(**config.streamer._unpack()) + + KvmdServer( + auth_manager=AuthManager( + internal_type=config.auth.internal.type, + internal_kwargs=config.auth.internal._unpack(ignore=["type", "force_users"]), + external_type=config.auth.external.type, + external_kwargs=(config.auth.external._unpack(ignore=["type"]) if config.auth.external.type else {}), + force_internal_users=config.auth.internal.force_users, + enabled=config.auth.enabled, + ), + info_manager=InfoManager(global_config), + log_reader=LogReader(), + wol=WakeOnLan(**config.wol._unpack()), + user_gpio=UserGpio(config.gpio), + + hid=hid, + atx=get_atx_class(config.atx.type)(**config.atx._unpack(ignore=["type"])), + msd=get_msd_class(config.msd.type)(**msd_kwargs), + streamer=streamer, + + snapshoter=Snapshoter( hid=hid, - atx=get_atx_class(config.atx.type)(**config.atx._unpack(ignore=["type"])), - msd=get_msd_class(config.msd.type)(**msd_kwargs), streamer=streamer, + **config.snapshot._unpack(), + ), - snapshoter=Snapshoter( - hid=hid, - streamer=streamer, - **config.snapshot._unpack(), - ), - - heartbeat=config.server.heartbeat, - sync_chunk_size=config.server.sync_chunk_size, + heartbeat=config.server.heartbeat, + sync_chunk_size=config.server.sync_chunk_size, - keymap_path=config.hid.keymap, - ).run(**config.server._unpack(ignore=["heartbeat", "sync_chunk_size"])) + keymap_path=config.hid.keymap, + ).run(**config.server._unpack(ignore=["heartbeat", "sync_chunk_size"])) get_logger(0).info("Bye-bye") diff --git a/kvmd/gpio.py b/kvmd/gpio.py deleted file mode 100644 index 8dce12f6..00000000 --- a/kvmd/gpio.py +++ /dev/null @@ -1,101 +0,0 @@ -# ========================================================================== # -# # -# KVMD - The main Pi-KVM daemon. # -# # -# Copyright (C) 2018 Maxim Devaev <[email protected]> # -# # -# This program is free software: you can redistribute it and/or modify # -# it under the terms of the GNU General Public License as published by # -# the Free Software Foundation, either version 3 of the License, or # -# (at your option) any later version. # -# # -# This program is distributed in the hope that it will be useful, # -# but WITHOUT ANY WARRANTY; without even the implied warranty of # -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # -# GNU General Public License for more details. # -# # -# You should have received a copy of the GNU General Public License # -# along with this program. If not, see <https://www.gnu.org/licenses/>. # -# # -# ========================================================================== # - - -import asyncio -import contextlib - -from typing import Tuple -from typing import Set -from typing import Generator -from typing import Optional - -from RPi import GPIO - -from .logging import get_logger - -from . import aiotools - - -# ===== -def bcm() -> Generator[None, None, None]: - logger = get_logger(2) - GPIO.setmode(GPIO.BCM) - logger.info("Configured GPIO mode as BCM") - try: - yield - finally: - GPIO.cleanup() - logger.info("GPIO cleaned") - - -def set_output(pin: int, initial: Optional[bool]) -> int: - assert pin >= 0, pin - GPIO.setup(pin, GPIO.OUT, initial=initial) - return pin - - -def set_input(pin: int) -> int: - assert pin >= 0, pin - GPIO.setup(pin, GPIO.IN) - return pin - - -def read(pin: int) -> bool: - assert pin >= 0, pin - return bool(GPIO.input(pin)) - - -def write(pin: int, state: bool) -> None: - assert pin >= 0, pin - GPIO.output(pin, state) - - -class BatchReader: - def __init__( - self, - pins: Set[int], - interval: float, - notifier: aiotools.AioNotifier, - ) -> None: - - self.__pins = sorted(pins) - self.__interval = interval - self.__notifier = notifier - - self.__state = {pin: read(pin) for pin in self.__pins} - self.__flags: Tuple[Optional[bool], ...] = (None,) * len(self.__pins) - - def get(self, pin: int) -> bool: - return self.__state[pin] - - async def poll(self) -> None: - if not self.__pins: - await aiotools.wait_infinite() - else: - while True: - flags = tuple(map(read, self.__pins)) - if flags != self.__flags: - self.__flags = flags - self.__state = dict(zip(self.__pins, flags)) - await self.__notifier.notify() - await asyncio.sleep(self.__interval) diff --git a/testenv/linters/vulture-wl.py b/testenv/linters/vulture-wl.py index afb95bc2..62338131 100644 --- a/testenv/linters/vulture-wl.py +++ b/testenv/linters/vulture-wl.py @@ -20,8 +20,6 @@ IpmiServer.handle_raw_request _AtxApiPart.switch_power -fake_rpi.RPi.GPIO - _KeyMapping.web_name _KeyMapping.serial_code _KeyMapping.arduino_name diff --git a/testenv/requirements.txt b/testenv/requirements.txt index a88dd993..74cfcdb5 100644 --- a/testenv/requirements.txt +++ b/testenv/requirements.txt @@ -1,5 +1,3 @@ -git+git://github.com/willbuckner/rpi-gpio-development-mock@master#egg=rpi -fake_rpi aiohttp aiofiles passlib diff --git a/testenv/tests/__init__.py b/testenv/tests/__init__.py index d1faace6..1e91f7fa 100644 --- a/testenv/tests/__init__.py +++ b/testenv/tests/__init__.py @@ -18,42 +18,3 @@ # along with this program. If not, see <https://www.gnu.org/licenses/>. # # # # ========================================================================== # - - -import sys - -from typing import Dict -from typing import Optional - -import fake_rpi.RPi - - -# ===== -class _GPIO(fake_rpi.RPi._GPIO): # pylint: disable=protected-access - def __init__(self) -> None: - super().__init__() - self.__states: Dict[int, int] = {} - - @fake_rpi.RPi.printf - def setup(self, channel: int, state: int, initial: int=0, pull_up_down: Optional[int]=None) -> None: - _ = state # Makes linter happy - _ = pull_up_down # Makes linter happy - self.__states[int(channel)] = int(initial) - - @fake_rpi.RPi.printf - def output(self, channel: int, state: int) -> None: - self.__states[int(channel)] = int(state) - - @fake_rpi.RPi.printf - def input(self, channel: int) -> int: # pylint: disable=arguments-differ - return self.__states[int(channel)] - - @fake_rpi.RPi.printf - def cleanup(self, channel: Optional[int]=None) -> None: # pylint: disable=arguments-differ - _ = channel # Makes linter happy - self.__states = {} - - -# ===== -fake_rpi.RPi.GPIO = _GPIO() -sys.modules["RPi"] = fake_rpi.RPi diff --git a/testenv/tests/test_gpio.py b/testenv/tests/test_gpio.py deleted file mode 100644 index 3db61609..00000000 --- a/testenv/tests/test_gpio.py +++ /dev/null @@ -1,58 +0,0 @@ -# ========================================================================== # -# # -# KVMD - The main Pi-KVM daemon. # -# # -# Copyright (C) 2018 Maxim Devaev <[email protected]> # -# # -# This program is free software: you can redistribute it and/or modify # -# it under the terms of the GNU General Public License as published by # -# the Free Software Foundation, either version 3 of the License, or # -# (at your option) any later version. # -# # -# This program is distributed in the hope that it will be useful, # -# but WITHOUT ANY WARRANTY; without even the implied warranty of # -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # -# GNU General Public License for more details. # -# # -# You should have received a copy of the GNU General Public License # -# along with this program. If not, see <https://www.gnu.org/licenses/>. # -# # -# ========================================================================== # - - -import pytest - -from kvmd import gpio - - -# ===== [email protected]("pin", [0, 1, 13]) -def test_ok__loopback_initial_false(pin: int) -> None: - with gpio.bcm(): - assert gpio.set_output(pin, False) == pin - assert gpio.read(pin) is False - gpio.write(pin, True) - assert gpio.read(pin) is True - - [email protected]("pin", [0, 1, 13]) -def test_ok__loopback_initial_true(pin: int) -> None: - with gpio.bcm(): - assert gpio.set_output(pin, True) == pin - assert gpio.read(pin) is True - gpio.write(pin, False) - assert gpio.read(pin) is False - - [email protected]("pin", [0, 1, 13]) -def test_ok__input(pin: int) -> None: - with gpio.bcm(): - assert gpio.set_input(pin) == pin - assert gpio.read(pin) is False - - -def test_fail__invalid_pin() -> None: - with pytest.raises(AssertionError): - gpio.set_output(-1, False) - with pytest.raises(AssertionError): - gpio.set_input(-1) |