From 9b14e8b3e2ce319e90703fddd3c470304f5dd4f5 Mon Sep 17 00:00:00 2001 From: Devaev Maxim Date: Fri, 28 Aug 2020 17:01:35 +0300 Subject: user gpio --- kvmd/gpio.py | 35 +++++++++++++++++++++++++++++++++-- 1 file changed, 33 insertions(+), 2 deletions(-) (limited to 'kvmd/gpio.py') diff --git a/kvmd/gpio.py b/kvmd/gpio.py index ef9d1c5e..6959a640 100644 --- a/kvmd/gpio.py +++ b/kvmd/gpio.py @@ -20,14 +20,20 @@ # ========================================================================== # +import asyncio import contextlib +from typing import Tuple +from typing import List from typing import Generator +from typing import Optional from RPi import GPIO from .logging import get_logger +from . import aiotools + # ===== @contextlib.contextmanager @@ -59,6 +65,31 @@ def read(pin: int) -> bool: return bool(GPIO.input(pin)) -def write(pin: int, flag: bool) -> None: +def write(pin: int, state: bool) -> None: assert pin >= 0, pin - GPIO.output(pin, flag) + GPIO.output(pin, state) + + +class BatchReader: + def __init__(self, pins: List[int], interval: float, notifier: aiotools.AioNotifier) -> None: + self.__pins = pins + self.__flags: Tuple[Optional[bool], ...] = (None,) * len(pins) + self.__state = dict.fromkeys(pins, False) + + self.__interval = interval + self.__notifier = notifier + + def get(self, pin: int) -> bool: + return self.__state[pin] + + async def poll(self) -> None: + if not self.__pins: + await aiotools.wait_infinite() + else: + while True: + flags = tuple(map(read, self.__pins)) + if flags != self.__flags: + self.__flags = flags + self.__state = dict(zip(self.__pins, flags)) + await self.__notifier.notify() + await asyncio.sleep(self.__interval) -- cgit v1.2.3