diff options
author | Devaev Maxim <[email protected]> | 2020-09-10 13:40:56 +0300 |
---|---|---|
committer | Devaev Maxim <[email protected]> | 2020-09-10 13:40:56 +0300 |
commit | 44c50aa4de96b7cc3fb6d057a62a1cd994315e63 (patch) | |
tree | dd0a96629b23b87cb5c8c5a1bf501c3a297977d8 /kvmd/gpio.py | |
parent | ff36ff203e2228125c41a5cdd3df3d50b1a17cac (diff) |
removed edge detection
Diffstat (limited to 'kvmd/gpio.py')
-rw-r--r-- | kvmd/gpio.py | 41 |
1 files changed, 1 insertions, 40 deletions
diff --git a/kvmd/gpio.py b/kvmd/gpio.py index 18b36dfd..8dce12f6 100644 --- a/kvmd/gpio.py +++ b/kvmd/gpio.py @@ -23,7 +23,6 @@ import asyncio import contextlib -from typing import List from typing import Tuple from typing import Set from typing import Generator @@ -75,59 +74,21 @@ class BatchReader: def __init__( self, pins: Set[int], - edge_detection: bool, interval: float, notifier: aiotools.AioNotifier, ) -> None: self.__pins = sorted(pins) - self.__edge_detection = edge_detection self.__interval = interval self.__notifier = notifier self.__state = {pin: read(pin) for pin in self.__pins} - - self.__loop: Optional[asyncio.AbstractEventLoop] = None # Only for edge detection - - self.__flags: Tuple[Optional[bool], ...] = (None,) * len(self.__pins) # Only for busyloop + self.__flags: Tuple[Optional[bool], ...] = (None,) * len(self.__pins) def get(self, pin: int) -> bool: return self.__state[pin] async def poll(self) -> None: - if self.__edge_detection: - await self.__poll_edge() - else: - await self.__poll_busyloop() - - # ===== - - async def __poll_edge(self) -> None: - assert self.__loop is None - self.__loop = asyncio.get_running_loop() - watched: List[int] = [] - try: - for pin in self.__pins: - GPIO.add_event_detect( - pin, GPIO.BOTH, - callback=self.__poll_edge_callback, - bouncetime=int(self.__interval * 1000), - ) - watched.append(pin) - await self.__notifier.notify() - await aiotools.wait_infinite() - finally: - for pin in watched: - GPIO.remove_event_detect(pin) - - def __poll_edge_callback(self, pin: int) -> None: - assert self.__loop - self.__state[pin] = read(pin) - self.__loop.call_soon_threadsafe(self.__notifier.notify_sync) - - # ===== - - async def __poll_busyloop(self) -> None: if not self.__pins: await aiotools.wait_infinite() else: |