summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kvmd/aiotools.py3
-rw-r--r--kvmd/gpio.py41
-rw-r--r--kvmd/plugins/atx/gpio.py3
-rw-r--r--kvmd/plugins/ugpio/gpio.py1
4 files changed, 1 insertions, 47 deletions
diff --git a/kvmd/aiotools.py b/kvmd/aiotools.py
index dfd67f44..84c6f314 100644
--- a/kvmd/aiotools.py
+++ b/kvmd/aiotools.py
@@ -97,9 +97,6 @@ class AioNotifier:
async def notify(self) -> None:
await self.__queue.put(None)
- def notify_sync(self) -> None:
- self.__queue.put_nowait(None)
-
async def wait(self) -> None:
await self.__queue.get()
while not self.__queue.empty():
diff --git a/kvmd/gpio.py b/kvmd/gpio.py
index 18b36dfd..8dce12f6 100644
--- a/kvmd/gpio.py
+++ b/kvmd/gpio.py
@@ -23,7 +23,6 @@
import asyncio
import contextlib
-from typing import List
from typing import Tuple
from typing import Set
from typing import Generator
@@ -75,59 +74,21 @@ class BatchReader:
def __init__(
self,
pins: Set[int],
- edge_detection: bool,
interval: float,
notifier: aiotools.AioNotifier,
) -> None:
self.__pins = sorted(pins)
- self.__edge_detection = edge_detection
self.__interval = interval
self.__notifier = notifier
self.__state = {pin: read(pin) for pin in self.__pins}
-
- self.__loop: Optional[asyncio.AbstractEventLoop] = None # Only for edge detection
-
- self.__flags: Tuple[Optional[bool], ...] = (None,) * len(self.__pins) # Only for busyloop
+ self.__flags: Tuple[Optional[bool], ...] = (None,) * len(self.__pins)
def get(self, pin: int) -> bool:
return self.__state[pin]
async def poll(self) -> None:
- if self.__edge_detection:
- await self.__poll_edge()
- else:
- await self.__poll_busyloop()
-
- # =====
-
- async def __poll_edge(self) -> None:
- assert self.__loop is None
- self.__loop = asyncio.get_running_loop()
- watched: List[int] = []
- try:
- for pin in self.__pins:
- GPIO.add_event_detect(
- pin, GPIO.BOTH,
- callback=self.__poll_edge_callback,
- bouncetime=int(self.__interval * 1000),
- )
- watched.append(pin)
- await self.__notifier.notify()
- await aiotools.wait_infinite()
- finally:
- for pin in watched:
- GPIO.remove_event_detect(pin)
-
- def __poll_edge_callback(self, pin: int) -> None:
- assert self.__loop
- self.__state[pin] = read(pin)
- self.__loop.call_soon_threadsafe(self.__notifier.notify_sync)
-
- # =====
-
- async def __poll_busyloop(self) -> None:
if not self.__pins:
await aiotools.wait_infinite()
else:
diff --git a/kvmd/plugins/atx/gpio.py b/kvmd/plugins/atx/gpio.py
index 8401d2c0..e3225cf0 100644
--- a/kvmd/plugins/atx/gpio.py
+++ b/kvmd/plugins/atx/gpio.py
@@ -57,7 +57,6 @@ class Plugin(BaseAtx): # pylint: disable=too-many-instance-attributes
click_delay: float,
long_click_delay: float,
- edge_detection: bool,
state_poll: float,
) -> None:
@@ -77,7 +76,6 @@ class Plugin(BaseAtx): # pylint: disable=too-many-instance-attributes
self.__reader = gpio.BatchReader(
pins=set([self.__power_led_pin, self.__hdd_led_pin]),
- edge_detection=edge_detection,
interval=state_poll,
notifier=self.__notifier,
)
@@ -95,7 +93,6 @@ class Plugin(BaseAtx): # pylint: disable=too-many-instance-attributes
"click_delay": Option(0.1, type=valid_float_f01),
"long_click_delay": Option(5.5, type=valid_float_f01),
- "edge_detection": Option(False, type=valid_bool),
"state_poll": Option(0.1, type=valid_float_f01),
}
diff --git a/kvmd/plugins/ugpio/gpio.py b/kvmd/plugins/ugpio/gpio.py
index 2af24907..90426e13 100644
--- a/kvmd/plugins/ugpio/gpio.py
+++ b/kvmd/plugins/ugpio/gpio.py
@@ -75,7 +75,6 @@ class Plugin(BaseUserGpioDriver):
for (pin, initial) in self.__output_pins.items()
],
]),
- edge_detection=False,
interval=self.__state_poll,
notifier=self._notifier,
)