diff options
author | Devaev Maxim <[email protected]> | 2019-06-19 04:15:43 +0300 |
---|---|---|
committer | Devaev Maxim <[email protected]> | 2019-06-19 04:15:43 +0300 |
commit | c7a2e445d0701b981aba3aa0608cb2bd4409a1a6 (patch) | |
tree | 4ea1e1c70a389706e074553cdba56a0b08b8e49d /kvmd/aiotools.py | |
parent | 376ab295bdd28c2d8bf451d91e75d508ce588128 (diff) |
many fixes with asyncio
Diffstat (limited to 'kvmd/aiotools.py')
-rw-r--r-- | kvmd/aiotools.py | 67 |
1 files changed, 55 insertions, 12 deletions
diff --git a/kvmd/aiotools.py b/kvmd/aiotools.py index ef29a2df..574c8bc7 100644 --- a/kvmd/aiotools.py +++ b/kvmd/aiotools.py @@ -22,24 +22,50 @@ import asyncio import functools +import contextlib import typing +from typing import List from typing import Callable from typing import Coroutine +from typing import Generator +from typing import AsyncGenerator from typing import TypeVar from typing import Any +from . import aioregion + +from .logging import get_logger + # ===== -_AtomicF = TypeVar("_AtomicF", bound=Callable[..., Any]) +_ATTR_SHORT_TASK = "_aiotools_short_task" + +_MethodT = TypeVar("_MethodT", bound=Callable[..., Any]) +_RetvalT = TypeVar("_RetvalT") -def atomic(method: _AtomicF) -> _AtomicF: +# ===== +def atomic(method: _MethodT) -> _MethodT: @functools.wraps(method) async def wrapper(*args: Any, **kwargs: Any) -> Any: return (await asyncio.shield(method(*args, **kwargs))) - return typing.cast(_AtomicF, wrapper) + return typing.cast(_MethodT, wrapper) + + +def muted(msg: str) -> Callable[[_MethodT], Callable[..., None]]: + def make_wrapper(method: _MethodT) -> Callable[..., None]: + @functools.wraps(method) + async def wrapper(*args: Any, **kwargs: Any) -> None: + try: + await method(*args, **kwargs) + except asyncio.CancelledError: # pylint: disable=try-except-raise + raise + except Exception: + get_logger(0).exception(msg) + return typing.cast(Callable[..., None], wrapper) + return make_wrapper def tasked(method: Callable[..., Any]) -> Callable[..., asyncio.Task]: @@ -49,29 +75,46 @@ def tasked(method: Callable[..., Any]) -> Callable[..., asyncio.Task]: return typing.cast(Callable[..., asyncio.Task], wrapper) -_ATTR_SHORT_TASK = "_aiotools_short_task" - - +# ===== def create_short_task(coro: Coroutine) -> asyncio.Task: task = asyncio.create_task(coro) setattr(task, _ATTR_SHORT_TASK, True) return task -async def gather_short_tasks() -> None: - await asyncio.gather(*[ +def get_short_tasks() -> List[asyncio.Task]: + return [ task for task in asyncio.Task.all_tasks() if getattr(task, _ATTR_SHORT_TASK, False) - ]) - - -_RetvalT = TypeVar("_RetvalT") + ] +# ===== async def run_async(method: Callable[..., _RetvalT], *args: Any) -> _RetvalT: return (await asyncio.get_running_loop().run_in_executor(None, method, *args)) def run_sync(coro: Coroutine[Any, Any, _RetvalT]) -> _RetvalT: return asyncio.get_event_loop().run_until_complete(coro) + + +# ===== +def unregion_only_on_exception(region: aioregion.AioExclusiveRegion) -> Generator[None, None, None]: + region.enter() + try: + yield + except: # noqa: E722 + region.exit() + raise + + +async def unlock_only_on_exception(lock: asyncio.Lock) -> AsyncGenerator[None, None]: + await lock.acquire() + try: + yield + except: # noqa: E722 + lock.release() + raise |