diff options
-rw-r--r-- | kvmd/apps/kvmd/api/streamer.py | 110 | ||||
-rw-r--r-- | kvmd/apps/kvmd/http.py | 5 | ||||
-rw-r--r-- | kvmd/apps/kvmd/server.py | 8 | ||||
-rw-r--r-- | kvmd/apps/kvmd/streamer.py | 80 | ||||
-rw-r--r-- | kvmd/clients/streamer.py | 24 | ||||
-rw-r--r-- | web/share/js/kvm/stream.js | 2 |
6 files changed, 202 insertions, 27 deletions
diff --git a/kvmd/apps/kvmd/api/streamer.py b/kvmd/apps/kvmd/api/streamer.py new file mode 100644 index 00000000..a3bbbe8b --- /dev/null +++ b/kvmd/apps/kvmd/api/streamer.py @@ -0,0 +1,110 @@ +# ========================================================================== # +# # +# KVMD - The main Pi-KVM daemon. # +# # +# Copyright (C) 2018 Maxim Devaev <[email protected]> # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see <https://www.gnu.org/licenses/>. # +# # +# ========================================================================== # + + +import io +import functools + +from aiohttp.web import Request +from aiohttp.web import Response + +from PIL import Image + +from ....validators.basic import valid_bool +from ....validators.basic import valid_int_f0 + +from ....validators.kvm import valid_stream_quality + +from .... import aiotools + +from ..http import UnavailableError +from ..http import exposed_http +from ..http import make_json_response + +from ..streamer import StreamerSnapshot +from ..streamer import Streamer + + +# ===== +class StreamerApi: + def __init__(self, streamer: Streamer) -> None: + self.__streamer = streamer + + # ===== + + @exposed_http("GET", "/streamer") + async def __state_handler(self, _: Request) -> Response: + return make_json_response(await self.__streamer.get_state()) + + @exposed_http("GET", "/streamer/snapshot") + async def __make_snapshot_handler(self, request: Request) -> Response: + if (snapshot := await self.__streamer.make_snapshot( + save=valid_bool(request.query.get("save", "false")), + load=valid_bool(request.query.get("load", "false")), + allow_offline=valid_bool(request.query.get("allow_offline", "false")), + )): + if valid_bool(request.query.get("preview", "false")): + data = await self.__make_preview( + snapshot=snapshot, + max_width=valid_int_f0(request.query.get("preview_max_width", "0")), + max_height=valid_int_f0(request.query.get("preview_max_height", "0")), + quality=valid_stream_quality(request.query.get("preview_quality", "80")), + ) + else: + data = snapshot.data + return Response( + body=data, + headers=dict(snapshot.headers), + content_type="image/jpeg", + ) + raise UnavailableError() + + @exposed_http("DELETE", "/streamer/snapshot") + async def __remove_snapshot_handler(self, _: Request) -> Response: + self.__streamer.remove_snapshot() + return make_json_response() + + # ===== + + async def __make_preview(self, snapshot: StreamerSnapshot, max_width: int, max_height: int, quality: int) -> bytes: + if max_width == 0 and max_height == 0: + max_width = snapshot.width // 5 + max_height = snapshot.height // 5 + else: + max_width = min((max_width or snapshot.width), snapshot.width) + max_height = min((max_height or snapshot.height), snapshot.height) + + if max_width == snapshot.width and max_height == snapshot.height: + return snapshot.data + else: + return (await aiotools.run_async(self.__inner_make_preview, snapshot, max_width, max_height, quality)) + + @functools.lru_cache(maxsize=1) + def __inner_make_preview(self, snapshot: StreamerSnapshot, max_width: int, max_height: int, quality: int) -> bytes: + assert 0 < max_width <= snapshot.width + assert 0 < max_height <= snapshot.height + assert not (max_width == snapshot.width and max_height == snapshot.height) + with io.BytesIO(snapshot.data) as snapshot_bio: + with io.BytesIO() as preview_bio: + with Image.open(snapshot_bio) as image: + image.thumbnail((max_width, max_height), Image.ANTIALIAS) + image.save(preview_bio, format="jpeg", quality=quality) + return preview_bio.getvalue() diff --git a/kvmd/apps/kvmd/http.py b/kvmd/apps/kvmd/http.py index 00b08014..729c8498 100644 --- a/kvmd/apps/kvmd/http.py +++ b/kvmd/apps/kvmd/http.py @@ -39,6 +39,11 @@ class ForbiddenError(HttpError): super().__init__("Forbidden", 403) +class UnavailableError(HttpError): + def __init__(self) -> None: + super().__init__("Service Unavailable", 503) + + # ===== @dataclasses.dataclass(frozen=True) class HttpExposed: diff --git a/kvmd/apps/kvmd/server.py b/kvmd/apps/kvmd/server.py index f6845a06..73200cd8 100644 --- a/kvmd/apps/kvmd/server.py +++ b/kvmd/apps/kvmd/server.py @@ -83,6 +83,7 @@ from .api.wol import WolApi from .api.hid import HidApi from .api.atx import AtxApi from .api.msd import MsdApi +from .api.streamer import StreamerApi # ===== @@ -133,6 +134,7 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins HidApi(hid, keymap_path), AtxApi(atx), MsdApi(msd, sync_chunk_size), + StreamerApi(streamer), ] self.__ws_handlers: Dict[str, Callable] = {} @@ -164,11 +166,7 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins async def __info_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response: return make_json_response(await self.__make_info()) - # ===== STREAMER - - @exposed_http("GET", "/streamer") - async def __streamer_state_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response: - return make_json_response(await self.__streamer.get_state()) + # ===== STREAMER CONTROLLER @exposed_http("POST", "/streamer/set_params") async def __streamer_set_params_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response: diff --git a/kvmd/apps/kvmd/streamer.py b/kvmd/apps/kvmd/streamer.py index 7f527f69..26d14b62 100644 --- a/kvmd/apps/kvmd/streamer.py +++ b/kvmd/apps/kvmd/streamer.py @@ -24,7 +24,10 @@ import os import signal import asyncio import asyncio.subprocess +import dataclasses +import operator +from typing import Tuple from typing import List from typing import Dict from typing import AsyncGenerator @@ -42,6 +45,16 @@ from ... import gpio # ===== [email protected](frozen=True) +class StreamerSnapshot: + online: bool + width: int + height: int + mtime: float + headers: Tuple[Tuple[str, str], ...] + data: bytes + + class Streamer: # pylint: disable=too-many-instance-attributes def __init__( # pylint: disable=too-many-arguments,too-many-locals self, @@ -101,6 +114,10 @@ class Streamer: # pylint: disable=too-many-instance-attributes self.__http_session: Optional[aiohttp.ClientSession] = None + self.__snapshot: Optional[StreamerSnapshot] = None + + self.__state_notifier = aiotools.AioNotifier() + # ===== @aiotools.atomic @@ -163,6 +180,8 @@ class Streamer: # pylint: disable=too-many-instance-attributes # Запущено и не планирует останавливаться return bool(self.__streamer_task and not self.__stop_task) + # ===== + def set_params(self, params: Dict) -> None: assert not self.__streamer_task self.__params = { @@ -176,6 +195,8 @@ class Streamer: # pylint: disable=too-many-instance-attributes def get_params(self) -> Dict: return dict(self.__params) + # ===== + async def get_state(self) -> Dict: state = None if self.__streamer_task: @@ -188,18 +209,24 @@ class Streamer: # pylint: disable=too-many-instance-attributes pass except Exception: get_logger().exception("Invalid streamer response from /state") + + snapshot: Optional[Dict] = None + if self.__snapshot: + snapshot = dataclasses.asdict(self.__snapshot) + del snapshot["headers"] + del snapshot["data"] + return { "limits": {"max_fps": self.__max_fps}, "params": self.__params, + "snapshot": {"saved": snapshot}, "state": state, } async def poll_state(self) -> AsyncGenerator[Dict, None]: - notifier = aiotools.AioNotifier() - def signal_handler(*_: Any) -> None: get_logger(0).info("Got SIGUSR2, checking the stream state ...") - asyncio.ensure_future(notifier.notify()) + asyncio.ensure_future(self.__state_notifier.notify()) get_logger(0).info("Installing SIGUSR2 streamer handler ...") asyncio.get_event_loop().add_signal_handler(signal.SIGUSR2, signal_handler) @@ -213,10 +240,12 @@ class Streamer: # pylint: disable=too-many-instance-attributes prev_state = state if waiter_task is None: - waiter_task = asyncio.create_task(notifier.wait()) + waiter_task = asyncio.create_task(self.__state_notifier.wait()) if waiter_task in (await aiotools.wait_first(asyncio.sleep(self.__state_poll), waiter_task))[0]: waiter_task = None + # ===== + async def get_info(self) -> Dict: version = (await aioproc.read_process([self.__cmd[0], "--version"], err_to_null=True))[1] return { @@ -224,6 +253,49 @@ class Streamer: # pylint: disable=too-many-instance-attributes "version": version, } + async def make_snapshot(self, save: bool, load: bool, allow_offline: bool) -> Optional[StreamerSnapshot]: + if load: + return self.__snapshot + else: + session = self.__ensure_http_session() + try: + async with session.get(self.__make_url("snapshot")) as response: + htclient.raise_not_200(response) + online = (response.headers["X-UStreamer-Online"] == "true") + if online or allow_offline: + snapshot = StreamerSnapshot( + online=online, + width=int(response.headers["X-UStreamer-Width"]), + height=int(response.headers["X-UStreamer-Height"]), + mtime=float(response.headers["X-Timestamp"]), + headers=tuple( + (key, value) + for (key, value) in sorted(response.headers.items(), key=operator.itemgetter(0)) + if key.lower().startswith("x-ustreamer-") or key.lower() in [ + "x-timestamp", + "access-control-allow-origin", + "cache-control", + "pragma", + "expires", + ] + ), + data=bytes(await response.read()), + ) + if save: + self.__snapshot = snapshot + await self.__state_notifier.notify() + return snapshot + except (aiohttp.ClientConnectionError, aiohttp.ServerConnectionError): + pass + except Exception: + get_logger().exception("Invalid streamer response from /snapshot") + return None + + def remove_snapshot(self) -> None: + self.__snapshot = None + + # ===== + @aiotools.atomic async def cleanup(self) -> None: try: diff --git a/kvmd/clients/streamer.py b/kvmd/clients/streamer.py index 493ac080..1b3f9433 100644 --- a/kvmd/clients/streamer.py +++ b/kvmd/clients/streamer.py @@ -54,7 +54,7 @@ class StreamerClient: async def read_stream(self) -> AsyncGenerator[Tuple[bool, int, int, bytes], None]: try: - async with self.__make_http_session(infinite=True) as session: + async with self.__make_http_session() as session: async with session.get( url=self.__make_url("stream"), params={"extra_headers": "1"}, @@ -84,24 +84,14 @@ class StreamerClient: raise StreamerError(f"{type(err).__name__}: {err}") raise StreamerError("Reached EOF") -# async def get_snapshot(self) -> Tuple[bool, bytes]: -# async with self.__make_http_session(infinite=False) as session: -# async with session.get(self.__make_url("snapshot")) as response: -# htclient.raise_not_200(response) -# return ( -# (response.headers["X-UStreamer-Online"] == "true"), -# bytes(await response.read()), -# ) - - def __make_http_session(self, infinite: bool) -> aiohttp.ClientSession: - kwargs: Dict = {"headers": {"User-Agent": self.__user_agent}} - if infinite: - kwargs["timeout"] = aiohttp.ClientTimeout( + def __make_http_session(self) -> aiohttp.ClientSession: + kwargs: Dict = { + "headers": {"User-Agent": self.__user_agent}, + "timeout": aiohttp.ClientTimeout( connect=self.__timeout, sock_read=self.__timeout, - ) - else: - kwargs["timeout"] = aiohttp.ClientTimeout(total=self.__timeout) + ), + } if self.__unix_path: kwargs["connector"] = aiohttp.UnixConnector(path=self.__unix_path) return aiohttp.ClientSession(**kwargs) diff --git a/web/share/js/kvm/stream.js b/web/share/js/kvm/stream.js index 6ea3362e..7ea7d722 100644 --- a/web/share/js/kvm/stream.js +++ b/web/share/js/kvm/stream.js @@ -193,7 +193,7 @@ export function Streamer() { var __clickScreenshotButton = function() { let el_a = document.createElement("a"); - el_a.href = "/streamer/snapshot"; + el_a.href = "/api/streamer/snapshot?allow_offline=1"; el_a.target = "_blank"; document.body.appendChild(el_a); el_a.click(); |