diff options
-rw-r--r-- | kvmd/aiotools.py | 3 | ||||
-rw-r--r-- | kvmd/gpio.py | 41 | ||||
-rw-r--r-- | kvmd/plugins/atx/gpio.py | 3 | ||||
-rw-r--r-- | kvmd/plugins/ugpio/gpio.py | 1 |
4 files changed, 1 insertions, 47 deletions
diff --git a/kvmd/aiotools.py b/kvmd/aiotools.py index dfd67f44..84c6f314 100644 --- a/kvmd/aiotools.py +++ b/kvmd/aiotools.py @@ -97,9 +97,6 @@ class AioNotifier: async def notify(self) -> None: await self.__queue.put(None) - def notify_sync(self) -> None: - self.__queue.put_nowait(None) - async def wait(self) -> None: await self.__queue.get() while not self.__queue.empty(): diff --git a/kvmd/gpio.py b/kvmd/gpio.py index 18b36dfd..8dce12f6 100644 --- a/kvmd/gpio.py +++ b/kvmd/gpio.py @@ -23,7 +23,6 @@ import asyncio import contextlib -from typing import List from typing import Tuple from typing import Set from typing import Generator @@ -75,59 +74,21 @@ class BatchReader: def __init__( self, pins: Set[int], - edge_detection: bool, interval: float, notifier: aiotools.AioNotifier, ) -> None: self.__pins = sorted(pins) - self.__edge_detection = edge_detection self.__interval = interval self.__notifier = notifier self.__state = {pin: read(pin) for pin in self.__pins} - - self.__loop: Optional[asyncio.AbstractEventLoop] = None # Only for edge detection - - self.__flags: Tuple[Optional[bool], ...] = (None,) * len(self.__pins) # Only for busyloop + self.__flags: Tuple[Optional[bool], ...] = (None,) * len(self.__pins) def get(self, pin: int) -> bool: return self.__state[pin] async def poll(self) -> None: - if self.__edge_detection: - await self.__poll_edge() - else: - await self.__poll_busyloop() - - # ===== - - async def __poll_edge(self) -> None: - assert self.__loop is None - self.__loop = asyncio.get_running_loop() - watched: List[int] = [] - try: - for pin in self.__pins: - GPIO.add_event_detect( - pin, GPIO.BOTH, - callback=self.__poll_edge_callback, - bouncetime=int(self.__interval * 1000), - ) - watched.append(pin) - await self.__notifier.notify() - await aiotools.wait_infinite() - finally: - for pin in watched: - GPIO.remove_event_detect(pin) - - def __poll_edge_callback(self, pin: int) -> None: - assert self.__loop - self.__state[pin] = read(pin) - self.__loop.call_soon_threadsafe(self.__notifier.notify_sync) - - # ===== - - async def __poll_busyloop(self) -> None: if not self.__pins: await aiotools.wait_infinite() else: diff --git a/kvmd/plugins/atx/gpio.py b/kvmd/plugins/atx/gpio.py index 8401d2c0..e3225cf0 100644 --- a/kvmd/plugins/atx/gpio.py +++ b/kvmd/plugins/atx/gpio.py @@ -57,7 +57,6 @@ class Plugin(BaseAtx): # pylint: disable=too-many-instance-attributes click_delay: float, long_click_delay: float, - edge_detection: bool, state_poll: float, ) -> None: @@ -77,7 +76,6 @@ class Plugin(BaseAtx): # pylint: disable=too-many-instance-attributes self.__reader = gpio.BatchReader( pins=set([self.__power_led_pin, self.__hdd_led_pin]), - edge_detection=edge_detection, interval=state_poll, notifier=self.__notifier, ) @@ -95,7 +93,6 @@ class Plugin(BaseAtx): # pylint: disable=too-many-instance-attributes "click_delay": Option(0.1, type=valid_float_f01), "long_click_delay": Option(5.5, type=valid_float_f01), - "edge_detection": Option(False, type=valid_bool), "state_poll": Option(0.1, type=valid_float_f01), } diff --git a/kvmd/plugins/ugpio/gpio.py b/kvmd/plugins/ugpio/gpio.py index 2af24907..90426e13 100644 --- a/kvmd/plugins/ugpio/gpio.py +++ b/kvmd/plugins/ugpio/gpio.py @@ -75,7 +75,6 @@ class Plugin(BaseUserGpioDriver): for (pin, initial) in self.__output_pins.items() ], ]), - edge_detection=False, interval=self.__state_poll, notifier=self._notifier, ) |