diff options
author | Devaev Maxim <[email protected]> | 2018-11-08 20:42:42 +0300 |
---|---|---|
committer | Devaev Maxim <[email protected]> | 2018-11-08 20:42:42 +0300 |
commit | 1640725cdc6358a77e4a0d8b5ecaf85310c970d6 (patch) | |
tree | e44c87427a319f2afbd69c9ed37ee849d2f2d991 /kvmd | |
parent | 363bbdac57fb20cda7a261196105ab4b5f61e7ff (diff) |
streamer state over websocket
Diffstat (limited to 'kvmd')
-rw-r--r-- | kvmd/apps/kvmd/__init__.py | 17 | ||||
-rw-r--r-- | kvmd/apps/kvmd/server.py | 43 | ||||
-rw-r--r-- | kvmd/apps/kvmd/streamer.py | 61 |
3 files changed, 82 insertions, 39 deletions
diff --git a/kvmd/apps/kvmd/__init__.py b/kvmd/apps/kvmd/__init__.py index 01737b01..1959f05c 100644 --- a/kvmd/apps/kvmd/__init__.py +++ b/kvmd/apps/kvmd/__init__.py @@ -1,5 +1,7 @@ import asyncio +import aiohttp + from ...application import init from ...logging import get_logger from ...logging import Log @@ -18,6 +20,7 @@ def main() -> None: config = init() with gpio.bcm(): loop = asyncio.get_event_loop() + http_session = aiohttp.ClientSession(loop=loop) log = Log( services=list(config["log"]["services"]), @@ -34,6 +37,7 @@ def main() -> None: atx = Atx( power_led=int(config["atx"]["pinout"]["power_led"]), hdd_led=int(config["atx"]["pinout"]["hdd_led"]), + power_switch=int(config["atx"]["pinout"]["power_switch"]), reset_switch=int(config["atx"]["pinout"]["reset_switch"]), click_delay=float(config["atx"]["click_delay"]), @@ -43,10 +47,12 @@ def main() -> None: msd = MassStorageDevice( target=int(config["msd"]["pinout"]["target"]), reset=int(config["msd"]["pinout"]["reset"]), + device_path=str(config["msd"]["device"]), init_delay=float(config["msd"]["init_delay"]), reset_delay=float(config["msd"]["reset_delay"]), write_meta=bool(config["msd"]["write_meta"]), + loop=loop, ) @@ -56,10 +62,18 @@ def main() -> None: sync_delay=float(config["streamer"]["sync_delay"]), init_delay=float(config["streamer"]["init_delay"]), init_restart_after=float(config["streamer"]["init_restart_after"]), + quality=int(config["streamer"]["quality"]), desired_fps=int(config["streamer"]["desired_fps"]), + + host=str(config["streamer"]["host"]), + port=int(config["streamer"]["port"]), + timeout=float(config["streamer"]["timeout"]), + cmd=list(map(str, config["streamer"]["cmd"])), + loop=loop, + http_session=http_session, ) Server( @@ -68,10 +82,13 @@ def main() -> None: atx=atx, msd=msd, streamer=streamer, + heartbeat=float(config["server"]["heartbeat"]), atx_state_poll=float(config["atx"]["state_poll"]), + streamer_state_poll=float(config["streamer"]["state_poll"]), streamer_shutdown_delay=float(config["streamer"]["shutdown_delay"]), msd_chunk_size=int(config["msd"]["chunk_size"]), + loop=loop, ).run( host=str(config["server"]["host"]), diff --git a/kvmd/apps/kvmd/server.py b/kvmd/apps/kvmd/server.py index cd89c433..04ce140f 100644 --- a/kvmd/apps/kvmd/server.py +++ b/kvmd/apps/kvmd/server.py @@ -113,6 +113,7 @@ class Server: # pylint: disable=too-many-instance-attributes heartbeat: float, atx_state_poll: float, + streamer_state_poll: float, streamer_shutdown_delay: float, msd_chunk_size: int, @@ -126,8 +127,9 @@ class Server: # pylint: disable=too-many-instance-attributes self.__streamer = streamer self.__heartbeat = heartbeat - self.__streamer_shutdown_delay = streamer_shutdown_delay self.__atx_state_poll = atx_state_poll + self.__streamer_state_poll = streamer_state_poll + self.__streamer_shutdown_delay = streamer_shutdown_delay self.__msd_chunk_size = msd_chunk_size self.__loop = loop @@ -138,8 +140,7 @@ class Server: # pylint: disable=too-many-instance-attributes self.__system_tasks: List[asyncio.Task] = [] self.__reset_streamer = False - self.__streamer_quality = streamer.get_current_quality() - self.__streamer_desired_fps = streamer.get_current_desired_fps() + self.__streamer_params = streamer.get_params() def run(self, host: str, port: int) -> None: self.__hid.start() @@ -175,6 +176,7 @@ class Server: # pylint: disable=too-many-instance-attributes self.__loop.create_task(self.__stream_controller()), self.__loop.create_task(self.__poll_dead_sockets()), self.__loop.create_task(self.__poll_atx_state()), + self.__loop.create_task(self.__poll_streamer_state()), ]) aiohttp.web.run_app(app, host=host, port=port, print=self.__run_app_print) @@ -347,16 +349,17 @@ class Server: # pylint: disable=too-many-instance-attributes # ===== STREAMER async def __streamer_state_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response: - return _json(self.__streamer.get_state()) + return _json(await self.__streamer.get_state()) @_wrap_exceptions_for_web("Can't set stream params") async def __streamer_set_params_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response: - quality = request.query.get("quality") - if quality: - self.__streamer_quality = _valid_int("quality", quality, 1, 100) - desired_fps = request.query.get("desired_fps") - if desired_fps: - self.__streamer_desired_fps = _valid_int("desired_fps", desired_fps, 0, 30) + for (name, validator) in [ + ("quality", lambda arg: _valid_int("quality", arg, 1, 100)), + ("desired_fps", lambda arg: _valid_int("desired_fps", arg, 0, 30)), + ]: + value = request.query.get(name) + if value: + self.__streamer_params[name] = validator(value) return _json() async def __streamer_reset_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response: @@ -402,24 +405,17 @@ class Server: # pylint: disable=too-many-instance-attributes cur = len(self.__sockets) if prev == 0 and cur > 0: if not self.__streamer.is_running(): - await self.__streamer.start(self.__streamer_quality, self.__streamer_desired_fps) - await self.__broadcast_event("streamer_state", **self.__streamer.get_state()) + await self.__streamer.start(self.__streamer_params) elif prev > 0 and cur == 0: shutdown_at = time.time() + self.__streamer_shutdown_delay elif prev == 0 and cur == 0 and time.time() > shutdown_at: if self.__streamer.is_running(): await self.__streamer.stop() - await self.__broadcast_event("streamer_state", **self.__streamer.get_state()) - if ( - self.__reset_streamer - or self.__streamer_quality != self.__streamer.get_current_quality() - or self.__streamer_desired_fps != self.__streamer.get_current_desired_fps() - ): + if (self.__reset_streamer or self.__streamer_params != self.__streamer.get_params()): if self.__streamer.is_running(): await self.__streamer.stop() - await self.__streamer.start(self.__streamer_quality, self.__streamer_desired_fps, no_init_restart=True) - await self.__broadcast_event("streamer_state", **self.__streamer.get_state()) + await self.__streamer.start(self.__streamer_params, no_init_restart=True) self.__reset_streamer = False prev = cur @@ -440,6 +436,13 @@ class Server: # pylint: disable=too-many-instance-attributes await self.__broadcast_event("atx_state", **self.__atx.get_state()) await asyncio.sleep(self.__atx_state_poll) + @_system_task + async def __poll_streamer_state(self) -> None: + while True: + if self.__sockets: + await self.__broadcast_event("streamer_state", **(await self.__streamer.get_state())) + await asyncio.sleep(self.__streamer_state_poll) + async def __broadcast_event(self, event: str, **kwargs: Dict) -> None: await asyncio.gather(*[ ws.send_str(json.dumps({ diff --git a/kvmd/apps/kvmd/streamer.py b/kvmd/apps/kvmd/streamer.py index 983dee6e..da82ade1 100644 --- a/kvmd/apps/kvmd/streamer.py +++ b/kvmd/apps/kvmd/streamer.py @@ -6,6 +6,8 @@ from typing import List from typing import Dict from typing import Optional +import aiohttp + from ...logging import get_logger from ... import gpio @@ -13,17 +15,25 @@ from ... import gpio # ===== class Streamer: # pylint: disable=too-many-instance-attributes - def __init__( + def __init__( # pylint: disable=too-many-arguments self, cap_power: int, conv_power: int, sync_delay: float, init_delay: float, init_restart_after: float, + quality: int, desired_fps: int, + + host: str, + port: int, + timeout: float, + cmd: List[str], + loop: asyncio.AbstractEventLoop, + http_session: aiohttp.ClientSession, ) -> None: self.__cap_power = (gpio.set_output(cap_power) if cap_power > 0 else cap_power) @@ -31,23 +41,30 @@ class Streamer: # pylint: disable=too-many-instance-attributes self.__sync_delay = sync_delay self.__init_delay = init_delay self.__init_restart_after = init_restart_after - self.__quality = quality - self.__desired_fps = desired_fps + + self.__params = { + "quality": quality, + "desired_fps": desired_fps, + } + + self.__host = host + self.__port = port + self.__timeout = timeout + self.__cmd = cmd self.__loop = loop + self.__http_session = http_session self.__proc_task: Optional[asyncio.Task] = None - async def start(self, quality: int, desired_fps: int, no_init_restart: bool=False) -> None: + async def start(self, params: Dict, no_init_restart: bool=False) -> None: logger = get_logger() logger.info("Starting streamer ...") - assert 1 <= quality <= 100 - self.__quality = quality - - assert 0 <= desired_fps <= 30 - self.__desired_fps = desired_fps + self.__params = {key: params[key] for key in self.__params} # Only known params + assert 1 <= self.__params["quality"] <= 100 + assert 0 <= self.__params["desired_fps"] <= 30 await self.__inner_start() if self.__init_restart_after > 0.0 and not no_init_restart: @@ -63,17 +80,23 @@ class Streamer: # pylint: disable=too-many-instance-attributes def is_running(self) -> bool: return bool(self.__proc_task) - def get_current_quality(self) -> int: - return self.__quality + def get_params(self) -> Dict: + return dict(self.__params) - def get_current_desired_fps(self) -> int: - return self.__desired_fps - - def get_state(self) -> Dict: + async def get_state(self) -> Dict: + url = "http://%s:%d/state" % (self.__host, self.__port) + state = None + try: + async with self.__http_session.get(url, timeout=self.__timeout) as response: + response.raise_for_status() + state = (await response.json())["result"] + except aiohttp.ClientConnectorError: + pass + except Exception: + get_logger().exception("Invalid streamer response from /state") return { - "is_running": self.is_running(), - "quality": self.__quality, - "desired_fps": self.__desired_fps, + "params": self.get_params(), + "state": state, } def get_app(self) -> str: @@ -121,7 +144,7 @@ class Streamer: # pylint: disable=too-many-instance-attributes while True: # pylint: disable=too-many-nested-blocks proc: Optional[asyncio.subprocess.Process] = None # pylint: disable=no-member try: - cmd = [part.format(quality=self.__quality, desired_fps=self.__desired_fps) for part in self.__cmd] + cmd = [part.format(host=self.__host, port=self.__port, **self.__params) for part in self.__cmd] proc = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, |