summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDevaev Maxim <[email protected]>2019-06-05 06:30:21 +0300
committerDevaev Maxim <[email protected]>2019-06-05 06:30:21 +0300
commit8aa333ba895e7b498e867f7b430335852a4a9f84 (patch)
treef7726c35505d5054115cf01192250c3198f3243e
parent234aa8bda4978a9d0bed70237e7548ea5a051422 (diff)
better atomic ops
-rw-r--r--kvmd/aiotools.py48
-rw-r--r--kvmd/apps/kvmd/atx.py13
-rw-r--r--kvmd/apps/kvmd/auth.py3
-rw-r--r--kvmd/apps/kvmd/hid.py4
-rw-r--r--kvmd/apps/kvmd/msd.py13
-rw-r--r--kvmd/apps/kvmd/server.py29
-rw-r--r--kvmd/apps/kvmd/streamer.py2
7 files changed, 83 insertions, 29 deletions
diff --git a/kvmd/aiotools.py b/kvmd/aiotools.py
new file mode 100644
index 00000000..d0c12537
--- /dev/null
+++ b/kvmd/aiotools.py
@@ -0,0 +1,48 @@
+# ========================================================================== #
+# #
+# KVMD - The main Pi-KVM daemon. #
+# #
+# Copyright (C) 2018 Maxim Devaev <[email protected]> #
+# #
+# This program is free software: you can redistribute it and/or modify #
+# it under the terms of the GNU General Public License as published by #
+# the Free Software Foundation, either version 3 of the License, or #
+# (at your option) any later version. #
+# #
+# This program is distributed in the hope that it will be useful, #
+# but WITHOUT ANY WARRANTY; without even the implied warranty of #
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
+# GNU General Public License for more details. #
+# #
+# You should have received a copy of the GNU General Public License #
+# along with this program. If not, see <https://www.gnu.org/licenses/>. #
+# #
+# ========================================================================== #
+
+
+import asyncio
+import functools
+
+import typing
+
+from typing import Callable
+from typing import TypeVar
+from typing import Any
+
+
+# =====
+_AtomicF = TypeVar("_AtomicF", bound=Callable[..., Any])
+
+
+def atomic(method: _AtomicF) -> _AtomicF:
+ @functools.wraps(method)
+ async def wrapper(*args: object, **kwargs: object) -> Any:
+ return (await asyncio.shield(method(*args, **kwargs)))
+ return typing.cast(_AtomicF, wrapper)
+
+
+def task(method: Callable[..., Any]) -> Callable[..., asyncio.Task]:
+ @functools.wraps(method)
+ async def wrapper(*args: object, **kwargs: object) -> asyncio.Task:
+ return asyncio.create_task(method(*args, **kwargs))
+ return typing.cast(Callable[..., asyncio.Task], wrapper)
diff --git a/kvmd/apps/kvmd/atx.py b/kvmd/apps/kvmd/atx.py
index 3bf2bd62..861c3dc2 100644
--- a/kvmd/apps/kvmd/atx.py
+++ b/kvmd/apps/kvmd/atx.py
@@ -30,6 +30,7 @@ from typing import Any
from ...logging import get_logger
+from ... import aiotools
from ... import aioregion
from ... import gpio
@@ -174,14 +175,10 @@ class Atx: # pylint: disable=too-many-instance-attributes
# =====
+ @aiotools.task
+ @aiotools.atomic
async def __click(self, pin: int, delay: float) -> None:
- self.__region.enter()
- asyncio.ensure_future(self.__inner_click(pin, delay))
-
- async def __inner_click(self, pin: int, delay: float) -> None:
- try:
- for flag in (True, False):
+ with self.__region:
+ for flag in [True, False]:
gpio.write(pin, flag)
await asyncio.sleep(delay)
- finally:
- self.__region.exit()
diff --git a/kvmd/apps/kvmd/auth.py b/kvmd/apps/kvmd/auth.py
index d1d21db4..6125db29 100644
--- a/kvmd/apps/kvmd/auth.py
+++ b/kvmd/apps/kvmd/auth.py
@@ -26,6 +26,8 @@ from typing import List
from typing import Dict
from typing import Optional
+from ... import aiotools
+
from ...logging import get_logger
from ...plugins.auth import BaseAuthService
@@ -91,6 +93,7 @@ class AuthManager:
def check(self, token: str) -> Optional[str]:
return self.__tokens.get(token)
+ @aiotools.atomic
async def cleanup(self) -> None:
await self.__internal_service.cleanup()
if self.__external_service:
diff --git a/kvmd/apps/kvmd/hid.py b/kvmd/apps/kvmd/hid.py
index dbfd5c58..da1c7d43 100644
--- a/kvmd/apps/kvmd/hid.py
+++ b/kvmd/apps/kvmd/hid.py
@@ -40,6 +40,7 @@ import setproctitle
from ...logging import get_logger
+from ... import aiotools
from ... import gpio
from ... import keymap
@@ -164,6 +165,8 @@ class Hid(multiprocessing.Process): # pylint: disable=too-many-instance-attribu
yield self.get_state()
await asyncio.sleep(self.__state_poll)
+ @aiotools.task
+ @aiotools.atomic
async def reset(self) -> None:
async with self.__lock:
gpio.write(self.__reset_pin, True)
@@ -187,6 +190,7 @@ class Hid(multiprocessing.Process): # pylint: disable=too-many-instance-attribu
async with self.__lock:
self.__unsafe_clear_events()
+ @aiotools.atomic
async def cleanup(self) -> None:
async with self.__lock:
if self.is_alive():
diff --git a/kvmd/apps/kvmd/msd.py b/kvmd/apps/kvmd/msd.py
index 7bf9dff1..6033b25a 100644
--- a/kvmd/apps/kvmd/msd.py
+++ b/kvmd/apps/kvmd/msd.py
@@ -42,6 +42,7 @@ import aiofiles.base
from ...logging import get_logger
from ... import aioregion
+from ... import aiotools
from ... import gpio
@@ -273,6 +274,7 @@ class MassStorageDevice: # pylint: disable=too-many-instance-attributes
else:
await asyncio.sleep(60)
+ @aiotools.atomic
async def cleanup(self) -> None:
if self._enabled:
await self.__close_device_file()
@@ -280,6 +282,7 @@ class MassStorageDevice: # pylint: disable=too-many-instance-attributes
gpio.write(self.__reset_pin, False)
@_msd_working
+ @aiotools.atomic
async def connect_to_kvm(self, initial: bool=False) -> Dict:
with self.__region:
if self.__device_info:
@@ -299,6 +302,7 @@ class MassStorageDevice: # pylint: disable=too-many-instance-attributes
return state
@_msd_working
+ @aiotools.atomic
async def connect_to_pc(self) -> Dict:
with self.__region:
if not self.__device_info:
@@ -311,15 +315,17 @@ class MassStorageDevice: # pylint: disable=too-many-instance-attributes
return state
@_msd_working
+ @aiotools.task
+ @aiotools.atomic
async def reset(self) -> None:
with self.__region:
get_logger().info("Mass-storage device reset")
gpio.write(self.__reset_pin, True)
await asyncio.sleep(self.__reset_delay)
gpio.write(self.__reset_pin, False)
- await self.__state_queue.put(self.get_state())
@_msd_working
+ @aiotools.atomic
async def __aenter__(self) -> "MassStorageDevice":
self.__region.enter()
try:
@@ -332,6 +338,7 @@ class MassStorageDevice: # pylint: disable=too-many-instance-attributes
await self.__state_queue.put(self.get_state())
self.__region.exit()
+ @aiotools.atomic
async def write_image_info(self, name: str, complete: bool) -> None:
assert self.__device_file
assert self.__device_info
@@ -344,11 +351,13 @@ class MassStorageDevice: # pylint: disable=too-many-instance-attributes
else:
get_logger().error("Can't write image info because device is full")
+ @aiotools.atomic
async def write_image_chunk(self, chunk: bytes) -> int:
await self.__write_to_device_file(chunk)
self.__written += len(chunk)
return self.__written
+ @aiotools.atomic
async def __aexit__(
self,
_exc_type: Type[BaseException],
@@ -380,6 +389,6 @@ class MassStorageDevice: # pylint: disable=too-many-instance-attributes
await self.__device_file.close()
except Exception:
get_logger().exception("Can't close mass-storage device file")
- await self.reset()
+ await (await self.reset())
self.__device_file = None
self.__written = 0
diff --git a/kvmd/apps/kvmd/server.py b/kvmd/apps/kvmd/server.py
index 25111e11..d4c4c13a 100644
--- a/kvmd/apps/kvmd/server.py
+++ b/kvmd/apps/kvmd/server.py
@@ -155,13 +155,7 @@ _HEADER_AUTH_PASSWD = "X-KVMD-Passwd"
_COOKIE_AUTH_TOKEN = "auth_token"
-def _atomic(handler: Callable) -> Callable:
- async def wrapper(self: "Server", request: aiohttp.web.Request) -> aiohttp.web.Response:
- return (await asyncio.shield(handler(self, request)))
- return wrapper
-
-
-def _exposed(http_method: str, path: str, atomic: bool=False, auth_required: bool=True) -> Callable:
+def _exposed(http_method: str, path: str, auth_required: bool=True) -> Callable:
def make_wrapper(handler: Callable) -> Callable:
async def wrapper(self: "Server", request: aiohttp.web.Request) -> aiohttp.web.Response:
try:
@@ -197,9 +191,6 @@ def _exposed(http_method: str, path: str, atomic: bool=False, auth_required: boo
except ForbiddenError as err:
return _json_exception(err, 403)
- if atomic:
- wrapper = _atomic(wrapper)
-
setattr(wrapper, _ATTR_EXPOSED, True)
setattr(wrapper, _ATTR_EXPOSED_METHOD, http_method)
setattr(wrapper, _ATTR_EXPOSED_PATH, path)
@@ -311,7 +302,7 @@ class Server: # pylint: disable=too-many-instance-attributes
# ===== AUTH
- @_exposed("POST", "/auth/login", atomic=True, auth_required=False)
+ @_exposed("POST", "/auth/login", auth_required=False)
async def __auth_login_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response:
credentials = await request.post()
token = await self._auth_manager.login(
@@ -429,7 +420,7 @@ class Server: # pylint: disable=too-many-instance-attributes
async def __hid_state_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
return _json(self.__hid.get_state())
- @_exposed("POST", "/hid/reset", atomic=True)
+ @_exposed("POST", "/hid/reset")
async def __hid_reset_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
await self.__hid.reset()
return _json()
@@ -440,18 +431,18 @@ class Server: # pylint: disable=too-many-instance-attributes
async def __atx_state_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
return _json(self.__atx.get_state())
- @_exposed("POST", "/atx/power", atomic=True)
+ @_exposed("POST", "/atx/power")
async def __atx_power_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response:
action = valid_atx_power_action(request.query.get("action"))
- done = await ({
+ processing = await ({
"on": self.__atx.power_on,
"off": self.__atx.power_off,
"off_hard": self.__atx.power_off_hard,
"reset_hard": self.__atx.power_reset_hard,
}[action])()
- return _json({"action": action, "done": done})
+ return _json({"processing": processing})
- @_exposed("POST", "/atx/click", atomic=True)
+ @_exposed("POST", "/atx/click")
async def __atx_click_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response:
button = valid_atx_button(request.query.get("button"))
await ({
@@ -459,7 +450,7 @@ class Server: # pylint: disable=too-many-instance-attributes
"power_long": self.__atx.click_power_long,
"reset": self.__atx.click_reset,
}[button])()
- return _json({"clicked": button})
+ return _json()
# ===== MSD
@@ -467,7 +458,7 @@ class Server: # pylint: disable=too-many-instance-attributes
async def __msd_state_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
return _json(self.__msd.get_state())
- @_exposed("POST", "/msd/connect", atomic=True)
+ @_exposed("POST", "/msd/connect")
async def __msd_connect_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response:
to = valid_kvm_target(request.query.get("to"))
return _json(await ({
@@ -500,7 +491,7 @@ class Server: # pylint: disable=too-many-instance-attributes
logger.info("Written %d bytes to mass-storage device", written)
return _json({"written": written})
- @_exposed("POST", "/msd/reset", atomic=True)
+ @_exposed("POST", "/msd/reset")
async def __msd_reset_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
await self.__msd.reset()
return _json()
diff --git a/kvmd/apps/kvmd/streamer.py b/kvmd/apps/kvmd/streamer.py
index a62609a9..c8c89c50 100644
--- a/kvmd/apps/kvmd/streamer.py
+++ b/kvmd/apps/kvmd/streamer.py
@@ -34,6 +34,7 @@ import aiohttp
from ...logging import get_logger
+from ... import aiotools
from ... import gpio
from ... import __version__
@@ -152,6 +153,7 @@ class Streamer: # pylint: disable=too-many-instance-attributes
(stdout, _) = await proc.communicate()
return stdout.decode(errors="ignore").strip()
+ @aiotools.atomic
async def cleanup(self) -> None:
if self.is_running():
await self.stop()