summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--PKGBUILD1
-rw-r--r--kvmd/apps/kvmd/__init__.py79
-rw-r--r--kvmd/gpio.py101
-rw-r--r--testenv/linters/vulture-wl.py2
-rw-r--r--testenv/requirements.txt2
-rw-r--r--testenv/tests/__init__.py39
-rw-r--r--testenv/tests/test_gpio.py58
7 files changed, 38 insertions, 244 deletions
diff --git a/PKGBUILD b/PKGBUILD
index 062e16b1..c4de0621 100644
--- a/PKGBUILD
+++ b/PKGBUILD
@@ -39,7 +39,6 @@ depends=(
python-aiohttp
python-aiofiles
python-passlib
- python-raspberry-gpio
python-pyserial
python-setproctitle
python-psutil
diff --git a/kvmd/apps/kvmd/__init__.py b/kvmd/apps/kvmd/__init__.py
index e257145d..bd08e157 100644
--- a/kvmd/apps/kvmd/__init__.py
+++ b/kvmd/apps/kvmd/__init__.py
@@ -25,8 +25,6 @@ from typing import Optional
from ...logging import get_logger
-from ... import gpio
-
from ...plugins.hid import get_hid_class
from ...plugins.atx import get_atx_class
from ...plugins.msd import get_msd_class
@@ -45,6 +43,8 @@ from .server import KvmdServer
# =====
def main(argv: Optional[List[str]]=None) -> None:
+ # pylint: disable=protected-access
+
config = init(
prog="kvmd",
description="The main Pi-KVM daemon",
@@ -56,48 +56,45 @@ def main(argv: Optional[List[str]]=None) -> None:
load_gpio=True,
)[2]
- with gpio.bcm():
- # pylint: disable=protected-access
-
- msd_kwargs = config.kvmd.msd._unpack(ignore=["type"])
- if config.kvmd.msd.type == "otg":
- msd_kwargs["gadget"] = config.otg.gadget # XXX: Small crutch to pass gadget name to plugin
-
- global_config = config
- config = config.kvmd
-
- hid = get_hid_class(config.hid.type)(**config.hid._unpack(ignore=["type", "keymap"]))
- streamer = Streamer(**config.streamer._unpack())
-
- KvmdServer(
- auth_manager=AuthManager(
- internal_type=config.auth.internal.type,
- internal_kwargs=config.auth.internal._unpack(ignore=["type", "force_users"]),
- external_type=config.auth.external.type,
- external_kwargs=(config.auth.external._unpack(ignore=["type"]) if config.auth.external.type else {}),
- force_internal_users=config.auth.internal.force_users,
- enabled=config.auth.enabled,
- ),
- info_manager=InfoManager(global_config),
- log_reader=LogReader(),
- wol=WakeOnLan(**config.wol._unpack()),
- user_gpio=UserGpio(config.gpio),
-
+ msd_kwargs = config.kvmd.msd._unpack(ignore=["type"])
+ if config.kvmd.msd.type == "otg":
+ msd_kwargs["gadget"] = config.otg.gadget # XXX: Small crutch to pass gadget name to plugin
+
+ global_config = config
+ config = config.kvmd
+
+ hid = get_hid_class(config.hid.type)(**config.hid._unpack(ignore=["type", "keymap"]))
+ streamer = Streamer(**config.streamer._unpack())
+
+ KvmdServer(
+ auth_manager=AuthManager(
+ internal_type=config.auth.internal.type,
+ internal_kwargs=config.auth.internal._unpack(ignore=["type", "force_users"]),
+ external_type=config.auth.external.type,
+ external_kwargs=(config.auth.external._unpack(ignore=["type"]) if config.auth.external.type else {}),
+ force_internal_users=config.auth.internal.force_users,
+ enabled=config.auth.enabled,
+ ),
+ info_manager=InfoManager(global_config),
+ log_reader=LogReader(),
+ wol=WakeOnLan(**config.wol._unpack()),
+ user_gpio=UserGpio(config.gpio),
+
+ hid=hid,
+ atx=get_atx_class(config.atx.type)(**config.atx._unpack(ignore=["type"])),
+ msd=get_msd_class(config.msd.type)(**msd_kwargs),
+ streamer=streamer,
+
+ snapshoter=Snapshoter(
hid=hid,
- atx=get_atx_class(config.atx.type)(**config.atx._unpack(ignore=["type"])),
- msd=get_msd_class(config.msd.type)(**msd_kwargs),
streamer=streamer,
+ **config.snapshot._unpack(),
+ ),
- snapshoter=Snapshoter(
- hid=hid,
- streamer=streamer,
- **config.snapshot._unpack(),
- ),
-
- heartbeat=config.server.heartbeat,
- sync_chunk_size=config.server.sync_chunk_size,
+ heartbeat=config.server.heartbeat,
+ sync_chunk_size=config.server.sync_chunk_size,
- keymap_path=config.hid.keymap,
- ).run(**config.server._unpack(ignore=["heartbeat", "sync_chunk_size"]))
+ keymap_path=config.hid.keymap,
+ ).run(**config.server._unpack(ignore=["heartbeat", "sync_chunk_size"]))
get_logger(0).info("Bye-bye")
diff --git a/kvmd/gpio.py b/kvmd/gpio.py
deleted file mode 100644
index 8dce12f6..00000000
--- a/kvmd/gpio.py
+++ /dev/null
@@ -1,101 +0,0 @@
-# ========================================================================== #
-# #
-# KVMD - The main Pi-KVM daemon. #
-# #
-# Copyright (C) 2018 Maxim Devaev <[email protected]> #
-# #
-# This program is free software: you can redistribute it and/or modify #
-# it under the terms of the GNU General Public License as published by #
-# the Free Software Foundation, either version 3 of the License, or #
-# (at your option) any later version. #
-# #
-# This program is distributed in the hope that it will be useful, #
-# but WITHOUT ANY WARRANTY; without even the implied warranty of #
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
-# GNU General Public License for more details. #
-# #
-# You should have received a copy of the GNU General Public License #
-# along with this program. If not, see <https://www.gnu.org/licenses/>. #
-# #
-# ========================================================================== #
-
-
-import asyncio
-import contextlib
-
-from typing import Tuple
-from typing import Set
-from typing import Generator
-from typing import Optional
-
-from RPi import GPIO
-
-from .logging import get_logger
-
-from . import aiotools
-
-
-# =====
-def bcm() -> Generator[None, None, None]:
- logger = get_logger(2)
- GPIO.setmode(GPIO.BCM)
- logger.info("Configured GPIO mode as BCM")
- try:
- yield
- finally:
- GPIO.cleanup()
- logger.info("GPIO cleaned")
-
-
-def set_output(pin: int, initial: Optional[bool]) -> int:
- assert pin >= 0, pin
- GPIO.setup(pin, GPIO.OUT, initial=initial)
- return pin
-
-
-def set_input(pin: int) -> int:
- assert pin >= 0, pin
- GPIO.setup(pin, GPIO.IN)
- return pin
-
-
-def read(pin: int) -> bool:
- assert pin >= 0, pin
- return bool(GPIO.input(pin))
-
-
-def write(pin: int, state: bool) -> None:
- assert pin >= 0, pin
- GPIO.output(pin, state)
-
-
-class BatchReader:
- def __init__(
- self,
- pins: Set[int],
- interval: float,
- notifier: aiotools.AioNotifier,
- ) -> None:
-
- self.__pins = sorted(pins)
- self.__interval = interval
- self.__notifier = notifier
-
- self.__state = {pin: read(pin) for pin in self.__pins}
- self.__flags: Tuple[Optional[bool], ...] = (None,) * len(self.__pins)
-
- def get(self, pin: int) -> bool:
- return self.__state[pin]
-
- async def poll(self) -> None:
- if not self.__pins:
- await aiotools.wait_infinite()
- else:
- while True:
- flags = tuple(map(read, self.__pins))
- if flags != self.__flags:
- self.__flags = flags
- self.__state = dict(zip(self.__pins, flags))
- await self.__notifier.notify()
- await asyncio.sleep(self.__interval)
diff --git a/testenv/linters/vulture-wl.py b/testenv/linters/vulture-wl.py
index afb95bc2..62338131 100644
--- a/testenv/linters/vulture-wl.py
+++ b/testenv/linters/vulture-wl.py
@@ -20,8 +20,6 @@ IpmiServer.handle_raw_request
_AtxApiPart.switch_power
-fake_rpi.RPi.GPIO
-
_KeyMapping.web_name
_KeyMapping.serial_code
_KeyMapping.arduino_name
diff --git a/testenv/requirements.txt b/testenv/requirements.txt
index a88dd993..74cfcdb5 100644
--- a/testenv/requirements.txt
+++ b/testenv/requirements.txt
@@ -1,5 +1,3 @@
-git+git://github.com/willbuckner/rpi-gpio-development-mock@master#egg=rpi
-fake_rpi
aiohttp
aiofiles
passlib
diff --git a/testenv/tests/__init__.py b/testenv/tests/__init__.py
index d1faace6..1e91f7fa 100644
--- a/testenv/tests/__init__.py
+++ b/testenv/tests/__init__.py
@@ -18,42 +18,3 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
# #
# ========================================================================== #
-
-
-import sys
-
-from typing import Dict
-from typing import Optional
-
-import fake_rpi.RPi
-
-
-# =====
-class _GPIO(fake_rpi.RPi._GPIO): # pylint: disable=protected-access
- def __init__(self) -> None:
- super().__init__()
- self.__states: Dict[int, int] = {}
-
- @fake_rpi.RPi.printf
- def setup(self, channel: int, state: int, initial: int=0, pull_up_down: Optional[int]=None) -> None:
- _ = state # Makes linter happy
- _ = pull_up_down # Makes linter happy
- self.__states[int(channel)] = int(initial)
-
- @fake_rpi.RPi.printf
- def output(self, channel: int, state: int) -> None:
- self.__states[int(channel)] = int(state)
-
- @fake_rpi.RPi.printf
- def input(self, channel: int) -> int: # pylint: disable=arguments-differ
- return self.__states[int(channel)]
-
- @fake_rpi.RPi.printf
- def cleanup(self, channel: Optional[int]=None) -> None: # pylint: disable=arguments-differ
- _ = channel # Makes linter happy
- self.__states = {}
-
-
-# =====
-fake_rpi.RPi.GPIO = _GPIO()
-sys.modules["RPi"] = fake_rpi.RPi
diff --git a/testenv/tests/test_gpio.py b/testenv/tests/test_gpio.py
deleted file mode 100644
index 3db61609..00000000
--- a/testenv/tests/test_gpio.py
+++ /dev/null
@@ -1,58 +0,0 @@
-# ========================================================================== #
-# #
-# KVMD - The main Pi-KVM daemon. #
-# #
-# Copyright (C) 2018 Maxim Devaev <[email protected]> #
-# #
-# This program is free software: you can redistribute it and/or modify #
-# it under the terms of the GNU General Public License as published by #
-# the Free Software Foundation, either version 3 of the License, or #
-# (at your option) any later version. #
-# #
-# This program is distributed in the hope that it will be useful, #
-# but WITHOUT ANY WARRANTY; without even the implied warranty of #
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
-# GNU General Public License for more details. #
-# #
-# You should have received a copy of the GNU General Public License #
-# along with this program. If not, see <https://www.gnu.org/licenses/>. #
-# #
-# ========================================================================== #
-
-
-import pytest
-
-from kvmd import gpio
-
-
-# =====
[email protected]("pin", [0, 1, 13])
-def test_ok__loopback_initial_false(pin: int) -> None:
- with gpio.bcm():
- assert gpio.set_output(pin, False) == pin
- assert gpio.read(pin) is False
- gpio.write(pin, True)
- assert gpio.read(pin) is True
-
-
[email protected]("pin", [0, 1, 13])
-def test_ok__loopback_initial_true(pin: int) -> None:
- with gpio.bcm():
- assert gpio.set_output(pin, True) == pin
- assert gpio.read(pin) is True
- gpio.write(pin, False)
- assert gpio.read(pin) is False
-
-
[email protected]("pin", [0, 1, 13])
-def test_ok__input(pin: int) -> None:
- with gpio.bcm():
- assert gpio.set_input(pin) == pin
- assert gpio.read(pin) is False
-
-
-def test_fail__invalid_pin() -> None:
- with pytest.raises(AssertionError):
- gpio.set_output(-1, False)
- with pytest.raises(AssertionError):
- gpio.set_input(-1)