diff options
-rw-r--r-- | kvmd/apps/kvmd/streamer.py | 80 |
1 files changed, 43 insertions, 37 deletions
diff --git a/kvmd/apps/kvmd/streamer.py b/kvmd/apps/kvmd/streamer.py index ff0d92f0..90c22356 100644 --- a/kvmd/apps/kvmd/streamer.py +++ b/kvmd/apps/kvmd/streamer.py @@ -87,6 +87,7 @@ class Streamer: # pylint: disable=too-many-instance-attributes self.__cmd = cmd self.__streamer_task: Optional[asyncio.Task] = None + self.__streamer_proc: Optional[asyncio.subprocess.Process] = None # pylint: disable=no-member self.__http_session: Optional[aiohttp.ClientSession] = None @@ -174,12 +175,13 @@ class Streamer: # pylint: disable=too-many-instance-attributes async def __inner_start(self) -> None: assert not self.__streamer_task await self.__set_hw_enabled(True) - self.__streamer_task = asyncio.create_task(self.__run_streamer()) + self.__streamer_task = asyncio.create_task(self.__streamer_task_loop()) async def __inner_stop(self) -> None: assert self.__streamer_task self.__streamer_task.cancel() await asyncio.gather(self.__streamer_task, return_exceptions=True) + await self.__kill_streamer_proc() await self.__set_hw_enabled(False) self.__streamer_task = None @@ -194,23 +196,15 @@ class Streamer: # pylint: disable=too-many-instance-attributes if enabled: await asyncio.sleep(self.__init_delay) - async def __run_streamer(self) -> None: # pylint: disable=too-many-branches + async def __streamer_task_loop(self) -> None: # pylint: disable=too-many-branches logger = get_logger(0) while True: # pylint: disable=too-many-nested-blocks - proc: Optional[asyncio.subprocess.Process] = None # pylint: disable=no-member try: - cmd = self.__make_cmd() - proc = await asyncio.create_subprocess_exec( - *cmd, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.STDOUT, - preexec_fn=(lambda: signal.signal(signal.SIGINT, signal.SIG_IGN)), - ) - logger.info("Started streamer pid=%d: %s", proc.pid, cmd) + await self.__start_streamer_proc() empty = 0 - async for line_bytes in proc.stdout: # type: ignore + async for line_bytes in self.__streamer_proc.stdout: # type: ignore line = line_bytes.decode(errors="ignore").strip() if line: logger.info("Streamer: %s", line) @@ -226,18 +220,16 @@ class Streamer: # pylint: disable=too-many-instance-attributes break except Exception as err: - if proc: - logger.exception("Unexpected streamer error: pid=%d", proc.pid) + if self.__streamer_proc: + logger.exception("Unexpected streamer error: pid=%d", self.__streamer_proc.pid) else: logger.exception("Can't start streamer: %s", err) + await self.__kill_streamer_proc() await asyncio.sleep(1) - finally: - if proc and proc.returncode is None: - await self.__kill(proc) - - def __make_cmd(self) -> List[str]: - return [ + async def __start_streamer_proc(self) -> None: + assert self.__streamer_proc is None + cmd = [ part.format( host=self.__host, port=self.__port, @@ -246,21 +238,35 @@ class Streamer: # pylint: disable=too-many-instance-attributes ) for part in self.__cmd ] + self.__streamer_proc = await asyncio.create_subprocess_exec( + *cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.STDOUT, + preexec_fn=(lambda: signal.signal(signal.SIGINT, signal.SIG_IGN)), + ) + get_logger(0).info("Started streamer pid=%d: %s", self.__streamer_proc.pid, cmd) - async def __kill(self, proc: asyncio.subprocess.Process) -> None: # pylint: disable=no-member - try: - proc.terminate() - await asyncio.sleep(1) - if proc.returncode is None: - try: - proc.kill() - except Exception: - if proc.returncode is not None: - raise - await proc.wait() - get_logger().info("Streamer killed: pid=%d; retcode=%d", proc.pid, proc.returncode) - except Exception: - if proc.returncode is None: - get_logger().exception("Can't kill streamer pid=%d", proc.pid) - else: - get_logger().info("Streamer killed: pid=%d; retcode=%d", proc.pid, proc.returncode) + async def __kill_streamer_proc(self) -> None: + logger = get_logger(0) + if self.__streamer_proc and self.__streamer_proc.returncode is None: + try: + self.__streamer_proc.terminate() + await asyncio.sleep(1) + if self.__streamer_proc.returncode is None: + try: + self.__streamer_proc.kill() + except Exception: + if self.__streamer_proc.returncode is not None: + raise + await self.__streamer_proc.wait() + logger.info("Streamer killed: pid=%d; retcode=%d", + self.__streamer_proc.pid, self.__streamer_proc.returncode) + except asyncio.CancelledError: + pass + except Exception: + if self.__streamer_proc.returncode is None: + logger.exception("Can't kill streamer pid=%d", self.__streamer_proc.pid) + else: + logger.info("Streamer killed: pid=%d; retcode=%d", + self.__streamer_proc.pid, self.__streamer_proc.returncode) + self.__streamer_proc = None |