summaryrefslogtreecommitdiff
path: root/kvmd/aiomulti.py
diff options
context:
space:
mode:
authorDevaev Maxim <[email protected]>2020-10-23 03:33:57 +0300
committerDevaev Maxim <[email protected]>2020-10-25 02:44:43 +0300
commitad943811f9b0fb89c08196c08f307f7a5a374feb (patch)
treea1e2c18094df34350900cf0570d6915c2f470f8d /kvmd/aiomulti.py
parent4a211ffc1087ea689d3f1549504746887a279ef2 (diff)
ezcoo sw41ha as gpio
Diffstat (limited to 'kvmd/aiomulti.py')
-rw-r--r--kvmd/aiomulti.py40
1 files changed, 30 insertions, 10 deletions
diff --git a/kvmd/aiomulti.py b/kvmd/aiomulti.py
index edfc5c34..c0fca419 100644
--- a/kvmd/aiomulti.py
+++ b/kvmd/aiomulti.py
@@ -23,12 +23,41 @@
import multiprocessing
import queue
+from typing import Tuple
from typing import Dict
+from typing import TypeVar
+from typing import Optional
from . import aiotools
# =====
+_QueueItemT = TypeVar("_QueueItemT")
+
+
+async def queue_get_last( # pylint: disable=invalid-name
+ q: "multiprocessing.Queue[_QueueItemT]",
+ timeout: float,
+) -> Tuple[bool, Optional[_QueueItemT]]:
+
+ return (await aiotools.run_async(queue_get_last_sync, q, timeout))
+
+
+def queue_get_last_sync( # pylint: disable=invalid-name
+ q: "multiprocessing.Queue[_QueueItemT]",
+ timeout: float,
+) -> Tuple[bool, Optional[_QueueItemT]]:
+
+ try:
+ item = q.get(timeout=timeout)
+ while not q.empty():
+ item = q.get()
+ return (True, item)
+ except queue.Empty:
+ return (False, None)
+
+
+# =====
class AioProcessNotifier:
def __init__(self) -> None:
self.__queue: "multiprocessing.Queue[None]" = multiprocessing.Queue()
@@ -37,18 +66,9 @@ class AioProcessNotifier:
self.__queue.put_nowait(None)
async def wait(self) -> None:
- while not (await aiotools.run_async(self.__inner_wait)):
+ while not (await queue_get_last(self.__queue, 0.1))[0]:
pass
- def __inner_wait(self) -> bool:
- try:
- self.__queue.get(timeout=0.1)
- while not self.__queue.empty():
- self.__queue.get()
- return True
- except queue.Empty:
- return False
-
# =====
class AioSharedFlags: