summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kvmd/apps/kvmd/streamer.py80
1 files changed, 43 insertions, 37 deletions
diff --git a/kvmd/apps/kvmd/streamer.py b/kvmd/apps/kvmd/streamer.py
index ff0d92f0..90c22356 100644
--- a/kvmd/apps/kvmd/streamer.py
+++ b/kvmd/apps/kvmd/streamer.py
@@ -87,6 +87,7 @@ class Streamer: # pylint: disable=too-many-instance-attributes
self.__cmd = cmd
self.__streamer_task: Optional[asyncio.Task] = None
+ self.__streamer_proc: Optional[asyncio.subprocess.Process] = None # pylint: disable=no-member
self.__http_session: Optional[aiohttp.ClientSession] = None
@@ -174,12 +175,13 @@ class Streamer: # pylint: disable=too-many-instance-attributes
async def __inner_start(self) -> None:
assert not self.__streamer_task
await self.__set_hw_enabled(True)
- self.__streamer_task = asyncio.create_task(self.__run_streamer())
+ self.__streamer_task = asyncio.create_task(self.__streamer_task_loop())
async def __inner_stop(self) -> None:
assert self.__streamer_task
self.__streamer_task.cancel()
await asyncio.gather(self.__streamer_task, return_exceptions=True)
+ await self.__kill_streamer_proc()
await self.__set_hw_enabled(False)
self.__streamer_task = None
@@ -194,23 +196,15 @@ class Streamer: # pylint: disable=too-many-instance-attributes
if enabled:
await asyncio.sleep(self.__init_delay)
- async def __run_streamer(self) -> None: # pylint: disable=too-many-branches
+ async def __streamer_task_loop(self) -> None: # pylint: disable=too-many-branches
logger = get_logger(0)
while True: # pylint: disable=too-many-nested-blocks
- proc: Optional[asyncio.subprocess.Process] = None # pylint: disable=no-member
try:
- cmd = self.__make_cmd()
- proc = await asyncio.create_subprocess_exec(
- *cmd,
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.STDOUT,
- preexec_fn=(lambda: signal.signal(signal.SIGINT, signal.SIG_IGN)),
- )
- logger.info("Started streamer pid=%d: %s", proc.pid, cmd)
+ await self.__start_streamer_proc()
empty = 0
- async for line_bytes in proc.stdout: # type: ignore
+ async for line_bytes in self.__streamer_proc.stdout: # type: ignore
line = line_bytes.decode(errors="ignore").strip()
if line:
logger.info("Streamer: %s", line)
@@ -226,18 +220,16 @@ class Streamer: # pylint: disable=too-many-instance-attributes
break
except Exception as err:
- if proc:
- logger.exception("Unexpected streamer error: pid=%d", proc.pid)
+ if self.__streamer_proc:
+ logger.exception("Unexpected streamer error: pid=%d", self.__streamer_proc.pid)
else:
logger.exception("Can't start streamer: %s", err)
+ await self.__kill_streamer_proc()
await asyncio.sleep(1)
- finally:
- if proc and proc.returncode is None:
- await self.__kill(proc)
-
- def __make_cmd(self) -> List[str]:
- return [
+ async def __start_streamer_proc(self) -> None:
+ assert self.__streamer_proc is None
+ cmd = [
part.format(
host=self.__host,
port=self.__port,
@@ -246,21 +238,35 @@ class Streamer: # pylint: disable=too-many-instance-attributes
)
for part in self.__cmd
]
+ self.__streamer_proc = await asyncio.create_subprocess_exec(
+ *cmd,
+ stdout=asyncio.subprocess.PIPE,
+ stderr=asyncio.subprocess.STDOUT,
+ preexec_fn=(lambda: signal.signal(signal.SIGINT, signal.SIG_IGN)),
+ )
+ get_logger(0).info("Started streamer pid=%d: %s", self.__streamer_proc.pid, cmd)
- async def __kill(self, proc: asyncio.subprocess.Process) -> None: # pylint: disable=no-member
- try:
- proc.terminate()
- await asyncio.sleep(1)
- if proc.returncode is None:
- try:
- proc.kill()
- except Exception:
- if proc.returncode is not None:
- raise
- await proc.wait()
- get_logger().info("Streamer killed: pid=%d; retcode=%d", proc.pid, proc.returncode)
- except Exception:
- if proc.returncode is None:
- get_logger().exception("Can't kill streamer pid=%d", proc.pid)
- else:
- get_logger().info("Streamer killed: pid=%d; retcode=%d", proc.pid, proc.returncode)
+ async def __kill_streamer_proc(self) -> None:
+ logger = get_logger(0)
+ if self.__streamer_proc and self.__streamer_proc.returncode is None:
+ try:
+ self.__streamer_proc.terminate()
+ await asyncio.sleep(1)
+ if self.__streamer_proc.returncode is None:
+ try:
+ self.__streamer_proc.kill()
+ except Exception:
+ if self.__streamer_proc.returncode is not None:
+ raise
+ await self.__streamer_proc.wait()
+ logger.info("Streamer killed: pid=%d; retcode=%d",
+ self.__streamer_proc.pid, self.__streamer_proc.returncode)
+ except asyncio.CancelledError:
+ pass
+ except Exception:
+ if self.__streamer_proc.returncode is None:
+ logger.exception("Can't kill streamer pid=%d", self.__streamer_proc.pid)
+ else:
+ logger.info("Streamer killed: pid=%d; retcode=%d",
+ self.__streamer_proc.pid, self.__streamer_proc.returncode)
+ self.__streamer_proc = None