summaryrefslogtreecommitdiff
path: root/kvmd
diff options
context:
space:
mode:
authorDevaev Maxim <[email protected]>2019-06-19 04:15:43 +0300
committerDevaev Maxim <[email protected]>2019-06-19 04:15:43 +0300
commitc7a2e445d0701b981aba3aa0608cb2bd4409a1a6 (patch)
tree4ea1e1c70a389706e074553cdba56a0b08b8e49d /kvmd
parent376ab295bdd28c2d8bf451d91e75d508ce588128 (diff)
many fixes with asyncio
Diffstat (limited to 'kvmd')
-rw-r--r--kvmd/aiotools.py67
-rw-r--r--kvmd/apps/kvmd/__init__.py2
-rw-r--r--kvmd/apps/kvmd/atx.py42
-rw-r--r--kvmd/apps/kvmd/hid.py40
-rw-r--r--kvmd/apps/kvmd/msd.py62
-rw-r--r--kvmd/apps/kvmd/server.py15
-rw-r--r--kvmd/apps/kvmd/streamer.py13
7 files changed, 163 insertions, 78 deletions
diff --git a/kvmd/aiotools.py b/kvmd/aiotools.py
index ef29a2df..574c8bc7 100644
--- a/kvmd/aiotools.py
+++ b/kvmd/aiotools.py
@@ -22,24 +22,50 @@
import asyncio
import functools
+import contextlib
import typing
+from typing import List
from typing import Callable
from typing import Coroutine
+from typing import Generator
+from typing import AsyncGenerator
from typing import TypeVar
from typing import Any
+from . import aioregion
+
+from .logging import get_logger
+
# =====
-_AtomicF = TypeVar("_AtomicF", bound=Callable[..., Any])
+_ATTR_SHORT_TASK = "_aiotools_short_task"
+
+_MethodT = TypeVar("_MethodT", bound=Callable[..., Any])
+_RetvalT = TypeVar("_RetvalT")
-def atomic(method: _AtomicF) -> _AtomicF:
+# =====
+def atomic(method: _MethodT) -> _MethodT:
@functools.wraps(method)
async def wrapper(*args: Any, **kwargs: Any) -> Any:
return (await asyncio.shield(method(*args, **kwargs)))
- return typing.cast(_AtomicF, wrapper)
+ return typing.cast(_MethodT, wrapper)
+
+
+def muted(msg: str) -> Callable[[_MethodT], Callable[..., None]]:
+ def make_wrapper(method: _MethodT) -> Callable[..., None]:
+ @functools.wraps(method)
+ async def wrapper(*args: Any, **kwargs: Any) -> None:
+ try:
+ await method(*args, **kwargs)
+ except asyncio.CancelledError: # pylint: disable=try-except-raise
+ raise
+ except Exception:
+ get_logger(0).exception(msg)
+ return typing.cast(Callable[..., None], wrapper)
+ return make_wrapper
def tasked(method: Callable[..., Any]) -> Callable[..., asyncio.Task]:
@@ -49,29 +75,46 @@ def tasked(method: Callable[..., Any]) -> Callable[..., asyncio.Task]:
return typing.cast(Callable[..., asyncio.Task], wrapper)
-_ATTR_SHORT_TASK = "_aiotools_short_task"
-
-
+# =====
def create_short_task(coro: Coroutine) -> asyncio.Task:
task = asyncio.create_task(coro)
setattr(task, _ATTR_SHORT_TASK, True)
return task
-async def gather_short_tasks() -> None:
- await asyncio.gather(*[
+def get_short_tasks() -> List[asyncio.Task]:
+ return [
task
for task in asyncio.Task.all_tasks()
if getattr(task, _ATTR_SHORT_TASK, False)
- ])
-
-
-_RetvalT = TypeVar("_RetvalT")
+ ]
+# =====
async def run_async(method: Callable[..., _RetvalT], *args: Any) -> _RetvalT:
return (await asyncio.get_running_loop().run_in_executor(None, method, *args))
def run_sync(coro: Coroutine[Any, Any, _RetvalT]) -> _RetvalT:
return asyncio.get_event_loop().run_until_complete(coro)
+
+
+# =====
+def unregion_only_on_exception(region: aioregion.AioExclusiveRegion) -> Generator[None, None, None]:
+ region.enter()
+ try:
+ yield
+ except: # noqa: E722
+ region.exit()
+ raise
+
+
+async def unlock_only_on_exception(lock: asyncio.Lock) -> AsyncGenerator[None, None]:
+ await lock.acquire()
+ try:
+ yield
+ except: # noqa: E722
+ lock.release()
+ raise
diff --git a/kvmd/apps/kvmd/__init__.py b/kvmd/apps/kvmd/__init__.py
index 49d7820f..4f60335a 100644
--- a/kvmd/apps/kvmd/__init__.py
+++ b/kvmd/apps/kvmd/__init__.py
@@ -67,4 +67,4 @@ def main(argv: Optional[List[str]]=None) -> None:
streamer=Streamer(**config.streamer._unpack()),
).run(**config.server._unpack())
- get_logger().info("Bye-bye")
+ get_logger(0).info("Bye-bye")
diff --git a/kvmd/apps/kvmd/atx.py b/kvmd/apps/kvmd/atx.py
index aa22d831..9310c071 100644
--- a/kvmd/apps/kvmd/atx.py
+++ b/kvmd/apps/kvmd/atx.py
@@ -129,6 +129,16 @@ class Atx: # pylint: disable=too-many-instance-attributes
else:
await asyncio.sleep(60)
+ async def cleanup(self) -> None:
+ for (name, pin) in [
+ ("power", self.__power_switch_pin),
+ ("reset", self.__reset_switch_pin),
+ ]:
+ try:
+ gpio.write(pin, False)
+ except Exception:
+ get_logger(0).exception("Can't cleanup %s pin %d", name, pin)
+
# =====
@_atx_working
@@ -163,25 +173,33 @@ class Atx: # pylint: disable=too-many-instance-attributes
@_atx_working
async def click_power(self) -> None:
- get_logger().info("Clicking power ...")
- await self.__click(self.__power_switch_pin, self.__click_delay)
+ await self.__click("power", self.__power_switch_pin, self.__click_delay)
@_atx_working
async def click_power_long(self) -> None:
- get_logger().info("Clicking power (long press) ...")
- await self.__click(self.__power_switch_pin, self.__long_click_delay)
+ await self.__click("power_long", self.__power_switch_pin, self.__long_click_delay)
@_atx_working
async def click_reset(self) -> None:
- get_logger().info("Clicking reset")
- await self.__click(self.__reset_switch_pin, self.__click_delay)
+ await self.__click("reset", self.__reset_switch_pin, self.__click_delay)
# =====
- @aiotools.tasked
@aiotools.atomic
- async def __click(self, pin: int, delay: float) -> None:
- with self.__region:
- for flag in [True, False]:
- gpio.write(pin, flag)
- await asyncio.sleep(delay)
+ async def __click(self, name: str, pin: int, delay: float) -> None:
+ with aiotools.unregion_only_on_exception(self.__region):
+ await self.__inner_click(name, pin, delay)
+
+ @aiotools.tasked
+ @aiotools.muted("Can't perform ATX click or operation was not completed")
+ async def __inner_click(self, name: str, pin: int, delay: float) -> None:
+ try:
+ gpio.write(pin, True)
+ await asyncio.sleep(delay)
+ finally:
+ try:
+ gpio.write(pin, False)
+ await asyncio.sleep(1)
+ finally:
+ self.__region.exit()
+ get_logger(0).info("Clicked ATX button %r", name)
diff --git a/kvmd/apps/kvmd/hid.py b/kvmd/apps/kvmd/hid.py
index 5530b188..19ff71e3 100644
--- a/kvmd/apps/kvmd/hid.py
+++ b/kvmd/apps/kvmd/hid.py
@@ -169,13 +169,24 @@ class Hid(multiprocessing.Process): # pylint: disable=too-many-instance-attribu
prev_state = state
await asyncio.sleep(self.__state_poll)
- @aiotools.tasked
@aiotools.atomic
async def reset(self) -> None:
- async with self.__lock:
+ async with aiotools.unlock_only_on_exception(self.__lock):
+ await self.__inner_reset()
+
+ @aiotools.tasked
+ @aiotools.muted("Can't reset HID or operation was not completed")
+ async def __inner_reset(self) -> None:
+ try:
gpio.write(self.__reset_pin, True)
await asyncio.sleep(self.__reset_delay)
- gpio.write(self.__reset_pin, False)
+ finally:
+ try:
+ gpio.write(self.__reset_pin, False)
+ await asyncio.sleep(1)
+ finally:
+ self.__lock.release()
+ get_logger(0).info("Reset HID performed")
async def send_key_event(self, key: str, state: bool) -> None:
await self.__send_bool_event(_KeyEvent(key, state), self.__pressed_keys)
@@ -196,17 +207,20 @@ class Hid(multiprocessing.Process): # pylint: disable=too-many-instance-attribu
@aiotools.atomic
async def cleanup(self) -> None:
+ logger = get_logger(0)
async with self.__lock:
- if self.is_alive():
- self.__unsafe_clear_events()
- get_logger(0).info("Stopping HID daemon ...")
- self.__stop_event.set()
- else:
- get_logger(0).warning("Emergency cleaning up HID events ...")
- self.__emergency_clear_events()
- if self.exitcode is not None:
- self.join()
- gpio.write(self.__reset_pin, False)
+ try:
+ if self.is_alive():
+ self.__unsafe_clear_events()
+ logger.info("Stopping HID daemon ...")
+ self.__stop_event.set()
+ else:
+ logger.warning("Emergency cleaning up HID events ...")
+ self.__emergency_clear_events()
+ if self.exitcode is not None:
+ self.join()
+ finally:
+ gpio.write(self.__reset_pin, False)
async def __send_bool_event(self, event: _BoolEvent, pressed: Set[str]) -> None:
if not self.__stop_event.is_set():
diff --git a/kvmd/apps/kvmd/msd.py b/kvmd/apps/kvmd/msd.py
index 5f85662c..377e591c 100644
--- a/kvmd/apps/kvmd/msd.py
+++ b/kvmd/apps/kvmd/msd.py
@@ -58,27 +58,27 @@ class MsdOperationError(MsdError):
class MsdDisabledError(MsdOperationError):
def __init__(self) -> None:
- super().__init__("Mass-storage device is disabled")
+ super().__init__("MSD is disabled")
class MsdOfflineError(MsdOperationError):
def __init__(self) -> None:
- super().__init__("Mass-storage device is not found")
+ super().__init__("MSD is not found")
class MsdAlreadyOnServerError(MsdOperationError):
def __init__(self) -> None:
- super().__init__("Mass-storage is already connected to Server")
+ super().__init__("MSD is already connected to Server")
class MsdAlreadyOnKvmError(MsdOperationError):
def __init__(self) -> None:
- super().__init__("Mass-storage is already connected to KVM")
+ super().__init__("MSD is already connected to KVM")
class MsdNotOnKvmError(MsdOperationError):
def __init__(self) -> None:
- super().__init__("Mass-storage is not connected to KVM")
+ super().__init__("MSD is not connected to KVM")
class MsdIsBusyError(MsdOperationError, aioregion.RegionIsBusyError):
@@ -226,16 +226,16 @@ class MassStorageDevice: # pylint: disable=too-many-instance-attributes
logger = get_logger(0)
if self._enabled:
- logger.info("Using %r as mass-storage device", self.__device_path)
+ logger.info("Using %r as MSD", self.__device_path)
try:
aiotools.run_sync(self.__load_device_info())
if self.__write_meta:
logger.info("Enabled image metadata writing")
except Exception as err:
log = (logger.error if isinstance(err, MsdError) else logger.exception)
- log("Mass-storage device is offline: %s", err)
+ log("MSD is offline: %s", err)
else:
- logger.info("Mass-storage device is disabled")
+ logger.info("MSD is disabled")
def get_state(self) -> Dict:
online = (self._enabled and bool(self._device_info))
@@ -282,7 +282,7 @@ class MassStorageDevice: # pylint: disable=too-many-instance-attributes
gpio.write(self.__target_pin, True)
raise
self.__on_kvm = True
- get_logger().info("Mass-storage device switched to KVM: %s", self._device_info)
+ get_logger().info("MSD switched to KVM: %s", self._device_info)
state = self.get_state()
return state
@@ -303,7 +303,7 @@ class MassStorageDevice: # pylint: disable=too-many-instance-attributes
gpio.write(self.__target_pin, True)
self.__on_kvm = False
- get_logger().info("Mass-storage device switched to Server")
+ get_logger().info("MSD switched to Server")
state = self.get_state()
return state
@@ -311,28 +311,34 @@ class MassStorageDevice: # pylint: disable=too-many-instance-attributes
if notify:
await self.__state_queue.put(state or self.get_state())
- @aiotools.tasked
@aiotools.atomic
async def reset(self) -> None:
- notify = False
+ if not self._enabled:
+ raise MsdDisabledError()
+ with aiotools.unregion_only_on_exception(self.__region):
+ await self.__inner_reset()
+
+ @aiotools.tasked
+ @aiotools.muted("Can't reset MSD or operation was not completed")
+ async def __inner_reset(self) -> None:
try:
- with self.__region:
- if not self._enabled:
- raise MsdDisabledError()
- notify = True
+ gpio.write(self.__reset_pin, True)
+ await asyncio.sleep(self.__reset_delay)
+ gpio.write(self.__reset_pin, False)
- gpio.write(self.__reset_pin, True)
- await asyncio.sleep(self.__reset_delay)
- gpio.write(self.__target_pin, False)
- self.__on_kvm = True
- await asyncio.sleep(self.__reset_delay)
- gpio.write(self.__reset_pin, False)
+ gpio.write(self.__target_pin, False)
+ self.__on_kvm = True
- await self.__load_device_info()
- get_logger(0).info("Mass-storage device reset has been successful")
+ await self.__load_device_info()
+ get_logger(0).info("MSD reset has been successful")
finally:
- if notify:
- await self.__state_queue.put(self.get_state())
+ try:
+ gpio.write(self.__reset_pin, False)
+ finally:
+ try:
+ await self.__state_queue.put(self.get_state())
+ finally:
+ self.__region.exit()
@_msd_working
@aiotools.atomic
@@ -392,12 +398,12 @@ class MassStorageDevice: # pylint: disable=too-many-instance-attributes
async def __close_device_file(self) -> None:
try:
if self.__device_file:
- get_logger().info("Closing mass-storage device file ...")
+ get_logger().info("Closing device file ...")
await self.__device_file.close()
except asyncio.CancelledError: # pylint: disable=try-except-raise
raise
except Exception:
- get_logger().exception("Can't close mass-storage device file")
+ get_logger().exception("Can't close device file")
finally:
self.__device_file = None
self.__written = 0
diff --git a/kvmd/apps/kvmd/server.py b/kvmd/apps/kvmd/server.py
index 4ecbc74b..5520b13e 100644
--- a/kvmd/apps/kvmd/server.py
+++ b/kvmd/apps/kvmd/server.py
@@ -480,7 +480,7 @@ class Server: # pylint: disable=too-many-instance-attributes
data_field = await _get_multipart_field(reader, "image_data")
- logger.info("Writing image %r to mass-storage device ...", image_name)
+ logger.info("Writing image %r to MSD ...", image_name)
await self.__msd.write_image_info(image_name, False)
while True:
chunk = await data_field.read_chunk(self.__msd.chunk_size)
@@ -490,7 +490,7 @@ class Server: # pylint: disable=too-many-instance-attributes
await self.__msd.write_image_info(image_name, True)
finally:
if written != 0:
- logger.info("Written %d bytes to mass-storage device", written)
+ logger.info("Written %d bytes to MSD", written)
return _json({"written": written})
@_exposed("POST", "/msd/reset")
@@ -549,12 +549,14 @@ class Server: # pylint: disable=too-many-instance-attributes
logger = get_logger(0)
logger.info("Waiting short tasks ...")
- await aiotools.gather_short_tasks()
+ await asyncio.gather(*aiotools.get_short_tasks(), return_exceptions=True)
logger.info("Cancelling system tasks ...")
for task in self.__system_tasks:
task.cancel()
- await asyncio.gather(*self.__system_tasks)
+
+ logger.info("Waiting system tasks ...")
+ await asyncio.gather(*self.__system_tasks, return_exceptions=True)
logger.info("Disconnecting clients ...")
for ws in list(self.__sockets):
@@ -566,15 +568,14 @@ class Server: # pylint: disable=too-many-instance-attributes
self._auth_manager,
self.__streamer,
self.__msd,
+ self.__atx,
self.__hid,
]:
logger.info("Cleaning up %s ...", type(obj).__name__)
try:
await obj.cleanup() # type: ignore
- except asyncio.CancelledError: # pylint: disable=try-except-raise
- raise
except Exception:
- logger.exception("Cleanup error")
+ logger.exception("Cleanup error on %s", type(obj).__name__)
async def __broadcast_event(self, event_type: _Events, event_attrs: Dict) -> None:
if self.__sockets:
diff --git a/kvmd/apps/kvmd/streamer.py b/kvmd/apps/kvmd/streamer.py
index 32f3e3dd..c85a53bd 100644
--- a/kvmd/apps/kvmd/streamer.py
+++ b/kvmd/apps/kvmd/streamer.py
@@ -162,11 +162,14 @@ class Streamer: # pylint: disable=too-many-instance-attributes
@aiotools.atomic
async def cleanup(self) -> None:
- if self.is_running():
- await self.stop()
- if self.__http_session:
- await self.__http_session.close()
- self.__http_session = None
+ try:
+ if self.is_running():
+ await self.stop()
+ if self.__http_session:
+ await self.__http_session.close()
+ self.__http_session = None
+ finally:
+ await self.__set_hw_enabled(False)
def __ensure_session(self) -> aiohttp.ClientSession:
if not self.__http_session: