summaryrefslogtreecommitdiff
path: root/kvmd
diff options
context:
space:
mode:
authorDevaev Maxim <[email protected]>2019-12-09 01:21:38 +0300
committerDevaev Maxim <[email protected]>2019-12-09 02:23:05 +0300
commitdd52a85cf6c21c5a7743acad152d8378f1ae0ef4 (patch)
treeb2b4ff6b6e84a572214e0ad1001fe345a33f62c6 /kvmd
parent272ea08adf8944b385a585a142f91e4d8fccb605 (diff)
refactoring
Diffstat (limited to 'kvmd')
-rw-r--r--kvmd/apps/kvmd/__init__.py9
-rw-r--r--kvmd/apps/kvmd/api/__init__.py20
-rw-r--r--kvmd/apps/kvmd/api/atx.py64
-rw-r--r--kvmd/apps/kvmd/api/hid.py93
-rw-r--r--kvmd/apps/kvmd/api/log.py57
-rw-r--r--kvmd/apps/kvmd/api/msd.py106
-rw-r--r--kvmd/apps/kvmd/api/wol.py45
-rw-r--r--kvmd/apps/kvmd/http.py179
-rw-r--r--kvmd/apps/kvmd/server.py562
9 files changed, 719 insertions, 416 deletions
diff --git a/kvmd/apps/kvmd/__init__.py b/kvmd/apps/kvmd/__init__.py
index 1b498a4e..4275d1a2 100644
--- a/kvmd/apps/kvmd/__init__.py
+++ b/kvmd/apps/kvmd/__init__.py
@@ -38,7 +38,7 @@ from .info import InfoManager
from .logreader import LogReader
from .wol import WakeOnLan
from .streamer import Streamer
-from .server import Server
+from .server import KvmdServer
# =====
@@ -62,7 +62,7 @@ def main(argv: Optional[List[str]]=None) -> None:
config = config.kvmd
- Server(
+ KvmdServer(
auth_manager=AuthManager(
internal_type=config.auth.internal.type,
internal_kwargs=config.auth.internal._unpack(ignore=["type", "force_users"]),
@@ -78,6 +78,9 @@ def main(argv: Optional[List[str]]=None) -> None:
atx=get_atx_class(config.atx.type)(**config.atx._unpack(ignore=["type"])),
msd=get_msd_class(config.msd.type)(**msd_kwargs),
streamer=Streamer(**config.streamer._unpack()),
- ).run(**config.server._unpack())
+
+ heartbeat=config.server.heartbeat,
+ sync_chunk_size=config.server.sync_chunk_size,
+ ).run(**config.server._unpack(ignore=["heartbeat", "sync_chunk_size"]))
get_logger(0).info("Bye-bye")
diff --git a/kvmd/apps/kvmd/api/__init__.py b/kvmd/apps/kvmd/api/__init__.py
new file mode 100644
index 00000000..1e91f7fa
--- /dev/null
+++ b/kvmd/apps/kvmd/api/__init__.py
@@ -0,0 +1,20 @@
+# ========================================================================== #
+# #
+# KVMD - The main Pi-KVM daemon. #
+# #
+# Copyright (C) 2018 Maxim Devaev <[email protected]> #
+# #
+# This program is free software: you can redistribute it and/or modify #
+# it under the terms of the GNU General Public License as published by #
+# the Free Software Foundation, either version 3 of the License, or #
+# (at your option) any later version. #
+# #
+# This program is distributed in the hope that it will be useful, #
+# but WITHOUT ANY WARRANTY; without even the implied warranty of #
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
+# GNU General Public License for more details. #
+# #
+# You should have received a copy of the GNU General Public License #
+# along with this program. If not, see <https://www.gnu.org/licenses/>. #
+# #
+# ========================================================================== #
diff --git a/kvmd/apps/kvmd/api/atx.py b/kvmd/apps/kvmd/api/atx.py
new file mode 100644
index 00000000..176c4e77
--- /dev/null
+++ b/kvmd/apps/kvmd/api/atx.py
@@ -0,0 +1,64 @@
+# ========================================================================== #
+# #
+# KVMD - The main Pi-KVM daemon. #
+# #
+# Copyright (C) 2018 Maxim Devaev <[email protected]> #
+# #
+# This program is free software: you can redistribute it and/or modify #
+# it under the terms of the GNU General Public License as published by #
+# the Free Software Foundation, either version 3 of the License, or #
+# (at your option) any later version. #
+# #
+# This program is distributed in the hope that it will be useful, #
+# but WITHOUT ANY WARRANTY; without even the implied warranty of #
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
+# GNU General Public License for more details. #
+# #
+# You should have received a copy of the GNU General Public License #
+# along with this program. If not, see <https://www.gnu.org/licenses/>. #
+# #
+# ========================================================================== #
+
+
+import aiohttp.web
+
+from ....plugins.atx import BaseAtx
+
+from ....validators.kvm import valid_atx_power_action
+from ....validators.kvm import valid_atx_button
+
+from ..http import exposed_http
+from ..http import make_json_response
+
+
+# =====
+class AtxApi:
+ def __init__(self, atx: BaseAtx) -> None:
+ self.__atx = atx
+
+ # =====
+
+ @exposed_http("GET", "/atx")
+ async def __state_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
+ return make_json_response(self.__atx.get_state())
+
+ @exposed_http("POST", "/atx/power")
+ async def __power_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response:
+ action = valid_atx_power_action(request.query.get("action"))
+ processing = await ({
+ "on": self.__atx.power_on,
+ "off": self.__atx.power_off,
+ "off_hard": self.__atx.power_off_hard,
+ "reset_hard": self.__atx.power_reset_hard,
+ }[action])()
+ return make_json_response({"processing": processing})
+
+ @exposed_http("POST", "/atx/click")
+ async def __click_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response:
+ button = valid_atx_button(request.query.get("button"))
+ await ({
+ "power": self.__atx.click_power,
+ "power_long": self.__atx.click_power_long,
+ "reset": self.__atx.click_reset,
+ }[button])()
+ return make_json_response()
diff --git a/kvmd/apps/kvmd/api/hid.py b/kvmd/apps/kvmd/api/hid.py
new file mode 100644
index 00000000..4dc952ba
--- /dev/null
+++ b/kvmd/apps/kvmd/api/hid.py
@@ -0,0 +1,93 @@
+# ========================================================================== #
+# #
+# KVMD - The main Pi-KVM daemon. #
+# #
+# Copyright (C) 2018 Maxim Devaev <[email protected]> #
+# #
+# This program is free software: you can redistribute it and/or modify #
+# it under the terms of the GNU General Public License as published by #
+# the Free Software Foundation, either version 3 of the License, or #
+# (at your option) any later version. #
+# #
+# This program is distributed in the hope that it will be useful, #
+# but WITHOUT ANY WARRANTY; without even the implied warranty of #
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
+# GNU General Public License for more details. #
+# #
+# You should have received a copy of the GNU General Public License #
+# along with this program. If not, see <https://www.gnu.org/licenses/>. #
+# #
+# ========================================================================== #
+
+
+from typing import Dict
+
+import aiohttp.web
+
+from ....plugins.hid import BaseHid
+
+from ....validators.basic import valid_bool
+
+from ....validators.kvm import valid_hid_key
+from ....validators.kvm import valid_hid_mouse_move
+from ....validators.kvm import valid_hid_mouse_button
+from ....validators.kvm import valid_hid_mouse_wheel
+
+from ..http import exposed_http
+from ..http import exposed_ws
+from ..http import make_json_response
+
+
+# =====
+class HidApi:
+ def __init__(self, hid: BaseHid) -> None:
+ self.__hid = hid
+
+ # =====
+
+ @exposed_http("GET", "/hid/state")
+ async def __state_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
+ return make_json_response(self.__hid.get_state())
+
+ @exposed_http("GET", "/hid/reset")
+ async def __reset_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
+ await self.__hid.reset()
+ return make_json_response()
+
+ # =====
+
+ @exposed_ws("key")
+ async def __ws_key_handler(self, _: aiohttp.web.WebSocketResponse, event: Dict) -> None:
+ try:
+ key = valid_hid_key(event["key"])
+ state = valid_bool(event["state"])
+ except Exception:
+ return
+ await self.__hid.send_key_event(key, state)
+
+ @exposed_ws("mouse_button")
+ async def __ws_mouse_button_handler(self, _: aiohttp.web.WebSocketResponse, event: Dict) -> None:
+ try:
+ button = valid_hid_mouse_button(event["button"])
+ state = valid_bool(event["state"])
+ except Exception:
+ return
+ await self.__hid.send_mouse_button_event(button, state)
+
+ @exposed_ws("mouse_move")
+ async def __ws_mouse_move_handler(self, _: aiohttp.web.WebSocketResponse, event: Dict) -> None:
+ try:
+ to_x = valid_hid_mouse_move(event["to"]["x"])
+ to_y = valid_hid_mouse_move(event["to"]["y"])
+ except Exception:
+ return
+ await self.__hid.send_mouse_move_event(to_x, to_y)
+
+ @exposed_ws("mouse_wheel")
+ async def __ws_mouse_wheel_handler(self, _: aiohttp.web.WebSocketResponse, event: Dict) -> None:
+ try:
+ delta_x = valid_hid_mouse_wheel(event["delta"]["x"])
+ delta_y = valid_hid_mouse_wheel(event["delta"]["y"])
+ except Exception:
+ return
+ await self.__hid.send_mouse_wheel_event(delta_x, delta_y)
diff --git a/kvmd/apps/kvmd/api/log.py b/kvmd/apps/kvmd/api/log.py
new file mode 100644
index 00000000..9a6d8853
--- /dev/null
+++ b/kvmd/apps/kvmd/api/log.py
@@ -0,0 +1,57 @@
+# ========================================================================== #
+# #
+# KVMD - The main Pi-KVM daemon. #
+# #
+# Copyright (C) 2018 Maxim Devaev <[email protected]> #
+# #
+# This program is free software: you can redistribute it and/or modify #
+# it under the terms of the GNU General Public License as published by #
+# the Free Software Foundation, either version 3 of the License, or #
+# (at your option) any later version. #
+# #
+# This program is distributed in the hope that it will be useful, #
+# but WITHOUT ANY WARRANTY; without even the implied warranty of #
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
+# GNU General Public License for more details. #
+# #
+# You should have received a copy of the GNU General Public License #
+# along with this program. If not, see <https://www.gnu.org/licenses/>. #
+# #
+# ========================================================================== #
+
+
+from typing import Optional
+
+import aiohttp.web
+
+from ....validators.basic import valid_bool
+
+from ....validators.kvm import valid_log_seek
+
+from ..logreader import LogReader
+
+from ..http import exposed_http
+
+
+# =====
+class LogApi:
+ def __init__(self, log_reader: LogReader) -> None:
+ self.__log_reader = log_reader
+
+ # =====
+
+ @exposed_http("GET", "/log")
+ async def __log_handler(self, request: aiohttp.web.Request) -> aiohttp.web.StreamResponse:
+ seek = valid_log_seek(request.query.get("seek", "0"))
+ follow = valid_bool(request.query.get("follow", "false"))
+ response: Optional[aiohttp.web.StreamResponse] = None
+ async for record in self.__log_reader.poll_log(seek, follow):
+ if response is None:
+ response = aiohttp.web.StreamResponse(status=200, reason="OK", headers={"Content-Type": "text/plain"})
+ await response.prepare(request)
+ await response.write(("[%s %s] --- %s" % (
+ record["dt"].strftime("%Y-%m-%d %H:%M:%S"),
+ record["service"],
+ record["msg"],
+ )).encode("utf-8") + b"\r\n")
+ return response
diff --git a/kvmd/apps/kvmd/api/msd.py b/kvmd/apps/kvmd/api/msd.py
new file mode 100644
index 00000000..0ab2e9b4
--- /dev/null
+++ b/kvmd/apps/kvmd/api/msd.py
@@ -0,0 +1,106 @@
+# ========================================================================== #
+# #
+# KVMD - The main Pi-KVM daemon. #
+# #
+# Copyright (C) 2018 Maxim Devaev <[email protected]> #
+# #
+# This program is free software: you can redistribute it and/or modify #
+# it under the terms of the GNU General Public License as published by #
+# the Free Software Foundation, either version 3 of the License, or #
+# (at your option) any later version. #
+# #
+# This program is distributed in the hope that it will be useful, #
+# but WITHOUT ANY WARRANTY; without even the implied warranty of #
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
+# GNU General Public License for more details. #
+# #
+# You should have received a copy of the GNU General Public License #
+# along with this program. If not, see <https://www.gnu.org/licenses/>. #
+# #
+# ========================================================================== #
+
+
+import aiohttp
+import aiohttp.web
+
+from ....logging import get_logger
+
+from ....plugins.msd import BaseMsd
+
+from ....validators.basic import valid_bool
+
+from ....validators.kvm import valid_msd_image_name
+
+from ..http import exposed_http
+from ..http import make_json_response
+from ..http import get_multipart_field
+
+
+# ======
+class MsdApi:
+ def __init__(self, msd: BaseMsd, sync_chunk_size: int) -> None:
+ self.__msd = msd
+ self.__sync_chunk_size = sync_chunk_size
+
+ # =====
+
+ @exposed_http("GET", "/msd")
+ async def __state_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
+ return make_json_response(await self.__msd.get_state())
+
+ @exposed_http("POST", "/msd/set_params")
+ async def __set_params_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response:
+ params = {
+ key: validator(request.query.get(param))
+ for (param, key, validator) in [
+ ("image", "name", (lambda arg: str(arg).strip() and valid_msd_image_name(arg))),
+ ("cdrom", "cdrom", valid_bool),
+ ]
+ if request.query.get(param) is not None
+ }
+ await self.__msd.set_params(**params) # type: ignore
+ return make_json_response()
+
+ @exposed_http("POST", "/msd/connect")
+ async def __connect_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
+ await self.__msd.connect()
+ return make_json_response()
+
+ @exposed_http("POST", "/msd/disconnect")
+ async def __disconnect_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
+ await self.__msd.disconnect()
+ return make_json_response()
+
+ @exposed_http("POST", "/msd/write")
+ async def __write_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response:
+ logger = get_logger(0)
+ reader = await request.multipart()
+ name = ""
+ written = 0
+ try:
+ name_field = await get_multipart_field(reader, "image")
+ name = valid_msd_image_name((await name_field.read()).decode("utf-8"))
+
+ data_field = await get_multipart_field(reader, "data")
+
+ async with self.__msd.write_image(name):
+ logger.info("Writing image %r to MSD ...", name)
+ while True:
+ chunk = await data_field.read_chunk(self.__sync_chunk_size)
+ if not chunk:
+ break
+ written = await self.__msd.write_image_chunk(chunk)
+ finally:
+ if written != 0:
+ logger.info("Written image %r with size=%d bytes to MSD", name, written)
+ return make_json_response({"image": {"name": name, "size": written}})
+
+ @exposed_http("POST", "/msd/remove")
+ async def __remove_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response:
+ await self.__msd.remove(valid_msd_image_name(request.query.get("image")))
+ return make_json_response()
+
+ @exposed_http("POST", "/msd/reset")
+ async def __reset_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
+ await self.__msd.reset()
+ return make_json_response()
diff --git a/kvmd/apps/kvmd/api/wol.py b/kvmd/apps/kvmd/api/wol.py
new file mode 100644
index 00000000..c919728e
--- /dev/null
+++ b/kvmd/apps/kvmd/api/wol.py
@@ -0,0 +1,45 @@
+# ========================================================================== #
+# #
+# KVMD - The main Pi-KVM daemon. #
+# #
+# Copyright (C) 2018 Maxim Devaev <[email protected]> #
+# #
+# This program is free software: you can redistribute it and/or modify #
+# it under the terms of the GNU General Public License as published by #
+# the Free Software Foundation, either version 3 of the License, or #
+# (at your option) any later version. #
+# #
+# This program is distributed in the hope that it will be useful, #
+# but WITHOUT ANY WARRANTY; without even the implied warranty of #
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
+# GNU General Public License for more details. #
+# #
+# You should have received a copy of the GNU General Public License #
+# along with this program. If not, see <https://www.gnu.org/licenses/>. #
+# #
+# ========================================================================== #
+
+
+import aiohttp.web
+
+from ..wol import WakeOnLan
+
+from ..http import exposed_http
+from ..http import make_json_response
+
+
+# =====
+class WolApi:
+ def __init__(self, wol: WakeOnLan) -> None:
+ self.__wol = wol
+
+ # =====
+
+ @exposed_http("GET", "/wol")
+ async def __wol_state_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
+ return make_json_response(self.__wol.get_state())
+
+ @exposed_http("POST", "/wol/wakeup")
+ async def __wol_wakeup_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
+ await self.__wol.wakeup()
+ return make_json_response()
diff --git a/kvmd/apps/kvmd/http.py b/kvmd/apps/kvmd/http.py
new file mode 100644
index 00000000..ce263063
--- /dev/null
+++ b/kvmd/apps/kvmd/http.py
@@ -0,0 +1,179 @@
+import os
+import socket
+import dataclasses
+import inspect
+import json
+
+from typing import List
+from typing import Dict
+from typing import Callable
+from typing import Optional
+
+import aiohttp
+import aiohttp.web
+
+from ...logging import get_logger
+
+from ...validators import ValidatorError
+
+
+# =====
+class HttpError(Exception):
+ pass
+
+
+class UnauthorizedError(HttpError):
+ pass
+
+
+class ForbiddenError(HttpError):
+ pass
+
+
+# =====
[email protected](frozen=True)
+class HttpExposed:
+ method: str
+ path: str
+ auth_required: bool
+ handler: Callable
+
+
+_HTTP_EXPOSED = "_http_exposed"
+_HTTP_METHOD = "_http_method"
+_HTTP_PATH = "_http_path"
+_HTTP_AUTH_REQUIRED = "_http_auth_required"
+
+
+def exposed_http(http_method: str, path: str, auth_required: bool=True) -> Callable:
+ def set_attrs(handler: Callable) -> Callable:
+ setattr(handler, _HTTP_EXPOSED, True)
+ setattr(handler, _HTTP_METHOD, http_method)
+ setattr(handler, _HTTP_PATH, path)
+ setattr(handler, _HTTP_AUTH_REQUIRED, auth_required)
+ return handler
+ return set_attrs
+
+
+def get_exposed_http(obj: object) -> List[HttpExposed]:
+ return [
+ HttpExposed(
+ method=getattr(handler, _HTTP_METHOD),
+ path=getattr(handler, _HTTP_PATH),
+ auth_required=getattr(handler, _HTTP_AUTH_REQUIRED),
+ handler=handler,
+ )
+ for name in dir(obj)
+ if inspect.ismethod(handler := getattr(obj, name)) and getattr(handler, _HTTP_EXPOSED, False)
+ ]
+
+
+# =====
[email protected](frozen=True)
+class WsExposed:
+ event_type: str
+ handler: Callable
+
+
+_WS_EXPOSED = "_ws_exposed"
+_WS_EVENT_TYPE = "_ws_event_type"
+
+
+def exposed_ws(event_type: str) -> Callable:
+ def set_attrs(handler: Callable) -> Callable:
+ setattr(handler, _WS_EXPOSED, True)
+ setattr(handler, _WS_EVENT_TYPE, event_type)
+ return handler
+ return set_attrs
+
+
+def get_exposed_ws(obj: object) -> List[WsExposed]:
+ return [
+ WsExposed(
+ event_type=getattr(handler, _WS_EVENT_TYPE),
+ handler=handler,
+ )
+ for name in dir(obj)
+ if inspect.ismethod(handler := getattr(obj, name)) and getattr(handler, _WS_EXPOSED, False)
+ ]
+
+
+# =====
+def make_json_response(
+ result: Optional[Dict]=None,
+ status: int=200,
+ set_cookies: Optional[Dict[str, str]]=None,
+) -> aiohttp.web.Response:
+
+ response = aiohttp.web.Response(
+ text=json.dumps({
+ "ok": (status == 200),
+ "result": (result or {}),
+ }, sort_keys=True, indent=4),
+ status=status,
+ content_type="application/json",
+ )
+ if set_cookies:
+ for (key, value) in set_cookies.items():
+ response.set_cookie(key, value)
+ return response
+
+
+def make_json_exception(err: Exception, status: int) -> aiohttp.web.Response:
+ name = type(err).__name__
+ msg = str(err)
+ if not isinstance(err, (UnauthorizedError, ForbiddenError)):
+ get_logger().error("API error: %s: %s", name, msg)
+ return make_json_response({
+ "error": name,
+ "error_msg": msg,
+ }, status=status)
+
+
+# =====
+async def get_multipart_field(reader: aiohttp.MultipartReader, name: str) -> aiohttp.BodyPartReader:
+ field = await reader.next()
+ if not field or field.name != name:
+ raise ValidatorError(f"Missing {name!r} field")
+ return field
+
+
+# =====
+class HttpServer:
+ def run(
+ self,
+ host: str,
+ port: int,
+ unix_path: str,
+ unix_rm: bool,
+ unix_mode: int,
+ access_log_format: str,
+ ) -> None:
+
+ assert port or unix_path
+ if unix_path:
+ socket_kwargs: Dict = {}
+ if unix_rm and os.path.exists(unix_path):
+ os.remove(unix_path)
+ server_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ server_socket.bind(unix_path)
+ if unix_mode:
+ os.chmod(unix_path, unix_mode)
+ socket_kwargs = {"sock": server_socket}
+ else:
+ socket_kwargs = {"host": host, "port": port}
+
+ aiohttp.web.run_app(
+ app=self._make_app(),
+ access_log_format=access_log_format,
+ print=self.__run_app_print,
+ **socket_kwargs,
+ )
+
+ async def _make_app(self) -> aiohttp.web.Application:
+ raise NotImplementedError
+
+ def __run_app_print(self, text: str) -> None:
+ logger = get_logger(0)
+ for line in text.strip().splitlines():
+ logger.info(line.strip())
diff --git a/kvmd/apps/kvmd/server.py b/kvmd/apps/kvmd/server.py
index bd40d76f..cd687918 100644
--- a/kvmd/apps/kvmd/server.py
+++ b/kvmd/apps/kvmd/server.py
@@ -22,9 +22,7 @@
import os
import signal
-import socket
import asyncio
-import inspect
import json
import time
@@ -34,7 +32,9 @@ from typing import List
from typing import Dict
from typing import Set
from typing import Callable
+from typing import AsyncGenerator
from typing import Optional
+from typing import Any
import aiohttp
import aiohttp.web
@@ -56,22 +56,12 @@ from ...plugins.msd import BaseMsd
from ...validators import ValidatorError
-from ...validators.basic import valid_bool
-
from ...validators.auth import valid_user
from ...validators.auth import valid_passwd
from ...validators.auth import valid_auth_token
-from ...validators.kvm import valid_atx_power_action
-from ...validators.kvm import valid_atx_button
-from ...validators.kvm import valid_log_seek
from ...validators.kvm import valid_stream_quality
from ...validators.kvm import valid_stream_fps
-from ...validators.kvm import valid_msd_image_name
-from ...validators.kvm import valid_hid_key
-from ...validators.kvm import valid_hid_mouse_move
-from ...validators.kvm import valid_hid_mouse_button
-from ...validators.kvm import valid_hid_mouse_wheel
from ... import aiotools
@@ -85,6 +75,23 @@ from .streamer import Streamer
from .wol import WolDisabledError
from .wol import WakeOnLan
+from .http import UnauthorizedError
+from .http import ForbiddenError
+from .http import HttpExposed
+from .http import exposed_http
+from .http import exposed_ws
+from .http import get_exposed_http
+from .http import get_exposed_ws
+from .http import make_json_response
+from .http import make_json_exception
+from .http import HttpServer
+
+from .api.log import LogApi
+from .api.wol import WolApi
+from .api.hid import HidApi
+from .api.atx import AtxApi
+from .api.msd import MsdApi
+
# =====
try:
@@ -104,125 +111,12 @@ AccessLogger._format_P = staticmethod(_format_P) # type: ignore # pylint: disa
# =====
-class HttpError(Exception):
- pass
-
-
-class UnauthorizedError(HttpError):
- pass
-
-
-class ForbiddenError(HttpError):
- pass
-
-
-def _json(
- result: Optional[Dict]=None,
- status: int=200,
- set_cookies: Optional[Dict[str, str]]=None,
-) -> aiohttp.web.Response:
-
- response = aiohttp.web.Response(
- text=json.dumps({
- "ok": (status == 200),
- "result": (result or {}),
- }, sort_keys=True, indent=4),
- status=status,
- content_type="application/json",
- )
- if set_cookies:
- for (key, value) in set_cookies.items():
- response.set_cookie(key, value)
- return response
-
-
-def _json_exception(err: Exception, status: int) -> aiohttp.web.Response:
- name = type(err).__name__
- msg = str(err)
- if not isinstance(err, (UnauthorizedError, ForbiddenError)):
- get_logger().error("API error: %s: %s", name, msg)
- return _json({
- "error": name,
- "error_msg": msg,
- }, status=status)
-
-
-async def _get_multipart_field(reader: aiohttp.MultipartReader, name: str) -> aiohttp.BodyPartReader:
- field = await reader.next()
- if not field or field.name != name:
- raise ValidatorError(f"Missing {name!r} field")
- return field
-
-
-_ATTR_EXPOSED = "_server_exposed"
-_ATTR_EXPOSED_METHOD = "_server_exposed_method"
-_ATTR_EXPOSED_PATH = "_server_exposed_path"
-_ATTR_SYSTEM_TASK = "system_task"
-
_HEADER_AUTH_USER = "X-KVMD-User"
_HEADER_AUTH_PASSWD = "X-KVMD-Passwd"
_COOKIE_AUTH_TOKEN = "auth_token"
-def _exposed(http_method: str, path: str, auth_required: bool=True) -> Callable:
- def make_wrapper(handler: Callable) -> Callable:
- async def wrapper(self: "Server", request: aiohttp.web.Request) -> aiohttp.web.Response:
- try:
- if auth_required:
- user = request.headers.get(_HEADER_AUTH_USER, "")
- passwd = request.headers.get(_HEADER_AUTH_PASSWD, "")
- token = request.cookies.get(_COOKIE_AUTH_TOKEN, "")
-
- if user:
- user = valid_user(user)
- setattr(request, _ATTR_KVMD_AUTH_INFO, f"{user} (xhdr)")
- if not (await self._auth_manager.authorize(user, valid_passwd(passwd))):
- raise ForbiddenError("Forbidden")
-
- elif token:
- user = self._auth_manager.check(valid_auth_token(token))
- if not user:
- setattr(request, _ATTR_KVMD_AUTH_INFO, "- (token)")
- raise ForbiddenError("Forbidden")
- setattr(request, _ATTR_KVMD_AUTH_INFO, f"{user} (token)")
-
- else:
- raise UnauthorizedError("Unauthorized")
-
- return (await handler(self, request))
-
- except (AtxIsBusyError, MsdIsBusyError) as err:
- return _json_exception(err, 409)
- except (ValidatorError, AtxOperationError, MsdOperationError, WolDisabledError) as err:
- return _json_exception(err, 400)
- except UnauthorizedError as err:
- return _json_exception(err, 401)
- except ForbiddenError as err:
- return _json_exception(err, 403)
-
- setattr(wrapper, _ATTR_EXPOSED, True)
- setattr(wrapper, _ATTR_EXPOSED_METHOD, http_method)
- setattr(wrapper, _ATTR_EXPOSED_PATH, path)
- return wrapper
- return make_wrapper
-
-
-def _system_task(method: Callable) -> Callable:
- async def wrapper(self: "Server") -> None:
- try:
- await method(self)
- raise RuntimeError(f"Dead system task: {method}")
- except asyncio.CancelledError:
- pass
- except Exception:
- get_logger().exception("Unhandled exception, killing myself ...")
- os.kill(os.getpid(), signal.SIGTERM)
-
- setattr(wrapper, _ATTR_SYSTEM_TASK, True)
- return wrapper
-
-
class _Events(Enum):
INFO_STATE = "info_state"
WOL_STATE = "wol_state"
@@ -232,8 +126,8 @@ class _Events(Enum):
STREAMER_STATE = "streamer_state"
-class Server: # pylint: disable=too-many-instance-attributes
- def __init__(
+class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-instance-attributes
+ def __init__( # pylint: disable=too-many-arguments
self,
auth_manager: AuthManager,
info_manager: InfoManager,
@@ -244,11 +138,13 @@ class Server: # pylint: disable=too-many-instance-attributes
atx: BaseAtx,
msd: BaseMsd,
streamer: Streamer,
+
+ heartbeat: float,
+ sync_chunk_size: int,
) -> None:
- self._auth_manager = auth_manager
+ self.__auth_manager = auth_manager
self.__info_manager = info_manager
- self.__log_reader = log_reader
self.__wol = wol
self.__hid = hid
@@ -256,8 +152,19 @@ class Server: # pylint: disable=too-many-instance-attributes
self.__msd = msd
self.__streamer = streamer
- self.__heartbeat: Optional[float] = None # Assigned in run() for consistance
- self.__sync_chunk_size: Optional[int] = None # Ditto
+ self.__heartbeat = heartbeat
+
+ self.__apis: List[object] = [
+ self,
+ LogApi(log_reader),
+ WolApi(wol),
+ HidApi(hid),
+ AtxApi(atx),
+ MsdApi(msd, sync_chunk_size),
+ ]
+
+ self.__ws_handlers: Dict[str, Callable] = {}
+
self.__sockets: Set[aiohttp.web.WebSocketResponse] = set()
self.__sockets_lock = asyncio.Lock()
@@ -266,45 +173,6 @@ class Server: # pylint: disable=too-many-instance-attributes
self.__reset_streamer = False
self.__streamer_params = streamer.get_params()
- def run(
- self,
- host: str,
- port: int,
- unix_path: str,
- unix_rm: bool,
- unix_mode: int,
- heartbeat: float,
- sync_chunk_size: int,
- access_log_format: str,
- ) -> None:
-
- self.__hid.start()
-
- setproctitle.setproctitle(f"kvmd/main: {setproctitle.getproctitle()}")
-
- self.__heartbeat = heartbeat
- self.__sync_chunk_size = sync_chunk_size
-
- assert port or unix_path
- if unix_path:
- socket_kwargs: Dict = {}
- if unix_rm and os.path.exists(unix_path):
- os.remove(unix_path)
- server_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
- server_socket.bind(unix_path)
- if unix_mode:
- os.chmod(unix_path, unix_mode)
- socket_kwargs = {"sock": server_socket}
- else:
- socket_kwargs = {"host": host, "port": port}
-
- aiohttp.web.run_app(
- app=self.__make_app(),
- access_log_format=access_log_format,
- print=self.__run_app_print,
- **socket_kwargs,
- )
-
async def __make_info(self) -> Dict:
return {
"version": {
@@ -318,66 +186,60 @@ class Server: # pylint: disable=too-many-instance-attributes
# ===== AUTH
- @_exposed("POST", "/auth/login", auth_required=False)
+ @exposed_http("POST", "/auth/login", auth_required=False)
async def __auth_login_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response:
credentials = await request.post()
- token = await self._auth_manager.login(
+ token = await self.__auth_manager.login(
user=valid_user(credentials.get("user", "")),
passwd=valid_passwd(credentials.get("passwd", "")),
)
if token:
- return _json({}, set_cookies={_COOKIE_AUTH_TOKEN: token})
+ return make_json_response({}, set_cookies={_COOKIE_AUTH_TOKEN: token})
raise ForbiddenError("Forbidden")
- @_exposed("POST", "/auth/logout")
+ @exposed_http("POST", "/auth/logout")
async def __auth_logout_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response:
token = valid_auth_token(request.cookies.get(_COOKIE_AUTH_TOKEN, ""))
- self._auth_manager.logout(token)
- return _json({})
+ self.__auth_manager.logout(token)
+ return make_json_response({})
- @_exposed("GET", "/auth/check")
+ @exposed_http("GET", "/auth/check")
async def __auth_check_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
- return _json({})
+ return make_json_response({})
# ===== SYSTEM
- @_exposed("GET", "/info")
+ @exposed_http("GET", "/info")
async def __info_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
- return _json(await self.__make_info())
-
- @_exposed("GET", "/log")
- async def __log_handler(self, request: aiohttp.web.Request) -> aiohttp.web.StreamResponse:
- seek = valid_log_seek(request.query.get("seek", "0"))
- follow = valid_bool(request.query.get("follow", "false"))
- response: Optional[aiohttp.web.StreamResponse] = None
- async for record in self.__log_reader.poll_log(seek, follow):
- if response is None:
- response = aiohttp.web.StreamResponse(status=200, reason="OK", headers={"Content-Type": "text/plain"})
- await response.prepare(request)
- await response.write(("[%s %s] --- %s" % (
- record["dt"].strftime("%Y-%m-%d %H:%M:%S"),
- record["service"],
- record["msg"],
- )).encode("utf-8") + b"\r\n")
- return response
-
- # ===== Wake-on-LAN
-
- @_exposed("GET", "/wol")
- async def __wol_state_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
- return _json(self.__wol.get_state())
-
- @_exposed("POST", "/wol/wakeup")
- async def __wol_wakeup_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
- await self.__wol.wakeup()
- return _json()
+ return make_json_response(await self.__make_info())
+
+ # ===== STREAMER
+
+ @exposed_http("GET", "/streamer")
+ async def __streamer_state_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
+ return make_json_response(await self.__streamer.get_state())
+
+ @exposed_http("POST", "/streamer/set_params")
+ async def __streamer_set_params_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response:
+ for (name, validator) in [
+ ("quality", valid_stream_quality),
+ ("desired_fps", valid_stream_fps),
+ ]:
+ value = request.query.get(name)
+ if value:
+ self.__streamer_params[name] = validator(value)
+ return make_json_response()
+
+ @exposed_http("POST", "/streamer/reset")
+ async def __streamer_reset_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
+ self.__reset_streamer = True
+ return make_json_response()
# ===== WEBSOCKET
- @_exposed("GET", "/ws")
+ @exposed_http("GET", "/ws")
async def __ws_handler(self, request: aiohttp.web.Request) -> aiohttp.web.WebSocketResponse:
logger = get_logger(0)
- assert self.__heartbeat is not None
ws = aiohttp.web.WebSocketResponse(heartbeat=self.__heartbeat)
await ws.prepare(request)
await self.__register_socket(ws)
@@ -396,203 +258,95 @@ class Server: # pylint: disable=too-many-instance-attributes
except Exception as err:
logger.error("Can't parse JSON event from websocket: %s", err)
else:
- event_type = event.get("event_type")
- if event_type == "ping":
- await ws.send_str(json.dumps({"msg_type": "pong"}))
- elif event_type == "key":
- await self.__handle_ws_key_event(event)
- elif event_type == "mouse_button":
- await self.__handle_ws_mouse_button_event(event)
- elif event_type == "mouse_move":
- await self.__handle_ws_mouse_move_event(event)
- elif event_type == "mouse_wheel":
- await self.__handle_ws_mouse_wheel_event(event)
+ handler = self.__ws_handlers.get(event.get("event_type"))
+ if handler:
+ await handler(ws, event)
else:
logger.error("Unknown websocket event: %r", event)
else:
break
return ws
- async def __handle_ws_key_event(self, event: Dict) -> None:
- try:
- key = valid_hid_key(event["key"])
- state = valid_bool(event["state"])
- except Exception:
- return
- await self.__hid.send_key_event(key, state)
-
- async def __handle_ws_mouse_button_event(self, event: Dict) -> None:
- try:
- button = valid_hid_mouse_button(event["button"])
- state = valid_bool(event["state"])
- except Exception:
- return
- await self.__hid.send_mouse_button_event(button, state)
-
- async def __handle_ws_mouse_move_event(self, event: Dict) -> None:
- try:
- to_x = valid_hid_mouse_move(event["to"]["x"])
- to_y = valid_hid_mouse_move(event["to"]["y"])
- except Exception:
- return
- await self.__hid.send_mouse_move_event(to_x, to_y)
-
- async def __handle_ws_mouse_wheel_event(self, event: Dict) -> None:
- try:
- delta_x = valid_hid_mouse_wheel(event["delta"]["x"])
- delta_y = valid_hid_mouse_wheel(event["delta"]["y"])
- except Exception:
- return
- await self.__hid.send_mouse_wheel_event(delta_x, delta_y)
-
- # ===== HID
-
- @_exposed("GET", "/hid")
- async def __hid_state_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
- return _json(self.__hid.get_state())
-
- @_exposed("POST", "/hid/reset")
- async def __hid_reset_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
- await self.__hid.reset()
- return _json()
-
- # ===== ATX
-
- @_exposed("GET", "/atx")
- async def __atx_state_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
- return _json(self.__atx.get_state())
-
- @_exposed("POST", "/atx/power")
- async def __atx_power_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response:
- action = valid_atx_power_action(request.query.get("action"))
- processing = await ({
- "on": self.__atx.power_on,
- "off": self.__atx.power_off,
- "off_hard": self.__atx.power_off_hard,
- "reset_hard": self.__atx.power_reset_hard,
- }[action])()
- return _json({"processing": processing})
-
- @_exposed("POST", "/atx/click")
- async def __atx_click_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response:
- button = valid_atx_button(request.query.get("button"))
- await ({
- "power": self.__atx.click_power,
- "power_long": self.__atx.click_power_long,
- "reset": self.__atx.click_reset,
- }[button])()
- return _json()
-
- # ===== MSD
-
- @_exposed("GET", "/msd")
- async def __msd_state_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
- return _json(await self.__msd.get_state())
-
- @_exposed("POST", "/msd/set_params")
- async def __msd_set_params_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response:
- params = {
- key: validator(request.query.get(param))
- for (param, key, validator) in [
- ("image", "name", (lambda arg: str(arg).strip() and valid_msd_image_name(arg))),
- ("cdrom", "cdrom", valid_bool),
- ]
- if request.query.get(param) is not None
- }
- await self.__msd.set_params(**params) # type: ignore
- return _json()
-
- @_exposed("POST", "/msd/connect")
- async def __msd_connect_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
- await self.__msd.connect()
- return _json()
-
- @_exposed("POST", "/msd/disconnect")
- async def __msd_disconnect_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
- await self.__msd.disconnect()
- return _json()
-
- @_exposed("POST", "/msd/write")
- async def __msd_write_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response:
- assert self.__sync_chunk_size is not None
- logger = get_logger(0)
- reader = await request.multipart()
- name = ""
- written = 0
- try:
- name_field = await _get_multipart_field(reader, "image")
- name = valid_msd_image_name((await name_field.read()).decode("utf-8"))
-
- data_field = await _get_multipart_field(reader, "data")
-
- async with self.__msd.write_image(name):
- logger.info("Writing image %r to MSD ...", name)
- while True:
- chunk = await data_field.read_chunk(self.__sync_chunk_size)
- if not chunk:
- break
- written = await self.__msd.write_image_chunk(chunk)
- finally:
- if written != 0:
- logger.info("Written image %r with size=%d bytes to MSD", name, written)
- return _json({"image": {"name": name, "size": written}})
-
- @_exposed("POST", "/msd/remove")
- async def __msd_remove_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response:
- await self.__msd.remove(valid_msd_image_name(request.query.get("image")))
- return _json()
-
- @_exposed("POST", "/msd/reset")
- async def __msd_reset_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
- await self.__msd.reset()
- return _json()
-
- # ===== STREAMER
-
- @_exposed("GET", "/streamer")
- async def __streamer_state_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
- return _json(await self.__streamer.get_state())
-
- @_exposed("POST", "/streamer/set_params")
- async def __streamer_set_params_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response:
- for (name, validator) in [
- ("quality", valid_stream_quality),
- ("desired_fps", valid_stream_fps),
- ]:
- value = request.query.get(name)
- if value:
- self.__streamer_params[name] = validator(value)
- return _json()
-
- @_exposed("POST", "/streamer/reset")
- async def __streamer_reset_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
- self.__reset_streamer = True
- return _json()
+ @exposed_ws("ping")
+ async def __ws_ping_handler(self, ws: aiohttp.web.WebSocketResponse, _: Dict) -> None:
+ await ws.send_str(json.dumps({"msg_type": "pong"}))
# ===== SYSTEM STUFF
- async def __make_app(self) -> aiohttp.web.Application:
+ def run(self, **kwargs: Any) -> None: # type: ignore # pylint: disable=arguments-differ
+ self.__hid.start()
+ setproctitle.setproctitle(f"kvmd/main: {setproctitle.getproctitle()}")
+ super().run(**kwargs)
+
+ async def _make_app(self) -> aiohttp.web.Application:
app = aiohttp.web.Application()
app.on_shutdown.append(self.__on_shutdown)
app.on_cleanup.append(self.__on_cleanup)
- for name in dir(self):
- method = getattr(self, name)
- if inspect.ismethod(method):
- if getattr(method, _ATTR_SYSTEM_TASK, False):
- self.__system_tasks.append(asyncio.create_task(method()))
- elif getattr(method, _ATTR_EXPOSED, False):
- app.router.add_route(
- getattr(method, _ATTR_EXPOSED_METHOD),
- getattr(method, _ATTR_EXPOSED_PATH),
- method,
- )
+ self.__run_system_task(self.__stream_controller)
+ self.__run_system_task(self.__poll_dead_sockets)
+ self.__run_system_task(self.__poll_state, _Events.HID_STATE, self.__hid.poll_state())
+ self.__run_system_task(self.__poll_state, _Events.ATX_STATE, self.__atx.poll_state())
+ self.__run_system_task(self.__poll_state, _Events.MSD_STATE, self.__msd.poll_state())
+ self.__run_system_task(self.__poll_state, _Events.STREAMER_STATE, self.__streamer.poll_state())
+
+ for api in self.__apis:
+ for http_exposed in get_exposed_http(api):
+ self.__add_app_route(app, http_exposed)
+ for ws_exposed in get_exposed_ws(api):
+ self.__ws_handlers[ws_exposed.event_type] = ws_exposed.handler
+
return app
- def __run_app_print(self, text: str) -> None:
- logger = get_logger()
- for line in text.strip().splitlines():
- logger.info(line.strip())
+ def __run_system_task(self, method: Callable, *args: Any) -> None:
+ async def wrapper() -> None:
+ try:
+ await method(*args)
+ raise RuntimeError(f"Dead system task: {method.__name__}"
+ f"({', '.join(getattr(arg, '__name__', str(arg)) for arg in args)})")
+ except asyncio.CancelledError:
+ pass
+ except Exception:
+ get_logger().exception("Unhandled exception, killing myself ...")
+ os.kill(os.getpid(), signal.SIGTERM)
+ self.__system_tasks.append(asyncio.create_task(wrapper()))
+
+ def __add_app_route(self, app: aiohttp.web.Application, exposed: HttpExposed) -> None:
+ async def wrapper(request: aiohttp.web.Request) -> aiohttp.web.Response:
+ try:
+ if exposed.auth_required:
+ user = request.headers.get("X-KVMD-User", "")
+ passwd = request.headers.get("X-KVMD-Passwd", "")
+ token = request.cookies.get(_COOKIE_AUTH_TOKEN, "")
+
+ if user:
+ user = valid_user(user)
+ setattr(request, _ATTR_KVMD_AUTH_INFO, f"{user} (xhdr)")
+ if not (await self.__auth_manager.authorize(user, valid_passwd(passwd))):
+ raise ForbiddenError("Forbidden")
+
+ elif token:
+ user = self.__auth_manager.check(valid_auth_token(token))
+ if not user:
+ setattr(request, _ATTR_KVMD_AUTH_INFO, "- (token)")
+ raise ForbiddenError("Forbidden")
+ setattr(request, _ATTR_KVMD_AUTH_INFO, f"{user} (token)")
+
+ else:
+ raise UnauthorizedError("Unauthorized")
+
+ return (await exposed.handler(request))
+
+ except (AtxIsBusyError, MsdIsBusyError) as err:
+ return make_json_exception(err, 409)
+ except (ValidatorError, AtxOperationError, MsdOperationError, WolDisabledError) as err:
+ return make_json_exception(err, 400)
+ except UnauthorizedError as err:
+ return make_json_exception(err, 401)
+ except ForbiddenError as err:
+ return make_json_exception(err, 403)
+
+ app.router.add_route(exposed.method, exposed.path, wrapper)
async def __on_shutdown(self, _: aiohttp.web.Application) -> None:
logger = get_logger(0)
@@ -614,7 +368,7 @@ class Server: # pylint: disable=too-many-instance-attributes
async def __on_cleanup(self, _: aiohttp.web.Application) -> None:
logger = get_logger(0)
for (name, obj) in [
- ("Auth manager", self._auth_manager),
+ ("Auth manager", self.__auth_manager),
("Streamer", self.__streamer),
("MSD", self.__msd),
("ATX", self.__atx),
@@ -663,7 +417,6 @@ class Server: # pylint: disable=too-many-instance-attributes
# ===== SYSTEM TASKS
- @_system_task
async def __stream_controller(self) -> None:
prev = 0
shutdown_at = 0.0
@@ -688,7 +441,6 @@ class Server: # pylint: disable=too-many-instance-attributes
prev = cur
await asyncio.sleep(0.1)
- @_system_task
async def __poll_dead_sockets(self) -> None:
while True:
for ws in list(self.__sockets):
@@ -696,22 +448,6 @@ class Server: # pylint: disable=too-many-instance-attributes
await self.__remove_socket(ws)
await asyncio.sleep(0.1)
- @_system_task
- async def __poll_hid_state(self) -> None:
- async for state in self.__hid.poll_state():
- await self.__broadcast_event(_Events.HID_STATE, state)
-
- @_system_task
- async def __poll_atx_state(self) -> None:
- async for state in self.__atx.poll_state():
- await self.__broadcast_event(_Events.ATX_STATE, state)
-
- @_system_task
- async def __poll_msd_state(self) -> None:
- async for state in self.__msd.poll_state():
- await self.__broadcast_event(_Events.MSD_STATE, state)
-
- @_system_task
- async def __poll_streamer_state(self) -> None:
- async for state in self.__streamer.poll_state():
- await self.__broadcast_event(_Events.STREAMER_STATE, state)
+ async def __poll_state(self, event_type: _Events, poller: AsyncGenerator[Dict, None]) -> None:
+ async for state in poller:
+ await self.__broadcast_event(event_type, state)