diff options
author | Devaev Maxim <[email protected]> | 2020-03-03 23:48:53 +0300 |
---|---|---|
committer | Devaev Maxim <[email protected]> | 2020-03-03 23:48:53 +0300 |
commit | 552bb93212ef2a890b1493b2d9be022d3ece802b (patch) | |
tree | 7ce3dd397b5fa6a78fac54f5d4c9d3bf71366008 /kvmd | |
parent | 3b16242cfa4b656ba7d396a600b452184aac6c76 (diff) |
atomic fixes, removed tasked and muted
Diffstat (limited to 'kvmd')
-rw-r--r-- | kvmd/aiotools.py | 77 | ||||
-rw-r--r-- | kvmd/apps/kvmd/wol.py | 5 | ||||
-rw-r--r-- | kvmd/plugins/atx/gpio.py | 16 | ||||
-rw-r--r-- | kvmd/plugins/hid/serial.py | 62 | ||||
-rw-r--r-- | kvmd/plugins/msd/otg/__init__.py | 1 | ||||
-rw-r--r-- | kvmd/plugins/msd/relay.py | 116 |
6 files changed, 124 insertions, 153 deletions
diff --git a/kvmd/aiotools.py b/kvmd/aiotools.py index 24021495..18990c2c 100644 --- a/kvmd/aiotools.py +++ b/kvmd/aiotools.py @@ -24,7 +24,6 @@ import os import asyncio import asyncio.queues import functools -import contextlib import types import typing @@ -32,7 +31,6 @@ import typing from typing import List from typing import Callable from typing import Coroutine -from typing import AsyncGenerator from typing import Type from typing import TypeVar from typing import Optional @@ -59,27 +57,6 @@ def atomic(method: _MethodT) -> _MethodT: return typing.cast(_MethodT, wrapper) -def muted(msg: str) -> Callable[[_MethodT], Callable[..., None]]: - def make_wrapper(method: _MethodT) -> Callable[..., None]: - @functools.wraps(method) - async def wrapper(*args: Any, **kwargs: Any) -> None: - try: - await method(*args, **kwargs) - except asyncio.CancelledError: # pylint: disable=try-except-raise - raise - except Exception: - get_logger(0).exception(msg) - return typing.cast(Callable[..., None], wrapper) - return make_wrapper - - -def tasked(method: Callable[..., Any]) -> Callable[..., asyncio.Task]: - @functools.wraps(method) - async def wrapper(*args: Any, **kwargs: Any) -> asyncio.Task: - return create_short_task(method(*args, **kwargs)) - return typing.cast(Callable[..., asyncio.Task], wrapper) - - # ===== def create_short_task(coro: Coroutine) -> asyncio.Task: task = asyncio.create_task(coro) @@ -110,17 +87,6 @@ async def wait_infinite() -> None: # ===== -async def unlock_only_on_exception(lock: asyncio.Lock) -> AsyncGenerator[None, None]: - await lock.acquire() - try: - yield - except: # noqa: E722 - lock.release() - raise - - -# ===== async def afile_write_now(afile: aiofiles.base.AiofilesContextManager, data: bytes) -> None: await afile.write(data) await afile.flush() @@ -154,6 +120,9 @@ class AioExclusiveRegion: self.__busy = False + def get_exc_type(self) -> Type[Exception]: + return self.__exc_type + def is_busy(self) -> bool: return self.__busy @@ -174,15 +143,6 @@ class AioExclusiveRegion: if self.__notifier: await self.__notifier.notify() - @contextlib.asynccontextmanager - async def exit_only_on_exception(self) -> AsyncGenerator[None, None]: - await self.enter() - try: - yield - except: # noqa: E722 - await self.exit() - raise - async def __aenter__(self) -> None: await self.enter() @@ -194,3 +154,34 @@ class AioExclusiveRegion: ) -> None: await self.exit() + + +async def run_region_task( + msg: str, + region: AioExclusiveRegion, + method: Callable[..., Coroutine[Any, Any, None]], + *args: Any, + **kwargs: Any, +) -> None: + + entered = asyncio.Future() # type: ignore + + async def wrapper() -> None: + try: + async with region: + entered.set_result(None) + await method(*args, **kwargs) + except asyncio.CancelledError: # pylint: disable=try-except-raise + raise + except region.get_exc_type(): + raise + except Exception: + get_logger(0).exception(msg) + + task = create_short_task(wrapper()) + await asyncio.wait([entered, task], return_when=asyncio.FIRST_COMPLETED) + + if entered.done(): + return + if (exc := task.exception()) is not None: # noqa: E203,E231 + raise exc diff --git a/kvmd/apps/kvmd/wol.py b/kvmd/apps/kvmd/wol.py index ea62aea1..f388f136 100644 --- a/kvmd/apps/kvmd/wol.py +++ b/kvmd/apps/kvmd/wol.py @@ -64,13 +64,10 @@ class WakeOnLan: async def wakeup(self) -> None: if not self.__magic: raise WolDisabledError() - await self.__inner_wakeup() - @aiotools.tasked - @aiotools.muted("Can't perform Wake-on-LAN or operation was not completed") - async def __inner_wakeup(self) -> None: logger = get_logger(0) logger.info("Waking up %s (%s:%s) using Wake-on-LAN ...", self.__mac, self.__ip, self.__port) + sock: Optional[socket.socket] = None try: # TODO: IPv6 support: http://lists.cluenet.de/pipermail/ipv6-ops/2014-September/010139.html diff --git a/kvmd/plugins/atx/gpio.py b/kvmd/plugins/atx/gpio.py index 51b063bc..0ed29b49 100644 --- a/kvmd/plugins/atx/gpio.py +++ b/kvmd/plugins/atx/gpio.py @@ -162,19 +162,17 @@ class Plugin(BaseAtx): # pylint: disable=too-many-instance-attributes @aiotools.atomic async def __click(self, name: str, pin: int, delay: float) -> None: - async with self.__region.exit_only_on_exception(): - await self.__inner_click(name, pin, delay) + await aiotools.run_region_task( + "Can't perform ATX click or operation was not completed", + self.__region, self.__inner_click, name, pin, delay, + ) - @aiotools.tasked - @aiotools.muted("Can't perform ATX click or operation was not completed") + @aiotools.atomic async def __inner_click(self, name: str, pin: int, delay: float) -> None: try: gpio.write(pin, True) await asyncio.sleep(delay) finally: - try: - gpio.write(pin, False) - await asyncio.sleep(1) - finally: - await self.__region.exit() + gpio.write(pin, False) + await asyncio.sleep(1) get_logger(0).info("Clicked ATX button %r", name) diff --git a/kvmd/plugins/hid/serial.py b/kvmd/plugins/hid/serial.py index b98bd101..1725483b 100644 --- a/kvmd/plugins/hid/serial.py +++ b/kvmd/plugins/hid/serial.py @@ -157,7 +157,7 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst self.__retries_delay = retries_delay self.__noop = noop - self.__lock = asyncio.Lock() + self.__reset_wip = False self.__events_queue: multiprocessing.queues.Queue = multiprocessing.Queue() @@ -216,42 +216,39 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst @aiotools.atomic async def reset(self) -> None: - async with aiotools.unlock_only_on_exception(self.__lock): - await self.__inner_reset() - - @aiotools.tasked - @aiotools.muted("Can't reset HID or operation was not completed") - async def __inner_reset(self) -> None: - try: - gpio.write(self.__reset_pin, True) - await asyncio.sleep(self.__reset_delay) - finally: + if not self.__reset_wip: try: - gpio.write(self.__reset_pin, False) - await asyncio.sleep(1) + self.__reset_wip = True + gpio.write(self.__reset_pin, True) + await asyncio.sleep(self.__reset_delay) finally: - self.__lock.release() - get_logger(0).info("Reset HID performed") + try: + gpio.write(self.__reset_pin, False) + await asyncio.sleep(1) + finally: + self.__reset_wip = False + get_logger().info("Reset HID performed") + else: + get_logger().info("Another reset HID in progress") @aiotools.atomic async def cleanup(self) -> None: logger = get_logger(0) - async with self.__lock: - try: - if self.is_alive(): - logger.info("Stopping HID daemon ...") - self.__stop_event.set() - if self.exitcode is not None: - self.join() - if os.path.exists(self.__device_path): - get_logger().info("Clearing HID events ...") - try: - with self.__get_serial() as tty: - self.__process_command(tty, b"\x10\x00\x00\x00\x00") - except Exception: - logger.exception("Can't clear HID events") - finally: - gpio.write(self.__reset_pin, False) + try: + if self.is_alive(): + logger.info("Stopping HID daemon ...") + self.__stop_event.set() + if self.exitcode is not None: + self.join() + if os.path.exists(self.__device_path): + get_logger().info("Clearing HID events ...") + try: + with self.__get_serial() as tty: + self.__process_command(tty, b"\x10\x00\x00\x00\x00") + except Exception: + logger.exception("Can't clear HID events") + finally: + gpio.write(self.__reset_pin, False) # ===== @@ -272,8 +269,7 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst async def __queue_event(self, event: _BaseEvent) -> None: if not self.__stop_event.is_set(): - async with self.__lock: - self.__events_queue.put(event) + self.__events_queue.put_nowait(event) def run(self) -> None: # pylint: disable=too-many-branches logger = get_logger(0) diff --git a/kvmd/plugins/msd/otg/__init__.py b/kvmd/plugins/msd/otg/__init__.py index ff5014cb..a65f989f 100644 --- a/kvmd/plugins/msd/otg/__init__.py +++ b/kvmd/plugins/msd/otg/__init__.py @@ -352,6 +352,7 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes await self.__state_notifier.notify() return self.__new_file_written + @aiotools.atomic async def remove(self, name: str) -> None: async with self.__state.busy(): assert self.__state.storage diff --git a/kvmd/plugins/msd/relay.py b/kvmd/plugins/msd/relay.py index 71a76e9c..34899f42 100644 --- a/kvmd/plugins/msd/relay.py +++ b/kvmd/plugins/msd/relay.py @@ -25,7 +25,6 @@ import stat import fcntl import struct import asyncio -import asyncio.queues import contextlib import dataclasses @@ -173,15 +172,14 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes self.__init_retries = init_retries self.__reset_delay = reset_delay - self.__region = aiotools.AioExclusiveRegion(MsdIsBusyError) - self.__device_info: Optional[_DeviceInfo] = None self.__connected = False self.__device_file: Optional[aiofiles.base.AiofilesContextManager] = None self.__written = 0 - self.__state_queue: asyncio.queues.Queue = asyncio.Queue() + self.__state_notifier = aiotools.AioNotifier() + self.__region = aiotools.AioExclusiveRegion(MsdIsBusyError, self.__state_notifier) logger = get_logger(0) logger.info("Using %r as MSD", self.__device_path) @@ -229,16 +227,22 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes } async def poll_state(self) -> AsyncGenerator[Dict, None]: + prev_state: Dict = {} while True: - yield (await self.__state_queue.get()) + state = await self.get_state() + if state != prev_state: + yield state + prev_state = state + await self.__state_notifier.wait() @aiotools.atomic async def reset(self) -> None: - async with self.__region.exit_only_on_exception(): - await self.__inner_reset() + await aiotools.run_region_task( + "Can't reset MSD or operation was not completed", + self.__region, self.__inner_reset, + ) - @aiotools.tasked - @aiotools.muted("Can't reset MSD or operation was not completed") + @aiotools.atomic async def __inner_reset(self) -> None: try: gpio.write(self.__reset_pin, True) @@ -251,20 +255,19 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes await self.__load_device_info() get_logger(0).info("MSD reset has been successful") finally: - try: - gpio.write(self.__reset_pin, False) - finally: - await self.__region.exit() - await self.__state_queue.put(await self.get_state()) + gpio.write(self.__reset_pin, False) @aiotools.atomic async def cleanup(self) -> None: - await self.__close_device_file() - gpio.write(self.__target_pin, False) - gpio.write(self.__reset_pin, False) + try: + await self.__close_device_file() + finally: + gpio.write(self.__target_pin, False) + gpio.write(self.__reset_pin, False) # ===== + @aiotools.atomic async def set_params(self, name: Optional[str]=None, cdrom: Optional[bool]=None) -> None: async with self.__working(): if name is not None: @@ -275,66 +278,50 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes @aiotools.atomic async def connect(self) -> None: async with self.__working(): - notify = False - try: - async with self.__region: - if self.__connected: - raise MsdConnectedError() - notify = True + async with self.__region: + if self.__connected: + raise MsdConnectedError() - gpio.write(self.__target_pin, True) - self.__connected = True - get_logger(0).info("MSD switched to Server") - finally: - if notify: - await self.__state_queue.put(await self.get_state()) + gpio.write(self.__target_pin, True) + self.__connected = True + get_logger(0).info("MSD switched to Server") @aiotools.atomic async def disconnect(self) -> None: async with self.__working(): - notify = False - try: - async with self.__region: - if not self.__connected: - raise MsdDisconnectedError() - notify = True - - gpio.write(self.__target_pin, False) - try: - await self.__load_device_info() - except Exception: - if self.__connected: - gpio.write(self.__target_pin, True) - raise - self.__connected = False - get_logger(0).info("MSD switched to KVM: %s", self.__device_info) - finally: - if notify: - await self.__state_queue.put(await self.get_state()) + async with self.__region: + if not self.__connected: + raise MsdDisconnectedError() + + gpio.write(self.__target_pin, False) + try: + await self.__load_device_info() + except Exception: + if self.__connected: + gpio.write(self.__target_pin, True) + raise + self.__connected = False + get_logger(0).info("MSD switched to KVM: %s", self.__device_info) @contextlib.asynccontextmanager async def write_image(self, name: str) -> AsyncGenerator[None, None]: async with self.__working(): - await self.__region.enter() - try: - assert self.__device_info - if self.__connected: - raise MsdConnectedError() + async with self.__region: + try: + assert self.__device_info + if self.__connected: + raise MsdConnectedError() - self.__device_file = await aiofiles.open(self.__device_info.path, mode="w+b", buffering=0) - self.__written = 0 + self.__device_file = await aiofiles.open(self.__device_info.path, mode="w+b", buffering=0) + self.__written = 0 - await self.__write_image_info(name, complete=False) - await self.__state_queue.put(await self.get_state()) - yield - await self.__write_image_info(name, complete=True) - finally: - try: + await self.__write_image_info(name, complete=False) + await self.__state_notifier.notify() + yield + await self.__write_image_info(name, complete=True) + finally: await self.__close_device_file() await self.__load_device_info() - finally: - await self.__region.exit() - await self.__state_queue.put(await self.get_state()) async def write_image_chunk(self, chunk: bytes) -> int: assert self.__device_file @@ -342,6 +329,7 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes self.__written += len(chunk) return self.__written + @aiotools.atomic async def remove(self, name: str) -> None: async with self.__working(): raise MsdMultiNotSupported() |