diff options
author | Maxim Devaev <[email protected]> | 2020-09-17 01:12:09 +0300 |
---|---|---|
committer | GitHub <[email protected]> | 2020-09-17 01:12:09 +0300 |
commit | 1f3cdd03be80b5a297db58c8118f63c1415f4584 (patch) | |
tree | e2e9321830f8fff5be3b03451c4468bee9c70cf5 /kvmd | |
parent | 1c31b8f80d90c6d4b1311e7527343d54316974d5 (diff) | |
parent | 3f79f55a9ef6368b0bd2c76bf55c7e139ecfb9c7 (diff) |
Merge pull request #8 from pikvm/libgpiod
Libgpiod
Diffstat (limited to 'kvmd')
-rw-r--r-- | kvmd/aiogp.py | 180 | ||||
-rw-r--r-- | kvmd/aiotools.py | 3 | ||||
-rw-r--r-- | kvmd/apps/__init__.py | 4 | ||||
-rw-r--r-- | kvmd/apps/cleanup/__init__.py | 34 | ||||
-rw-r--r-- | kvmd/apps/kvmd/__init__.py | 79 | ||||
-rw-r--r-- | kvmd/apps/kvmd/ugpio.py | 9 | ||||
-rw-r--r-- | kvmd/gpio.py | 101 | ||||
-rw-r--r-- | kvmd/plugins/atx/gpio.py | 100 | ||||
-rw-r--r-- | kvmd/plugins/hid/serial.py | 71 | ||||
-rw-r--r-- | kvmd/plugins/msd/relay.py | 87 | ||||
-rw-r--r-- | kvmd/plugins/ugpio/__init__.py | 2 | ||||
-rw-r--r-- | kvmd/plugins/ugpio/gpio.py | 62 | ||||
-rw-r--r-- | kvmd/plugins/ugpio/hidrelay.py | 2 |
13 files changed, 428 insertions, 306 deletions
diff --git a/kvmd/aiogp.py b/kvmd/aiogp.py new file mode 100644 index 00000000..aa7c2778 --- /dev/null +++ b/kvmd/aiogp.py @@ -0,0 +1,180 @@ +# ========================================================================== # +# # +# KVMD - The main Pi-KVM daemon. # +# # +# Copyright (C) 2018 Maxim Devaev <[email protected]> # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see <https://www.gnu.org/licenses/>. # +# # +# ========================================================================== # + + +import os +import asyncio +import asyncio.queues +import threading +import dataclasses + +from typing import Tuple +from typing import Dict +from typing import Optional + +import gpiod + +from . import aiotools + + +# ===== +# XXX: Do not use this variable for any purpose other than testing. +# It can be removed at any time. +DEVICE_PATH = os.getenv("KVMD_GPIO_DEVICE_PATH", "/dev/gpiochip0") + + +# ===== +async def pulse(line: gpiod.Line, delay: float, final: float) -> None: + try: + line.set_value(1) + await asyncio.sleep(delay) + finally: + line.set_value(0) + await asyncio.sleep(final) + + +# ===== [email protected](frozen=True) +class AioReaderPinParams: + inverted: bool + debounce: float + + +class AioReader: # pylint: disable=too-many-instance-attributes + def __init__( + self, + path: str, + consumer: str, + pins: Dict[int, AioReaderPinParams], + notifier: aiotools.AioNotifier, + ) -> None: + + self.__path = path + self.__consumer = consumer + self.__pins = pins + self.__notifier = notifier + + self.__values: Optional[Dict[int, _DebouncedValue]] = None + + self.__thread = threading.Thread(target=self.__run, daemon=True) + self.__stop_event = threading.Event() + + self.__loop: Optional[asyncio.AbstractEventLoop] = None + + def get(self, pin: int) -> bool: + value = (self.__values[pin].get() if self.__values is not None else False) + return (value ^ self.__pins[pin].inverted) + + async def poll(self) -> None: + if not self.__pins: + await aiotools.wait_infinite() + else: + assert self.__loop is None + self.__loop = asyncio.get_running_loop() + self.__thread.start() + try: + await aiotools.run_async(self.__thread.join) + finally: + self.__stop_event.set() + await aiotools.run_async(self.__thread.join) + + def __run(self) -> None: + assert self.__values is None + assert self.__loop + with gpiod.Chip(self.__path) as chip: + pins = sorted(self.__pins) + lines = chip.get_lines(pins) + lines.request(self.__consumer, gpiod.LINE_REQ_EV_BOTH_EDGES) + + lines.event_wait(nsec=1) + self.__values = { + pin: _DebouncedValue( + initial=bool(value), + debounce=self.__pins[pin].debounce, + notifier=self.__notifier, + loop=self.__loop, + ) + for (pin, value) in zip(pins, lines.get_values()) + } + self.__loop.call_soon_threadsafe(self.__notifier.notify_sync) + + while not self.__stop_event.is_set(): + ev_lines = lines.event_wait(1) + if ev_lines: + for ev_line in ev_lines: + events = ev_line.event_read_multiple() + if events: + (pin, value) = self.__parse_event(events[-1]) + self.__values[pin].set(bool(value)) + else: # Timeout + # Размер буфера ядра - 16 эвентов на линии. При превышении этого числа, + # новые эвенты потеряются. Это не баг, это фича, как мне объяснили в LKML. + # Штош. Будем с этим жить и синхронизировать состояния при таймауте. + for (pin, value) in zip(pins, lines.get_values()): + self.__values[pin].set(bool(value)) + + def __parse_event(self, event: gpiod.LineEvent) -> Tuple[int, int]: + pin = event.source.offset() + if event.type == gpiod.LineEvent.RISING_EDGE: + return (pin, 1) + elif event.type == gpiod.LineEvent.FALLING_EDGE: + return (pin, 0) + raise RuntimeError(f"Invalid event {event} type: {event.type}") + + +class _DebouncedValue: + def __init__( + self, + initial: bool, + debounce: float, + notifier: aiotools.AioNotifier, + loop: asyncio.AbstractEventLoop, + ) -> None: + + self.__value = initial + self.__debounce = debounce + self.__notifier = notifier + self.__loop = loop + + self.__queue: asyncio.queues.Queue = asyncio.Queue(loop=loop) + self.__task = loop.create_task(self.__consumer_task_loop()) + + def set(self, value: bool) -> None: + if self.__loop.is_running(): + self.__check_alive() + self.__loop.call_soon_threadsafe(self.__queue.put_nowait, value) + + def get(self) -> bool: + return self.__value + + def __check_alive(self) -> None: + if self.__task.done() and not self.__task.cancelled(): + raise RuntimeError("Dead debounce consumer") + + async def __consumer_task_loop(self) -> None: + while True: + value = await self.__queue.get() + while not self.__queue.empty(): + value = await self.__queue.get() + if self.__value != value: + self.__value = value + await self.__notifier.notify() + await asyncio.sleep(self.__debounce) diff --git a/kvmd/aiotools.py b/kvmd/aiotools.py index 84c6f314..dfd67f44 100644 --- a/kvmd/aiotools.py +++ b/kvmd/aiotools.py @@ -97,6 +97,9 @@ class AioNotifier: async def notify(self) -> None: await self.__queue.put(None) + def notify_sync(self) -> None: + self.__queue.put_nowait(None) + async def wait(self) -> None: await self.__queue.get() while not self.__queue.empty(): diff --git a/kvmd/apps/__init__.py b/kvmd/apps/__init__.py index ac140466..ccede372 100644 --- a/kvmd/apps/__init__.py +++ b/kvmd/apps/__init__.py @@ -227,7 +227,9 @@ def _patch_dynamic( # pylint: disable=too-many-locals "min_delay": Option(0.1, type=valid_float_f01), "max_delay": Option(0.1, type=valid_float_f01), }, - } if mode == UserGpioModes.OUTPUT else {}) + } if mode == UserGpioModes.OUTPUT else { # input + "debounce": Option(0.1, type=valid_float_f0), + }) } rebuild = True diff --git a/kvmd/apps/cleanup/__init__.py b/kvmd/apps/cleanup/__init__.py index 5b6f2860..ec6a43c6 100644 --- a/kvmd/apps/cleanup/__init__.py +++ b/kvmd/apps/cleanup/__init__.py @@ -33,39 +33,10 @@ from ...logging import get_logger from ...yamlconf import Section -from ... import gpio - from .. import init # ===== -def _clear_gpio(config: Section) -> None: - logger = get_logger(0) - - with gpio.bcm(): - for (name, pin) in [ - *([ - ("hid_serial/reset", config.hid.reset_pin), - ] if config.hid.type == "serial" else []), - - *([ - ("atx_gpio/power_switch", config.atx.power_switch_pin), - ("atx_gpio/reset_switch", config.atx.reset_switch_pin), - ] if config.atx.type == "gpio" else []), - - *([ - ("msd_relay/target", config.msd.target_pin), - ("msd_relay/reset", config.msd.reset_pin), - ] if config.msd.type == "relay" else []), - ]: - if pin >= 0: - logger.info("Writing 0 to GPIO pin=%d (%s)", pin, name) - try: - gpio.set_output(pin, False) - except Exception: - logger.exception("Can't clear GPIO pin=%d (%s)", pin, name) - - def _kill_streamer(config: Section) -> None: logger = get_logger(0) @@ -108,17 +79,12 @@ def main(argv: Optional[List[str]]=None) -> None: prog="kvmd-cleanup", description="Kill KVMD and clear resources", argv=argv, - load_hid=True, - load_atx=True, - load_msd=True, - load_gpio=True, )[2].kvmd logger = get_logger(0) logger.info("Cleaning up ...") for method in [ - _clear_gpio, _kill_streamer, _remove_sockets, ]: diff --git a/kvmd/apps/kvmd/__init__.py b/kvmd/apps/kvmd/__init__.py index e257145d..bd08e157 100644 --- a/kvmd/apps/kvmd/__init__.py +++ b/kvmd/apps/kvmd/__init__.py @@ -25,8 +25,6 @@ from typing import Optional from ...logging import get_logger -from ... import gpio - from ...plugins.hid import get_hid_class from ...plugins.atx import get_atx_class from ...plugins.msd import get_msd_class @@ -45,6 +43,8 @@ from .server import KvmdServer # ===== def main(argv: Optional[List[str]]=None) -> None: + # pylint: disable=protected-access + config = init( prog="kvmd", description="The main Pi-KVM daemon", @@ -56,48 +56,45 @@ def main(argv: Optional[List[str]]=None) -> None: load_gpio=True, )[2] - with gpio.bcm(): - # pylint: disable=protected-access - - msd_kwargs = config.kvmd.msd._unpack(ignore=["type"]) - if config.kvmd.msd.type == "otg": - msd_kwargs["gadget"] = config.otg.gadget # XXX: Small crutch to pass gadget name to plugin - - global_config = config - config = config.kvmd - - hid = get_hid_class(config.hid.type)(**config.hid._unpack(ignore=["type", "keymap"])) - streamer = Streamer(**config.streamer._unpack()) - - KvmdServer( - auth_manager=AuthManager( - internal_type=config.auth.internal.type, - internal_kwargs=config.auth.internal._unpack(ignore=["type", "force_users"]), - external_type=config.auth.external.type, - external_kwargs=(config.auth.external._unpack(ignore=["type"]) if config.auth.external.type else {}), - force_internal_users=config.auth.internal.force_users, - enabled=config.auth.enabled, - ), - info_manager=InfoManager(global_config), - log_reader=LogReader(), - wol=WakeOnLan(**config.wol._unpack()), - user_gpio=UserGpio(config.gpio), - + msd_kwargs = config.kvmd.msd._unpack(ignore=["type"]) + if config.kvmd.msd.type == "otg": + msd_kwargs["gadget"] = config.otg.gadget # XXX: Small crutch to pass gadget name to plugin + + global_config = config + config = config.kvmd + + hid = get_hid_class(config.hid.type)(**config.hid._unpack(ignore=["type", "keymap"])) + streamer = Streamer(**config.streamer._unpack()) + + KvmdServer( + auth_manager=AuthManager( + internal_type=config.auth.internal.type, + internal_kwargs=config.auth.internal._unpack(ignore=["type", "force_users"]), + external_type=config.auth.external.type, + external_kwargs=(config.auth.external._unpack(ignore=["type"]) if config.auth.external.type else {}), + force_internal_users=config.auth.internal.force_users, + enabled=config.auth.enabled, + ), + info_manager=InfoManager(global_config), + log_reader=LogReader(), + wol=WakeOnLan(**config.wol._unpack()), + user_gpio=UserGpio(config.gpio), + + hid=hid, + atx=get_atx_class(config.atx.type)(**config.atx._unpack(ignore=["type"])), + msd=get_msd_class(config.msd.type)(**msd_kwargs), + streamer=streamer, + + snapshoter=Snapshoter( hid=hid, - atx=get_atx_class(config.atx.type)(**config.atx._unpack(ignore=["type"])), - msd=get_msd_class(config.msd.type)(**msd_kwargs), streamer=streamer, + **config.snapshot._unpack(), + ), - snapshoter=Snapshoter( - hid=hid, - streamer=streamer, - **config.snapshot._unpack(), - ), - - heartbeat=config.server.heartbeat, - sync_chunk_size=config.server.sync_chunk_size, + heartbeat=config.server.heartbeat, + sync_chunk_size=config.server.sync_chunk_size, - keymap_path=config.hid.keymap, - ).run(**config.server._unpack(ignore=["heartbeat", "sync_chunk_size"])) + keymap_path=config.hid.keymap, + ).run(**config.server._unpack(ignore=["heartbeat", "sync_chunk_size"])) get_logger(0).info("Bye-bye") diff --git a/kvmd/apps/kvmd/ugpio.py b/kvmd/apps/kvmd/ugpio.py index f2ff2b1b..cb328115 100644 --- a/kvmd/apps/kvmd/ugpio.py +++ b/kvmd/apps/kvmd/ugpio.py @@ -81,7 +81,7 @@ class _GpioInput: self.__inverted: bool = config.inverted self.__driver = driver - self.__driver.register_input(self.__pin) + self.__driver.register_input(self.__pin, config.debounce) def get_scheme(self) -> Dict: return { @@ -201,10 +201,9 @@ class _GpioOutput: # pylint: disable=too-many-instance-attributes @aiotools.atomic async def __inner_switch(self, state: bool) -> None: - if state != self.__read(): - self.__write(state) - get_logger(0).info("Switched %s to state=%d", self, state) - await asyncio.sleep(self.__busy_delay) + self.__write(state) + get_logger(0).info("Ensured switch %s to state=%d", self, state) + await asyncio.sleep(self.__busy_delay) @aiotools.atomic async def __inner_pulse(self, delay: float) -> None: diff --git a/kvmd/gpio.py b/kvmd/gpio.py deleted file mode 100644 index 8dce12f6..00000000 --- a/kvmd/gpio.py +++ /dev/null @@ -1,101 +0,0 @@ -# ========================================================================== # -# # -# KVMD - The main Pi-KVM daemon. # -# # -# Copyright (C) 2018 Maxim Devaev <[email protected]> # -# # -# This program is free software: you can redistribute it and/or modify # -# it under the terms of the GNU General Public License as published by # -# the Free Software Foundation, either version 3 of the License, or # -# (at your option) any later version. # -# # -# This program is distributed in the hope that it will be useful, # -# but WITHOUT ANY WARRANTY; without even the implied warranty of # -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # -# GNU General Public License for more details. # -# # -# You should have received a copy of the GNU General Public License # -# along with this program. If not, see <https://www.gnu.org/licenses/>. # -# # -# ========================================================================== # - - -import asyncio -import contextlib - -from typing import Tuple -from typing import Set -from typing import Generator -from typing import Optional - -from RPi import GPIO - -from .logging import get_logger - -from . import aiotools - - -# ===== -def bcm() -> Generator[None, None, None]: - logger = get_logger(2) - GPIO.setmode(GPIO.BCM) - logger.info("Configured GPIO mode as BCM") - try: - yield - finally: - GPIO.cleanup() - logger.info("GPIO cleaned") - - -def set_output(pin: int, initial: Optional[bool]) -> int: - assert pin >= 0, pin - GPIO.setup(pin, GPIO.OUT, initial=initial) - return pin - - -def set_input(pin: int) -> int: - assert pin >= 0, pin - GPIO.setup(pin, GPIO.IN) - return pin - - -def read(pin: int) -> bool: - assert pin >= 0, pin - return bool(GPIO.input(pin)) - - -def write(pin: int, state: bool) -> None: - assert pin >= 0, pin - GPIO.output(pin, state) - - -class BatchReader: - def __init__( - self, - pins: Set[int], - interval: float, - notifier: aiotools.AioNotifier, - ) -> None: - - self.__pins = sorted(pins) - self.__interval = interval - self.__notifier = notifier - - self.__state = {pin: read(pin) for pin in self.__pins} - self.__flags: Tuple[Optional[bool], ...] = (None,) * len(self.__pins) - - def get(self, pin: int) -> bool: - return self.__state[pin] - - async def poll(self) -> None: - if not self.__pins: - await aiotools.wait_infinite() - else: - while True: - flags = tuple(map(read, self.__pins)) - if flags != self.__flags: - self.__flags = flags - self.__state = dict(zip(self.__pins, flags)) - await self.__notifier.notify() - await asyncio.sleep(self.__interval) diff --git a/kvmd/plugins/atx/gpio.py b/kvmd/plugins/atx/gpio.py index e3225cf0..b49dd689 100644 --- a/kvmd/plugins/atx/gpio.py +++ b/kvmd/plugins/atx/gpio.py @@ -20,24 +20,25 @@ # ========================================================================== # -import asyncio - from typing import Dict from typing import AsyncGenerator +from typing import Optional + +import gpiod from ...logging import get_logger from ... import aiotools -from ... import gpio +from ... import aiogp from ...yamlconf import Option from ...validators.basic import valid_bool +from ...validators.basic import valid_float_f0 from ...validators.basic import valid_float_f01 from ...validators.hw import valid_gpio_pin - from . import AtxIsBusyError from . import BaseAtx @@ -46,27 +47,24 @@ from . import BaseAtx class Plugin(BaseAtx): # pylint: disable=too-many-instance-attributes def __init__( # pylint: disable=too-many-arguments,super-init-not-called self, - power_led_pin: int, - hdd_led_pin: int, power_led_inverted: bool, + power_led_debounce: float, + + hdd_led_pin: int, hdd_led_inverted: bool, + hdd_led_debounce: float, power_switch_pin: int, reset_switch_pin: int, click_delay: float, long_click_delay: float, - - state_poll: float, ) -> None: - self.__power_led_pin = gpio.set_input(power_led_pin) - self.__hdd_led_pin = gpio.set_input(hdd_led_pin) - self.__power_switch_pin = gpio.set_output(power_switch_pin, False) - self.__reset_switch_pin = gpio.set_output(reset_switch_pin, False) - - self.__power_led_inverted = power_led_inverted - self.__hdd_led_inverted = hdd_led_inverted + self.__power_led_pin = power_led_pin + self.__hdd_led_pin = hdd_led_pin + self.__power_switch_pin = power_switch_pin + self.__reset_switch_pin = reset_switch_pin self.__click_delay = click_delay self.__long_click_delay = long_click_delay @@ -74,9 +72,17 @@ class Plugin(BaseAtx): # pylint: disable=too-many-instance-attributes self.__notifier = aiotools.AioNotifier() self.__region = aiotools.AioExclusiveRegion(AtxIsBusyError, self.__notifier) - self.__reader = gpio.BatchReader( - pins=set([self.__power_led_pin, self.__hdd_led_pin]), - interval=state_poll, + self.__chip: Optional[gpiod.Chip] = None + self.__power_switch_line: Optional[gpiod.Line] = None + self.__reset_switch_line: Optional[gpiod.Line] = None + + self.__reader = aiogp.AioReader( + path=aiogp.DEVICE_PATH, + consumer="kvmd/atx-gpio/leds", + pins={ + power_led_pin: aiogp.AioReaderPinParams(power_led_inverted, power_led_debounce), + hdd_led_pin: aiogp.AioReaderPinParams(hdd_led_inverted, hdd_led_debounce), + }, notifier=self.__notifier, ) @@ -84,25 +90,39 @@ class Plugin(BaseAtx): # pylint: disable=too-many-instance-attributes def get_plugin_options(cls) -> Dict: return { "power_led_pin": Option(-1, type=valid_gpio_pin), - "hdd_led_pin": Option(-1, type=valid_gpio_pin), "power_led_inverted": Option(False, type=valid_bool), - "hdd_led_inverted": Option(False, type=valid_bool), + "power_led_debounce": Option(0.1, type=valid_float_f0), + + "hdd_led_pin": Option(-1, type=valid_gpio_pin), + "hdd_led_inverted": Option(False, type=valid_bool), + "hdd_led_debounce": Option(0.1, type=valid_float_f0), "power_switch_pin": Option(-1, type=valid_gpio_pin), "reset_switch_pin": Option(-1, type=valid_gpio_pin), "click_delay": Option(0.1, type=valid_float_f01), "long_click_delay": Option(5.5, type=valid_float_f01), - - "state_poll": Option(0.1, type=valid_float_f01), } + def sysprep(self) -> None: + assert self.__chip is None + assert self.__power_switch_line is None + assert self.__reset_switch_line is None + + self.__chip = gpiod.Chip(aiogp.DEVICE_PATH) + + self.__power_switch_line = self.__chip.get_line(self.__power_switch_pin) + self.__power_switch_line.request("kvmd/atx-gpio/power_switch", gpiod.LINE_REQ_DIR_OUT, default_val=0) + + self.__reset_switch_line = self.__chip.get_line(self.__reset_switch_pin) + self.__reset_switch_line.request("kvmd/atx-gpio/reset_switch", gpiod.LINE_REQ_DIR_OUT, default_val=0) + async def get_state(self) -> Dict: return { "enabled": True, "busy": self.__region.is_busy(), "leds": { - "power": (self.__reader.get(self.__power_led_pin) ^ self.__power_led_inverted), - "hdd": (self.__reader.get(self.__hdd_led_pin) ^ self.__hdd_led_inverted), + "power": self.__reader.get(self.__power_led_pin), + "hdd": self.__reader.get(self.__hdd_led_pin), }, } @@ -119,14 +139,11 @@ class Plugin(BaseAtx): # pylint: disable=too-many-instance-attributes await self.__reader.poll() async def cleanup(self) -> None: - for (name, pin) in [ - ("power", self.__power_switch_pin), - ("reset", self.__reset_switch_pin), - ]: + if self.__chip: try: - gpio.write(pin, False) + self.__chip.close() except Exception: - get_logger(0).exception("Can't cleanup %s pin %d", name, pin) + pass # ===== @@ -149,13 +166,13 @@ class Plugin(BaseAtx): # pylint: disable=too-many-instance-attributes # ===== async def click_power(self, wait: bool) -> None: - await self.__click("power", self.__power_switch_pin, self.__click_delay, wait) + await self.__click("power", self.__power_switch_line, self.__click_delay, wait) async def click_power_long(self, wait: bool) -> None: - await self.__click("power_long", self.__power_switch_pin, self.__long_click_delay, wait) + await self.__click("power_long", self.__power_switch_line, self.__long_click_delay, wait) async def click_reset(self, wait: bool) -> None: - await self.__click("reset", self.__reset_switch_pin, self.__click_delay, wait) + await self.__click("reset", self.__reset_switch_line, self.__click_delay, wait) # ===== @@ -163,22 +180,17 @@ class Plugin(BaseAtx): # pylint: disable=too-many-instance-attributes return (await self.get_state())["leds"]["power"] @aiotools.atomic - async def __click(self, name: str, pin: int, delay: float, wait: bool) -> None: + async def __click(self, name: str, line: gpiod.Line, delay: float, wait: bool) -> None: if wait: async with self.__region: - await self.__inner_click(name, pin, delay) + await self.__inner_click(name, line, delay) else: await aiotools.run_region_task( - "Can't perform ATX click or operation was not completed", - self.__region, self.__inner_click, name, pin, delay, + f"Can't perform ATX {name} click or operation was not completed", + self.__region, self.__inner_click, name, line, delay, ) @aiotools.atomic - async def __inner_click(self, name: str, pin: int, delay: float) -> None: - try: - gpio.write(pin, True) - await asyncio.sleep(delay) - finally: - gpio.write(pin, False) - await asyncio.sleep(1) + async def __inner_click(self, name: str, line: gpiod.Line, delay: float) -> None: + await aiogp.pulse(line, delay, 1) get_logger(0).info("Clicked ATX button %r", name) diff --git a/kvmd/plugins/hid/serial.py b/kvmd/plugins/hid/serial.py index bb97aec4..c07ea45c 100644 --- a/kvmd/plugins/hid/serial.py +++ b/kvmd/plugins/hid/serial.py @@ -21,7 +21,6 @@ import os -import asyncio import multiprocessing import multiprocessing.queues import dataclasses @@ -35,7 +34,9 @@ from typing import List from typing import Dict from typing import Iterable from typing import AsyncGenerator +from typing import Optional +import gpiod import serial from ...logging import get_logger @@ -45,7 +46,7 @@ from ...keyboard.mappings import KEYMAP from ... import aiotools from ... import aiomulti from ... import aioproc -from ... import gpio +from ... import aiogp from ...yamlconf import Option @@ -57,7 +58,7 @@ from ...validators.basic import valid_float_f01 from ...validators.os import valid_abs_path from ...validators.hw import valid_tty_speed -from ...validators.hw import valid_gpio_pin +from ...validators.hw import valid_gpio_pin_optional from . import BaseHid @@ -156,6 +157,45 @@ class _MouseWheelEvent(_BaseEvent): return struct.pack(">Bxbxx", 0x14, self.delta_y) +class _Gpio: + def __init__(self, reset_pin: int, reset_delay: float) -> None: + self.__reset_pin = reset_pin + self.__reset_delay = reset_delay + + self.__chip: Optional[gpiod.Chip] = None + self.__reset_line: Optional[gpiod.Line] = None + self.__reset_wip = False + + def open(self) -> None: + if self.__reset_pin >= 0: + assert self.__chip is None + assert self.__reset_line is None + self.__chip = gpiod.Chip(aiogp.DEVICE_PATH) + self.__reset_line = self.__chip.get_line(self.__reset_pin) + self.__reset_line.request("kvmd/hid-serial/reset", gpiod.LINE_REQ_DIR_OUT, default_val=0) + + def close(self) -> None: + if self.__chip: + try: + self.__chip.close() + except Exception: + pass + + @aiotools.atomic + async def reset(self) -> None: + if self.__reset_pin >= 0: + assert self.__reset_line + if not self.__reset_wip: + self.__reset_wip = True + try: + await aiogp.pulse(self.__reset_line, self.__reset_delay, 1) + finally: + self.__reset_wip = False + get_logger(0).info("Reset HID performed") + else: + get_logger(0).info("Another reset HID in progress") + + # ===== class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-instance-attributes def __init__( # pylint: disable=too-many-arguments,super-init-not-called @@ -175,9 +215,6 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst multiprocessing.Process.__init__(self, daemon=True) - self.__reset_pin = gpio.set_output(reset_pin, False) - self.__reset_delay = reset_delay - self.__device_path = device_path self.__speed = speed self.__read_timeout = read_timeout @@ -187,7 +224,7 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst self.__errors_threshold = errors_threshold self.__noop = noop - self.__reset_wip = False + self.__gpio = _Gpio(reset_pin, reset_delay) self.__events_queue: multiprocessing.queues.Queue = multiprocessing.Queue() @@ -204,7 +241,7 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst @classmethod def get_plugin_options(cls) -> Dict: return { - "reset_pin": Option(-1, type=valid_gpio_pin), + "reset_pin": Option(-1, type=valid_gpio_pin_optional), "reset_delay": Option(0.1, type=valid_float_f01), "device": Option("", type=valid_abs_path, unpack_as="device_path"), @@ -218,6 +255,7 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst } def sysprep(self) -> None: + self.__gpio.open() get_logger(0).info("Starting HID daemon ...") self.start() @@ -247,20 +285,7 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst @aiotools.atomic async def reset(self) -> None: - if not self.__reset_wip: - try: - self.__reset_wip = True - gpio.write(self.__reset_pin, True) - await asyncio.sleep(self.__reset_delay) - finally: - try: - gpio.write(self.__reset_pin, False) - await asyncio.sleep(1) - finally: - self.__reset_wip = False - get_logger().info("Reset HID performed") - else: - get_logger().info("Another reset HID in progress") + await self.__gpio.reset() @aiotools.atomic async def cleanup(self) -> None: @@ -279,7 +304,7 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst except Exception: logger.exception("Can't clear HID events") finally: - gpio.write(self.__reset_pin, False) + self.__gpio.close() # ===== diff --git a/kvmd/plugins/msd/relay.py b/kvmd/plugins/msd/relay.py index 409d3d71..cec03377 100644 --- a/kvmd/plugins/msd/relay.py +++ b/kvmd/plugins/msd/relay.py @@ -35,12 +35,13 @@ from typing import Optional import aiofiles import aiofiles.base +import gpiod from ...logging import get_logger from ... import aiotools from ... import aiofs -from ... import gpio +from ... import aiogp from ...yamlconf import Option @@ -152,6 +153,55 @@ def _explore_device(device_path: str) -> _DeviceInfo: ) +class _Gpio: + def __init__( + self, + target_pin: int, + reset_pin: int, + reset_delay: float, + ) -> None: + + self.__target_pin = target_pin + self.__reset_pin = reset_pin + self.__reset_delay = reset_delay + + self.__chip: Optional[gpiod.Chip] = None + self.__target_line: Optional[gpiod.Line] = None + self.__reset_line: Optional[gpiod.Line] = None + + def open(self) -> None: + assert self.__chip is None + assert self.__target_line is None + assert self.__reset_line is None + + self.__chip = gpiod.Chip(aiogp.DEVICE_PATH) + + self.__target_line = self.__chip.get_line(self.__target_pin) + self.__target_line.request("kvmd/msd-relay/target", gpiod.LINE_REQ_DIR_OUT, default_val=0) + + self.__reset_line = self.__chip.get_line(self.__reset_pin) + self.__reset_line.request("kvmd/msd-relay/reset", gpiod.LINE_REQ_DIR_OUT, default_val=0) + + def close(self) -> None: + if self.__chip: + try: + self.__chip.close() + except Exception: + pass + + def switch_to_local(self) -> None: + assert self.__target_line + self.__target_line.set_value(0) + + def switch_to_server(self) -> None: + assert self.__target_line + self.__target_line.set_value(1) + + async def reset(self) -> None: + assert self.__reset_line + await aiogp.pulse(self.__reset_line, self.__reset_delay, 0) + + # ===== class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes def __init__( # pylint: disable=super-init-not-called @@ -165,13 +215,11 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes reset_delay: float, ) -> None: - self.__target_pin = gpio.set_output(target_pin, False) - self.__reset_pin = gpio.set_output(reset_pin, False) - self.__device_path = device_path self.__init_delay = init_delay self.__init_retries = init_retries - self.__reset_delay = reset_delay + + self.__gpio = _Gpio(target_pin, reset_pin, reset_delay) self.__device_info: Optional[_DeviceInfo] = None self.__connected = False @@ -202,6 +250,9 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes "reset_delay": Option(1.0, type=valid_float_f01), } + def sysprep(self) -> None: + self.__gpio.open() + async def get_state(self) -> Dict: storage: Optional[Dict] = None drive: Optional[Dict] = None @@ -245,26 +296,18 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes @aiotools.atomic async def __inner_reset(self) -> None: - try: - gpio.write(self.__reset_pin, True) - await asyncio.sleep(self.__reset_delay) - gpio.write(self.__reset_pin, False) - - gpio.write(self.__target_pin, False) - self.__connected = False - - await self.__load_device_info() - get_logger(0).info("MSD reset has been successful") - finally: - gpio.write(self.__reset_pin, False) + await self.__gpio.reset() + self.__gpio.switch_to_local() + self.__connected = False + await self.__load_device_info() + get_logger(0).info("MSD reset has been successful") @aiotools.atomic async def cleanup(self) -> None: try: await self.__close_device_file() finally: - gpio.write(self.__target_pin, False) - gpio.write(self.__reset_pin, False) + self.__gpio.close() # ===== @@ -283,7 +326,7 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes if self.__connected: raise MsdConnectedError() - gpio.write(self.__target_pin, True) + self.__gpio.switch_to_server() self.__connected = True get_logger(0).info("MSD switched to Server") @@ -294,12 +337,12 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes if not self.__connected: raise MsdDisconnectedError() - gpio.write(self.__target_pin, False) + self.__gpio.switch_to_local() try: await self.__load_device_info() except Exception: if self.__connected: - gpio.write(self.__target_pin, True) + self.__gpio.switch_to_server() raise self.__connected = False get_logger(0).info("MSD switched to KVM: %s", self.__device_info) diff --git a/kvmd/plugins/ugpio/__init__.py b/kvmd/plugins/ugpio/__init__.py index 9ed48a5f..c280b2a8 100644 --- a/kvmd/plugins/ugpio/__init__.py +++ b/kvmd/plugins/ugpio/__init__.py @@ -74,7 +74,7 @@ class BaseUserGpioDriver(BasePlugin): def get_modes(cls) -> Set[str]: return set(UserGpioModes.ALL) - def register_input(self, pin: int) -> None: + def register_input(self, pin: int, debounce: float) -> None: raise NotImplementedError def register_output(self, pin: int, initial: Optional[bool]) -> None: diff --git a/kvmd/plugins/ugpio/gpio.py b/kvmd/plugins/ugpio/gpio.py index 90426e13..4b951057 100644 --- a/kvmd/plugins/ugpio/gpio.py +++ b/kvmd/plugins/ugpio/gpio.py @@ -21,15 +21,12 @@ from typing import Dict -from typing import Set from typing import Optional -from ... import aiotools -from ... import gpio - -from ...yamlconf import Option +import gpiod -from ...validators.basic import valid_float_f01 +from ... import aiotools +from ... import aiogp from . import BaseUserGpioDriver @@ -40,59 +37,58 @@ class Plugin(BaseUserGpioDriver): self, instance_name: str, notifier: aiotools.AioNotifier, - - state_poll: float, ) -> None: super().__init__(instance_name, notifier) - self.__state_poll = state_poll - - self.__input_pins: Set[int] = set() + self.__input_pins: Dict[int, aiogp.AioReaderPinParams] = {} self.__output_pins: Dict[int, Optional[bool]] = {} - self.__reader: Optional[gpio.BatchReader] = None + self.__reader: Optional[aiogp.AioReader] = None - @classmethod - def get_plugin_options(cls) -> Dict: - return { - "state_poll": Option(0.1, type=valid_float_f01), - } + self.__chip: Optional[gpiod.Chip] = None + self.__output_lines: Dict[int, gpiod.Line] = {} - def register_input(self, pin: int) -> None: - self.__input_pins.add(pin) + def register_input(self, pin: int, debounce: float) -> None: + self.__input_pins[pin] = aiogp.AioReaderPinParams(False, debounce) def register_output(self, pin: int, initial: Optional[bool]) -> None: self.__output_pins[pin] = initial def prepare(self) -> None: assert self.__reader is None - self.__reader = gpio.BatchReader( - pins=set([ - *map(gpio.set_input, self.__input_pins), - *[ - gpio.set_output(pin, initial) - for (pin, initial) in self.__output_pins.items() - ], - ]), - interval=self.__state_poll, + self.__reader = aiogp.AioReader( + path=aiogp.DEVICE_PATH, + consumer="kvmd/ugpio-gpio/inputs", + pins=self.__input_pins, notifier=self._notifier, ) + self.__chip = gpiod.Chip(aiogp.DEVICE_PATH) + for (pin, initial) in self.__output_pins.items(): + line = self.__chip.get_line(pin) + line.request("kvmd/ugpio-gpio/outputs", gpiod.LINE_REQ_DIR_OUT, default_val=int(initial or False)) + self.__output_lines[pin] = line + async def run(self) -> None: assert self.__reader await self.__reader.poll() def cleanup(self) -> None: - for (pin, initial) in self.__output_pins.items(): - if initial is not None: - gpio.write(pin, initial) + if self.__chip: + try: + self.__chip.close() + except Exception: + pass def read(self, pin: int) -> bool: - return gpio.read(pin) + assert self.__reader + if pin in self.__input_pins: + return self.__reader.get(pin) + return bool(self.__output_lines[pin].get_value()) def write(self, pin: int, state: bool) -> None: - gpio.write(pin, state) + self.__output_lines[pin].set_value(int(state)) def __str__(self) -> str: return f"GPIO({self._instance_name})" diff --git a/kvmd/plugins/ugpio/hidrelay.py b/kvmd/plugins/ugpio/hidrelay.py index b7f88cb8..32c4a3eb 100644 --- a/kvmd/plugins/ugpio/hidrelay.py +++ b/kvmd/plugins/ugpio/hidrelay.py @@ -79,7 +79,7 @@ class Plugin(BaseUserGpioDriver): def get_modes(cls) -> Set[str]: return set([UserGpioModes.OUTPUT]) - def register_input(self, pin: int) -> None: + def register_input(self, pin: int, debounce: float) -> None: raise RuntimeError(f"Unsupported mode 'input' for pin={pin} on {self}") def register_output(self, pin: int, initial: Optional[bool]) -> None: |