summaryrefslogtreecommitdiff
path: root/kvmd
diff options
context:
space:
mode:
authorDevaev Maxim <[email protected]>2020-05-29 19:49:47 +0300
committerDevaev Maxim <[email protected]>2020-05-29 19:49:47 +0300
commit81fec121d08ecea8288b5dd0e39d7ccee8ba4ee3 (patch)
tree14756d744349aec59d048068bbdd40b96aaa45e4 /kvmd
parenta5fcafe2a5a1bd8e18df9b96e2185d1979fc9977 (diff)
new snapshot api
Diffstat (limited to 'kvmd')
-rw-r--r--kvmd/apps/kvmd/api/streamer.py110
-rw-r--r--kvmd/apps/kvmd/http.py5
-rw-r--r--kvmd/apps/kvmd/server.py8
-rw-r--r--kvmd/apps/kvmd/streamer.py80
-rw-r--r--kvmd/clients/streamer.py24
5 files changed, 201 insertions, 26 deletions
diff --git a/kvmd/apps/kvmd/api/streamer.py b/kvmd/apps/kvmd/api/streamer.py
new file mode 100644
index 00000000..a3bbbe8b
--- /dev/null
+++ b/kvmd/apps/kvmd/api/streamer.py
@@ -0,0 +1,110 @@
+# ========================================================================== #
+# #
+# KVMD - The main Pi-KVM daemon. #
+# #
+# Copyright (C) 2018 Maxim Devaev <[email protected]> #
+# #
+# This program is free software: you can redistribute it and/or modify #
+# it under the terms of the GNU General Public License as published by #
+# the Free Software Foundation, either version 3 of the License, or #
+# (at your option) any later version. #
+# #
+# This program is distributed in the hope that it will be useful, #
+# but WITHOUT ANY WARRANTY; without even the implied warranty of #
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
+# GNU General Public License for more details. #
+# #
+# You should have received a copy of the GNU General Public License #
+# along with this program. If not, see <https://www.gnu.org/licenses/>. #
+# #
+# ========================================================================== #
+
+
+import io
+import functools
+
+from aiohttp.web import Request
+from aiohttp.web import Response
+
+from PIL import Image
+
+from ....validators.basic import valid_bool
+from ....validators.basic import valid_int_f0
+
+from ....validators.kvm import valid_stream_quality
+
+from .... import aiotools
+
+from ..http import UnavailableError
+from ..http import exposed_http
+from ..http import make_json_response
+
+from ..streamer import StreamerSnapshot
+from ..streamer import Streamer
+
+
+# =====
+class StreamerApi:
+ def __init__(self, streamer: Streamer) -> None:
+ self.__streamer = streamer
+
+ # =====
+
+ @exposed_http("GET", "/streamer")
+ async def __state_handler(self, _: Request) -> Response:
+ return make_json_response(await self.__streamer.get_state())
+
+ @exposed_http("GET", "/streamer/snapshot")
+ async def __make_snapshot_handler(self, request: Request) -> Response:
+ if (snapshot := await self.__streamer.make_snapshot(
+ save=valid_bool(request.query.get("save", "false")),
+ load=valid_bool(request.query.get("load", "false")),
+ allow_offline=valid_bool(request.query.get("allow_offline", "false")),
+ )):
+ if valid_bool(request.query.get("preview", "false")):
+ data = await self.__make_preview(
+ snapshot=snapshot,
+ max_width=valid_int_f0(request.query.get("preview_max_width", "0")),
+ max_height=valid_int_f0(request.query.get("preview_max_height", "0")),
+ quality=valid_stream_quality(request.query.get("preview_quality", "80")),
+ )
+ else:
+ data = snapshot.data
+ return Response(
+ body=data,
+ headers=dict(snapshot.headers),
+ content_type="image/jpeg",
+ )
+ raise UnavailableError()
+
+ @exposed_http("DELETE", "/streamer/snapshot")
+ async def __remove_snapshot_handler(self, _: Request) -> Response:
+ self.__streamer.remove_snapshot()
+ return make_json_response()
+
+ # =====
+
+ async def __make_preview(self, snapshot: StreamerSnapshot, max_width: int, max_height: int, quality: int) -> bytes:
+ if max_width == 0 and max_height == 0:
+ max_width = snapshot.width // 5
+ max_height = snapshot.height // 5
+ else:
+ max_width = min((max_width or snapshot.width), snapshot.width)
+ max_height = min((max_height or snapshot.height), snapshot.height)
+
+ if max_width == snapshot.width and max_height == snapshot.height:
+ return snapshot.data
+ else:
+ return (await aiotools.run_async(self.__inner_make_preview, snapshot, max_width, max_height, quality))
+
+ @functools.lru_cache(maxsize=1)
+ def __inner_make_preview(self, snapshot: StreamerSnapshot, max_width: int, max_height: int, quality: int) -> bytes:
+ assert 0 < max_width <= snapshot.width
+ assert 0 < max_height <= snapshot.height
+ assert not (max_width == snapshot.width and max_height == snapshot.height)
+ with io.BytesIO(snapshot.data) as snapshot_bio:
+ with io.BytesIO() as preview_bio:
+ with Image.open(snapshot_bio) as image:
+ image.thumbnail((max_width, max_height), Image.ANTIALIAS)
+ image.save(preview_bio, format="jpeg", quality=quality)
+ return preview_bio.getvalue()
diff --git a/kvmd/apps/kvmd/http.py b/kvmd/apps/kvmd/http.py
index 00b08014..729c8498 100644
--- a/kvmd/apps/kvmd/http.py
+++ b/kvmd/apps/kvmd/http.py
@@ -39,6 +39,11 @@ class ForbiddenError(HttpError):
super().__init__("Forbidden", 403)
+class UnavailableError(HttpError):
+ def __init__(self) -> None:
+ super().__init__("Service Unavailable", 503)
+
+
# =====
@dataclasses.dataclass(frozen=True)
class HttpExposed:
diff --git a/kvmd/apps/kvmd/server.py b/kvmd/apps/kvmd/server.py
index f6845a06..73200cd8 100644
--- a/kvmd/apps/kvmd/server.py
+++ b/kvmd/apps/kvmd/server.py
@@ -83,6 +83,7 @@ from .api.wol import WolApi
from .api.hid import HidApi
from .api.atx import AtxApi
from .api.msd import MsdApi
+from .api.streamer import StreamerApi
# =====
@@ -133,6 +134,7 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins
HidApi(hid, keymap_path),
AtxApi(atx),
MsdApi(msd, sync_chunk_size),
+ StreamerApi(streamer),
]
self.__ws_handlers: Dict[str, Callable] = {}
@@ -164,11 +166,7 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins
async def __info_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
return make_json_response(await self.__make_info())
- # ===== STREAMER
-
- @exposed_http("GET", "/streamer")
- async def __streamer_state_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
- return make_json_response(await self.__streamer.get_state())
+ # ===== STREAMER CONTROLLER
@exposed_http("POST", "/streamer/set_params")
async def __streamer_set_params_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response:
diff --git a/kvmd/apps/kvmd/streamer.py b/kvmd/apps/kvmd/streamer.py
index 7f527f69..26d14b62 100644
--- a/kvmd/apps/kvmd/streamer.py
+++ b/kvmd/apps/kvmd/streamer.py
@@ -24,7 +24,10 @@ import os
import signal
import asyncio
import asyncio.subprocess
+import dataclasses
+import operator
+from typing import Tuple
from typing import List
from typing import Dict
from typing import AsyncGenerator
@@ -42,6 +45,16 @@ from ... import gpio
# =====
[email protected](frozen=True)
+class StreamerSnapshot:
+ online: bool
+ width: int
+ height: int
+ mtime: float
+ headers: Tuple[Tuple[str, str], ...]
+ data: bytes
+
+
class Streamer: # pylint: disable=too-many-instance-attributes
def __init__( # pylint: disable=too-many-arguments,too-many-locals
self,
@@ -101,6 +114,10 @@ class Streamer: # pylint: disable=too-many-instance-attributes
self.__http_session: Optional[aiohttp.ClientSession] = None
+ self.__snapshot: Optional[StreamerSnapshot] = None
+
+ self.__state_notifier = aiotools.AioNotifier()
+
# =====
@aiotools.atomic
@@ -163,6 +180,8 @@ class Streamer: # pylint: disable=too-many-instance-attributes
# Запущено и не планирует останавливаться
return bool(self.__streamer_task and not self.__stop_task)
+ # =====
+
def set_params(self, params: Dict) -> None:
assert not self.__streamer_task
self.__params = {
@@ -176,6 +195,8 @@ class Streamer: # pylint: disable=too-many-instance-attributes
def get_params(self) -> Dict:
return dict(self.__params)
+ # =====
+
async def get_state(self) -> Dict:
state = None
if self.__streamer_task:
@@ -188,18 +209,24 @@ class Streamer: # pylint: disable=too-many-instance-attributes
pass
except Exception:
get_logger().exception("Invalid streamer response from /state")
+
+ snapshot: Optional[Dict] = None
+ if self.__snapshot:
+ snapshot = dataclasses.asdict(self.__snapshot)
+ del snapshot["headers"]
+ del snapshot["data"]
+
return {
"limits": {"max_fps": self.__max_fps},
"params": self.__params,
+ "snapshot": {"saved": snapshot},
"state": state,
}
async def poll_state(self) -> AsyncGenerator[Dict, None]:
- notifier = aiotools.AioNotifier()
-
def signal_handler(*_: Any) -> None:
get_logger(0).info("Got SIGUSR2, checking the stream state ...")
- asyncio.ensure_future(notifier.notify())
+ asyncio.ensure_future(self.__state_notifier.notify())
get_logger(0).info("Installing SIGUSR2 streamer handler ...")
asyncio.get_event_loop().add_signal_handler(signal.SIGUSR2, signal_handler)
@@ -213,10 +240,12 @@ class Streamer: # pylint: disable=too-many-instance-attributes
prev_state = state
if waiter_task is None:
- waiter_task = asyncio.create_task(notifier.wait())
+ waiter_task = asyncio.create_task(self.__state_notifier.wait())
if waiter_task in (await aiotools.wait_first(asyncio.sleep(self.__state_poll), waiter_task))[0]:
waiter_task = None
+ # =====
+
async def get_info(self) -> Dict:
version = (await aioproc.read_process([self.__cmd[0], "--version"], err_to_null=True))[1]
return {
@@ -224,6 +253,49 @@ class Streamer: # pylint: disable=too-many-instance-attributes
"version": version,
}
+ async def make_snapshot(self, save: bool, load: bool, allow_offline: bool) -> Optional[StreamerSnapshot]:
+ if load:
+ return self.__snapshot
+ else:
+ session = self.__ensure_http_session()
+ try:
+ async with session.get(self.__make_url("snapshot")) as response:
+ htclient.raise_not_200(response)
+ online = (response.headers["X-UStreamer-Online"] == "true")
+ if online or allow_offline:
+ snapshot = StreamerSnapshot(
+ online=online,
+ width=int(response.headers["X-UStreamer-Width"]),
+ height=int(response.headers["X-UStreamer-Height"]),
+ mtime=float(response.headers["X-Timestamp"]),
+ headers=tuple(
+ (key, value)
+ for (key, value) in sorted(response.headers.items(), key=operator.itemgetter(0))
+ if key.lower().startswith("x-ustreamer-") or key.lower() in [
+ "x-timestamp",
+ "access-control-allow-origin",
+ "cache-control",
+ "pragma",
+ "expires",
+ ]
+ ),
+ data=bytes(await response.read()),
+ )
+ if save:
+ self.__snapshot = snapshot
+ await self.__state_notifier.notify()
+ return snapshot
+ except (aiohttp.ClientConnectionError, aiohttp.ServerConnectionError):
+ pass
+ except Exception:
+ get_logger().exception("Invalid streamer response from /snapshot")
+ return None
+
+ def remove_snapshot(self) -> None:
+ self.__snapshot = None
+
+ # =====
+
@aiotools.atomic
async def cleanup(self) -> None:
try:
diff --git a/kvmd/clients/streamer.py b/kvmd/clients/streamer.py
index 493ac080..1b3f9433 100644
--- a/kvmd/clients/streamer.py
+++ b/kvmd/clients/streamer.py
@@ -54,7 +54,7 @@ class StreamerClient:
async def read_stream(self) -> AsyncGenerator[Tuple[bool, int, int, bytes], None]:
try:
- async with self.__make_http_session(infinite=True) as session:
+ async with self.__make_http_session() as session:
async with session.get(
url=self.__make_url("stream"),
params={"extra_headers": "1"},
@@ -84,24 +84,14 @@ class StreamerClient:
raise StreamerError(f"{type(err).__name__}: {err}")
raise StreamerError("Reached EOF")
-# async def get_snapshot(self) -> Tuple[bool, bytes]:
-# async with self.__make_http_session(infinite=False) as session:
-# async with session.get(self.__make_url("snapshot")) as response:
-# htclient.raise_not_200(response)
-# return (
-# (response.headers["X-UStreamer-Online"] == "true"),
-# bytes(await response.read()),
-# )
-
- def __make_http_session(self, infinite: bool) -> aiohttp.ClientSession:
- kwargs: Dict = {"headers": {"User-Agent": self.__user_agent}}
- if infinite:
- kwargs["timeout"] = aiohttp.ClientTimeout(
+ def __make_http_session(self) -> aiohttp.ClientSession:
+ kwargs: Dict = {
+ "headers": {"User-Agent": self.__user_agent},
+ "timeout": aiohttp.ClientTimeout(
connect=self.__timeout,
sock_read=self.__timeout,
- )
- else:
- kwargs["timeout"] = aiohttp.ClientTimeout(total=self.__timeout)
+ ),
+ }
if self.__unix_path:
kwargs["connector"] = aiohttp.UnixConnector(path=self.__unix_path)
return aiohttp.ClientSession(**kwargs)