summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDevaev Maxim <[email protected]>2020-03-15 02:42:10 +0300
committerDevaev Maxim <[email protected]>2020-03-15 02:42:10 +0300
commit5b58af4d6f433fcbf16e967404a5cd85a85eefba (patch)
tree9a40dfc055f59494019eb50fd0e34b232669a665
parenteb419822cde3c5c482196664a1fd8aed5236f098 (diff)
proper usage of asyncio.wait() for first completed
-rw-r--r--kvmd/aiotools.py7
-rw-r--r--kvmd/apps/kvmd/streamer.py9
-rw-r--r--kvmd/plugins/msd/otg/__init__.py9
3 files changed, 17 insertions, 8 deletions
diff --git a/kvmd/aiotools.py b/kvmd/aiotools.py
index 18990c2c..10510b77 100644
--- a/kvmd/aiotools.py
+++ b/kvmd/aiotools.py
@@ -28,8 +28,11 @@ import types
import typing
+from typing import Tuple
from typing import List
+from typing import Set
from typing import Callable
+from typing import Awaitable
from typing import Coroutine
from typing import Type
from typing import TypeVar
@@ -86,6 +89,10 @@ async def wait_infinite() -> None:
await asyncio.get_event_loop().create_future()
+async def wait_first(*aws: Awaitable) -> Tuple[Set[asyncio.Future], Set[asyncio.Future]]:
+ return (await asyncio.wait(list(aws), return_when=asyncio.FIRST_COMPLETED))
+
+
# =====
async def afile_write_now(afile: aiofiles.base.AiofilesContextManager, data: bytes) -> None:
await afile.write(data)
diff --git a/kvmd/apps/kvmd/streamer.py b/kvmd/apps/kvmd/streamer.py
index ba2e4c53..fec05987 100644
--- a/kvmd/apps/kvmd/streamer.py
+++ b/kvmd/apps/kvmd/streamer.py
@@ -207,6 +207,7 @@ class Streamer: # pylint: disable=too-many-instance-attributes
get_logger(0).info("Installing SIGUSR2 streamer handler ...")
asyncio.get_event_loop().add_signal_handler(signal.SIGUSR2, signal_handler)
+ waiter_task: Optional[asyncio.Task] = None
prev_state: Dict = {}
while True:
state = await self.get_state()
@@ -214,10 +215,10 @@ class Streamer: # pylint: disable=too-many-instance-attributes
yield state
prev_state = state
- await asyncio.wait([
- asyncio.sleep(self.__state_poll),
- notifier.wait(),
- ], return_when=asyncio.FIRST_COMPLETED)
+ if waiter_task is None:
+ waiter_task = asyncio.create_task(notifier.wait())
+ if waiter_task in (await aiotools.wait_first(asyncio.sleep(self.__state_poll), waiter_task))[0]:
+ waiter_task = None
async def get_info(self) -> Dict:
proc = await asyncio.create_subprocess_exec(
diff --git a/kvmd/plugins/msd/otg/__init__.py b/kvmd/plugins/msd/otg/__init__.py
index a65f989f..97a49dc0 100644
--- a/kvmd/plugins/msd/otg/__init__.py
+++ b/kvmd/plugins/msd/otg/__init__.py
@@ -206,6 +206,7 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes
async def poll_state(self) -> AsyncGenerator[Dict, None]:
inotify_task = asyncio.create_task(self.__watch_inotify())
+ waiter_task: Optional[asyncio.Task] = None
prev_state: Dict = {}
try:
while True:
@@ -219,10 +220,10 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes
yield state
prev_state = state
- await asyncio.wait([
- inotify_task,
- self.__state_notifier.wait(),
- ], return_when=asyncio.FIRST_COMPLETED)
+ if waiter_task is None:
+ waiter_task = asyncio.create_task(self.__state_notifier.wait())
+ if waiter_task in (await aiotools.wait_first(inotify_task, waiter_task))[0]:
+ waiter_task = None
finally:
if not inotify_task.done():
inotify_task.cancel()