diff options
author | Devaev Maxim <[email protected]> | 2020-03-15 02:42:10 +0300 |
---|---|---|
committer | Devaev Maxim <[email protected]> | 2020-03-15 02:42:10 +0300 |
commit | 5b58af4d6f433fcbf16e967404a5cd85a85eefba (patch) | |
tree | 9a40dfc055f59494019eb50fd0e34b232669a665 | |
parent | eb419822cde3c5c482196664a1fd8aed5236f098 (diff) |
proper usage of asyncio.wait() for first completed
-rw-r--r-- | kvmd/aiotools.py | 7 | ||||
-rw-r--r-- | kvmd/apps/kvmd/streamer.py | 9 | ||||
-rw-r--r-- | kvmd/plugins/msd/otg/__init__.py | 9 |
3 files changed, 17 insertions, 8 deletions
diff --git a/kvmd/aiotools.py b/kvmd/aiotools.py index 18990c2c..10510b77 100644 --- a/kvmd/aiotools.py +++ b/kvmd/aiotools.py @@ -28,8 +28,11 @@ import types import typing +from typing import Tuple from typing import List +from typing import Set from typing import Callable +from typing import Awaitable from typing import Coroutine from typing import Type from typing import TypeVar @@ -86,6 +89,10 @@ async def wait_infinite() -> None: await asyncio.get_event_loop().create_future() +async def wait_first(*aws: Awaitable) -> Tuple[Set[asyncio.Future], Set[asyncio.Future]]: + return (await asyncio.wait(list(aws), return_when=asyncio.FIRST_COMPLETED)) + + # ===== async def afile_write_now(afile: aiofiles.base.AiofilesContextManager, data: bytes) -> None: await afile.write(data) diff --git a/kvmd/apps/kvmd/streamer.py b/kvmd/apps/kvmd/streamer.py index ba2e4c53..fec05987 100644 --- a/kvmd/apps/kvmd/streamer.py +++ b/kvmd/apps/kvmd/streamer.py @@ -207,6 +207,7 @@ class Streamer: # pylint: disable=too-many-instance-attributes get_logger(0).info("Installing SIGUSR2 streamer handler ...") asyncio.get_event_loop().add_signal_handler(signal.SIGUSR2, signal_handler) + waiter_task: Optional[asyncio.Task] = None prev_state: Dict = {} while True: state = await self.get_state() @@ -214,10 +215,10 @@ class Streamer: # pylint: disable=too-many-instance-attributes yield state prev_state = state - await asyncio.wait([ - asyncio.sleep(self.__state_poll), - notifier.wait(), - ], return_when=asyncio.FIRST_COMPLETED) + if waiter_task is None: + waiter_task = asyncio.create_task(notifier.wait()) + if waiter_task in (await aiotools.wait_first(asyncio.sleep(self.__state_poll), waiter_task))[0]: + waiter_task = None async def get_info(self) -> Dict: proc = await asyncio.create_subprocess_exec( diff --git a/kvmd/plugins/msd/otg/__init__.py b/kvmd/plugins/msd/otg/__init__.py index a65f989f..97a49dc0 100644 --- a/kvmd/plugins/msd/otg/__init__.py +++ b/kvmd/plugins/msd/otg/__init__.py @@ -206,6 +206,7 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes async def poll_state(self) -> AsyncGenerator[Dict, None]: inotify_task = asyncio.create_task(self.__watch_inotify()) + waiter_task: Optional[asyncio.Task] = None prev_state: Dict = {} try: while True: @@ -219,10 +220,10 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes yield state prev_state = state - await asyncio.wait([ - inotify_task, - self.__state_notifier.wait(), - ], return_when=asyncio.FIRST_COMPLETED) + if waiter_task is None: + waiter_task = asyncio.create_task(self.__state_notifier.wait()) + if waiter_task in (await aiotools.wait_first(inotify_task, waiter_task))[0]: + waiter_task = None finally: if not inotify_task.done(): inotify_task.cancel() |