summaryrefslogtreecommitdiff
path: root/kvmd/apps/janus/runner.py
diff options
context:
space:
mode:
authorDevaev Maxim <[email protected]>2021-05-24 05:03:45 +0300
committerDevaev Maxim <[email protected]>2021-05-24 05:08:53 +0300
commit19a68887e4083755af9f3edcb59c69e89b34b6f7 (patch)
treeeee102f2cf1312121d8af606e0bdeabb507fb236 /kvmd/apps/janus/runner.py
parent9cead6203295e55c25b9a011a6da93509d05e79f (diff)
janus runner draft
Diffstat (limited to 'kvmd/apps/janus/runner.py')
-rw-r--r--kvmd/apps/janus/runner.py189
1 files changed, 189 insertions, 0 deletions
diff --git a/kvmd/apps/janus/runner.py b/kvmd/apps/janus/runner.py
new file mode 100644
index 00000000..cbb4b582
--- /dev/null
+++ b/kvmd/apps/janus/runner.py
@@ -0,0 +1,189 @@
+import asyncio
+import asyncio.subprocess
+import socket
+import dataclasses
+
+from typing import Tuple
+from typing import List
+from typing import Optional
+
+import netifaces
+
+from ... import tools
+from ... import aiotools
+from ... import aioproc
+
+from ...logging import get_logger
+
+from .stun import stun_get_info
+
+
+# =====
[email protected](frozen=True)
+class _Netcfg:
+ nat_type: str = dataclasses.field(default="")
+ src_ip: str = dataclasses.field(default="")
+ ext_ip: str = dataclasses.field(default="")
+ stun_host: str = dataclasses.field(default="")
+ stun_port: int = dataclasses.field(default=0)
+
+
+# =====
+class JanusRunner:
+ def __init__( # pylint: disable=too-many-instance-attributes
+ self,
+ stun_host: str,
+ stun_port: int,
+ stun_timeout: float,
+
+ check_interval: int,
+ check_retries: int,
+ check_retries_delay: float,
+
+ cmd: List[str],
+ cmd_remove: List[str],
+ cmd_append: List[str],
+ ) -> None:
+
+ self.__stun_host = stun_host
+ self.__stun_port = stun_port
+ self.__stun_timeout = stun_timeout
+
+ self.__check_interval = check_interval
+ self.__check_retries = check_retries
+ self.__check_retries_delay = check_retries_delay
+
+ self.__cmd = tools.build_cmd(cmd, cmd_remove, cmd_append)
+
+ self.__janus_task: Optional[asyncio.Task] = None
+ self.__janus_proc: Optional[asyncio.subprocess.Process] = None # pylint: disable=no-member
+
+ def run(self) -> None:
+ logger = get_logger(0)
+ logger.info("Starting Janus Runner ...")
+ try:
+ asyncio.run(self.__run())
+ except (SystemExit, KeyboardInterrupt):
+ pass
+ logger.info("Bye-bye")
+
+ # =====
+
+ async def __run(self) -> None:
+ logger = get_logger(0)
+ try:
+ prev_netcfg: Optional[_Netcfg] = None
+ while True:
+ netcfg = _Netcfg()
+ for _ in range(self.__check_retries - 1):
+ netcfg = await self.__get_netcfg()
+ if netcfg.ext_ip:
+ break
+ await asyncio.sleep(self.__check_retries_delay)
+
+ if netcfg != prev_netcfg:
+ logger.info("Got new %s", netcfg)
+ if netcfg.src_ip and netcfg.ext_ip:
+ logger.info("Okay, restarting Janus ...")
+ await self.__stop_janus()
+ await self.__start_janus(netcfg)
+ else:
+ logger.error("Empty src_ip or ext_ip; stopping Janus ...")
+ await self.__stop_janus()
+ prev_netcfg = netcfg
+
+ await asyncio.sleep(self.__check_interval)
+ except: # noqa: E722
+ await self.__stop_janus()
+ raise
+
+ async def __get_netcfg(self) -> _Netcfg:
+ src_ip = (self.__get_default_ip() or "0.0.0.0")
+ (nat_type, ext_ip) = await self.__get_stun_info(src_ip)
+ return _Netcfg(nat_type, src_ip, ext_ip, self.__stun_host, self.__stun_port)
+
+ def __get_default_ip(self) -> str:
+ try:
+ gws = netifaces.gateways()
+ if "default" not in gws:
+ raise RuntimeError(f"No default gateway: {gws}")
+
+ iface = ""
+ for proto in [socket.AF_INET, socket.AF_INET6]:
+ if proto in gws["default"]:
+ iface = gws["default"][proto][1]
+ break
+ else:
+ raise RuntimeError(f"No iface for the gateway {gws['default']}")
+
+ for addr in netifaces.ifaddresses(iface).get(proto, []):
+ return addr["addr"]
+ except Exception as err:
+ get_logger().error("Can't get default IP: %s", tools.efmt(err))
+ return ""
+
+ async def __get_stun_info(self, src_ip: str) -> Tuple[str, str]:
+ try:
+ return (await stun_get_info(
+ stun_host=self.__stun_host,
+ stun_port=self.__stun_port,
+ src_ip=src_ip,
+ src_port=0,
+ timeout=self.__stun_timeout,
+ ))
+ except Exception as err:
+ get_logger().error("Can't get STUN info: %s", tools.efmt(err))
+ return ("", "")
+
+ # =====
+
+ @aiotools.atomic
+ async def __start_janus(self, netcfg: _Netcfg) -> None:
+ assert not self.__janus_task
+ self.__janus_task = asyncio.create_task(self.__janus_task_loop(netcfg))
+
+ @aiotools.atomic
+ async def __stop_janus(self) -> None:
+ if self.__janus_task:
+ self.__janus_task.cancel()
+ await asyncio.gather(self.__janus_task, return_exceptions=True)
+ await self.__kill_janus_proc()
+ self.__janus_task = None
+
+ # =====
+
+ async def __janus_task_loop(self, netcfg: _Netcfg) -> None: # pylint: disable=too-many-branches
+ logger = get_logger(0)
+ while True: # pylint: disable=too-many-nested-blocks
+ try:
+ await self.__start_janus_proc(netcfg)
+ assert self.__janus_proc is not None
+ await aioproc.log_stdout_infinite(self.__janus_proc, logger)
+ raise RuntimeError("Janus unexpectedly died")
+ except asyncio.CancelledError:
+ break
+ except Exception:
+ if self.__janus_proc:
+ logger.exception("Unexpected Janus error: pid=%d", self.__janus_proc.pid)
+ else:
+ logger.exception("Can't start Janus")
+ await self.__kill_janus_proc()
+ await asyncio.sleep(1)
+
+ async def __start_janus_proc(self, netcfg: _Netcfg) -> None:
+ assert self.__janus_proc is None
+ placeholders = {
+ key: str(value)
+ for (key, value) in dataclasses.asdict(netcfg).items()
+ }
+ cmd = [
+ part.format(**placeholders)
+ for part in self.__cmd
+ ]
+ self.__janus_proc = await aioproc.run_process(cmd)
+ get_logger(0).info("Started Janus pid=%d: %s", self.__janus_proc.pid, cmd)
+
+ async def __kill_janus_proc(self) -> None:
+ if self.__janus_proc:
+ await aioproc.kill_process(self.__janus_proc, 5, get_logger(0))
+ self.__janus_proc = None