summaryrefslogtreecommitdiff
path: root/kvmd/streamer.py
diff options
context:
space:
mode:
authorDevaev Maxim <[email protected]>2018-09-26 02:57:24 +0300
committerDevaev Maxim <[email protected]>2018-09-26 02:57:24 +0300
commit81a5311349564a1016c4af2bf18ae872b650e85b (patch)
treed01fd027948494e22ae2c14334b98c1515e5e8a4 /kvmd/streamer.py
parentf3946f102fc167efdc53c73412b2c0d6ac6c72c5 (diff)
moved kvmd to the root
Diffstat (limited to 'kvmd/streamer.py')
-rw-r--r--kvmd/streamer.py151
1 files changed, 151 insertions, 0 deletions
diff --git a/kvmd/streamer.py b/kvmd/streamer.py
new file mode 100644
index 00000000..24097547
--- /dev/null
+++ b/kvmd/streamer.py
@@ -0,0 +1,151 @@
+import asyncio
+import asyncio.subprocess
+
+from typing import List
+from typing import Dict
+from typing import Optional
+
+from .logging import get_logger
+
+from . import gpio
+
+
+# =====
+class Streamer: # pylint: disable=too-many-instance-attributes
+ def __init__(
+ self,
+ cap_power: int,
+ conv_power: int,
+ sync_delay: float,
+ init_delay: float,
+ init_restart_after: float,
+ quality: int,
+ cmd: List[str],
+ loop: asyncio.AbstractEventLoop,
+ ) -> None:
+
+ self.__cap_power = (gpio.set_output(cap_power) if cap_power > 0 else cap_power)
+ self.__conv_power = (gpio.set_output(conv_power) if conv_power > 0 else conv_power)
+ self.__sync_delay = sync_delay
+ self.__init_delay = init_delay
+ self.__init_restart_after = init_restart_after
+ self.__quality = quality
+ self.__cmd = cmd
+
+ self.__loop = loop
+
+ self.__proc_task: Optional[asyncio.Task] = None
+
+ async def start(self, quality: int, no_init_restart: bool=False) -> None:
+ logger = get_logger()
+ logger.info("Starting streamer ...")
+ assert 1 <= quality <= 100
+ self.__quality = quality
+ await self.__inner_start()
+ if self.__init_restart_after > 0.0 and not no_init_restart:
+ logger.info("Stopping streamer to restart ...")
+ await self.__inner_stop()
+ logger.info("Starting again ...")
+ await self.__inner_start()
+
+ async def stop(self) -> None:
+ get_logger().info("Stopping streamer ...")
+ await self.__inner_stop()
+
+ def is_running(self) -> bool:
+ return bool(self.__proc_task)
+
+ def get_current_quality(self) -> int:
+ return self.__quality
+
+ def get_state(self) -> Dict:
+ return {
+ "is_running": self.is_running(),
+ "quality": self.__quality,
+ }
+
+ async def cleanup(self) -> None:
+ if self.is_running():
+ await self.stop()
+
+ async def __inner_start(self) -> None:
+ assert not self.__proc_task
+ await self.__set_hw_enabled(True)
+ self.__proc_task = self.__loop.create_task(self.__process())
+
+ async def __inner_stop(self) -> None:
+ assert self.__proc_task
+ self.__proc_task.cancel()
+ await asyncio.gather(self.__proc_task, return_exceptions=True)
+ await self.__set_hw_enabled(False)
+ self.__proc_task = None
+
+ async def __set_hw_enabled(self, enabled: bool) -> None:
+ # XXX: This sequence is very important to enable converter and cap board
+ if self.__cap_power > 0:
+ gpio.write(self.__cap_power, enabled)
+ if self.__conv_power > 0:
+ if enabled:
+ await asyncio.sleep(self.__sync_delay)
+ gpio.write(self.__conv_power, enabled)
+ if enabled:
+ await asyncio.sleep(self.__init_delay)
+
+ async def __process(self) -> None: # pylint: disable=too-many-branches
+ logger = get_logger(0)
+
+ while True: # pylint: disable=too-many-nested-blocks
+ proc: Optional[asyncio.subprocess.Process] = None # pylint: disable=no-member
+ try:
+ cmd = [part.format(quality=self.__quality) for part in self.__cmd]
+ proc = await asyncio.create_subprocess_exec(
+ *cmd,
+ stdout=asyncio.subprocess.PIPE,
+ stderr=asyncio.subprocess.STDOUT,
+ )
+ logger.info("Started streamer pid=%d: %s", proc.pid, cmd)
+
+ empty = 0
+ while proc.returncode is None:
+ line = (await proc.stdout.readline()).decode(errors="ignore").strip()
+ if line:
+ logger.info("streamer: %s", line)
+ empty = 0
+ else:
+ empty += 1
+ if empty == 100: # asyncio bug
+ break
+
+ raise RuntimeError("WTF")
+
+ except asyncio.CancelledError:
+ break
+
+ except Exception as err:
+ if proc:
+ logger.exception("Unexpected streamer error: pid=%d", proc.pid)
+ else:
+ logger.exception("Can't start streamer: %s", err)
+ await asyncio.sleep(1)
+
+ finally:
+ if proc and proc.returncode is None:
+ await self.__kill(proc)
+
+ async def __kill(self, proc: asyncio.subprocess.Process) -> None: # pylint: disable=no-member
+ try:
+ proc.terminate()
+ await asyncio.sleep(1)
+ if proc.returncode is None:
+ try:
+ proc.kill()
+ except Exception:
+ if proc.returncode is not None:
+ raise
+ await proc.wait()
+ get_logger().info("Streamer killed: pid=%d; retcode=%d", proc.pid, proc.returncode)
+ except Exception:
+ if proc.returncode is None:
+ get_logger().exception("Can't kill streamer pid=%d", proc.pid)
+ else:
+ get_logger().info("Streamer killed: pid=%d; retcode=%d", proc.pid, proc.returncode)