diff options
Diffstat (limited to 'kvmd')
-rw-r--r-- | kvmd/aiotools.py | 3 | ||||
-rw-r--r-- | kvmd/gpio.py | 53 | ||||
-rw-r--r-- | kvmd/plugins/atx/gpio.py | 5 | ||||
-rw-r--r-- | kvmd/plugins/ugpio/gpio.py | 7 |
4 files changed, 62 insertions, 6 deletions
diff --git a/kvmd/aiotools.py b/kvmd/aiotools.py index 84c6f314..dfd67f44 100644 --- a/kvmd/aiotools.py +++ b/kvmd/aiotools.py @@ -97,6 +97,9 @@ class AioNotifier: async def notify(self) -> None: await self.__queue.put(None) + def notify_sync(self) -> None: + self.__queue.put_nowait(None) + async def wait(self) -> None: await self.__queue.get() while not self.__queue.empty(): diff --git a/kvmd/gpio.py b/kvmd/gpio.py index f3ef8ba2..18b36dfd 100644 --- a/kvmd/gpio.py +++ b/kvmd/gpio.py @@ -23,6 +23,7 @@ import asyncio import contextlib +from typing import List from typing import Tuple from typing import Set from typing import Generator @@ -71,18 +72,62 @@ def write(pin: int, state: bool) -> None: class BatchReader: - def __init__(self, pins: Set[int], interval: float, notifier: aiotools.AioNotifier) -> None: - self.__pins = sorted(pins) - self.__flags: Tuple[Optional[bool], ...] = (None,) * len(self.__pins) - self.__state = {pin: read(pin) for pin in self.__pins} + def __init__( + self, + pins: Set[int], + edge_detection: bool, + interval: float, + notifier: aiotools.AioNotifier, + ) -> None: + self.__pins = sorted(pins) + self.__edge_detection = edge_detection self.__interval = interval self.__notifier = notifier + self.__state = {pin: read(pin) for pin in self.__pins} + + self.__loop: Optional[asyncio.AbstractEventLoop] = None # Only for edge detection + + self.__flags: Tuple[Optional[bool], ...] = (None,) * len(self.__pins) # Only for busyloop + def get(self, pin: int) -> bool: return self.__state[pin] async def poll(self) -> None: + if self.__edge_detection: + await self.__poll_edge() + else: + await self.__poll_busyloop() + + # ===== + + async def __poll_edge(self) -> None: + assert self.__loop is None + self.__loop = asyncio.get_running_loop() + watched: List[int] = [] + try: + for pin in self.__pins: + GPIO.add_event_detect( + pin, GPIO.BOTH, + callback=self.__poll_edge_callback, + bouncetime=int(self.__interval * 1000), + ) + watched.append(pin) + await self.__notifier.notify() + await aiotools.wait_infinite() + finally: + for pin in watched: + GPIO.remove_event_detect(pin) + + def __poll_edge_callback(self, pin: int) -> None: + assert self.__loop + self.__state[pin] = read(pin) + self.__loop.call_soon_threadsafe(self.__notifier.notify_sync) + + # ===== + + async def __poll_busyloop(self) -> None: if not self.__pins: await aiotools.wait_infinite() else: diff --git a/kvmd/plugins/atx/gpio.py b/kvmd/plugins/atx/gpio.py index df65e88e..8401d2c0 100644 --- a/kvmd/plugins/atx/gpio.py +++ b/kvmd/plugins/atx/gpio.py @@ -57,6 +57,7 @@ class Plugin(BaseAtx): # pylint: disable=too-many-instance-attributes click_delay: float, long_click_delay: float, + edge_detection: bool, state_poll: float, ) -> None: @@ -76,6 +77,7 @@ class Plugin(BaseAtx): # pylint: disable=too-many-instance-attributes self.__reader = gpio.BatchReader( pins=set([self.__power_led_pin, self.__hdd_led_pin]), + edge_detection=edge_detection, interval=state_poll, notifier=self.__notifier, ) @@ -93,7 +95,8 @@ class Plugin(BaseAtx): # pylint: disable=too-many-instance-attributes "click_delay": Option(0.1, type=valid_float_f01), "long_click_delay": Option(5.5, type=valid_float_f01), - "state_poll": Option(0.1, type=valid_float_f01), + "edge_detection": Option(False, type=valid_bool), + "state_poll": Option(0.1, type=valid_float_f01), } async def get_state(self) -> Dict: diff --git a/kvmd/plugins/ugpio/gpio.py b/kvmd/plugins/ugpio/gpio.py index 90426e13..d9056d59 100644 --- a/kvmd/plugins/ugpio/gpio.py +++ b/kvmd/plugins/ugpio/gpio.py @@ -29,6 +29,7 @@ from ... import gpio from ...yamlconf import Option +from ...validators.basic import valid_bool from ...validators.basic import valid_float_f01 from . import BaseUserGpioDriver @@ -41,11 +42,13 @@ class Plugin(BaseUserGpioDriver): instance_name: str, notifier: aiotools.AioNotifier, + edge_detection: bool, state_poll: float, ) -> None: super().__init__(instance_name, notifier) + self.__edge_detection = edge_detection self.__state_poll = state_poll self.__input_pins: Set[int] = set() @@ -56,7 +59,8 @@ class Plugin(BaseUserGpioDriver): @classmethod def get_plugin_options(cls) -> Dict: return { - "state_poll": Option(0.1, type=valid_float_f01), + "edge_detection": Option(False, type=valid_bool), + "state_poll": Option(0.1, type=valid_float_f01), } def register_input(self, pin: int) -> None: @@ -75,6 +79,7 @@ class Plugin(BaseUserGpioDriver): for (pin, initial) in self.__output_pins.items() ], ]), + edge_detection=self.__edge_detection, interval=self.__state_poll, notifier=self._notifier, ) |