diff options
author | Devaev Maxim <[email protected]> | 2020-09-13 18:23:28 +0300 |
---|---|---|
committer | Devaev Maxim <[email protected]> | 2020-09-13 18:23:28 +0300 |
commit | 1e6ab4672f0980f073ab7e1c6f48fb9bf43b0856 (patch) | |
tree | e2b1253063d813ca6119097ce0cdfea576a2f8fa /kvmd/aiogp.py | |
parent | 41223fa8b24be7978f320781c4b0e3d7b2347c75 (diff) |
refactoring and reuse gpio pulse code
Diffstat (limited to 'kvmd/aiogp.py')
-rw-r--r-- | kvmd/aiogp.py | 23 |
1 files changed, 15 insertions, 8 deletions
diff --git a/kvmd/aiogp.py b/kvmd/aiogp.py index dbc39e69..9b515d9a 100644 --- a/kvmd/aiogp.py +++ b/kvmd/aiogp.py @@ -23,7 +23,7 @@ import asyncio import threading -from typing import List +from typing import Dict from typing import Optional import gpiod @@ -32,23 +32,29 @@ from . import aiotools # ===== +async def pulse(line: gpiod.Line, delay: float, final: float) -> None: + try: + line.set_value(1) + await asyncio.sleep(delay) + finally: + line.set_value(0) + await asyncio.sleep(final) + + class AioPinsReader(threading.Thread): def __init__( self, path: str, consumer: str, - pins: List[int], - inverted: List[bool], + pins: Dict[int, bool], notifier: aiotools.AioNotifier, ) -> None: - assert len(pins) == len(inverted) super().__init__(daemon=True) self.__path = path self.__consumer = consumer self.__pins = pins - self.__inverted = dict(zip(pins, inverted)) self.__notifier = notifier self.__state = dict.fromkeys(pins, False) @@ -57,7 +63,7 @@ class AioPinsReader(threading.Thread): self.__loop: Optional[asyncio.AbstractEventLoop] = None def get(self, pin: int) -> bool: - return (self.__state[pin] ^ self.__inverted[pin]) + return (self.__state[pin] ^ self.__pins[pin]) async def poll(self) -> None: if not self.__pins: @@ -75,13 +81,14 @@ class AioPinsReader(threading.Thread): def run(self) -> None: assert self.__loop with gpiod.Chip(self.__path) as chip: - lines = chip.get_lines(self.__pins) + pins = sorted(self.__pins) + lines = chip.get_lines(pins) lines.request(self.__consumer, gpiod.LINE_REQ_EV_BOTH_EDGES) lines.event_wait(nsec=1) self.__state = { pin: bool(value) - for (pin, value) in zip(self.__pins, lines.get_values()) + for (pin, value) in zip(pins, lines.get_values()) } self.__loop.call_soon_threadsafe(self.__notifier.notify_sync) |