diff options
author | Devaev Maxim <[email protected]> | 2019-06-19 04:15:43 +0300 |
---|---|---|
committer | Devaev Maxim <[email protected]> | 2019-06-19 04:15:43 +0300 |
commit | c7a2e445d0701b981aba3aa0608cb2bd4409a1a6 (patch) | |
tree | 4ea1e1c70a389706e074553cdba56a0b08b8e49d | |
parent | 376ab295bdd28c2d8bf451d91e75d508ce588128 (diff) |
many fixes with asyncio
-rw-r--r-- | kvmd/aiotools.py | 67 | ||||
-rw-r--r-- | kvmd/apps/kvmd/__init__.py | 2 | ||||
-rw-r--r-- | kvmd/apps/kvmd/atx.py | 42 | ||||
-rw-r--r-- | kvmd/apps/kvmd/hid.py | 40 | ||||
-rw-r--r-- | kvmd/apps/kvmd/msd.py | 62 | ||||
-rw-r--r-- | kvmd/apps/kvmd/server.py | 15 | ||||
-rw-r--r-- | kvmd/apps/kvmd/streamer.py | 13 |
7 files changed, 163 insertions, 78 deletions
diff --git a/kvmd/aiotools.py b/kvmd/aiotools.py index ef29a2df..574c8bc7 100644 --- a/kvmd/aiotools.py +++ b/kvmd/aiotools.py @@ -22,24 +22,50 @@ import asyncio import functools +import contextlib import typing +from typing import List from typing import Callable from typing import Coroutine +from typing import Generator +from typing import AsyncGenerator from typing import TypeVar from typing import Any +from . import aioregion + +from .logging import get_logger + # ===== -_AtomicF = TypeVar("_AtomicF", bound=Callable[..., Any]) +_ATTR_SHORT_TASK = "_aiotools_short_task" + +_MethodT = TypeVar("_MethodT", bound=Callable[..., Any]) +_RetvalT = TypeVar("_RetvalT") -def atomic(method: _AtomicF) -> _AtomicF: +# ===== +def atomic(method: _MethodT) -> _MethodT: @functools.wraps(method) async def wrapper(*args: Any, **kwargs: Any) -> Any: return (await asyncio.shield(method(*args, **kwargs))) - return typing.cast(_AtomicF, wrapper) + return typing.cast(_MethodT, wrapper) + + +def muted(msg: str) -> Callable[[_MethodT], Callable[..., None]]: + def make_wrapper(method: _MethodT) -> Callable[..., None]: + @functools.wraps(method) + async def wrapper(*args: Any, **kwargs: Any) -> None: + try: + await method(*args, **kwargs) + except asyncio.CancelledError: # pylint: disable=try-except-raise + raise + except Exception: + get_logger(0).exception(msg) + return typing.cast(Callable[..., None], wrapper) + return make_wrapper def tasked(method: Callable[..., Any]) -> Callable[..., asyncio.Task]: @@ -49,29 +75,46 @@ def tasked(method: Callable[..., Any]) -> Callable[..., asyncio.Task]: return typing.cast(Callable[..., asyncio.Task], wrapper) -_ATTR_SHORT_TASK = "_aiotools_short_task" - - +# ===== def create_short_task(coro: Coroutine) -> asyncio.Task: task = asyncio.create_task(coro) setattr(task, _ATTR_SHORT_TASK, True) return task -async def gather_short_tasks() -> None: - await asyncio.gather(*[ +def get_short_tasks() -> List[asyncio.Task]: + return [ task for task in asyncio.Task.all_tasks() if getattr(task, _ATTR_SHORT_TASK, False) - ]) - - -_RetvalT = TypeVar("_RetvalT") + ] +# ===== async def run_async(method: Callable[..., _RetvalT], *args: Any) -> _RetvalT: return (await asyncio.get_running_loop().run_in_executor(None, method, *args)) def run_sync(coro: Coroutine[Any, Any, _RetvalT]) -> _RetvalT: return asyncio.get_event_loop().run_until_complete(coro) + + +# ===== +def unregion_only_on_exception(region: aioregion.AioExclusiveRegion) -> Generator[None, None, None]: + region.enter() + try: + yield + except: # noqa: E722 + region.exit() + raise + + +async def unlock_only_on_exception(lock: asyncio.Lock) -> AsyncGenerator[None, None]: + await lock.acquire() + try: + yield + except: # noqa: E722 + lock.release() + raise diff --git a/kvmd/apps/kvmd/__init__.py b/kvmd/apps/kvmd/__init__.py index 49d7820f..4f60335a 100644 --- a/kvmd/apps/kvmd/__init__.py +++ b/kvmd/apps/kvmd/__init__.py @@ -67,4 +67,4 @@ def main(argv: Optional[List[str]]=None) -> None: streamer=Streamer(**config.streamer._unpack()), ).run(**config.server._unpack()) - get_logger().info("Bye-bye") + get_logger(0).info("Bye-bye") diff --git a/kvmd/apps/kvmd/atx.py b/kvmd/apps/kvmd/atx.py index aa22d831..9310c071 100644 --- a/kvmd/apps/kvmd/atx.py +++ b/kvmd/apps/kvmd/atx.py @@ -129,6 +129,16 @@ class Atx: # pylint: disable=too-many-instance-attributes else: await asyncio.sleep(60) + async def cleanup(self) -> None: + for (name, pin) in [ + ("power", self.__power_switch_pin), + ("reset", self.__reset_switch_pin), + ]: + try: + gpio.write(pin, False) + except Exception: + get_logger(0).exception("Can't cleanup %s pin %d", name, pin) + # ===== @_atx_working @@ -163,25 +173,33 @@ class Atx: # pylint: disable=too-many-instance-attributes @_atx_working async def click_power(self) -> None: - get_logger().info("Clicking power ...") - await self.__click(self.__power_switch_pin, self.__click_delay) + await self.__click("power", self.__power_switch_pin, self.__click_delay) @_atx_working async def click_power_long(self) -> None: - get_logger().info("Clicking power (long press) ...") - await self.__click(self.__power_switch_pin, self.__long_click_delay) + await self.__click("power_long", self.__power_switch_pin, self.__long_click_delay) @_atx_working async def click_reset(self) -> None: - get_logger().info("Clicking reset") - await self.__click(self.__reset_switch_pin, self.__click_delay) + await self.__click("reset", self.__reset_switch_pin, self.__click_delay) # ===== - @aiotools.tasked @aiotools.atomic - async def __click(self, pin: int, delay: float) -> None: - with self.__region: - for flag in [True, False]: - gpio.write(pin, flag) - await asyncio.sleep(delay) + async def __click(self, name: str, pin: int, delay: float) -> None: + with aiotools.unregion_only_on_exception(self.__region): + await self.__inner_click(name, pin, delay) + + @aiotools.tasked + @aiotools.muted("Can't perform ATX click or operation was not completed") + async def __inner_click(self, name: str, pin: int, delay: float) -> None: + try: + gpio.write(pin, True) + await asyncio.sleep(delay) + finally: + try: + gpio.write(pin, False) + await asyncio.sleep(1) + finally: + self.__region.exit() + get_logger(0).info("Clicked ATX button %r", name) diff --git a/kvmd/apps/kvmd/hid.py b/kvmd/apps/kvmd/hid.py index 5530b188..19ff71e3 100644 --- a/kvmd/apps/kvmd/hid.py +++ b/kvmd/apps/kvmd/hid.py @@ -169,13 +169,24 @@ class Hid(multiprocessing.Process): # pylint: disable=too-many-instance-attribu prev_state = state await asyncio.sleep(self.__state_poll) - @aiotools.tasked @aiotools.atomic async def reset(self) -> None: - async with self.__lock: + async with aiotools.unlock_only_on_exception(self.__lock): + await self.__inner_reset() + + @aiotools.tasked + @aiotools.muted("Can't reset HID or operation was not completed") + async def __inner_reset(self) -> None: + try: gpio.write(self.__reset_pin, True) await asyncio.sleep(self.__reset_delay) - gpio.write(self.__reset_pin, False) + finally: + try: + gpio.write(self.__reset_pin, False) + await asyncio.sleep(1) + finally: + self.__lock.release() + get_logger(0).info("Reset HID performed") async def send_key_event(self, key: str, state: bool) -> None: await self.__send_bool_event(_KeyEvent(key, state), self.__pressed_keys) @@ -196,17 +207,20 @@ class Hid(multiprocessing.Process): # pylint: disable=too-many-instance-attribu @aiotools.atomic async def cleanup(self) -> None: + logger = get_logger(0) async with self.__lock: - if self.is_alive(): - self.__unsafe_clear_events() - get_logger(0).info("Stopping HID daemon ...") - self.__stop_event.set() - else: - get_logger(0).warning("Emergency cleaning up HID events ...") - self.__emergency_clear_events() - if self.exitcode is not None: - self.join() - gpio.write(self.__reset_pin, False) + try: + if self.is_alive(): + self.__unsafe_clear_events() + logger.info("Stopping HID daemon ...") + self.__stop_event.set() + else: + logger.warning("Emergency cleaning up HID events ...") + self.__emergency_clear_events() + if self.exitcode is not None: + self.join() + finally: + gpio.write(self.__reset_pin, False) async def __send_bool_event(self, event: _BoolEvent, pressed: Set[str]) -> None: if not self.__stop_event.is_set(): diff --git a/kvmd/apps/kvmd/msd.py b/kvmd/apps/kvmd/msd.py index 5f85662c..377e591c 100644 --- a/kvmd/apps/kvmd/msd.py +++ b/kvmd/apps/kvmd/msd.py @@ -58,27 +58,27 @@ class MsdOperationError(MsdError): class MsdDisabledError(MsdOperationError): def __init__(self) -> None: - super().__init__("Mass-storage device is disabled") + super().__init__("MSD is disabled") class MsdOfflineError(MsdOperationError): def __init__(self) -> None: - super().__init__("Mass-storage device is not found") + super().__init__("MSD is not found") class MsdAlreadyOnServerError(MsdOperationError): def __init__(self) -> None: - super().__init__("Mass-storage is already connected to Server") + super().__init__("MSD is already connected to Server") class MsdAlreadyOnKvmError(MsdOperationError): def __init__(self) -> None: - super().__init__("Mass-storage is already connected to KVM") + super().__init__("MSD is already connected to KVM") class MsdNotOnKvmError(MsdOperationError): def __init__(self) -> None: - super().__init__("Mass-storage is not connected to KVM") + super().__init__("MSD is not connected to KVM") class MsdIsBusyError(MsdOperationError, aioregion.RegionIsBusyError): @@ -226,16 +226,16 @@ class MassStorageDevice: # pylint: disable=too-many-instance-attributes logger = get_logger(0) if self._enabled: - logger.info("Using %r as mass-storage device", self.__device_path) + logger.info("Using %r as MSD", self.__device_path) try: aiotools.run_sync(self.__load_device_info()) if self.__write_meta: logger.info("Enabled image metadata writing") except Exception as err: log = (logger.error if isinstance(err, MsdError) else logger.exception) - log("Mass-storage device is offline: %s", err) + log("MSD is offline: %s", err) else: - logger.info("Mass-storage device is disabled") + logger.info("MSD is disabled") def get_state(self) -> Dict: online = (self._enabled and bool(self._device_info)) @@ -282,7 +282,7 @@ class MassStorageDevice: # pylint: disable=too-many-instance-attributes gpio.write(self.__target_pin, True) raise self.__on_kvm = True - get_logger().info("Mass-storage device switched to KVM: %s", self._device_info) + get_logger().info("MSD switched to KVM: %s", self._device_info) state = self.get_state() return state @@ -303,7 +303,7 @@ class MassStorageDevice: # pylint: disable=too-many-instance-attributes gpio.write(self.__target_pin, True) self.__on_kvm = False - get_logger().info("Mass-storage device switched to Server") + get_logger().info("MSD switched to Server") state = self.get_state() return state @@ -311,28 +311,34 @@ class MassStorageDevice: # pylint: disable=too-many-instance-attributes if notify: await self.__state_queue.put(state or self.get_state()) - @aiotools.tasked @aiotools.atomic async def reset(self) -> None: - notify = False + if not self._enabled: + raise MsdDisabledError() + with aiotools.unregion_only_on_exception(self.__region): + await self.__inner_reset() + + @aiotools.tasked + @aiotools.muted("Can't reset MSD or operation was not completed") + async def __inner_reset(self) -> None: try: - with self.__region: - if not self._enabled: - raise MsdDisabledError() - notify = True + gpio.write(self.__reset_pin, True) + await asyncio.sleep(self.__reset_delay) + gpio.write(self.__reset_pin, False) - gpio.write(self.__reset_pin, True) - await asyncio.sleep(self.__reset_delay) - gpio.write(self.__target_pin, False) - self.__on_kvm = True - await asyncio.sleep(self.__reset_delay) - gpio.write(self.__reset_pin, False) + gpio.write(self.__target_pin, False) + self.__on_kvm = True - await self.__load_device_info() - get_logger(0).info("Mass-storage device reset has been successful") + await self.__load_device_info() + get_logger(0).info("MSD reset has been successful") finally: - if notify: - await self.__state_queue.put(self.get_state()) + try: + gpio.write(self.__reset_pin, False) + finally: + try: + await self.__state_queue.put(self.get_state()) + finally: + self.__region.exit() @_msd_working @aiotools.atomic @@ -392,12 +398,12 @@ class MassStorageDevice: # pylint: disable=too-many-instance-attributes async def __close_device_file(self) -> None: try: if self.__device_file: - get_logger().info("Closing mass-storage device file ...") + get_logger().info("Closing device file ...") await self.__device_file.close() except asyncio.CancelledError: # pylint: disable=try-except-raise raise except Exception: - get_logger().exception("Can't close mass-storage device file") + get_logger().exception("Can't close device file") finally: self.__device_file = None self.__written = 0 diff --git a/kvmd/apps/kvmd/server.py b/kvmd/apps/kvmd/server.py index 4ecbc74b..5520b13e 100644 --- a/kvmd/apps/kvmd/server.py +++ b/kvmd/apps/kvmd/server.py @@ -480,7 +480,7 @@ class Server: # pylint: disable=too-many-instance-attributes data_field = await _get_multipart_field(reader, "image_data") - logger.info("Writing image %r to mass-storage device ...", image_name) + logger.info("Writing image %r to MSD ...", image_name) await self.__msd.write_image_info(image_name, False) while True: chunk = await data_field.read_chunk(self.__msd.chunk_size) @@ -490,7 +490,7 @@ class Server: # pylint: disable=too-many-instance-attributes await self.__msd.write_image_info(image_name, True) finally: if written != 0: - logger.info("Written %d bytes to mass-storage device", written) + logger.info("Written %d bytes to MSD", written) return _json({"written": written}) @_exposed("POST", "/msd/reset") @@ -549,12 +549,14 @@ class Server: # pylint: disable=too-many-instance-attributes logger = get_logger(0) logger.info("Waiting short tasks ...") - await aiotools.gather_short_tasks() + await asyncio.gather(*aiotools.get_short_tasks(), return_exceptions=True) logger.info("Cancelling system tasks ...") for task in self.__system_tasks: task.cancel() - await asyncio.gather(*self.__system_tasks) + + logger.info("Waiting system tasks ...") + await asyncio.gather(*self.__system_tasks, return_exceptions=True) logger.info("Disconnecting clients ...") for ws in list(self.__sockets): @@ -566,15 +568,14 @@ class Server: # pylint: disable=too-many-instance-attributes self._auth_manager, self.__streamer, self.__msd, + self.__atx, self.__hid, ]: logger.info("Cleaning up %s ...", type(obj).__name__) try: await obj.cleanup() # type: ignore - except asyncio.CancelledError: # pylint: disable=try-except-raise - raise except Exception: - logger.exception("Cleanup error") + logger.exception("Cleanup error on %s", type(obj).__name__) async def __broadcast_event(self, event_type: _Events, event_attrs: Dict) -> None: if self.__sockets: diff --git a/kvmd/apps/kvmd/streamer.py b/kvmd/apps/kvmd/streamer.py index 32f3e3dd..c85a53bd 100644 --- a/kvmd/apps/kvmd/streamer.py +++ b/kvmd/apps/kvmd/streamer.py @@ -162,11 +162,14 @@ class Streamer: # pylint: disable=too-many-instance-attributes @aiotools.atomic async def cleanup(self) -> None: - if self.is_running(): - await self.stop() - if self.__http_session: - await self.__http_session.close() - self.__http_session = None + try: + if self.is_running(): + await self.stop() + if self.__http_session: + await self.__http_session.close() + self.__http_session = None + finally: + await self.__set_hw_enabled(False) def __ensure_session(self) -> aiohttp.ClientSession: if not self.__http_session: |