summaryrefslogtreecommitdiff
path: root/kvmd
diff options
context:
space:
mode:
authorDevaev Maxim <[email protected]>2020-09-13 21:43:52 +0300
committerDevaev Maxim <[email protected]>2020-09-13 21:43:52 +0300
commit5ed0c27f1f7201783d60e035ba26f35f62539a08 (patch)
treeb51e154595e9ce14499cd950d902c8dc41da49c8 /kvmd
parent0ad0d17528eeb52ba0dee78d5be440f092fed7d7 (diff)
removed rpi.gpio
Diffstat (limited to 'kvmd')
-rw-r--r--kvmd/apps/kvmd/__init__.py79
-rw-r--r--kvmd/gpio.py101
2 files changed, 38 insertions, 142 deletions
diff --git a/kvmd/apps/kvmd/__init__.py b/kvmd/apps/kvmd/__init__.py
index e257145d..bd08e157 100644
--- a/kvmd/apps/kvmd/__init__.py
+++ b/kvmd/apps/kvmd/__init__.py
@@ -25,8 +25,6 @@ from typing import Optional
from ...logging import get_logger
-from ... import gpio
-
from ...plugins.hid import get_hid_class
from ...plugins.atx import get_atx_class
from ...plugins.msd import get_msd_class
@@ -45,6 +43,8 @@ from .server import KvmdServer
# =====
def main(argv: Optional[List[str]]=None) -> None:
+ # pylint: disable=protected-access
+
config = init(
prog="kvmd",
description="The main Pi-KVM daemon",
@@ -56,48 +56,45 @@ def main(argv: Optional[List[str]]=None) -> None:
load_gpio=True,
)[2]
- with gpio.bcm():
- # pylint: disable=protected-access
-
- msd_kwargs = config.kvmd.msd._unpack(ignore=["type"])
- if config.kvmd.msd.type == "otg":
- msd_kwargs["gadget"] = config.otg.gadget # XXX: Small crutch to pass gadget name to plugin
-
- global_config = config
- config = config.kvmd
-
- hid = get_hid_class(config.hid.type)(**config.hid._unpack(ignore=["type", "keymap"]))
- streamer = Streamer(**config.streamer._unpack())
-
- KvmdServer(
- auth_manager=AuthManager(
- internal_type=config.auth.internal.type,
- internal_kwargs=config.auth.internal._unpack(ignore=["type", "force_users"]),
- external_type=config.auth.external.type,
- external_kwargs=(config.auth.external._unpack(ignore=["type"]) if config.auth.external.type else {}),
- force_internal_users=config.auth.internal.force_users,
- enabled=config.auth.enabled,
- ),
- info_manager=InfoManager(global_config),
- log_reader=LogReader(),
- wol=WakeOnLan(**config.wol._unpack()),
- user_gpio=UserGpio(config.gpio),
-
+ msd_kwargs = config.kvmd.msd._unpack(ignore=["type"])
+ if config.kvmd.msd.type == "otg":
+ msd_kwargs["gadget"] = config.otg.gadget # XXX: Small crutch to pass gadget name to plugin
+
+ global_config = config
+ config = config.kvmd
+
+ hid = get_hid_class(config.hid.type)(**config.hid._unpack(ignore=["type", "keymap"]))
+ streamer = Streamer(**config.streamer._unpack())
+
+ KvmdServer(
+ auth_manager=AuthManager(
+ internal_type=config.auth.internal.type,
+ internal_kwargs=config.auth.internal._unpack(ignore=["type", "force_users"]),
+ external_type=config.auth.external.type,
+ external_kwargs=(config.auth.external._unpack(ignore=["type"]) if config.auth.external.type else {}),
+ force_internal_users=config.auth.internal.force_users,
+ enabled=config.auth.enabled,
+ ),
+ info_manager=InfoManager(global_config),
+ log_reader=LogReader(),
+ wol=WakeOnLan(**config.wol._unpack()),
+ user_gpio=UserGpio(config.gpio),
+
+ hid=hid,
+ atx=get_atx_class(config.atx.type)(**config.atx._unpack(ignore=["type"])),
+ msd=get_msd_class(config.msd.type)(**msd_kwargs),
+ streamer=streamer,
+
+ snapshoter=Snapshoter(
hid=hid,
- atx=get_atx_class(config.atx.type)(**config.atx._unpack(ignore=["type"])),
- msd=get_msd_class(config.msd.type)(**msd_kwargs),
streamer=streamer,
+ **config.snapshot._unpack(),
+ ),
- snapshoter=Snapshoter(
- hid=hid,
- streamer=streamer,
- **config.snapshot._unpack(),
- ),
-
- heartbeat=config.server.heartbeat,
- sync_chunk_size=config.server.sync_chunk_size,
+ heartbeat=config.server.heartbeat,
+ sync_chunk_size=config.server.sync_chunk_size,
- keymap_path=config.hid.keymap,
- ).run(**config.server._unpack(ignore=["heartbeat", "sync_chunk_size"]))
+ keymap_path=config.hid.keymap,
+ ).run(**config.server._unpack(ignore=["heartbeat", "sync_chunk_size"]))
get_logger(0).info("Bye-bye")
diff --git a/kvmd/gpio.py b/kvmd/gpio.py
deleted file mode 100644
index 8dce12f6..00000000
--- a/kvmd/gpio.py
+++ /dev/null
@@ -1,101 +0,0 @@
-# ========================================================================== #
-# #
-# KVMD - The main Pi-KVM daemon. #
-# #
-# Copyright (C) 2018 Maxim Devaev <[email protected]> #
-# #
-# This program is free software: you can redistribute it and/or modify #
-# it under the terms of the GNU General Public License as published by #
-# the Free Software Foundation, either version 3 of the License, or #
-# (at your option) any later version. #
-# #
-# This program is distributed in the hope that it will be useful, #
-# but WITHOUT ANY WARRANTY; without even the implied warranty of #
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
-# GNU General Public License for more details. #
-# #
-# You should have received a copy of the GNU General Public License #
-# along with this program. If not, see <https://www.gnu.org/licenses/>. #
-# #
-# ========================================================================== #
-
-
-import asyncio
-import contextlib
-
-from typing import Tuple
-from typing import Set
-from typing import Generator
-from typing import Optional
-
-from RPi import GPIO
-
-from .logging import get_logger
-
-from . import aiotools
-
-
-# =====
-def bcm() -> Generator[None, None, None]:
- logger = get_logger(2)
- GPIO.setmode(GPIO.BCM)
- logger.info("Configured GPIO mode as BCM")
- try:
- yield
- finally:
- GPIO.cleanup()
- logger.info("GPIO cleaned")
-
-
-def set_output(pin: int, initial: Optional[bool]) -> int:
- assert pin >= 0, pin
- GPIO.setup(pin, GPIO.OUT, initial=initial)
- return pin
-
-
-def set_input(pin: int) -> int:
- assert pin >= 0, pin
- GPIO.setup(pin, GPIO.IN)
- return pin
-
-
-def read(pin: int) -> bool:
- assert pin >= 0, pin
- return bool(GPIO.input(pin))
-
-
-def write(pin: int, state: bool) -> None:
- assert pin >= 0, pin
- GPIO.output(pin, state)
-
-
-class BatchReader:
- def __init__(
- self,
- pins: Set[int],
- interval: float,
- notifier: aiotools.AioNotifier,
- ) -> None:
-
- self.__pins = sorted(pins)
- self.__interval = interval
- self.__notifier = notifier
-
- self.__state = {pin: read(pin) for pin in self.__pins}
- self.__flags: Tuple[Optional[bool], ...] = (None,) * len(self.__pins)
-
- def get(self, pin: int) -> bool:
- return self.__state[pin]
-
- async def poll(self) -> None:
- if not self.__pins:
- await aiotools.wait_infinite()
- else:
- while True:
- flags = tuple(map(read, self.__pins))
- if flags != self.__flags:
- self.__flags = flags
- self.__state = dict(zip(self.__pins, flags))
- await self.__notifier.notify()
- await asyncio.sleep(self.__interval)