diff options
author | Devaev Maxim <[email protected]> | 2020-09-13 21:43:52 +0300 |
---|---|---|
committer | Devaev Maxim <[email protected]> | 2020-09-13 21:43:52 +0300 |
commit | 5ed0c27f1f7201783d60e035ba26f35f62539a08 (patch) | |
tree | b51e154595e9ce14499cd950d902c8dc41da49c8 /kvmd | |
parent | 0ad0d17528eeb52ba0dee78d5be440f092fed7d7 (diff) |
removed rpi.gpio
Diffstat (limited to 'kvmd')
-rw-r--r-- | kvmd/apps/kvmd/__init__.py | 79 | ||||
-rw-r--r-- | kvmd/gpio.py | 101 |
2 files changed, 38 insertions, 142 deletions
diff --git a/kvmd/apps/kvmd/__init__.py b/kvmd/apps/kvmd/__init__.py index e257145d..bd08e157 100644 --- a/kvmd/apps/kvmd/__init__.py +++ b/kvmd/apps/kvmd/__init__.py @@ -25,8 +25,6 @@ from typing import Optional from ...logging import get_logger -from ... import gpio - from ...plugins.hid import get_hid_class from ...plugins.atx import get_atx_class from ...plugins.msd import get_msd_class @@ -45,6 +43,8 @@ from .server import KvmdServer # ===== def main(argv: Optional[List[str]]=None) -> None: + # pylint: disable=protected-access + config = init( prog="kvmd", description="The main Pi-KVM daemon", @@ -56,48 +56,45 @@ def main(argv: Optional[List[str]]=None) -> None: load_gpio=True, )[2] - with gpio.bcm(): - # pylint: disable=protected-access - - msd_kwargs = config.kvmd.msd._unpack(ignore=["type"]) - if config.kvmd.msd.type == "otg": - msd_kwargs["gadget"] = config.otg.gadget # XXX: Small crutch to pass gadget name to plugin - - global_config = config - config = config.kvmd - - hid = get_hid_class(config.hid.type)(**config.hid._unpack(ignore=["type", "keymap"])) - streamer = Streamer(**config.streamer._unpack()) - - KvmdServer( - auth_manager=AuthManager( - internal_type=config.auth.internal.type, - internal_kwargs=config.auth.internal._unpack(ignore=["type", "force_users"]), - external_type=config.auth.external.type, - external_kwargs=(config.auth.external._unpack(ignore=["type"]) if config.auth.external.type else {}), - force_internal_users=config.auth.internal.force_users, - enabled=config.auth.enabled, - ), - info_manager=InfoManager(global_config), - log_reader=LogReader(), - wol=WakeOnLan(**config.wol._unpack()), - user_gpio=UserGpio(config.gpio), - + msd_kwargs = config.kvmd.msd._unpack(ignore=["type"]) + if config.kvmd.msd.type == "otg": + msd_kwargs["gadget"] = config.otg.gadget # XXX: Small crutch to pass gadget name to plugin + + global_config = config + config = config.kvmd + + hid = get_hid_class(config.hid.type)(**config.hid._unpack(ignore=["type", "keymap"])) + streamer = Streamer(**config.streamer._unpack()) + + KvmdServer( + auth_manager=AuthManager( + internal_type=config.auth.internal.type, + internal_kwargs=config.auth.internal._unpack(ignore=["type", "force_users"]), + external_type=config.auth.external.type, + external_kwargs=(config.auth.external._unpack(ignore=["type"]) if config.auth.external.type else {}), + force_internal_users=config.auth.internal.force_users, + enabled=config.auth.enabled, + ), + info_manager=InfoManager(global_config), + log_reader=LogReader(), + wol=WakeOnLan(**config.wol._unpack()), + user_gpio=UserGpio(config.gpio), + + hid=hid, + atx=get_atx_class(config.atx.type)(**config.atx._unpack(ignore=["type"])), + msd=get_msd_class(config.msd.type)(**msd_kwargs), + streamer=streamer, + + snapshoter=Snapshoter( hid=hid, - atx=get_atx_class(config.atx.type)(**config.atx._unpack(ignore=["type"])), - msd=get_msd_class(config.msd.type)(**msd_kwargs), streamer=streamer, + **config.snapshot._unpack(), + ), - snapshoter=Snapshoter( - hid=hid, - streamer=streamer, - **config.snapshot._unpack(), - ), - - heartbeat=config.server.heartbeat, - sync_chunk_size=config.server.sync_chunk_size, + heartbeat=config.server.heartbeat, + sync_chunk_size=config.server.sync_chunk_size, - keymap_path=config.hid.keymap, - ).run(**config.server._unpack(ignore=["heartbeat", "sync_chunk_size"])) + keymap_path=config.hid.keymap, + ).run(**config.server._unpack(ignore=["heartbeat", "sync_chunk_size"])) get_logger(0).info("Bye-bye") diff --git a/kvmd/gpio.py b/kvmd/gpio.py deleted file mode 100644 index 8dce12f6..00000000 --- a/kvmd/gpio.py +++ /dev/null @@ -1,101 +0,0 @@ -# ========================================================================== # -# # -# KVMD - The main Pi-KVM daemon. # -# # -# Copyright (C) 2018 Maxim Devaev <[email protected]> # -# # -# This program is free software: you can redistribute it and/or modify # -# it under the terms of the GNU General Public License as published by # -# the Free Software Foundation, either version 3 of the License, or # -# (at your option) any later version. # -# # -# This program is distributed in the hope that it will be useful, # -# but WITHOUT ANY WARRANTY; without even the implied warranty of # -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # -# GNU General Public License for more details. # -# # -# You should have received a copy of the GNU General Public License # -# along with this program. If not, see <https://www.gnu.org/licenses/>. # -# # -# ========================================================================== # - - -import asyncio -import contextlib - -from typing import Tuple -from typing import Set -from typing import Generator -from typing import Optional - -from RPi import GPIO - -from .logging import get_logger - -from . import aiotools - - -# ===== -def bcm() -> Generator[None, None, None]: - logger = get_logger(2) - GPIO.setmode(GPIO.BCM) - logger.info("Configured GPIO mode as BCM") - try: - yield - finally: - GPIO.cleanup() - logger.info("GPIO cleaned") - - -def set_output(pin: int, initial: Optional[bool]) -> int: - assert pin >= 0, pin - GPIO.setup(pin, GPIO.OUT, initial=initial) - return pin - - -def set_input(pin: int) -> int: - assert pin >= 0, pin - GPIO.setup(pin, GPIO.IN) - return pin - - -def read(pin: int) -> bool: - assert pin >= 0, pin - return bool(GPIO.input(pin)) - - -def write(pin: int, state: bool) -> None: - assert pin >= 0, pin - GPIO.output(pin, state) - - -class BatchReader: - def __init__( - self, - pins: Set[int], - interval: float, - notifier: aiotools.AioNotifier, - ) -> None: - - self.__pins = sorted(pins) - self.__interval = interval - self.__notifier = notifier - - self.__state = {pin: read(pin) for pin in self.__pins} - self.__flags: Tuple[Optional[bool], ...] = (None,) * len(self.__pins) - - def get(self, pin: int) -> bool: - return self.__state[pin] - - async def poll(self) -> None: - if not self.__pins: - await aiotools.wait_infinite() - else: - while True: - flags = tuple(map(read, self.__pins)) - if flags != self.__flags: - self.__flags = flags - self.__state = dict(zip(self.__pins, flags)) - await self.__notifier.notify() - await asyncio.sleep(self.__interval) |