diff options
author | Devaev Maxim <[email protected]> | 2019-12-09 01:21:38 +0300 |
---|---|---|
committer | Devaev Maxim <[email protected]> | 2019-12-09 02:23:05 +0300 |
commit | dd52a85cf6c21c5a7743acad152d8378f1ae0ef4 (patch) | |
tree | b2b4ff6b6e84a572214e0ad1001fe345a33f62c6 /kvmd/apps | |
parent | 272ea08adf8944b385a585a142f91e4d8fccb605 (diff) |
refactoring
Diffstat (limited to 'kvmd/apps')
-rw-r--r-- | kvmd/apps/kvmd/__init__.py | 9 | ||||
-rw-r--r-- | kvmd/apps/kvmd/api/__init__.py | 20 | ||||
-rw-r--r-- | kvmd/apps/kvmd/api/atx.py | 64 | ||||
-rw-r--r-- | kvmd/apps/kvmd/api/hid.py | 93 | ||||
-rw-r--r-- | kvmd/apps/kvmd/api/log.py | 57 | ||||
-rw-r--r-- | kvmd/apps/kvmd/api/msd.py | 106 | ||||
-rw-r--r-- | kvmd/apps/kvmd/api/wol.py | 45 | ||||
-rw-r--r-- | kvmd/apps/kvmd/http.py | 179 | ||||
-rw-r--r-- | kvmd/apps/kvmd/server.py | 562 |
9 files changed, 719 insertions, 416 deletions
diff --git a/kvmd/apps/kvmd/__init__.py b/kvmd/apps/kvmd/__init__.py index 1b498a4e..4275d1a2 100644 --- a/kvmd/apps/kvmd/__init__.py +++ b/kvmd/apps/kvmd/__init__.py @@ -38,7 +38,7 @@ from .info import InfoManager from .logreader import LogReader from .wol import WakeOnLan from .streamer import Streamer -from .server import Server +from .server import KvmdServer # ===== @@ -62,7 +62,7 @@ def main(argv: Optional[List[str]]=None) -> None: config = config.kvmd - Server( + KvmdServer( auth_manager=AuthManager( internal_type=config.auth.internal.type, internal_kwargs=config.auth.internal._unpack(ignore=["type", "force_users"]), @@ -78,6 +78,9 @@ def main(argv: Optional[List[str]]=None) -> None: atx=get_atx_class(config.atx.type)(**config.atx._unpack(ignore=["type"])), msd=get_msd_class(config.msd.type)(**msd_kwargs), streamer=Streamer(**config.streamer._unpack()), - ).run(**config.server._unpack()) + + heartbeat=config.server.heartbeat, + sync_chunk_size=config.server.sync_chunk_size, + ).run(**config.server._unpack(ignore=["heartbeat", "sync_chunk_size"])) get_logger(0).info("Bye-bye") diff --git a/kvmd/apps/kvmd/api/__init__.py b/kvmd/apps/kvmd/api/__init__.py new file mode 100644 index 00000000..1e91f7fa --- /dev/null +++ b/kvmd/apps/kvmd/api/__init__.py @@ -0,0 +1,20 @@ +# ========================================================================== # +# # +# KVMD - The main Pi-KVM daemon. # +# # +# Copyright (C) 2018 Maxim Devaev <[email protected]> # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see <https://www.gnu.org/licenses/>. # +# # +# ========================================================================== # diff --git a/kvmd/apps/kvmd/api/atx.py b/kvmd/apps/kvmd/api/atx.py new file mode 100644 index 00000000..176c4e77 --- /dev/null +++ b/kvmd/apps/kvmd/api/atx.py @@ -0,0 +1,64 @@ +# ========================================================================== # +# # +# KVMD - The main Pi-KVM daemon. # +# # +# Copyright (C) 2018 Maxim Devaev <[email protected]> # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see <https://www.gnu.org/licenses/>. # +# # +# ========================================================================== # + + +import aiohttp.web + +from ....plugins.atx import BaseAtx + +from ....validators.kvm import valid_atx_power_action +from ....validators.kvm import valid_atx_button + +from ..http import exposed_http +from ..http import make_json_response + + +# ===== +class AtxApi: + def __init__(self, atx: BaseAtx) -> None: + self.__atx = atx + + # ===== + + @exposed_http("GET", "/atx") + async def __state_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response: + return make_json_response(self.__atx.get_state()) + + @exposed_http("POST", "/atx/power") + async def __power_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response: + action = valid_atx_power_action(request.query.get("action")) + processing = await ({ + "on": self.__atx.power_on, + "off": self.__atx.power_off, + "off_hard": self.__atx.power_off_hard, + "reset_hard": self.__atx.power_reset_hard, + }[action])() + return make_json_response({"processing": processing}) + + @exposed_http("POST", "/atx/click") + async def __click_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response: + button = valid_atx_button(request.query.get("button")) + await ({ + "power": self.__atx.click_power, + "power_long": self.__atx.click_power_long, + "reset": self.__atx.click_reset, + }[button])() + return make_json_response() diff --git a/kvmd/apps/kvmd/api/hid.py b/kvmd/apps/kvmd/api/hid.py new file mode 100644 index 00000000..4dc952ba --- /dev/null +++ b/kvmd/apps/kvmd/api/hid.py @@ -0,0 +1,93 @@ +# ========================================================================== # +# # +# KVMD - The main Pi-KVM daemon. # +# # +# Copyright (C) 2018 Maxim Devaev <[email protected]> # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see <https://www.gnu.org/licenses/>. # +# # +# ========================================================================== # + + +from typing import Dict + +import aiohttp.web + +from ....plugins.hid import BaseHid + +from ....validators.basic import valid_bool + +from ....validators.kvm import valid_hid_key +from ....validators.kvm import valid_hid_mouse_move +from ....validators.kvm import valid_hid_mouse_button +from ....validators.kvm import valid_hid_mouse_wheel + +from ..http import exposed_http +from ..http import exposed_ws +from ..http import make_json_response + + +# ===== +class HidApi: + def __init__(self, hid: BaseHid) -> None: + self.__hid = hid + + # ===== + + @exposed_http("GET", "/hid/state") + async def __state_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response: + return make_json_response(self.__hid.get_state()) + + @exposed_http("GET", "/hid/reset") + async def __reset_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response: + await self.__hid.reset() + return make_json_response() + + # ===== + + @exposed_ws("key") + async def __ws_key_handler(self, _: aiohttp.web.WebSocketResponse, event: Dict) -> None: + try: + key = valid_hid_key(event["key"]) + state = valid_bool(event["state"]) + except Exception: + return + await self.__hid.send_key_event(key, state) + + @exposed_ws("mouse_button") + async def __ws_mouse_button_handler(self, _: aiohttp.web.WebSocketResponse, event: Dict) -> None: + try: + button = valid_hid_mouse_button(event["button"]) + state = valid_bool(event["state"]) + except Exception: + return + await self.__hid.send_mouse_button_event(button, state) + + @exposed_ws("mouse_move") + async def __ws_mouse_move_handler(self, _: aiohttp.web.WebSocketResponse, event: Dict) -> None: + try: + to_x = valid_hid_mouse_move(event["to"]["x"]) + to_y = valid_hid_mouse_move(event["to"]["y"]) + except Exception: + return + await self.__hid.send_mouse_move_event(to_x, to_y) + + @exposed_ws("mouse_wheel") + async def __ws_mouse_wheel_handler(self, _: aiohttp.web.WebSocketResponse, event: Dict) -> None: + try: + delta_x = valid_hid_mouse_wheel(event["delta"]["x"]) + delta_y = valid_hid_mouse_wheel(event["delta"]["y"]) + except Exception: + return + await self.__hid.send_mouse_wheel_event(delta_x, delta_y) diff --git a/kvmd/apps/kvmd/api/log.py b/kvmd/apps/kvmd/api/log.py new file mode 100644 index 00000000..9a6d8853 --- /dev/null +++ b/kvmd/apps/kvmd/api/log.py @@ -0,0 +1,57 @@ +# ========================================================================== # +# # +# KVMD - The main Pi-KVM daemon. # +# # +# Copyright (C) 2018 Maxim Devaev <[email protected]> # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see <https://www.gnu.org/licenses/>. # +# # +# ========================================================================== # + + +from typing import Optional + +import aiohttp.web + +from ....validators.basic import valid_bool + +from ....validators.kvm import valid_log_seek + +from ..logreader import LogReader + +from ..http import exposed_http + + +# ===== +class LogApi: + def __init__(self, log_reader: LogReader) -> None: + self.__log_reader = log_reader + + # ===== + + @exposed_http("GET", "/log") + async def __log_handler(self, request: aiohttp.web.Request) -> aiohttp.web.StreamResponse: + seek = valid_log_seek(request.query.get("seek", "0")) + follow = valid_bool(request.query.get("follow", "false")) + response: Optional[aiohttp.web.StreamResponse] = None + async for record in self.__log_reader.poll_log(seek, follow): + if response is None: + response = aiohttp.web.StreamResponse(status=200, reason="OK", headers={"Content-Type": "text/plain"}) + await response.prepare(request) + await response.write(("[%s %s] --- %s" % ( + record["dt"].strftime("%Y-%m-%d %H:%M:%S"), + record["service"], + record["msg"], + )).encode("utf-8") + b"\r\n") + return response diff --git a/kvmd/apps/kvmd/api/msd.py b/kvmd/apps/kvmd/api/msd.py new file mode 100644 index 00000000..0ab2e9b4 --- /dev/null +++ b/kvmd/apps/kvmd/api/msd.py @@ -0,0 +1,106 @@ +# ========================================================================== # +# # +# KVMD - The main Pi-KVM daemon. # +# # +# Copyright (C) 2018 Maxim Devaev <[email protected]> # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see <https://www.gnu.org/licenses/>. # +# # +# ========================================================================== # + + +import aiohttp +import aiohttp.web + +from ....logging import get_logger + +from ....plugins.msd import BaseMsd + +from ....validators.basic import valid_bool + +from ....validators.kvm import valid_msd_image_name + +from ..http import exposed_http +from ..http import make_json_response +from ..http import get_multipart_field + + +# ====== +class MsdApi: + def __init__(self, msd: BaseMsd, sync_chunk_size: int) -> None: + self.__msd = msd + self.__sync_chunk_size = sync_chunk_size + + # ===== + + @exposed_http("GET", "/msd") + async def __state_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response: + return make_json_response(await self.__msd.get_state()) + + @exposed_http("POST", "/msd/set_params") + async def __set_params_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response: + params = { + key: validator(request.query.get(param)) + for (param, key, validator) in [ + ("image", "name", (lambda arg: str(arg).strip() and valid_msd_image_name(arg))), + ("cdrom", "cdrom", valid_bool), + ] + if request.query.get(param) is not None + } + await self.__msd.set_params(**params) # type: ignore + return make_json_response() + + @exposed_http("POST", "/msd/connect") + async def __connect_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response: + await self.__msd.connect() + return make_json_response() + + @exposed_http("POST", "/msd/disconnect") + async def __disconnect_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response: + await self.__msd.disconnect() + return make_json_response() + + @exposed_http("POST", "/msd/write") + async def __write_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response: + logger = get_logger(0) + reader = await request.multipart() + name = "" + written = 0 + try: + name_field = await get_multipart_field(reader, "image") + name = valid_msd_image_name((await name_field.read()).decode("utf-8")) + + data_field = await get_multipart_field(reader, "data") + + async with self.__msd.write_image(name): + logger.info("Writing image %r to MSD ...", name) + while True: + chunk = await data_field.read_chunk(self.__sync_chunk_size) + if not chunk: + break + written = await self.__msd.write_image_chunk(chunk) + finally: + if written != 0: + logger.info("Written image %r with size=%d bytes to MSD", name, written) + return make_json_response({"image": {"name": name, "size": written}}) + + @exposed_http("POST", "/msd/remove") + async def __remove_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response: + await self.__msd.remove(valid_msd_image_name(request.query.get("image"))) + return make_json_response() + + @exposed_http("POST", "/msd/reset") + async def __reset_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response: + await self.__msd.reset() + return make_json_response() diff --git a/kvmd/apps/kvmd/api/wol.py b/kvmd/apps/kvmd/api/wol.py new file mode 100644 index 00000000..c919728e --- /dev/null +++ b/kvmd/apps/kvmd/api/wol.py @@ -0,0 +1,45 @@ +# ========================================================================== # +# # +# KVMD - The main Pi-KVM daemon. # +# # +# Copyright (C) 2018 Maxim Devaev <[email protected]> # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see <https://www.gnu.org/licenses/>. # +# # +# ========================================================================== # + + +import aiohttp.web + +from ..wol import WakeOnLan + +from ..http import exposed_http +from ..http import make_json_response + + +# ===== +class WolApi: + def __init__(self, wol: WakeOnLan) -> None: + self.__wol = wol + + # ===== + + @exposed_http("GET", "/wol") + async def __wol_state_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response: + return make_json_response(self.__wol.get_state()) + + @exposed_http("POST", "/wol/wakeup") + async def __wol_wakeup_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response: + await self.__wol.wakeup() + return make_json_response() diff --git a/kvmd/apps/kvmd/http.py b/kvmd/apps/kvmd/http.py new file mode 100644 index 00000000..ce263063 --- /dev/null +++ b/kvmd/apps/kvmd/http.py @@ -0,0 +1,179 @@ +import os +import socket +import dataclasses +import inspect +import json + +from typing import List +from typing import Dict +from typing import Callable +from typing import Optional + +import aiohttp +import aiohttp.web + +from ...logging import get_logger + +from ...validators import ValidatorError + + +# ===== +class HttpError(Exception): + pass + + +class UnauthorizedError(HttpError): + pass + + +class ForbiddenError(HttpError): + pass + + +# ===== [email protected](frozen=True) +class HttpExposed: + method: str + path: str + auth_required: bool + handler: Callable + + +_HTTP_EXPOSED = "_http_exposed" +_HTTP_METHOD = "_http_method" +_HTTP_PATH = "_http_path" +_HTTP_AUTH_REQUIRED = "_http_auth_required" + + +def exposed_http(http_method: str, path: str, auth_required: bool=True) -> Callable: + def set_attrs(handler: Callable) -> Callable: + setattr(handler, _HTTP_EXPOSED, True) + setattr(handler, _HTTP_METHOD, http_method) + setattr(handler, _HTTP_PATH, path) + setattr(handler, _HTTP_AUTH_REQUIRED, auth_required) + return handler + return set_attrs + + +def get_exposed_http(obj: object) -> List[HttpExposed]: + return [ + HttpExposed( + method=getattr(handler, _HTTP_METHOD), + path=getattr(handler, _HTTP_PATH), + auth_required=getattr(handler, _HTTP_AUTH_REQUIRED), + handler=handler, + ) + for name in dir(obj) + if inspect.ismethod(handler := getattr(obj, name)) and getattr(handler, _HTTP_EXPOSED, False) + ] + + +# ===== [email protected](frozen=True) +class WsExposed: + event_type: str + handler: Callable + + +_WS_EXPOSED = "_ws_exposed" +_WS_EVENT_TYPE = "_ws_event_type" + + +def exposed_ws(event_type: str) -> Callable: + def set_attrs(handler: Callable) -> Callable: + setattr(handler, _WS_EXPOSED, True) + setattr(handler, _WS_EVENT_TYPE, event_type) + return handler + return set_attrs + + +def get_exposed_ws(obj: object) -> List[WsExposed]: + return [ + WsExposed( + event_type=getattr(handler, _WS_EVENT_TYPE), + handler=handler, + ) + for name in dir(obj) + if inspect.ismethod(handler := getattr(obj, name)) and getattr(handler, _WS_EXPOSED, False) + ] + + +# ===== +def make_json_response( + result: Optional[Dict]=None, + status: int=200, + set_cookies: Optional[Dict[str, str]]=None, +) -> aiohttp.web.Response: + + response = aiohttp.web.Response( + text=json.dumps({ + "ok": (status == 200), + "result": (result or {}), + }, sort_keys=True, indent=4), + status=status, + content_type="application/json", + ) + if set_cookies: + for (key, value) in set_cookies.items(): + response.set_cookie(key, value) + return response + + +def make_json_exception(err: Exception, status: int) -> aiohttp.web.Response: + name = type(err).__name__ + msg = str(err) + if not isinstance(err, (UnauthorizedError, ForbiddenError)): + get_logger().error("API error: %s: %s", name, msg) + return make_json_response({ + "error": name, + "error_msg": msg, + }, status=status) + + +# ===== +async def get_multipart_field(reader: aiohttp.MultipartReader, name: str) -> aiohttp.BodyPartReader: + field = await reader.next() + if not field or field.name != name: + raise ValidatorError(f"Missing {name!r} field") + return field + + +# ===== +class HttpServer: + def run( + self, + host: str, + port: int, + unix_path: str, + unix_rm: bool, + unix_mode: int, + access_log_format: str, + ) -> None: + + assert port or unix_path + if unix_path: + socket_kwargs: Dict = {} + if unix_rm and os.path.exists(unix_path): + os.remove(unix_path) + server_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + server_socket.bind(unix_path) + if unix_mode: + os.chmod(unix_path, unix_mode) + socket_kwargs = {"sock": server_socket} + else: + socket_kwargs = {"host": host, "port": port} + + aiohttp.web.run_app( + app=self._make_app(), + access_log_format=access_log_format, + print=self.__run_app_print, + **socket_kwargs, + ) + + async def _make_app(self) -> aiohttp.web.Application: + raise NotImplementedError + + def __run_app_print(self, text: str) -> None: + logger = get_logger(0) + for line in text.strip().splitlines(): + logger.info(line.strip()) diff --git a/kvmd/apps/kvmd/server.py b/kvmd/apps/kvmd/server.py index bd40d76f..cd687918 100644 --- a/kvmd/apps/kvmd/server.py +++ b/kvmd/apps/kvmd/server.py @@ -22,9 +22,7 @@ import os import signal -import socket import asyncio -import inspect import json import time @@ -34,7 +32,9 @@ from typing import List from typing import Dict from typing import Set from typing import Callable +from typing import AsyncGenerator from typing import Optional +from typing import Any import aiohttp import aiohttp.web @@ -56,22 +56,12 @@ from ...plugins.msd import BaseMsd from ...validators import ValidatorError -from ...validators.basic import valid_bool - from ...validators.auth import valid_user from ...validators.auth import valid_passwd from ...validators.auth import valid_auth_token -from ...validators.kvm import valid_atx_power_action -from ...validators.kvm import valid_atx_button -from ...validators.kvm import valid_log_seek from ...validators.kvm import valid_stream_quality from ...validators.kvm import valid_stream_fps -from ...validators.kvm import valid_msd_image_name -from ...validators.kvm import valid_hid_key -from ...validators.kvm import valid_hid_mouse_move -from ...validators.kvm import valid_hid_mouse_button -from ...validators.kvm import valid_hid_mouse_wheel from ... import aiotools @@ -85,6 +75,23 @@ from .streamer import Streamer from .wol import WolDisabledError from .wol import WakeOnLan +from .http import UnauthorizedError +from .http import ForbiddenError +from .http import HttpExposed +from .http import exposed_http +from .http import exposed_ws +from .http import get_exposed_http +from .http import get_exposed_ws +from .http import make_json_response +from .http import make_json_exception +from .http import HttpServer + +from .api.log import LogApi +from .api.wol import WolApi +from .api.hid import HidApi +from .api.atx import AtxApi +from .api.msd import MsdApi + # ===== try: @@ -104,125 +111,12 @@ AccessLogger._format_P = staticmethod(_format_P) # type: ignore # pylint: disa # ===== -class HttpError(Exception): - pass - - -class UnauthorizedError(HttpError): - pass - - -class ForbiddenError(HttpError): - pass - - -def _json( - result: Optional[Dict]=None, - status: int=200, - set_cookies: Optional[Dict[str, str]]=None, -) -> aiohttp.web.Response: - - response = aiohttp.web.Response( - text=json.dumps({ - "ok": (status == 200), - "result": (result or {}), - }, sort_keys=True, indent=4), - status=status, - content_type="application/json", - ) - if set_cookies: - for (key, value) in set_cookies.items(): - response.set_cookie(key, value) - return response - - -def _json_exception(err: Exception, status: int) -> aiohttp.web.Response: - name = type(err).__name__ - msg = str(err) - if not isinstance(err, (UnauthorizedError, ForbiddenError)): - get_logger().error("API error: %s: %s", name, msg) - return _json({ - "error": name, - "error_msg": msg, - }, status=status) - - -async def _get_multipart_field(reader: aiohttp.MultipartReader, name: str) -> aiohttp.BodyPartReader: - field = await reader.next() - if not field or field.name != name: - raise ValidatorError(f"Missing {name!r} field") - return field - - -_ATTR_EXPOSED = "_server_exposed" -_ATTR_EXPOSED_METHOD = "_server_exposed_method" -_ATTR_EXPOSED_PATH = "_server_exposed_path" -_ATTR_SYSTEM_TASK = "system_task" - _HEADER_AUTH_USER = "X-KVMD-User" _HEADER_AUTH_PASSWD = "X-KVMD-Passwd" _COOKIE_AUTH_TOKEN = "auth_token" -def _exposed(http_method: str, path: str, auth_required: bool=True) -> Callable: - def make_wrapper(handler: Callable) -> Callable: - async def wrapper(self: "Server", request: aiohttp.web.Request) -> aiohttp.web.Response: - try: - if auth_required: - user = request.headers.get(_HEADER_AUTH_USER, "") - passwd = request.headers.get(_HEADER_AUTH_PASSWD, "") - token = request.cookies.get(_COOKIE_AUTH_TOKEN, "") - - if user: - user = valid_user(user) - setattr(request, _ATTR_KVMD_AUTH_INFO, f"{user} (xhdr)") - if not (await self._auth_manager.authorize(user, valid_passwd(passwd))): - raise ForbiddenError("Forbidden") - - elif token: - user = self._auth_manager.check(valid_auth_token(token)) - if not user: - setattr(request, _ATTR_KVMD_AUTH_INFO, "- (token)") - raise ForbiddenError("Forbidden") - setattr(request, _ATTR_KVMD_AUTH_INFO, f"{user} (token)") - - else: - raise UnauthorizedError("Unauthorized") - - return (await handler(self, request)) - - except (AtxIsBusyError, MsdIsBusyError) as err: - return _json_exception(err, 409) - except (ValidatorError, AtxOperationError, MsdOperationError, WolDisabledError) as err: - return _json_exception(err, 400) - except UnauthorizedError as err: - return _json_exception(err, 401) - except ForbiddenError as err: - return _json_exception(err, 403) - - setattr(wrapper, _ATTR_EXPOSED, True) - setattr(wrapper, _ATTR_EXPOSED_METHOD, http_method) - setattr(wrapper, _ATTR_EXPOSED_PATH, path) - return wrapper - return make_wrapper - - -def _system_task(method: Callable) -> Callable: - async def wrapper(self: "Server") -> None: - try: - await method(self) - raise RuntimeError(f"Dead system task: {method}") - except asyncio.CancelledError: - pass - except Exception: - get_logger().exception("Unhandled exception, killing myself ...") - os.kill(os.getpid(), signal.SIGTERM) - - setattr(wrapper, _ATTR_SYSTEM_TASK, True) - return wrapper - - class _Events(Enum): INFO_STATE = "info_state" WOL_STATE = "wol_state" @@ -232,8 +126,8 @@ class _Events(Enum): STREAMER_STATE = "streamer_state" -class Server: # pylint: disable=too-many-instance-attributes - def __init__( +class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-instance-attributes + def __init__( # pylint: disable=too-many-arguments self, auth_manager: AuthManager, info_manager: InfoManager, @@ -244,11 +138,13 @@ class Server: # pylint: disable=too-many-instance-attributes atx: BaseAtx, msd: BaseMsd, streamer: Streamer, + + heartbeat: float, + sync_chunk_size: int, ) -> None: - self._auth_manager = auth_manager + self.__auth_manager = auth_manager self.__info_manager = info_manager - self.__log_reader = log_reader self.__wol = wol self.__hid = hid @@ -256,8 +152,19 @@ class Server: # pylint: disable=too-many-instance-attributes self.__msd = msd self.__streamer = streamer - self.__heartbeat: Optional[float] = None # Assigned in run() for consistance - self.__sync_chunk_size: Optional[int] = None # Ditto + self.__heartbeat = heartbeat + + self.__apis: List[object] = [ + self, + LogApi(log_reader), + WolApi(wol), + HidApi(hid), + AtxApi(atx), + MsdApi(msd, sync_chunk_size), + ] + + self.__ws_handlers: Dict[str, Callable] = {} + self.__sockets: Set[aiohttp.web.WebSocketResponse] = set() self.__sockets_lock = asyncio.Lock() @@ -266,45 +173,6 @@ class Server: # pylint: disable=too-many-instance-attributes self.__reset_streamer = False self.__streamer_params = streamer.get_params() - def run( - self, - host: str, - port: int, - unix_path: str, - unix_rm: bool, - unix_mode: int, - heartbeat: float, - sync_chunk_size: int, - access_log_format: str, - ) -> None: - - self.__hid.start() - - setproctitle.setproctitle(f"kvmd/main: {setproctitle.getproctitle()}") - - self.__heartbeat = heartbeat - self.__sync_chunk_size = sync_chunk_size - - assert port or unix_path - if unix_path: - socket_kwargs: Dict = {} - if unix_rm and os.path.exists(unix_path): - os.remove(unix_path) - server_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - server_socket.bind(unix_path) - if unix_mode: - os.chmod(unix_path, unix_mode) - socket_kwargs = {"sock": server_socket} - else: - socket_kwargs = {"host": host, "port": port} - - aiohttp.web.run_app( - app=self.__make_app(), - access_log_format=access_log_format, - print=self.__run_app_print, - **socket_kwargs, - ) - async def __make_info(self) -> Dict: return { "version": { @@ -318,66 +186,60 @@ class Server: # pylint: disable=too-many-instance-attributes # ===== AUTH - @_exposed("POST", "/auth/login", auth_required=False) + @exposed_http("POST", "/auth/login", auth_required=False) async def __auth_login_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response: credentials = await request.post() - token = await self._auth_manager.login( + token = await self.__auth_manager.login( user=valid_user(credentials.get("user", "")), passwd=valid_passwd(credentials.get("passwd", "")), ) if token: - return _json({}, set_cookies={_COOKIE_AUTH_TOKEN: token}) + return make_json_response({}, set_cookies={_COOKIE_AUTH_TOKEN: token}) raise ForbiddenError("Forbidden") - @_exposed("POST", "/auth/logout") + @exposed_http("POST", "/auth/logout") async def __auth_logout_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response: token = valid_auth_token(request.cookies.get(_COOKIE_AUTH_TOKEN, "")) - self._auth_manager.logout(token) - return _json({}) + self.__auth_manager.logout(token) + return make_json_response({}) - @_exposed("GET", "/auth/check") + @exposed_http("GET", "/auth/check") async def __auth_check_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response: - return _json({}) + return make_json_response({}) # ===== SYSTEM - @_exposed("GET", "/info") + @exposed_http("GET", "/info") async def __info_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response: - return _json(await self.__make_info()) - - @_exposed("GET", "/log") - async def __log_handler(self, request: aiohttp.web.Request) -> aiohttp.web.StreamResponse: - seek = valid_log_seek(request.query.get("seek", "0")) - follow = valid_bool(request.query.get("follow", "false")) - response: Optional[aiohttp.web.StreamResponse] = None - async for record in self.__log_reader.poll_log(seek, follow): - if response is None: - response = aiohttp.web.StreamResponse(status=200, reason="OK", headers={"Content-Type": "text/plain"}) - await response.prepare(request) - await response.write(("[%s %s] --- %s" % ( - record["dt"].strftime("%Y-%m-%d %H:%M:%S"), - record["service"], - record["msg"], - )).encode("utf-8") + b"\r\n") - return response - - # ===== Wake-on-LAN - - @_exposed("GET", "/wol") - async def __wol_state_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response: - return _json(self.__wol.get_state()) - - @_exposed("POST", "/wol/wakeup") - async def __wol_wakeup_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response: - await self.__wol.wakeup() - return _json() + return make_json_response(await self.__make_info()) + + # ===== STREAMER + + @exposed_http("GET", "/streamer") + async def __streamer_state_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response: + return make_json_response(await self.__streamer.get_state()) + + @exposed_http("POST", "/streamer/set_params") + async def __streamer_set_params_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response: + for (name, validator) in [ + ("quality", valid_stream_quality), + ("desired_fps", valid_stream_fps), + ]: + value = request.query.get(name) + if value: + self.__streamer_params[name] = validator(value) + return make_json_response() + + @exposed_http("POST", "/streamer/reset") + async def __streamer_reset_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response: + self.__reset_streamer = True + return make_json_response() # ===== WEBSOCKET - @_exposed("GET", "/ws") + @exposed_http("GET", "/ws") async def __ws_handler(self, request: aiohttp.web.Request) -> aiohttp.web.WebSocketResponse: logger = get_logger(0) - assert self.__heartbeat is not None ws = aiohttp.web.WebSocketResponse(heartbeat=self.__heartbeat) await ws.prepare(request) await self.__register_socket(ws) @@ -396,203 +258,95 @@ class Server: # pylint: disable=too-many-instance-attributes except Exception as err: logger.error("Can't parse JSON event from websocket: %s", err) else: - event_type = event.get("event_type") - if event_type == "ping": - await ws.send_str(json.dumps({"msg_type": "pong"})) - elif event_type == "key": - await self.__handle_ws_key_event(event) - elif event_type == "mouse_button": - await self.__handle_ws_mouse_button_event(event) - elif event_type == "mouse_move": - await self.__handle_ws_mouse_move_event(event) - elif event_type == "mouse_wheel": - await self.__handle_ws_mouse_wheel_event(event) + handler = self.__ws_handlers.get(event.get("event_type")) + if handler: + await handler(ws, event) else: logger.error("Unknown websocket event: %r", event) else: break return ws - async def __handle_ws_key_event(self, event: Dict) -> None: - try: - key = valid_hid_key(event["key"]) - state = valid_bool(event["state"]) - except Exception: - return - await self.__hid.send_key_event(key, state) - - async def __handle_ws_mouse_button_event(self, event: Dict) -> None: - try: - button = valid_hid_mouse_button(event["button"]) - state = valid_bool(event["state"]) - except Exception: - return - await self.__hid.send_mouse_button_event(button, state) - - async def __handle_ws_mouse_move_event(self, event: Dict) -> None: - try: - to_x = valid_hid_mouse_move(event["to"]["x"]) - to_y = valid_hid_mouse_move(event["to"]["y"]) - except Exception: - return - await self.__hid.send_mouse_move_event(to_x, to_y) - - async def __handle_ws_mouse_wheel_event(self, event: Dict) -> None: - try: - delta_x = valid_hid_mouse_wheel(event["delta"]["x"]) - delta_y = valid_hid_mouse_wheel(event["delta"]["y"]) - except Exception: - return - await self.__hid.send_mouse_wheel_event(delta_x, delta_y) - - # ===== HID - - @_exposed("GET", "/hid") - async def __hid_state_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response: - return _json(self.__hid.get_state()) - - @_exposed("POST", "/hid/reset") - async def __hid_reset_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response: - await self.__hid.reset() - return _json() - - # ===== ATX - - @_exposed("GET", "/atx") - async def __atx_state_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response: - return _json(self.__atx.get_state()) - - @_exposed("POST", "/atx/power") - async def __atx_power_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response: - action = valid_atx_power_action(request.query.get("action")) - processing = await ({ - "on": self.__atx.power_on, - "off": self.__atx.power_off, - "off_hard": self.__atx.power_off_hard, - "reset_hard": self.__atx.power_reset_hard, - }[action])() - return _json({"processing": processing}) - - @_exposed("POST", "/atx/click") - async def __atx_click_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response: - button = valid_atx_button(request.query.get("button")) - await ({ - "power": self.__atx.click_power, - "power_long": self.__atx.click_power_long, - "reset": self.__atx.click_reset, - }[button])() - return _json() - - # ===== MSD - - @_exposed("GET", "/msd") - async def __msd_state_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response: - return _json(await self.__msd.get_state()) - - @_exposed("POST", "/msd/set_params") - async def __msd_set_params_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response: - params = { - key: validator(request.query.get(param)) - for (param, key, validator) in [ - ("image", "name", (lambda arg: str(arg).strip() and valid_msd_image_name(arg))), - ("cdrom", "cdrom", valid_bool), - ] - if request.query.get(param) is not None - } - await self.__msd.set_params(**params) # type: ignore - return _json() - - @_exposed("POST", "/msd/connect") - async def __msd_connect_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response: - await self.__msd.connect() - return _json() - - @_exposed("POST", "/msd/disconnect") - async def __msd_disconnect_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response: - await self.__msd.disconnect() - return _json() - - @_exposed("POST", "/msd/write") - async def __msd_write_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response: - assert self.__sync_chunk_size is not None - logger = get_logger(0) - reader = await request.multipart() - name = "" - written = 0 - try: - name_field = await _get_multipart_field(reader, "image") - name = valid_msd_image_name((await name_field.read()).decode("utf-8")) - - data_field = await _get_multipart_field(reader, "data") - - async with self.__msd.write_image(name): - logger.info("Writing image %r to MSD ...", name) - while True: - chunk = await data_field.read_chunk(self.__sync_chunk_size) - if not chunk: - break - written = await self.__msd.write_image_chunk(chunk) - finally: - if written != 0: - logger.info("Written image %r with size=%d bytes to MSD", name, written) - return _json({"image": {"name": name, "size": written}}) - - @_exposed("POST", "/msd/remove") - async def __msd_remove_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response: - await self.__msd.remove(valid_msd_image_name(request.query.get("image"))) - return _json() - - @_exposed("POST", "/msd/reset") - async def __msd_reset_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response: - await self.__msd.reset() - return _json() - - # ===== STREAMER - - @_exposed("GET", "/streamer") - async def __streamer_state_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response: - return _json(await self.__streamer.get_state()) - - @_exposed("POST", "/streamer/set_params") - async def __streamer_set_params_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response: - for (name, validator) in [ - ("quality", valid_stream_quality), - ("desired_fps", valid_stream_fps), - ]: - value = request.query.get(name) - if value: - self.__streamer_params[name] = validator(value) - return _json() - - @_exposed("POST", "/streamer/reset") - async def __streamer_reset_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response: - self.__reset_streamer = True - return _json() + @exposed_ws("ping") + async def __ws_ping_handler(self, ws: aiohttp.web.WebSocketResponse, _: Dict) -> None: + await ws.send_str(json.dumps({"msg_type": "pong"})) # ===== SYSTEM STUFF - async def __make_app(self) -> aiohttp.web.Application: + def run(self, **kwargs: Any) -> None: # type: ignore # pylint: disable=arguments-differ + self.__hid.start() + setproctitle.setproctitle(f"kvmd/main: {setproctitle.getproctitle()}") + super().run(**kwargs) + + async def _make_app(self) -> aiohttp.web.Application: app = aiohttp.web.Application() app.on_shutdown.append(self.__on_shutdown) app.on_cleanup.append(self.__on_cleanup) - for name in dir(self): - method = getattr(self, name) - if inspect.ismethod(method): - if getattr(method, _ATTR_SYSTEM_TASK, False): - self.__system_tasks.append(asyncio.create_task(method())) - elif getattr(method, _ATTR_EXPOSED, False): - app.router.add_route( - getattr(method, _ATTR_EXPOSED_METHOD), - getattr(method, _ATTR_EXPOSED_PATH), - method, - ) + self.__run_system_task(self.__stream_controller) + self.__run_system_task(self.__poll_dead_sockets) + self.__run_system_task(self.__poll_state, _Events.HID_STATE, self.__hid.poll_state()) + self.__run_system_task(self.__poll_state, _Events.ATX_STATE, self.__atx.poll_state()) + self.__run_system_task(self.__poll_state, _Events.MSD_STATE, self.__msd.poll_state()) + self.__run_system_task(self.__poll_state, _Events.STREAMER_STATE, self.__streamer.poll_state()) + + for api in self.__apis: + for http_exposed in get_exposed_http(api): + self.__add_app_route(app, http_exposed) + for ws_exposed in get_exposed_ws(api): + self.__ws_handlers[ws_exposed.event_type] = ws_exposed.handler + return app - def __run_app_print(self, text: str) -> None: - logger = get_logger() - for line in text.strip().splitlines(): - logger.info(line.strip()) + def __run_system_task(self, method: Callable, *args: Any) -> None: + async def wrapper() -> None: + try: + await method(*args) + raise RuntimeError(f"Dead system task: {method.__name__}" + f"({', '.join(getattr(arg, '__name__', str(arg)) for arg in args)})") + except asyncio.CancelledError: + pass + except Exception: + get_logger().exception("Unhandled exception, killing myself ...") + os.kill(os.getpid(), signal.SIGTERM) + self.__system_tasks.append(asyncio.create_task(wrapper())) + + def __add_app_route(self, app: aiohttp.web.Application, exposed: HttpExposed) -> None: + async def wrapper(request: aiohttp.web.Request) -> aiohttp.web.Response: + try: + if exposed.auth_required: + user = request.headers.get("X-KVMD-User", "") + passwd = request.headers.get("X-KVMD-Passwd", "") + token = request.cookies.get(_COOKIE_AUTH_TOKEN, "") + + if user: + user = valid_user(user) + setattr(request, _ATTR_KVMD_AUTH_INFO, f"{user} (xhdr)") + if not (await self.__auth_manager.authorize(user, valid_passwd(passwd))): + raise ForbiddenError("Forbidden") + + elif token: + user = self.__auth_manager.check(valid_auth_token(token)) + if not user: + setattr(request, _ATTR_KVMD_AUTH_INFO, "- (token)") + raise ForbiddenError("Forbidden") + setattr(request, _ATTR_KVMD_AUTH_INFO, f"{user} (token)") + + else: + raise UnauthorizedError("Unauthorized") + + return (await exposed.handler(request)) + + except (AtxIsBusyError, MsdIsBusyError) as err: + return make_json_exception(err, 409) + except (ValidatorError, AtxOperationError, MsdOperationError, WolDisabledError) as err: + return make_json_exception(err, 400) + except UnauthorizedError as err: + return make_json_exception(err, 401) + except ForbiddenError as err: + return make_json_exception(err, 403) + + app.router.add_route(exposed.method, exposed.path, wrapper) async def __on_shutdown(self, _: aiohttp.web.Application) -> None: logger = get_logger(0) @@ -614,7 +368,7 @@ class Server: # pylint: disable=too-many-instance-attributes async def __on_cleanup(self, _: aiohttp.web.Application) -> None: logger = get_logger(0) for (name, obj) in [ - ("Auth manager", self._auth_manager), + ("Auth manager", self.__auth_manager), ("Streamer", self.__streamer), ("MSD", self.__msd), ("ATX", self.__atx), @@ -663,7 +417,6 @@ class Server: # pylint: disable=too-many-instance-attributes # ===== SYSTEM TASKS - @_system_task async def __stream_controller(self) -> None: prev = 0 shutdown_at = 0.0 @@ -688,7 +441,6 @@ class Server: # pylint: disable=too-many-instance-attributes prev = cur await asyncio.sleep(0.1) - @_system_task async def __poll_dead_sockets(self) -> None: while True: for ws in list(self.__sockets): @@ -696,22 +448,6 @@ class Server: # pylint: disable=too-many-instance-attributes await self.__remove_socket(ws) await asyncio.sleep(0.1) - @_system_task - async def __poll_hid_state(self) -> None: - async for state in self.__hid.poll_state(): - await self.__broadcast_event(_Events.HID_STATE, state) - - @_system_task - async def __poll_atx_state(self) -> None: - async for state in self.__atx.poll_state(): - await self.__broadcast_event(_Events.ATX_STATE, state) - - @_system_task - async def __poll_msd_state(self) -> None: - async for state in self.__msd.poll_state(): - await self.__broadcast_event(_Events.MSD_STATE, state) - - @_system_task - async def __poll_streamer_state(self) -> None: - async for state in self.__streamer.poll_state(): - await self.__broadcast_event(_Events.STREAMER_STATE, state) + async def __poll_state(self, event_type: _Events, poller: AsyncGenerator[Dict, None]) -> None: + async for state in poller: + await self.__broadcast_event(event_type, state) |