diff options
Diffstat (limited to 'kvmd/gpio.py')
-rw-r--r-- | kvmd/gpio.py | 53 |
1 files changed, 49 insertions, 4 deletions
diff --git a/kvmd/gpio.py b/kvmd/gpio.py index f3ef8ba2..18b36dfd 100644 --- a/kvmd/gpio.py +++ b/kvmd/gpio.py @@ -23,6 +23,7 @@ import asyncio import contextlib +from typing import List from typing import Tuple from typing import Set from typing import Generator @@ -71,18 +72,62 @@ def write(pin: int, state: bool) -> None: class BatchReader: - def __init__(self, pins: Set[int], interval: float, notifier: aiotools.AioNotifier) -> None: - self.__pins = sorted(pins) - self.__flags: Tuple[Optional[bool], ...] = (None,) * len(self.__pins) - self.__state = {pin: read(pin) for pin in self.__pins} + def __init__( + self, + pins: Set[int], + edge_detection: bool, + interval: float, + notifier: aiotools.AioNotifier, + ) -> None: + self.__pins = sorted(pins) + self.__edge_detection = edge_detection self.__interval = interval self.__notifier = notifier + self.__state = {pin: read(pin) for pin in self.__pins} + + self.__loop: Optional[asyncio.AbstractEventLoop] = None # Only for edge detection + + self.__flags: Tuple[Optional[bool], ...] = (None,) * len(self.__pins) # Only for busyloop + def get(self, pin: int) -> bool: return self.__state[pin] async def poll(self) -> None: + if self.__edge_detection: + await self.__poll_edge() + else: + await self.__poll_busyloop() + + # ===== + + async def __poll_edge(self) -> None: + assert self.__loop is None + self.__loop = asyncio.get_running_loop() + watched: List[int] = [] + try: + for pin in self.__pins: + GPIO.add_event_detect( + pin, GPIO.BOTH, + callback=self.__poll_edge_callback, + bouncetime=int(self.__interval * 1000), + ) + watched.append(pin) + await self.__notifier.notify() + await aiotools.wait_infinite() + finally: + for pin in watched: + GPIO.remove_event_detect(pin) + + def __poll_edge_callback(self, pin: int) -> None: + assert self.__loop + self.__state[pin] = read(pin) + self.__loop.call_soon_threadsafe(self.__notifier.notify_sync) + + # ===== + + async def __poll_busyloop(self) -> None: if not self.__pins: await aiotools.wait_infinite() else: |