diff options
author | Devaev Maxim <[email protected]> | 2020-02-29 16:46:35 +0300 |
---|---|---|
committer | Devaev Maxim <[email protected]> | 2020-02-29 16:46:35 +0300 |
commit | 831b4fa16c5d6adca3d8ecf46586d96f2f4d1ea7 (patch) | |
tree | 18967bc27133ed3a2a709b7823c9e3f392ee4dc4 | |
parent | 1470ebe6fa3900cb28266edf532d6e68707e1f24 (diff) |
refactoring; reduce cpu consumption in streamer controller
-rw-r--r-- | kvmd/apps/kvmd/server.py | 34 | ||||
-rw-r--r-- | kvmd/apps/kvmd/streamer.py | 144 |
2 files changed, 115 insertions, 63 deletions
diff --git a/kvmd/apps/kvmd/server.py b/kvmd/apps/kvmd/server.py index 1b77d345..4972bd6a 100644 --- a/kvmd/apps/kvmd/server.py +++ b/kvmd/apps/kvmd/server.py @@ -24,7 +24,6 @@ import os import signal import asyncio import json -import time from enum import Enum @@ -150,15 +149,16 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins self.__system_tasks: List[asyncio.Task] = [] self.__reset_streamer = False - self.__streamer_params = streamer.get_params() + self.__new_streamer_params: Dict = {} async def __make_info(self) -> Dict: + streamer_info = await self.__streamer.get_info() return { "version": { "kvmd": __version__, - "streamer": await self.__streamer.get_version(), + "streamer": streamer_info["version"], }, - "streamer": self.__streamer.get_app(), + "streamer": streamer_info["app"], "meta": await self.__info_manager.get_meta(), "extras": await self.__info_manager.get_extras(), } @@ -209,7 +209,7 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins ]: value = request.query.get(name) if value: - self.__streamer_params[name] = validator(value) + self.__new_streamer_params[name] = validator(value) return make_json_response() @exposed_http("POST", "/streamer/reset") @@ -400,23 +400,21 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins async def __stream_controller(self) -> None: prev = 0 - shutdown_at = 0.0 - while True: cur = len(self.__sockets) if prev == 0 and cur > 0: - if not self.__streamer.is_running(): - await self.__streamer.start(self.__streamer_params) + await self.__streamer.ensure_start(init_restart=True) elif prev > 0 and cur == 0: - shutdown_at = time.time() + self.__streamer.shutdown_delay - elif prev == 0 and cur == 0 and time.time() > shutdown_at: - if self.__streamer.is_running(): - await self.__streamer.stop() - - if (self.__reset_streamer or self.__streamer_params != self.__streamer.get_params()): - if self.__streamer.is_running(): - await self.__streamer.stop() - await self.__streamer.start(self.__streamer_params, no_init_restart=True) + await self.__streamer.ensure_stop(immediately=False) + + if self.__reset_streamer or self.__new_streamer_params: + start = self.__streamer.is_working() + await self.__streamer.ensure_stop(immediately=True) + if self.__new_streamer_params: + self.__streamer.set_params(self.__new_streamer_params) + self.__new_streamer_params = {} + if start: + await self.__streamer.ensure_start(init_restart=False) self.__reset_streamer = False prev = cur diff --git a/kvmd/apps/kvmd/streamer.py b/kvmd/apps/kvmd/streamer.py index b7f1826a..999033a7 100644 --- a/kvmd/apps/kvmd/streamer.py +++ b/kvmd/apps/kvmd/streamer.py @@ -73,7 +73,7 @@ class Streamer: # pylint: disable=too-many-instance-attributes self.__sync_delay = sync_delay self.__init_delay = init_delay self.__init_restart_after = init_restart_after - self.shutdown_delay = shutdown_delay + self.__shutdown_delay = shutdown_delay self.__state_poll = state_poll self.__params = { @@ -92,15 +92,78 @@ class Streamer: # pylint: disable=too-many-instance-attributes self.__cmd = cmd + self.__stop_task: Optional[asyncio.Task] = None + self.__stop_wip = False + self.__streamer_task: Optional[asyncio.Task] = None self.__streamer_proc: Optional[asyncio.subprocess.Process] = None # pylint: disable=no-member self.__http_session: Optional[aiohttp.ClientSession] = None - async def start(self, params: Dict, no_init_restart: bool=False) -> None: - logger = get_logger() - logger.info("Starting streamer ...") + # ===== + + @aiotools.atomic + async def ensure_start(self, init_restart: bool) -> None: + if not self.__streamer_task or self.__stop_task: + logger = get_logger(0) + + if self.__stop_task: + if not self.__stop_wip: + self.__stop_task.cancel() + await asyncio.gather(self.__stop_task, return_exceptions=True) + logger.info("Streamer stop cancelled") + return + else: + await asyncio.gather(self.__stop_task, return_exceptions=True) + + logger.info("Starting streamer ...") + await self.__inner_start() + if self.__init_restart_after > 0.0 and init_restart: + await asyncio.sleep(self.__init_restart_after) + logger.info("Stopping streamer to restart ...") + await self.__inner_stop() + logger.info("Starting again ...") + await self.__inner_start() + + @aiotools.atomic + async def ensure_stop(self, immediately: bool) -> None: + if self.__streamer_task: + logger = get_logger(0) + + if immediately: + if self.__stop_task: + if not self.__stop_wip: + self.__stop_task.cancel() + await asyncio.gather(self.__stop_task, return_exceptions=True) + logger.info("Stopping streamer immediately ...") + await self.__inner_stop() + else: + await asyncio.gather(self.__stop_task, return_exceptions=True) + else: + logger.info("Stopping streamer immediately ...") + await self.__inner_stop() + + elif not self.__stop_task: + async def delayed_stop() -> None: + try: + await asyncio.sleep(self.__shutdown_delay) + self.__stop_wip = True + logger.info("Stopping streamer after delay ...") + await self.__inner_stop() + finally: + self.__stop_task = None + self.__stop_wip = False + + logger.info("Planning to stop streamer in %.2f seconds ...", self.__shutdown_delay) + self.__stop_task = asyncio.create_task(delayed_stop()) + + def is_working(self) -> bool: + # Запущено и не планирует останавливаться + return bool(self.__streamer_task and not self.__stop_task) + + def set_params(self, params: Dict) -> None: + assert not self.__streamer_task self.__params = { key: min(max(params.get(key, self.__params[key]), a), b) for (key, a, b) in [ @@ -109,43 +172,27 @@ class Streamer: # pylint: disable=too-many-instance-attributes ] } - await self.__inner_start() - if self.__init_restart_after > 0.0 and not no_init_restart: - logger.info("Stopping streamer to restart ...") - await self.__inner_stop() - logger.info("Starting again ...") - await self.__inner_start() - - async def stop(self) -> None: - get_logger().info("Stopping streamer ...") - await self.__inner_stop() - - def is_running(self) -> bool: - return bool(self.__streamer_task) - - def get_params(self) -> Dict: - return dict(self.__params) - async def get_state(self) -> Dict: - session = self.__ensure_session() state = None - try: - async with session.get( - url=f"http://{self.__host}:{self.__port}/state", - headers={"User-Agent": f"KVMD/{__version__}"}, - timeout=self.__timeout, - ) as response: - response.raise_for_status() - state = (await response.json())["result"] - except (aiohttp.ClientConnectionError, aiohttp.ServerConnectionError): - pass - except asyncio.CancelledError: # pylint: disable=try-except-raise - raise - except Exception: - get_logger().exception("Invalid streamer response from /state") + if self.__streamer_task: + session = self.__ensure_session() + try: + async with session.get( + url=f"http://{self.__host}:{self.__port}/state", + headers={"User-Agent": f"KVMD/{__version__}"}, + timeout=self.__timeout, + ) as response: + response.raise_for_status() + state = (await response.json())["result"] + except (aiohttp.ClientConnectionError, aiohttp.ServerConnectionError): + pass + except asyncio.CancelledError: # pylint: disable=try-except-raise + raise + except Exception: + get_logger().exception("Invalid streamer response from /state") return { "limits": {"max_fps": self.__max_fps}, - "params": self.get_params(), + "params": self.__params, "state": state, } @@ -158,10 +205,7 @@ class Streamer: # pylint: disable=too-many-instance-attributes prev_state = state await asyncio.sleep(self.__state_poll) - def get_app(self) -> str: - return os.path.basename(self.__cmd[0]) - - async def get_version(self) -> str: + async def get_info(self) -> Dict: proc = await asyncio.create_subprocess_exec( *[self.__cmd[0], "--version"], stdout=asyncio.subprocess.PIPE, @@ -169,19 +213,23 @@ class Streamer: # pylint: disable=too-many-instance-attributes preexec_fn=(lambda: signal.signal(signal.SIGINT, signal.SIG_IGN)), ) (stdout, _) = await proc.communicate() - return stdout.decode(errors="ignore").strip() + return { + "app": os.path.basename(self.__cmd[0]), + "version": stdout.decode(errors="ignore").strip(), + } @aiotools.atomic async def cleanup(self) -> None: try: - if self.is_running(): - await self.stop() + await self.ensure_stop(immediately=True) if self.__http_session: await self.__http_session.close() self.__http_session = None finally: await self.__set_hw_enabled(False) + # ===== + def __ensure_session(self) -> aiohttp.ClientSession: if not self.__http_session: if self.__unix_path: @@ -190,11 +238,15 @@ class Streamer: # pylint: disable=too-many-instance-attributes self.__http_session = aiohttp.ClientSession() return self.__http_session + # ===== + + @aiotools.atomic async def __inner_start(self) -> None: assert not self.__streamer_task await self.__set_hw_enabled(True) self.__streamer_task = asyncio.create_task(self.__streamer_task_loop()) + @aiotools.atomic async def __inner_stop(self) -> None: assert self.__streamer_task self.__streamer_task.cancel() @@ -203,6 +255,7 @@ class Streamer: # pylint: disable=too-many-instance-attributes await self.__set_hw_enabled(False) self.__streamer_task = None + @aiotools.atomic async def __set_hw_enabled(self, enabled: bool) -> None: # XXX: This sequence is very important to enable converter and cap board if self.__cap_pin >= 0: @@ -214,9 +267,10 @@ class Streamer: # pylint: disable=too-many-instance-attributes if enabled: await asyncio.sleep(self.__init_delay) + # ===== + async def __streamer_task_loop(self) -> None: # pylint: disable=too-many-branches logger = get_logger(0) - while True: # pylint: disable=too-many-nested-blocks try: await self.__start_streamer_proc() |