summaryrefslogtreecommitdiff
path: root/kvmd
diff options
context:
space:
mode:
authorDevaev Maxim <[email protected]>2021-05-24 05:03:45 +0300
committerDevaev Maxim <[email protected]>2021-05-24 05:08:53 +0300
commit19a68887e4083755af9f3edcb59c69e89b34b6f7 (patch)
treeeee102f2cf1312121d8af606e0bdeabb507fb236 /kvmd
parent9cead6203295e55c25b9a011a6da93509d05e79f (diff)
janus runner draft
Diffstat (limited to 'kvmd')
-rw-r--r--kvmd/aioproc.py28
-rw-r--r--kvmd/apps/__init__.py25
-rw-r--r--kvmd/apps/janus/__init__.py44
-rw-r--r--kvmd/apps/janus/__main__.py24
-rw-r--r--kvmd/apps/janus/runner.py189
-rw-r--r--kvmd/apps/janus/stun.py183
-rw-r--r--kvmd/apps/kvmd/streamer.py24
7 files changed, 492 insertions, 25 deletions
diff --git a/kvmd/aioproc.py b/kvmd/aioproc.py
index 19925f10..d7d33f05 100644
--- a/kvmd/aioproc.py
+++ b/kvmd/aioproc.py
@@ -72,7 +72,7 @@ async def log_process(
if stdout:
log = (logger.info if proc.returncode == 0 else logger.error)
for line in stdout.split("\n"):
- log("Console: %s", line)
+ log("=> %s", line)
return proc
@@ -81,12 +81,34 @@ async def log_stdout_infinite(proc: asyncio.subprocess.Process, logger: logging.
async for line_bytes in proc.stdout: # type: ignore
line = line_bytes.decode(errors="ignore").strip()
if line:
- logger.info("Console: %s", line)
+ logger.info("=> %s", line)
empty = 0
else:
empty += 1
if empty == 100: # asyncio bug
- raise RuntimeError("asyncio process: too many empty lines")
+ raise RuntimeError("Asyncio process: too many empty lines")
+
+
+async def kill_process(proc: asyncio.subprocess.Process, wait: float, logger: logging.Logger) -> None: # pylint: disable=no-member
+ if proc.returncode is None:
+ try:
+ proc.terminate()
+ await asyncio.sleep(wait)
+ if proc.returncode is None:
+ try:
+ proc.kill()
+ except Exception:
+ if proc.returncode is not None:
+ raise
+ await proc.wait()
+ logger.info("Process killed: pid=%d; retcode=%d", proc.pid, proc.returncode)
+ except asyncio.CancelledError:
+ pass
+ except Exception:
+ if proc.returncode is None:
+ logger.exception("Can't kill process pid=%d", proc.pid)
+ else:
+ logger.info("Process killed: pid=%d; retcode=%d", proc.pid, proc.returncode)
def rename_process(suffix: str, prefix: str="kvmd") -> None:
diff --git a/kvmd/apps/__init__.py b/kvmd/apps/__init__.py
index 30992f33..4f6f160d 100644
--- a/kvmd/apps/__init__.py
+++ b/kvmd/apps/__init__.py
@@ -658,4 +658,29 @@ def _get_config_scheme() -> Dict:
},
},
},
+
+ "janus": {
+ "stun": {
+ "host": Option("stun.l.google.com", type=valid_ip_or_host, unpack_as="stun_host"),
+ "port": Option(19302, type=valid_port, unpack_as="stun_port"),
+ "timeout": Option(5.0, type=valid_float_f01, unpack_as="stun_timeout"),
+ },
+
+ "check": {
+ "interval": Option(10.0, type=valid_float_f01, unpack_as="check_interval"),
+ "retries": Option(5, type=valid_int_f1, unpack_as="check_retries"),
+ "retries_delay": Option(5.0, type=valid_float_f01, unpack_as="check_retries_delay"),
+ },
+
+ "cmd": Option([
+ "/usr/bin/janus",
+ "--disable-colors",
+ "--plugins-folder=/usr/lib/ustreamer/janus",
+ "--configs-folder=/etc/kvmd/janus",
+ "--interface={src_ip}",
+ "--stun-server={stun_host}:{stun_port}",
+ ], type=valid_command),
+ "cmd_remove": Option([], type=valid_options),
+ "cmd_append": Option([], type=valid_options),
+ },
}
diff --git a/kvmd/apps/janus/__init__.py b/kvmd/apps/janus/__init__.py
new file mode 100644
index 00000000..c3210aa1
--- /dev/null
+++ b/kvmd/apps/janus/__init__.py
@@ -0,0 +1,44 @@
+# ========================================================================== #
+# #
+# KVMD - The main Pi-KVM daemon. #
+# #
+# Copyright (C) 2018-2021 Maxim Devaev <[email protected]> #
+# #
+# This program is free software: you can redistribute it and/or modify #
+# it under the terms of the GNU General Public License as published by #
+# the Free Software Foundation, either version 3 of the License, or #
+# (at your option) any later version. #
+# #
+# This program is distributed in the hope that it will be useful, #
+# but WITHOUT ANY WARRANTY; without even the implied warranty of #
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
+# GNU General Public License for more details. #
+# #
+# You should have received a copy of the GNU General Public License #
+# along with this program. If not, see <https://www.gnu.org/licenses/>. #
+# #
+# ========================================================================== #
+
+
+from typing import List
+from typing import Optional
+
+from .. import init
+
+from .runner import JanusRunner
+
+
+# =====
+def main(argv: Optional[List[str]]=None) -> None:
+ config = init(
+ prog="kvmd-Janus",
+ description="Janus WebRTC Gateway Runner",
+ check_run=True,
+ argv=argv,
+ )[2].janus
+
+ JanusRunner(
+ **config.stun._unpack(),
+ **config.check._unpack(),
+ **config._unpack(ignore=["stun", "check"]),
+ ).run()
diff --git a/kvmd/apps/janus/__main__.py b/kvmd/apps/janus/__main__.py
new file mode 100644
index 00000000..5742376e
--- /dev/null
+++ b/kvmd/apps/janus/__main__.py
@@ -0,0 +1,24 @@
+# ========================================================================== #
+# #
+# KVMD - The main Pi-KVM daemon. #
+# #
+# Copyright (C) 2018-2021 Maxim Devaev <[email protected]> #
+# #
+# This program is free software: you can redistribute it and/or modify #
+# it under the terms of the GNU General Public License as published by #
+# the Free Software Foundation, either version 3 of the License, or #
+# (at your option) any later version. #
+# #
+# This program is distributed in the hope that it will be useful, #
+# but WITHOUT ANY WARRANTY; without even the implied warranty of #
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
+# GNU General Public License for more details. #
+# #
+# You should have received a copy of the GNU General Public License #
+# along with this program. If not, see <https://www.gnu.org/licenses/>. #
+# #
+# ========================================================================== #
+
+
+from . import main
+main()
diff --git a/kvmd/apps/janus/runner.py b/kvmd/apps/janus/runner.py
new file mode 100644
index 00000000..cbb4b582
--- /dev/null
+++ b/kvmd/apps/janus/runner.py
@@ -0,0 +1,189 @@
+import asyncio
+import asyncio.subprocess
+import socket
+import dataclasses
+
+from typing import Tuple
+from typing import List
+from typing import Optional
+
+import netifaces
+
+from ... import tools
+from ... import aiotools
+from ... import aioproc
+
+from ...logging import get_logger
+
+from .stun import stun_get_info
+
+
+# =====
[email protected](frozen=True)
+class _Netcfg:
+ nat_type: str = dataclasses.field(default="")
+ src_ip: str = dataclasses.field(default="")
+ ext_ip: str = dataclasses.field(default="")
+ stun_host: str = dataclasses.field(default="")
+ stun_port: int = dataclasses.field(default=0)
+
+
+# =====
+class JanusRunner:
+ def __init__( # pylint: disable=too-many-instance-attributes
+ self,
+ stun_host: str,
+ stun_port: int,
+ stun_timeout: float,
+
+ check_interval: int,
+ check_retries: int,
+ check_retries_delay: float,
+
+ cmd: List[str],
+ cmd_remove: List[str],
+ cmd_append: List[str],
+ ) -> None:
+
+ self.__stun_host = stun_host
+ self.__stun_port = stun_port
+ self.__stun_timeout = stun_timeout
+
+ self.__check_interval = check_interval
+ self.__check_retries = check_retries
+ self.__check_retries_delay = check_retries_delay
+
+ self.__cmd = tools.build_cmd(cmd, cmd_remove, cmd_append)
+
+ self.__janus_task: Optional[asyncio.Task] = None
+ self.__janus_proc: Optional[asyncio.subprocess.Process] = None # pylint: disable=no-member
+
+ def run(self) -> None:
+ logger = get_logger(0)
+ logger.info("Starting Janus Runner ...")
+ try:
+ asyncio.run(self.__run())
+ except (SystemExit, KeyboardInterrupt):
+ pass
+ logger.info("Bye-bye")
+
+ # =====
+
+ async def __run(self) -> None:
+ logger = get_logger(0)
+ try:
+ prev_netcfg: Optional[_Netcfg] = None
+ while True:
+ netcfg = _Netcfg()
+ for _ in range(self.__check_retries - 1):
+ netcfg = await self.__get_netcfg()
+ if netcfg.ext_ip:
+ break
+ await asyncio.sleep(self.__check_retries_delay)
+
+ if netcfg != prev_netcfg:
+ logger.info("Got new %s", netcfg)
+ if netcfg.src_ip and netcfg.ext_ip:
+ logger.info("Okay, restarting Janus ...")
+ await self.__stop_janus()
+ await self.__start_janus(netcfg)
+ else:
+ logger.error("Empty src_ip or ext_ip; stopping Janus ...")
+ await self.__stop_janus()
+ prev_netcfg = netcfg
+
+ await asyncio.sleep(self.__check_interval)
+ except: # noqa: E722
+ await self.__stop_janus()
+ raise
+
+ async def __get_netcfg(self) -> _Netcfg:
+ src_ip = (self.__get_default_ip() or "0.0.0.0")
+ (nat_type, ext_ip) = await self.__get_stun_info(src_ip)
+ return _Netcfg(nat_type, src_ip, ext_ip, self.__stun_host, self.__stun_port)
+
+ def __get_default_ip(self) -> str:
+ try:
+ gws = netifaces.gateways()
+ if "default" not in gws:
+ raise RuntimeError(f"No default gateway: {gws}")
+
+ iface = ""
+ for proto in [socket.AF_INET, socket.AF_INET6]:
+ if proto in gws["default"]:
+ iface = gws["default"][proto][1]
+ break
+ else:
+ raise RuntimeError(f"No iface for the gateway {gws['default']}")
+
+ for addr in netifaces.ifaddresses(iface).get(proto, []):
+ return addr["addr"]
+ except Exception as err:
+ get_logger().error("Can't get default IP: %s", tools.efmt(err))
+ return ""
+
+ async def __get_stun_info(self, src_ip: str) -> Tuple[str, str]:
+ try:
+ return (await stun_get_info(
+ stun_host=self.__stun_host,
+ stun_port=self.__stun_port,
+ src_ip=src_ip,
+ src_port=0,
+ timeout=self.__stun_timeout,
+ ))
+ except Exception as err:
+ get_logger().error("Can't get STUN info: %s", tools.efmt(err))
+ return ("", "")
+
+ # =====
+
+ @aiotools.atomic
+ async def __start_janus(self, netcfg: _Netcfg) -> None:
+ assert not self.__janus_task
+ self.__janus_task = asyncio.create_task(self.__janus_task_loop(netcfg))
+
+ @aiotools.atomic
+ async def __stop_janus(self) -> None:
+ if self.__janus_task:
+ self.__janus_task.cancel()
+ await asyncio.gather(self.__janus_task, return_exceptions=True)
+ await self.__kill_janus_proc()
+ self.__janus_task = None
+
+ # =====
+
+ async def __janus_task_loop(self, netcfg: _Netcfg) -> None: # pylint: disable=too-many-branches
+ logger = get_logger(0)
+ while True: # pylint: disable=too-many-nested-blocks
+ try:
+ await self.__start_janus_proc(netcfg)
+ assert self.__janus_proc is not None
+ await aioproc.log_stdout_infinite(self.__janus_proc, logger)
+ raise RuntimeError("Janus unexpectedly died")
+ except asyncio.CancelledError:
+ break
+ except Exception:
+ if self.__janus_proc:
+ logger.exception("Unexpected Janus error: pid=%d", self.__janus_proc.pid)
+ else:
+ logger.exception("Can't start Janus")
+ await self.__kill_janus_proc()
+ await asyncio.sleep(1)
+
+ async def __start_janus_proc(self, netcfg: _Netcfg) -> None:
+ assert self.__janus_proc is None
+ placeholders = {
+ key: str(value)
+ for (key, value) in dataclasses.asdict(netcfg).items()
+ }
+ cmd = [
+ part.format(**placeholders)
+ for part in self.__cmd
+ ]
+ self.__janus_proc = await aioproc.run_process(cmd)
+ get_logger(0).info("Started Janus pid=%d: %s", self.__janus_proc.pid, cmd)
+
+ async def __kill_janus_proc(self) -> None:
+ if self.__janus_proc:
+ await aioproc.kill_process(self.__janus_proc, 5, get_logger(0))
+ self.__janus_proc = None
diff --git a/kvmd/apps/janus/stun.py b/kvmd/apps/janus/stun.py
new file mode 100644
index 00000000..954f2d73
--- /dev/null
+++ b/kvmd/apps/janus/stun.py
@@ -0,0 +1,183 @@
+import socket
+import struct
+import secrets
+import dataclasses
+
+from typing import Tuple
+from typing import Dict
+from typing import Optional
+
+from ... import tools
+from ... import aiotools
+
+from ...logging import get_logger
+
+
+# =====
[email protected](frozen=True)
+class StunAddress:
+ ip: str
+ port: int
+
+
[email protected](frozen=True)
+class StunResponse:
+ ok: bool
+ ext: Optional[StunAddress] = dataclasses.field(default=None)
+ src: Optional[StunAddress] = dataclasses.field(default=None)
+ changed: Optional[StunAddress] = dataclasses.field(default=None)
+
+
+class StunNatType:
+ BLOCKED = "Blocked"
+ OPEN_INTERNET = "Open Internet"
+ SYMMETRIC_UDP_FW = "Symmetric UDP Firewall"
+ FULL_CONE_NAT = "Full Cone NAT"
+ RESTRICTED_NAT = "Restricted NAT"
+ RESTRICTED_PORT_NAT = "Restricted Port NAT"
+ SYMMETRIC_NAT = "Symmetric NAT"
+ CHANGED_ADDR_ERROR = "Error when testing on Changed-IP and Port"
+
+
+# =====
+async def stun_get_info(
+ stun_host: str,
+ stun_port: int,
+ src_ip: str,
+ src_port: int,
+ timeout: float,
+) -> Tuple[str, str]:
+
+ return (await aiotools.run_async(_stun_get_info, stun_host, stun_port, src_ip, src_port, timeout))
+
+
+def _stun_get_info(
+ stun_host: str,
+ stun_port: int,
+ src_ip: str,
+ src_port: int,
+ timeout: float,
+) -> Tuple[str, str]:
+
+ # Partially based on https://github.com/JohnVillalovos/pystun
+
+ (family, _, _, _, addr) = socket.getaddrinfo(src_ip, src_port, type=socket.SOCK_DGRAM)[0]
+ with socket.socket(family, socket.SOCK_DGRAM) as sock:
+ sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ sock.settimeout(timeout)
+ sock.bind(addr)
+ (nat_type, response) = _get_nat_type(
+ stun_host=stun_host,
+ stun_port=stun_port,
+ src_ip=src_ip,
+ sock=sock,
+ )
+ return (nat_type, (response.ext.ip if response.ext is not None else ""))
+
+
+def _get_nat_type( # pylint: disable=too-many-return-statements
+ stun_host: str,
+ stun_port: int,
+ src_ip: str,
+ sock: socket.socket,
+) -> Tuple[str, StunResponse]:
+
+ first = _stun_request("First probe", stun_host, stun_port, b"", sock)
+ if not first.ok:
+ return (StunNatType.BLOCKED, first)
+ if first.ext is None:
+ raise RuntimeError(f"Ext addr is None: {first}")
+
+ request = struct.pack(">HHI", 0x0003, 0x0004, 0x00000006) # Change-Request
+ response = _stun_request("Change request [ext_ip == src_ip]", stun_host, stun_port, request, sock)
+
+ if first.ext.ip == src_ip:
+ if response.ok:
+ return (StunNatType.OPEN_INTERNET, response)
+ return (StunNatType.SYMMETRIC_UDP_FW, response)
+
+ if response.ok:
+ return (StunNatType.FULL_CONE_NAT, response)
+
+ if first.changed is None:
+ raise RuntimeError(f"Changed addr is None: {first}")
+ response = _stun_request("Change request [ext_ip != src_ip]", first.changed.ip, first.changed.port, b"", sock)
+ if not response.ok:
+ return (StunNatType.CHANGED_ADDR_ERROR, response)
+
+ if response.ext == first.ext:
+ request = struct.pack(">HHI", 0x0003, 0x0004, 0x00000002)
+ response = _stun_request("Change port", first.changed.ip, stun_port, request, sock)
+ if response.ok:
+ return (StunNatType.RESTRICTED_NAT, response)
+ return (StunNatType.RESTRICTED_PORT_NAT, response)
+
+ return (StunNatType.SYMMETRIC_NAT, response)
+
+
+def _stun_request( # pylint: disable=too-many-locals
+ ctx: str,
+ host: str,
+ port: int,
+ request: bytes,
+ sock: socket.socket,
+) -> StunResponse:
+
+ # TODO: Support IPv6 and RFC 5389
+ # The first 4 bytes of the response are the Type (2) and Length (2)
+ # The 5th byte is Reserved
+ # The 6th byte is the Family: 0x01 = IPv4, 0x02 = IPv6
+ # The remaining bytes are the IP address. 32 bits for IPv4 or 128 bits for
+ # IPv6.
+ # More info at: https://tools.ietf.org/html/rfc3489#section-11.2.1
+ # And at: https://tools.ietf.org/html/rfc5389#section-15.1
+
+ trans_id = secrets.token_bytes(16)
+ request = struct.pack(">HH", 0x0001, len(request)) + trans_id + request # Bind Request
+
+ try:
+ sock.sendto(request, (host, port))
+ except Exception as err:
+ get_logger().error("%s: Can't send request: %s", ctx, tools.efmt(err))
+ return StunResponse(ok=False)
+ try:
+ response = sock.recvfrom(2048)[0]
+ except Exception as err:
+ get_logger().error("%s: Can't recv response: %s", ctx, tools.efmt(err))
+ return StunResponse(ok=False)
+
+ (response_type, payload_len) = struct.unpack(">HH", response[:4])
+ if response_type != 0x0101:
+ get_logger().error("%s: Invalid response type: %#.4x", ctx, response_type)
+ return StunResponse(ok=False)
+ if trans_id != response[4:20]:
+ get_logger().error("%s: Transaction ID mismatch")
+ return StunResponse(ok=False)
+
+ parsed: Dict[str, StunAddress] = {}
+ base = 20
+ remaining = payload_len
+ while remaining > 0:
+ (attr_type, attr_len) = struct.unpack(">HH", response[base:(base + 4)])
+ base += 4
+ field = {
+ 0x0001: "ext", # MAPPED-ADDRESS
+ 0x0004: "src", # SOURCE-ADDRESS
+ 0x0005: "changed", # CHANGED-ADDRESS
+ }.get(attr_type)
+ if field is not None:
+ parsed[field] = _parse_address(response[base:])
+ base += attr_len
+ remaining -= (4 + attr_len)
+ return StunResponse(ok=True, **parsed)
+
+
+def _parse_address(data: bytes) -> StunAddress:
+ family = data[1]
+ if family == 1:
+ parts = struct.unpack(">HBBBB", data[2:8])
+ return StunAddress(
+ ip=".".join(map(str, parts[1:])),
+ port=parts[0],
+ )
+ raise RuntimeError(f"Only IPv4 supported; received: {family}")
diff --git a/kvmd/apps/kvmd/streamer.py b/kvmd/apps/kvmd/streamer.py
index ea14e41a..7fb2944b 100644
--- a/kvmd/apps/kvmd/streamer.py
+++ b/kvmd/apps/kvmd/streamer.py
@@ -435,26 +435,6 @@ class Streamer: # pylint: disable=too-many-instance-attributes
get_logger(0).info("Started streamer pid=%d: %s", self.__streamer_proc.pid, cmd)
async def __kill_streamer_proc(self) -> None:
- logger = get_logger(0)
- if self.__streamer_proc and self.__streamer_proc.returncode is None:
- try:
- self.__streamer_proc.terminate()
- await asyncio.sleep(1)
- if self.__streamer_proc.returncode is None:
- try:
- self.__streamer_proc.kill()
- except Exception:
- if self.__streamer_proc.returncode is not None:
- raise
- await self.__streamer_proc.wait()
- logger.info("Streamer killed: pid=%d; retcode=%d",
- self.__streamer_proc.pid, self.__streamer_proc.returncode)
- except asyncio.CancelledError:
- pass
- except Exception:
- if self.__streamer_proc.returncode is None:
- logger.exception("Can't kill streamer pid=%d", self.__streamer_proc.pid)
- else:
- logger.info("Streamer killed: pid=%d; retcode=%d",
- self.__streamer_proc.pid, self.__streamer_proc.returncode)
+ if self.__streamer_proc:
+ await aioproc.kill_process(self.__streamer_proc, 1, get_logger(0))
self.__streamer_proc = None