summaryrefslogtreecommitdiff
path: root/kvmd
diff options
context:
space:
mode:
authorDevaev Maxim <[email protected]>2018-11-08 20:42:42 +0300
committerDevaev Maxim <[email protected]>2018-11-08 20:42:42 +0300
commit1640725cdc6358a77e4a0d8b5ecaf85310c970d6 (patch)
treee44c87427a319f2afbd69c9ed37ee849d2f2d991 /kvmd
parent363bbdac57fb20cda7a261196105ab4b5f61e7ff (diff)
streamer state over websocket
Diffstat (limited to 'kvmd')
-rw-r--r--kvmd/apps/kvmd/__init__.py17
-rw-r--r--kvmd/apps/kvmd/server.py43
-rw-r--r--kvmd/apps/kvmd/streamer.py61
3 files changed, 82 insertions, 39 deletions
diff --git a/kvmd/apps/kvmd/__init__.py b/kvmd/apps/kvmd/__init__.py
index 01737b01..1959f05c 100644
--- a/kvmd/apps/kvmd/__init__.py
+++ b/kvmd/apps/kvmd/__init__.py
@@ -1,5 +1,7 @@
import asyncio
+import aiohttp
+
from ...application import init
from ...logging import get_logger
from ...logging import Log
@@ -18,6 +20,7 @@ def main() -> None:
config = init()
with gpio.bcm():
loop = asyncio.get_event_loop()
+ http_session = aiohttp.ClientSession(loop=loop)
log = Log(
services=list(config["log"]["services"]),
@@ -34,6 +37,7 @@ def main() -> None:
atx = Atx(
power_led=int(config["atx"]["pinout"]["power_led"]),
hdd_led=int(config["atx"]["pinout"]["hdd_led"]),
+
power_switch=int(config["atx"]["pinout"]["power_switch"]),
reset_switch=int(config["atx"]["pinout"]["reset_switch"]),
click_delay=float(config["atx"]["click_delay"]),
@@ -43,10 +47,12 @@ def main() -> None:
msd = MassStorageDevice(
target=int(config["msd"]["pinout"]["target"]),
reset=int(config["msd"]["pinout"]["reset"]),
+
device_path=str(config["msd"]["device"]),
init_delay=float(config["msd"]["init_delay"]),
reset_delay=float(config["msd"]["reset_delay"]),
write_meta=bool(config["msd"]["write_meta"]),
+
loop=loop,
)
@@ -56,10 +62,18 @@ def main() -> None:
sync_delay=float(config["streamer"]["sync_delay"]),
init_delay=float(config["streamer"]["init_delay"]),
init_restart_after=float(config["streamer"]["init_restart_after"]),
+
quality=int(config["streamer"]["quality"]),
desired_fps=int(config["streamer"]["desired_fps"]),
+
+ host=str(config["streamer"]["host"]),
+ port=int(config["streamer"]["port"]),
+ timeout=float(config["streamer"]["timeout"]),
+
cmd=list(map(str, config["streamer"]["cmd"])),
+
loop=loop,
+ http_session=http_session,
)
Server(
@@ -68,10 +82,13 @@ def main() -> None:
atx=atx,
msd=msd,
streamer=streamer,
+
heartbeat=float(config["server"]["heartbeat"]),
atx_state_poll=float(config["atx"]["state_poll"]),
+ streamer_state_poll=float(config["streamer"]["state_poll"]),
streamer_shutdown_delay=float(config["streamer"]["shutdown_delay"]),
msd_chunk_size=int(config["msd"]["chunk_size"]),
+
loop=loop,
).run(
host=str(config["server"]["host"]),
diff --git a/kvmd/apps/kvmd/server.py b/kvmd/apps/kvmd/server.py
index cd89c433..04ce140f 100644
--- a/kvmd/apps/kvmd/server.py
+++ b/kvmd/apps/kvmd/server.py
@@ -113,6 +113,7 @@ class Server: # pylint: disable=too-many-instance-attributes
heartbeat: float,
atx_state_poll: float,
+ streamer_state_poll: float,
streamer_shutdown_delay: float,
msd_chunk_size: int,
@@ -126,8 +127,9 @@ class Server: # pylint: disable=too-many-instance-attributes
self.__streamer = streamer
self.__heartbeat = heartbeat
- self.__streamer_shutdown_delay = streamer_shutdown_delay
self.__atx_state_poll = atx_state_poll
+ self.__streamer_state_poll = streamer_state_poll
+ self.__streamer_shutdown_delay = streamer_shutdown_delay
self.__msd_chunk_size = msd_chunk_size
self.__loop = loop
@@ -138,8 +140,7 @@ class Server: # pylint: disable=too-many-instance-attributes
self.__system_tasks: List[asyncio.Task] = []
self.__reset_streamer = False
- self.__streamer_quality = streamer.get_current_quality()
- self.__streamer_desired_fps = streamer.get_current_desired_fps()
+ self.__streamer_params = streamer.get_params()
def run(self, host: str, port: int) -> None:
self.__hid.start()
@@ -175,6 +176,7 @@ class Server: # pylint: disable=too-many-instance-attributes
self.__loop.create_task(self.__stream_controller()),
self.__loop.create_task(self.__poll_dead_sockets()),
self.__loop.create_task(self.__poll_atx_state()),
+ self.__loop.create_task(self.__poll_streamer_state()),
])
aiohttp.web.run_app(app, host=host, port=port, print=self.__run_app_print)
@@ -347,16 +349,17 @@ class Server: # pylint: disable=too-many-instance-attributes
# ===== STREAMER
async def __streamer_state_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
- return _json(self.__streamer.get_state())
+ return _json(await self.__streamer.get_state())
@_wrap_exceptions_for_web("Can't set stream params")
async def __streamer_set_params_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response:
- quality = request.query.get("quality")
- if quality:
- self.__streamer_quality = _valid_int("quality", quality, 1, 100)
- desired_fps = request.query.get("desired_fps")
- if desired_fps:
- self.__streamer_desired_fps = _valid_int("desired_fps", desired_fps, 0, 30)
+ for (name, validator) in [
+ ("quality", lambda arg: _valid_int("quality", arg, 1, 100)),
+ ("desired_fps", lambda arg: _valid_int("desired_fps", arg, 0, 30)),
+ ]:
+ value = request.query.get(name)
+ if value:
+ self.__streamer_params[name] = validator(value)
return _json()
async def __streamer_reset_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
@@ -402,24 +405,17 @@ class Server: # pylint: disable=too-many-instance-attributes
cur = len(self.__sockets)
if prev == 0 and cur > 0:
if not self.__streamer.is_running():
- await self.__streamer.start(self.__streamer_quality, self.__streamer_desired_fps)
- await self.__broadcast_event("streamer_state", **self.__streamer.get_state())
+ await self.__streamer.start(self.__streamer_params)
elif prev > 0 and cur == 0:
shutdown_at = time.time() + self.__streamer_shutdown_delay
elif prev == 0 and cur == 0 and time.time() > shutdown_at:
if self.__streamer.is_running():
await self.__streamer.stop()
- await self.__broadcast_event("streamer_state", **self.__streamer.get_state())
- if (
- self.__reset_streamer
- or self.__streamer_quality != self.__streamer.get_current_quality()
- or self.__streamer_desired_fps != self.__streamer.get_current_desired_fps()
- ):
+ if (self.__reset_streamer or self.__streamer_params != self.__streamer.get_params()):
if self.__streamer.is_running():
await self.__streamer.stop()
- await self.__streamer.start(self.__streamer_quality, self.__streamer_desired_fps, no_init_restart=True)
- await self.__broadcast_event("streamer_state", **self.__streamer.get_state())
+ await self.__streamer.start(self.__streamer_params, no_init_restart=True)
self.__reset_streamer = False
prev = cur
@@ -440,6 +436,13 @@ class Server: # pylint: disable=too-many-instance-attributes
await self.__broadcast_event("atx_state", **self.__atx.get_state())
await asyncio.sleep(self.__atx_state_poll)
+ @_system_task
+ async def __poll_streamer_state(self) -> None:
+ while True:
+ if self.__sockets:
+ await self.__broadcast_event("streamer_state", **(await self.__streamer.get_state()))
+ await asyncio.sleep(self.__streamer_state_poll)
+
async def __broadcast_event(self, event: str, **kwargs: Dict) -> None:
await asyncio.gather(*[
ws.send_str(json.dumps({
diff --git a/kvmd/apps/kvmd/streamer.py b/kvmd/apps/kvmd/streamer.py
index 983dee6e..da82ade1 100644
--- a/kvmd/apps/kvmd/streamer.py
+++ b/kvmd/apps/kvmd/streamer.py
@@ -6,6 +6,8 @@ from typing import List
from typing import Dict
from typing import Optional
+import aiohttp
+
from ...logging import get_logger
from ... import gpio
@@ -13,17 +15,25 @@ from ... import gpio
# =====
class Streamer: # pylint: disable=too-many-instance-attributes
- def __init__(
+ def __init__( # pylint: disable=too-many-arguments
self,
cap_power: int,
conv_power: int,
sync_delay: float,
init_delay: float,
init_restart_after: float,
+
quality: int,
desired_fps: int,
+
+ host: str,
+ port: int,
+ timeout: float,
+
cmd: List[str],
+
loop: asyncio.AbstractEventLoop,
+ http_session: aiohttp.ClientSession,
) -> None:
self.__cap_power = (gpio.set_output(cap_power) if cap_power > 0 else cap_power)
@@ -31,23 +41,30 @@ class Streamer: # pylint: disable=too-many-instance-attributes
self.__sync_delay = sync_delay
self.__init_delay = init_delay
self.__init_restart_after = init_restart_after
- self.__quality = quality
- self.__desired_fps = desired_fps
+
+ self.__params = {
+ "quality": quality,
+ "desired_fps": desired_fps,
+ }
+
+ self.__host = host
+ self.__port = port
+ self.__timeout = timeout
+
self.__cmd = cmd
self.__loop = loop
+ self.__http_session = http_session
self.__proc_task: Optional[asyncio.Task] = None
- async def start(self, quality: int, desired_fps: int, no_init_restart: bool=False) -> None:
+ async def start(self, params: Dict, no_init_restart: bool=False) -> None:
logger = get_logger()
logger.info("Starting streamer ...")
- assert 1 <= quality <= 100
- self.__quality = quality
-
- assert 0 <= desired_fps <= 30
- self.__desired_fps = desired_fps
+ self.__params = {key: params[key] for key in self.__params} # Only known params
+ assert 1 <= self.__params["quality"] <= 100
+ assert 0 <= self.__params["desired_fps"] <= 30
await self.__inner_start()
if self.__init_restart_after > 0.0 and not no_init_restart:
@@ -63,17 +80,23 @@ class Streamer: # pylint: disable=too-many-instance-attributes
def is_running(self) -> bool:
return bool(self.__proc_task)
- def get_current_quality(self) -> int:
- return self.__quality
+ def get_params(self) -> Dict:
+ return dict(self.__params)
- def get_current_desired_fps(self) -> int:
- return self.__desired_fps
-
- def get_state(self) -> Dict:
+ async def get_state(self) -> Dict:
+ url = "http://%s:%d/state" % (self.__host, self.__port)
+ state = None
+ try:
+ async with self.__http_session.get(url, timeout=self.__timeout) as response:
+ response.raise_for_status()
+ state = (await response.json())["result"]
+ except aiohttp.ClientConnectorError:
+ pass
+ except Exception:
+ get_logger().exception("Invalid streamer response from /state")
return {
- "is_running": self.is_running(),
- "quality": self.__quality,
- "desired_fps": self.__desired_fps,
+ "params": self.get_params(),
+ "state": state,
}
def get_app(self) -> str:
@@ -121,7 +144,7 @@ class Streamer: # pylint: disable=too-many-instance-attributes
while True: # pylint: disable=too-many-nested-blocks
proc: Optional[asyncio.subprocess.Process] = None # pylint: disable=no-member
try:
- cmd = [part.format(quality=self.__quality, desired_fps=self.__desired_fps) for part in self.__cmd]
+ cmd = [part.format(host=self.__host, port=self.__port, **self.__params) for part in self.__cmd]
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,