diff options
author | Devaev Maxim <[email protected]> | 2020-09-13 10:47:53 +0300 |
---|---|---|
committer | Devaev Maxim <[email protected]> | 2020-09-13 10:47:53 +0300 |
commit | 002823b6e1ae6a261e77ee83b358307c682ce65a (patch) | |
tree | 7f697c83a6523c9546ff8ef277c3669f90a73f48 /kvmd | |
parent | bddabc4742ebd130020efc14c53675bf5c96e134 (diff) |
using libgpiod for the gpio atx
Diffstat (limited to 'kvmd')
-rw-r--r-- | kvmd/aiogp.py | 101 | ||||
-rw-r--r-- | kvmd/aiotools.py | 3 | ||||
-rw-r--r-- | kvmd/apps/cleanup/__init__.py | 25 | ||||
-rw-r--r-- | kvmd/plugins/atx/gpio.py | 81 |
4 files changed, 148 insertions, 62 deletions
diff --git a/kvmd/aiogp.py b/kvmd/aiogp.py new file mode 100644 index 00000000..dbc39e69 --- /dev/null +++ b/kvmd/aiogp.py @@ -0,0 +1,101 @@ +# ========================================================================== # +# # +# KVMD - The main Pi-KVM daemon. # +# # +# Copyright (C) 2018 Maxim Devaev <[email protected]> # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see <https://www.gnu.org/licenses/>. # +# # +# ========================================================================== # + + +import asyncio +import threading + +from typing import List +from typing import Optional + +import gpiod + +from . import aiotools + + +# ===== +class AioPinsReader(threading.Thread): + def __init__( + self, + path: str, + consumer: str, + pins: List[int], + inverted: List[bool], + notifier: aiotools.AioNotifier, + ) -> None: + + assert len(pins) == len(inverted) + super().__init__(daemon=True) + + self.__path = path + self.__consumer = consumer + self.__pins = pins + self.__inverted = dict(zip(pins, inverted)) + self.__notifier = notifier + + self.__state = dict.fromkeys(pins, False) + + self.__stop_event = threading.Event() + self.__loop: Optional[asyncio.AbstractEventLoop] = None + + def get(self, pin: int) -> bool: + return (self.__state[pin] ^ self.__inverted[pin]) + + async def poll(self) -> None: + if not self.__pins: + await aiotools.wait_infinite() + else: + assert self.__loop is None + self.__loop = asyncio.get_running_loop() + self.start() + try: + await aiotools.run_async(self.join) + finally: + self.__stop_event.set() + await aiotools.run_async(self.join) + + def run(self) -> None: + assert self.__loop + with gpiod.Chip(self.__path) as chip: + lines = chip.get_lines(self.__pins) + lines.request(self.__consumer, gpiod.LINE_REQ_EV_BOTH_EDGES) + + lines.event_wait(nsec=1) + self.__state = { + pin: bool(value) + for (pin, value) in zip(self.__pins, lines.get_values()) + } + self.__loop.call_soon_threadsafe(self.__notifier.notify_sync) + + while not self.__stop_event.is_set(): + ev_lines = lines.event_wait(10) + if ev_lines: + for ev_lines in lines.event_wait(1): + for ev_line in ev_lines: + event = ev_line.event_read() + if event.type == gpiod.LineEvent.RISING_EDGE: + value = True + elif event.type == gpiod.LineEvent.FALLING_EDGE: + value = False + else: + raise RuntimeError(f"Invalid event {event} type: {event.type}") + self.__state[event.source.offset()] = value + self.__loop.call_soon_threadsafe(self.__notifier.notify_sync) diff --git a/kvmd/aiotools.py b/kvmd/aiotools.py index 84c6f314..dfd67f44 100644 --- a/kvmd/aiotools.py +++ b/kvmd/aiotools.py @@ -97,6 +97,9 @@ class AioNotifier: async def notify(self) -> None: await self.__queue.put(None) + def notify_sync(self) -> None: + self.__queue.put_nowait(None) + async def wait(self) -> None: await self.__queue.get() while not self.__queue.empty(): diff --git a/kvmd/apps/cleanup/__init__.py b/kvmd/apps/cleanup/__init__.py index 795841c5..ec6a43c6 100644 --- a/kvmd/apps/cleanup/__init__.py +++ b/kvmd/apps/cleanup/__init__.py @@ -33,30 +33,10 @@ from ...logging import get_logger from ...yamlconf import Section -from ... import gpio - from .. import init # ===== -def _clear_gpio(config: Section) -> None: - logger = get_logger(0) - - with gpio.bcm(): - for (name, pin) in [ - *([ - ("atx_gpio/power_switch", config.atx.power_switch_pin), - ("atx_gpio/reset_switch", config.atx.reset_switch_pin), - ] if config.atx.type == "gpio" else []), - ]: - if pin >= 0: - logger.info("Writing 0 to GPIO pin=%d (%s)", pin, name) - try: - gpio.set_output(pin, False) - except Exception: - logger.exception("Can't clear GPIO pin=%d (%s)", pin, name) - - def _kill_streamer(config: Section) -> None: logger = get_logger(0) @@ -99,17 +79,12 @@ def main(argv: Optional[List[str]]=None) -> None: prog="kvmd-cleanup", description="Kill KVMD and clear resources", argv=argv, - load_hid=True, - load_atx=True, - load_msd=True, - load_gpio=True, )[2].kvmd logger = get_logger(0) logger.info("Cleaning up ...") for method in [ - _clear_gpio, _kill_streamer, _remove_sockets, ]: diff --git a/kvmd/plugins/atx/gpio.py b/kvmd/plugins/atx/gpio.py index e3225cf0..bd49513c 100644 --- a/kvmd/plugins/atx/gpio.py +++ b/kvmd/plugins/atx/gpio.py @@ -24,11 +24,14 @@ import asyncio from typing import Dict from typing import AsyncGenerator +from typing import Optional + +import gpiod from ...logging import get_logger from ... import aiotools -from ... import gpio +from ... import aiogp from ...yamlconf import Option @@ -37,7 +40,6 @@ from ...validators.basic import valid_float_f01 from ...validators.hw import valid_gpio_pin - from . import AtxIsBusyError from . import BaseAtx @@ -46,7 +48,6 @@ from . import BaseAtx class Plugin(BaseAtx): # pylint: disable=too-many-instance-attributes def __init__( # pylint: disable=too-many-arguments,super-init-not-called self, - power_led_pin: int, hdd_led_pin: int, power_led_inverted: bool, @@ -56,17 +57,12 @@ class Plugin(BaseAtx): # pylint: disable=too-many-instance-attributes reset_switch_pin: int, click_delay: float, long_click_delay: float, - - state_poll: float, ) -> None: - self.__power_led_pin = gpio.set_input(power_led_pin) - self.__hdd_led_pin = gpio.set_input(hdd_led_pin) - self.__power_switch_pin = gpio.set_output(power_switch_pin, False) - self.__reset_switch_pin = gpio.set_output(reset_switch_pin, False) - - self.__power_led_inverted = power_led_inverted - self.__hdd_led_inverted = hdd_led_inverted + self.__power_led_pin = power_led_pin + self.__hdd_led_pin = hdd_led_pin + self.__power_switch_pin = power_switch_pin + self.__reset_switch_pin = reset_switch_pin self.__click_delay = click_delay self.__long_click_delay = long_click_delay @@ -74,9 +70,15 @@ class Plugin(BaseAtx): # pylint: disable=too-many-instance-attributes self.__notifier = aiotools.AioNotifier() self.__region = aiotools.AioExclusiveRegion(AtxIsBusyError, self.__notifier) - self.__reader = gpio.BatchReader( - pins=set([self.__power_led_pin, self.__hdd_led_pin]), - interval=state_poll, + self.__chip: Optional[gpiod.Chip] = None + self.__power_switch_line: Optional[gpiod.Line] = None + self.__reset_switch_line: Optional[gpiod.Line] = None + + self.__reader = aiogp.AioPinsReader( + path="/dev/gpiochip0", + consumer="kvmd/atx-gpio/leds", + pins=[power_led_pin, hdd_led_pin], + inverted=[power_led_inverted, hdd_led_inverted], notifier=self.__notifier, ) @@ -92,17 +94,28 @@ class Plugin(BaseAtx): # pylint: disable=too-many-instance-attributes "reset_switch_pin": Option(-1, type=valid_gpio_pin), "click_delay": Option(0.1, type=valid_float_f01), "long_click_delay": Option(5.5, type=valid_float_f01), - - "state_poll": Option(0.1, type=valid_float_f01), } + def sysprep(self) -> None: + assert self.__chip is None + assert self.__power_switch_line is None + assert self.__reset_switch_line is None + + self.__chip = gpiod.Chip("/dev/gpiochip0") + + self.__power_switch_line = self.__chip.get_line(self.__power_switch_pin) + self.__power_switch_line.request("kvmd/atx-gpio/power_switch", gpiod.LINE_REQ_DIR_OUT, default_val=0) + + self.__reset_switch_line = self.__chip.get_line(self.__reset_switch_pin) + self.__reset_switch_line.request("kvmd/atx-gpio/reset_switch", gpiod.LINE_REQ_DIR_OUT, default_val=0) + async def get_state(self) -> Dict: return { "enabled": True, "busy": self.__region.is_busy(), "leds": { - "power": (self.__reader.get(self.__power_led_pin) ^ self.__power_led_inverted), - "hdd": (self.__reader.get(self.__hdd_led_pin) ^ self.__hdd_led_inverted), + "power": self.__reader.get(self.__power_led_pin), + "hdd": self.__reader.get(self.__hdd_led_pin), }, } @@ -119,14 +132,8 @@ class Plugin(BaseAtx): # pylint: disable=too-many-instance-attributes await self.__reader.poll() async def cleanup(self) -> None: - for (name, pin) in [ - ("power", self.__power_switch_pin), - ("reset", self.__reset_switch_pin), - ]: - try: - gpio.write(pin, False) - except Exception: - get_logger(0).exception("Can't cleanup %s pin %d", name, pin) + if self.__chip: + self.__chip.close() # ===== @@ -149,13 +156,13 @@ class Plugin(BaseAtx): # pylint: disable=too-many-instance-attributes # ===== async def click_power(self, wait: bool) -> None: - await self.__click("power", self.__power_switch_pin, self.__click_delay, wait) + await self.__click("power", self.__power_switch_line, self.__click_delay, wait) async def click_power_long(self, wait: bool) -> None: - await self.__click("power_long", self.__power_switch_pin, self.__long_click_delay, wait) + await self.__click("power_long", self.__power_switch_line, self.__long_click_delay, wait) async def click_reset(self, wait: bool) -> None: - await self.__click("reset", self.__reset_switch_pin, self.__click_delay, wait) + await self.__click("reset", self.__reset_switch_line, self.__click_delay, wait) # ===== @@ -163,22 +170,22 @@ class Plugin(BaseAtx): # pylint: disable=too-many-instance-attributes return (await self.get_state())["leds"]["power"] @aiotools.atomic - async def __click(self, name: str, pin: int, delay: float, wait: bool) -> None: + async def __click(self, name: str, line: gpiod.Line, delay: float, wait: bool) -> None: if wait: async with self.__region: - await self.__inner_click(name, pin, delay) + await self.__inner_click(name, line, delay) else: await aiotools.run_region_task( - "Can't perform ATX click or operation was not completed", - self.__region, self.__inner_click, name, pin, delay, + f"Can't perform ATX {name} click or operation was not completed", + self.__region, self.__inner_click, name, line, delay, ) @aiotools.atomic - async def __inner_click(self, name: str, pin: int, delay: float) -> None: + async def __inner_click(self, name: str, line: gpiod.Line, delay: float) -> None: try: - gpio.write(pin, True) + line.set_value(1) await asyncio.sleep(delay) finally: - gpio.write(pin, False) + line.set_value(0) await asyncio.sleep(1) get_logger(0).info("Clicked ATX button %r", name) |