diff options
author | Devaev Maxim <[email protected]> | 2019-06-05 06:30:21 +0300 |
---|---|---|
committer | Devaev Maxim <[email protected]> | 2019-06-05 06:30:21 +0300 |
commit | 8aa333ba895e7b498e867f7b430335852a4a9f84 (patch) | |
tree | f7726c35505d5054115cf01192250c3198f3243e | |
parent | 234aa8bda4978a9d0bed70237e7548ea5a051422 (diff) |
better atomic ops
-rw-r--r-- | kvmd/aiotools.py | 48 | ||||
-rw-r--r-- | kvmd/apps/kvmd/atx.py | 13 | ||||
-rw-r--r-- | kvmd/apps/kvmd/auth.py | 3 | ||||
-rw-r--r-- | kvmd/apps/kvmd/hid.py | 4 | ||||
-rw-r--r-- | kvmd/apps/kvmd/msd.py | 13 | ||||
-rw-r--r-- | kvmd/apps/kvmd/server.py | 29 | ||||
-rw-r--r-- | kvmd/apps/kvmd/streamer.py | 2 |
7 files changed, 83 insertions, 29 deletions
diff --git a/kvmd/aiotools.py b/kvmd/aiotools.py new file mode 100644 index 00000000..d0c12537 --- /dev/null +++ b/kvmd/aiotools.py @@ -0,0 +1,48 @@ +# ========================================================================== # +# # +# KVMD - The main Pi-KVM daemon. # +# # +# Copyright (C) 2018 Maxim Devaev <[email protected]> # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see <https://www.gnu.org/licenses/>. # +# # +# ========================================================================== # + + +import asyncio +import functools + +import typing + +from typing import Callable +from typing import TypeVar +from typing import Any + + +# ===== +_AtomicF = TypeVar("_AtomicF", bound=Callable[..., Any]) + + +def atomic(method: _AtomicF) -> _AtomicF: + @functools.wraps(method) + async def wrapper(*args: object, **kwargs: object) -> Any: + return (await asyncio.shield(method(*args, **kwargs))) + return typing.cast(_AtomicF, wrapper) + + +def task(method: Callable[..., Any]) -> Callable[..., asyncio.Task]: + @functools.wraps(method) + async def wrapper(*args: object, **kwargs: object) -> asyncio.Task: + return asyncio.create_task(method(*args, **kwargs)) + return typing.cast(Callable[..., asyncio.Task], wrapper) diff --git a/kvmd/apps/kvmd/atx.py b/kvmd/apps/kvmd/atx.py index 3bf2bd62..861c3dc2 100644 --- a/kvmd/apps/kvmd/atx.py +++ b/kvmd/apps/kvmd/atx.py @@ -30,6 +30,7 @@ from typing import Any from ...logging import get_logger +from ... import aiotools from ... import aioregion from ... import gpio @@ -174,14 +175,10 @@ class Atx: # pylint: disable=too-many-instance-attributes # ===== + @aiotools.task + @aiotools.atomic async def __click(self, pin: int, delay: float) -> None: - self.__region.enter() - asyncio.ensure_future(self.__inner_click(pin, delay)) - - async def __inner_click(self, pin: int, delay: float) -> None: - try: - for flag in (True, False): + with self.__region: + for flag in [True, False]: gpio.write(pin, flag) await asyncio.sleep(delay) - finally: - self.__region.exit() diff --git a/kvmd/apps/kvmd/auth.py b/kvmd/apps/kvmd/auth.py index d1d21db4..6125db29 100644 --- a/kvmd/apps/kvmd/auth.py +++ b/kvmd/apps/kvmd/auth.py @@ -26,6 +26,8 @@ from typing import List from typing import Dict from typing import Optional +from ... import aiotools + from ...logging import get_logger from ...plugins.auth import BaseAuthService @@ -91,6 +93,7 @@ class AuthManager: def check(self, token: str) -> Optional[str]: return self.__tokens.get(token) + @aiotools.atomic async def cleanup(self) -> None: await self.__internal_service.cleanup() if self.__external_service: diff --git a/kvmd/apps/kvmd/hid.py b/kvmd/apps/kvmd/hid.py index dbfd5c58..da1c7d43 100644 --- a/kvmd/apps/kvmd/hid.py +++ b/kvmd/apps/kvmd/hid.py @@ -40,6 +40,7 @@ import setproctitle from ...logging import get_logger +from ... import aiotools from ... import gpio from ... import keymap @@ -164,6 +165,8 @@ class Hid(multiprocessing.Process): # pylint: disable=too-many-instance-attribu yield self.get_state() await asyncio.sleep(self.__state_poll) + @aiotools.task + @aiotools.atomic async def reset(self) -> None: async with self.__lock: gpio.write(self.__reset_pin, True) @@ -187,6 +190,7 @@ class Hid(multiprocessing.Process): # pylint: disable=too-many-instance-attribu async with self.__lock: self.__unsafe_clear_events() + @aiotools.atomic async def cleanup(self) -> None: async with self.__lock: if self.is_alive(): diff --git a/kvmd/apps/kvmd/msd.py b/kvmd/apps/kvmd/msd.py index 7bf9dff1..6033b25a 100644 --- a/kvmd/apps/kvmd/msd.py +++ b/kvmd/apps/kvmd/msd.py @@ -42,6 +42,7 @@ import aiofiles.base from ...logging import get_logger from ... import aioregion +from ... import aiotools from ... import gpio @@ -273,6 +274,7 @@ class MassStorageDevice: # pylint: disable=too-many-instance-attributes else: await asyncio.sleep(60) + @aiotools.atomic async def cleanup(self) -> None: if self._enabled: await self.__close_device_file() @@ -280,6 +282,7 @@ class MassStorageDevice: # pylint: disable=too-many-instance-attributes gpio.write(self.__reset_pin, False) @_msd_working + @aiotools.atomic async def connect_to_kvm(self, initial: bool=False) -> Dict: with self.__region: if self.__device_info: @@ -299,6 +302,7 @@ class MassStorageDevice: # pylint: disable=too-many-instance-attributes return state @_msd_working + @aiotools.atomic async def connect_to_pc(self) -> Dict: with self.__region: if not self.__device_info: @@ -311,15 +315,17 @@ class MassStorageDevice: # pylint: disable=too-many-instance-attributes return state @_msd_working + @aiotools.task + @aiotools.atomic async def reset(self) -> None: with self.__region: get_logger().info("Mass-storage device reset") gpio.write(self.__reset_pin, True) await asyncio.sleep(self.__reset_delay) gpio.write(self.__reset_pin, False) - await self.__state_queue.put(self.get_state()) @_msd_working + @aiotools.atomic async def __aenter__(self) -> "MassStorageDevice": self.__region.enter() try: @@ -332,6 +338,7 @@ class MassStorageDevice: # pylint: disable=too-many-instance-attributes await self.__state_queue.put(self.get_state()) self.__region.exit() + @aiotools.atomic async def write_image_info(self, name: str, complete: bool) -> None: assert self.__device_file assert self.__device_info @@ -344,11 +351,13 @@ class MassStorageDevice: # pylint: disable=too-many-instance-attributes else: get_logger().error("Can't write image info because device is full") + @aiotools.atomic async def write_image_chunk(self, chunk: bytes) -> int: await self.__write_to_device_file(chunk) self.__written += len(chunk) return self.__written + @aiotools.atomic async def __aexit__( self, _exc_type: Type[BaseException], @@ -380,6 +389,6 @@ class MassStorageDevice: # pylint: disable=too-many-instance-attributes await self.__device_file.close() except Exception: get_logger().exception("Can't close mass-storage device file") - await self.reset() + await (await self.reset()) self.__device_file = None self.__written = 0 diff --git a/kvmd/apps/kvmd/server.py b/kvmd/apps/kvmd/server.py index 25111e11..d4c4c13a 100644 --- a/kvmd/apps/kvmd/server.py +++ b/kvmd/apps/kvmd/server.py @@ -155,13 +155,7 @@ _HEADER_AUTH_PASSWD = "X-KVMD-Passwd" _COOKIE_AUTH_TOKEN = "auth_token" -def _atomic(handler: Callable) -> Callable: - async def wrapper(self: "Server", request: aiohttp.web.Request) -> aiohttp.web.Response: - return (await asyncio.shield(handler(self, request))) - return wrapper - - -def _exposed(http_method: str, path: str, atomic: bool=False, auth_required: bool=True) -> Callable: +def _exposed(http_method: str, path: str, auth_required: bool=True) -> Callable: def make_wrapper(handler: Callable) -> Callable: async def wrapper(self: "Server", request: aiohttp.web.Request) -> aiohttp.web.Response: try: @@ -197,9 +191,6 @@ def _exposed(http_method: str, path: str, atomic: bool=False, auth_required: boo except ForbiddenError as err: return _json_exception(err, 403) - if atomic: - wrapper = _atomic(wrapper) - setattr(wrapper, _ATTR_EXPOSED, True) setattr(wrapper, _ATTR_EXPOSED_METHOD, http_method) setattr(wrapper, _ATTR_EXPOSED_PATH, path) @@ -311,7 +302,7 @@ class Server: # pylint: disable=too-many-instance-attributes # ===== AUTH - @_exposed("POST", "/auth/login", atomic=True, auth_required=False) + @_exposed("POST", "/auth/login", auth_required=False) async def __auth_login_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response: credentials = await request.post() token = await self._auth_manager.login( @@ -429,7 +420,7 @@ class Server: # pylint: disable=too-many-instance-attributes async def __hid_state_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response: return _json(self.__hid.get_state()) - @_exposed("POST", "/hid/reset", atomic=True) + @_exposed("POST", "/hid/reset") async def __hid_reset_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response: await self.__hid.reset() return _json() @@ -440,18 +431,18 @@ class Server: # pylint: disable=too-many-instance-attributes async def __atx_state_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response: return _json(self.__atx.get_state()) - @_exposed("POST", "/atx/power", atomic=True) + @_exposed("POST", "/atx/power") async def __atx_power_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response: action = valid_atx_power_action(request.query.get("action")) - done = await ({ + processing = await ({ "on": self.__atx.power_on, "off": self.__atx.power_off, "off_hard": self.__atx.power_off_hard, "reset_hard": self.__atx.power_reset_hard, }[action])() - return _json({"action": action, "done": done}) + return _json({"processing": processing}) - @_exposed("POST", "/atx/click", atomic=True) + @_exposed("POST", "/atx/click") async def __atx_click_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response: button = valid_atx_button(request.query.get("button")) await ({ @@ -459,7 +450,7 @@ class Server: # pylint: disable=too-many-instance-attributes "power_long": self.__atx.click_power_long, "reset": self.__atx.click_reset, }[button])() - return _json({"clicked": button}) + return _json() # ===== MSD @@ -467,7 +458,7 @@ class Server: # pylint: disable=too-many-instance-attributes async def __msd_state_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response: return _json(self.__msd.get_state()) - @_exposed("POST", "/msd/connect", atomic=True) + @_exposed("POST", "/msd/connect") async def __msd_connect_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response: to = valid_kvm_target(request.query.get("to")) return _json(await ({ @@ -500,7 +491,7 @@ class Server: # pylint: disable=too-many-instance-attributes logger.info("Written %d bytes to mass-storage device", written) return _json({"written": written}) - @_exposed("POST", "/msd/reset", atomic=True) + @_exposed("POST", "/msd/reset") async def __msd_reset_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response: await self.__msd.reset() return _json() diff --git a/kvmd/apps/kvmd/streamer.py b/kvmd/apps/kvmd/streamer.py index a62609a9..c8c89c50 100644 --- a/kvmd/apps/kvmd/streamer.py +++ b/kvmd/apps/kvmd/streamer.py @@ -34,6 +34,7 @@ import aiohttp from ...logging import get_logger +from ... import aiotools from ... import gpio from ... import __version__ @@ -152,6 +153,7 @@ class Streamer: # pylint: disable=too-many-instance-attributes (stdout, _) = await proc.communicate() return stdout.decode(errors="ignore").strip() + @aiotools.atomic async def cleanup(self) -> None: if self.is_running(): await self.stop() |