diff options
Diffstat (limited to 'kvmd/aiogp.py')
-rw-r--r-- | kvmd/aiogp.py | 61 |
1 files changed, 27 insertions, 34 deletions
diff --git a/kvmd/aiogp.py b/kvmd/aiogp.py index 7965666b..7559a9d9 100644 --- a/kvmd/aiogp.py +++ b/kvmd/aiogp.py @@ -30,16 +30,6 @@ from . import aiotools # ===== -async def pulse(line: gpiod.Line, delay: float, final: float, inverted: bool=False) -> None: - try: - line.set_value(int(not inverted)) - await asyncio.sleep(delay) - finally: - line.set_value(int(inverted)) - await asyncio.sleep(final) - - -# ===== @dataclasses.dataclass(frozen=True) class AioReaderPinParams: inverted: bool @@ -57,7 +47,7 @@ class AioReader: # pylint: disable=too-many-instance-attributes self.__path = path self.__consumer = consumer - self.__pins = pins + self.__pins = dict(pins) self.__notifier = notifier self.__values: (dict[int, _DebouncedValue] | None) = None @@ -87,44 +77,47 @@ class AioReader: # pylint: disable=too-many-instance-attributes def __run(self) -> None: assert self.__values is None assert self.__loop - with gpiod.Chip(self.__path) as chip: - pins = sorted(self.__pins) - lines = chip.get_lines(pins) - lines.request(self.__consumer, gpiod.LINE_REQ_EV_BOTH_EDGES) - lines.event_wait(nsec=1) + pins = sorted(self.__pins) + with gpiod.request_lines( + self.__path, + consumer=self.__consumer, + config={tuple(pins): gpiod.LineSettings(edge_detection=gpiod.line.Edge.BOTH)}, + ) as line_request: + + line_request.wait_edge_events(0.1) self.__values = { pin: _DebouncedValue( - initial=bool(value), + initial=bool(value.value), debounce=self.__pins[pin].debounce, notifier=self.__notifier, loop=self.__loop, ) - for (pin, value) in zip(pins, lines.get_values()) + for (pin, value) in zip(pins, line_request.get_values(pins)) } self.__loop.call_soon_threadsafe(self.__notifier.notify) while not self.__stop_event.is_set(): - ev_lines = lines.event_wait(1) - if ev_lines: - for ev_line in ev_lines: - events = ev_line.event_read_multiple() - if events: - (pin, value) = self.__parse_event(events[-1]) - self.__values[pin].set(bool(value)) + if line_request.wait_edge_events(1): + new: dict[int, bool] = {} + for event in line_request.read_edge_events(): + (pin, value) = self.__parse_event(event) + new[pin] = value + for (pin, value) in new.items(): + self.__values[pin].set(value) else: # Timeout + # XXX: Лимит был актуален для 1.6. Надо проверить, поменялось ли это в 2.x. # Размер буфера ядра - 16 эвентов на линии. При превышении этого числа, # новые эвенты потеряются. Это не баг, это фича, как мне объяснили в LKML. # Штош. Будем с этим жить и синхронизировать состояния при таймауте. - for (pin, value) in zip(pins, lines.get_values()): - self.__values[pin].set(bool(value)) - - def __parse_event(self, event: gpiod.LineEvent) -> tuple[int, int]: - pin = event.source.offset() - if event.type == gpiod.LineEvent.RISING_EDGE: - return (pin, 1) - elif event.type == gpiod.LineEvent.FALLING_EDGE: - return (pin, 0) + for (pin, value) in zip(pins, line_request.get_values(pins)): + self.__values[pin].set(bool(value.value)) # type: ignore + + def __parse_event(self, event: gpiod.EdgeEvent) -> tuple[int, bool]: + if event.event_type == event.Type.RISING_EDGE: + return (event.line_offset, True) + elif event.event_type == event.Type.FALLING_EDGE: + return (event.line_offset, False) raise RuntimeError(f"Invalid event {event} type: {event.type}") |