diff options
author | Devaev Maxim <[email protected]> | 2021-05-24 05:03:45 +0300 |
---|---|---|
committer | Devaev Maxim <[email protected]> | 2021-05-24 05:08:53 +0300 |
commit | 19a68887e4083755af9f3edcb59c69e89b34b6f7 (patch) | |
tree | eee102f2cf1312121d8af606e0bdeabb507fb236 /kvmd/apps/janus | |
parent | 9cead6203295e55c25b9a011a6da93509d05e79f (diff) |
janus runner draft
Diffstat (limited to 'kvmd/apps/janus')
-rw-r--r-- | kvmd/apps/janus/__init__.py | 44 | ||||
-rw-r--r-- | kvmd/apps/janus/__main__.py | 24 | ||||
-rw-r--r-- | kvmd/apps/janus/runner.py | 189 | ||||
-rw-r--r-- | kvmd/apps/janus/stun.py | 183 |
4 files changed, 440 insertions, 0 deletions
diff --git a/kvmd/apps/janus/__init__.py b/kvmd/apps/janus/__init__.py new file mode 100644 index 00000000..c3210aa1 --- /dev/null +++ b/kvmd/apps/janus/__init__.py @@ -0,0 +1,44 @@ +# ========================================================================== # +# # +# KVMD - The main Pi-KVM daemon. # +# # +# Copyright (C) 2018-2021 Maxim Devaev <[email protected]> # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see <https://www.gnu.org/licenses/>. # +# # +# ========================================================================== # + + +from typing import List +from typing import Optional + +from .. import init + +from .runner import JanusRunner + + +# ===== +def main(argv: Optional[List[str]]=None) -> None: + config = init( + prog="kvmd-Janus", + description="Janus WebRTC Gateway Runner", + check_run=True, + argv=argv, + )[2].janus + + JanusRunner( + **config.stun._unpack(), + **config.check._unpack(), + **config._unpack(ignore=["stun", "check"]), + ).run() diff --git a/kvmd/apps/janus/__main__.py b/kvmd/apps/janus/__main__.py new file mode 100644 index 00000000..5742376e --- /dev/null +++ b/kvmd/apps/janus/__main__.py @@ -0,0 +1,24 @@ +# ========================================================================== # +# # +# KVMD - The main Pi-KVM daemon. # +# # +# Copyright (C) 2018-2021 Maxim Devaev <[email protected]> # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see <https://www.gnu.org/licenses/>. # +# # +# ========================================================================== # + + +from . import main +main() diff --git a/kvmd/apps/janus/runner.py b/kvmd/apps/janus/runner.py new file mode 100644 index 00000000..cbb4b582 --- /dev/null +++ b/kvmd/apps/janus/runner.py @@ -0,0 +1,189 @@ +import asyncio +import asyncio.subprocess +import socket +import dataclasses + +from typing import Tuple +from typing import List +from typing import Optional + +import netifaces + +from ... import tools +from ... import aiotools +from ... import aioproc + +from ...logging import get_logger + +from .stun import stun_get_info + + +# ===== [email protected](frozen=True) +class _Netcfg: + nat_type: str = dataclasses.field(default="") + src_ip: str = dataclasses.field(default="") + ext_ip: str = dataclasses.field(default="") + stun_host: str = dataclasses.field(default="") + stun_port: int = dataclasses.field(default=0) + + +# ===== +class JanusRunner: + def __init__( # pylint: disable=too-many-instance-attributes + self, + stun_host: str, + stun_port: int, + stun_timeout: float, + + check_interval: int, + check_retries: int, + check_retries_delay: float, + + cmd: List[str], + cmd_remove: List[str], + cmd_append: List[str], + ) -> None: + + self.__stun_host = stun_host + self.__stun_port = stun_port + self.__stun_timeout = stun_timeout + + self.__check_interval = check_interval + self.__check_retries = check_retries + self.__check_retries_delay = check_retries_delay + + self.__cmd = tools.build_cmd(cmd, cmd_remove, cmd_append) + + self.__janus_task: Optional[asyncio.Task] = None + self.__janus_proc: Optional[asyncio.subprocess.Process] = None # pylint: disable=no-member + + def run(self) -> None: + logger = get_logger(0) + logger.info("Starting Janus Runner ...") + try: + asyncio.run(self.__run()) + except (SystemExit, KeyboardInterrupt): + pass + logger.info("Bye-bye") + + # ===== + + async def __run(self) -> None: + logger = get_logger(0) + try: + prev_netcfg: Optional[_Netcfg] = None + while True: + netcfg = _Netcfg() + for _ in range(self.__check_retries - 1): + netcfg = await self.__get_netcfg() + if netcfg.ext_ip: + break + await asyncio.sleep(self.__check_retries_delay) + + if netcfg != prev_netcfg: + logger.info("Got new %s", netcfg) + if netcfg.src_ip and netcfg.ext_ip: + logger.info("Okay, restarting Janus ...") + await self.__stop_janus() + await self.__start_janus(netcfg) + else: + logger.error("Empty src_ip or ext_ip; stopping Janus ...") + await self.__stop_janus() + prev_netcfg = netcfg + + await asyncio.sleep(self.__check_interval) + except: # noqa: E722 + await self.__stop_janus() + raise + + async def __get_netcfg(self) -> _Netcfg: + src_ip = (self.__get_default_ip() or "0.0.0.0") + (nat_type, ext_ip) = await self.__get_stun_info(src_ip) + return _Netcfg(nat_type, src_ip, ext_ip, self.__stun_host, self.__stun_port) + + def __get_default_ip(self) -> str: + try: + gws = netifaces.gateways() + if "default" not in gws: + raise RuntimeError(f"No default gateway: {gws}") + + iface = "" + for proto in [socket.AF_INET, socket.AF_INET6]: + if proto in gws["default"]: + iface = gws["default"][proto][1] + break + else: + raise RuntimeError(f"No iface for the gateway {gws['default']}") + + for addr in netifaces.ifaddresses(iface).get(proto, []): + return addr["addr"] + except Exception as err: + get_logger().error("Can't get default IP: %s", tools.efmt(err)) + return "" + + async def __get_stun_info(self, src_ip: str) -> Tuple[str, str]: + try: + return (await stun_get_info( + stun_host=self.__stun_host, + stun_port=self.__stun_port, + src_ip=src_ip, + src_port=0, + timeout=self.__stun_timeout, + )) + except Exception as err: + get_logger().error("Can't get STUN info: %s", tools.efmt(err)) + return ("", "") + + # ===== + + @aiotools.atomic + async def __start_janus(self, netcfg: _Netcfg) -> None: + assert not self.__janus_task + self.__janus_task = asyncio.create_task(self.__janus_task_loop(netcfg)) + + @aiotools.atomic + async def __stop_janus(self) -> None: + if self.__janus_task: + self.__janus_task.cancel() + await asyncio.gather(self.__janus_task, return_exceptions=True) + await self.__kill_janus_proc() + self.__janus_task = None + + # ===== + + async def __janus_task_loop(self, netcfg: _Netcfg) -> None: # pylint: disable=too-many-branches + logger = get_logger(0) + while True: # pylint: disable=too-many-nested-blocks + try: + await self.__start_janus_proc(netcfg) + assert self.__janus_proc is not None + await aioproc.log_stdout_infinite(self.__janus_proc, logger) + raise RuntimeError("Janus unexpectedly died") + except asyncio.CancelledError: + break + except Exception: + if self.__janus_proc: + logger.exception("Unexpected Janus error: pid=%d", self.__janus_proc.pid) + else: + logger.exception("Can't start Janus") + await self.__kill_janus_proc() + await asyncio.sleep(1) + + async def __start_janus_proc(self, netcfg: _Netcfg) -> None: + assert self.__janus_proc is None + placeholders = { + key: str(value) + for (key, value) in dataclasses.asdict(netcfg).items() + } + cmd = [ + part.format(**placeholders) + for part in self.__cmd + ] + self.__janus_proc = await aioproc.run_process(cmd) + get_logger(0).info("Started Janus pid=%d: %s", self.__janus_proc.pid, cmd) + + async def __kill_janus_proc(self) -> None: + if self.__janus_proc: + await aioproc.kill_process(self.__janus_proc, 5, get_logger(0)) + self.__janus_proc = None diff --git a/kvmd/apps/janus/stun.py b/kvmd/apps/janus/stun.py new file mode 100644 index 00000000..954f2d73 --- /dev/null +++ b/kvmd/apps/janus/stun.py @@ -0,0 +1,183 @@ +import socket +import struct +import secrets +import dataclasses + +from typing import Tuple +from typing import Dict +from typing import Optional + +from ... import tools +from ... import aiotools + +from ...logging import get_logger + + +# ===== [email protected](frozen=True) +class StunAddress: + ip: str + port: int + + [email protected](frozen=True) +class StunResponse: + ok: bool + ext: Optional[StunAddress] = dataclasses.field(default=None) + src: Optional[StunAddress] = dataclasses.field(default=None) + changed: Optional[StunAddress] = dataclasses.field(default=None) + + +class StunNatType: + BLOCKED = "Blocked" + OPEN_INTERNET = "Open Internet" + SYMMETRIC_UDP_FW = "Symmetric UDP Firewall" + FULL_CONE_NAT = "Full Cone NAT" + RESTRICTED_NAT = "Restricted NAT" + RESTRICTED_PORT_NAT = "Restricted Port NAT" + SYMMETRIC_NAT = "Symmetric NAT" + CHANGED_ADDR_ERROR = "Error when testing on Changed-IP and Port" + + +# ===== +async def stun_get_info( + stun_host: str, + stun_port: int, + src_ip: str, + src_port: int, + timeout: float, +) -> Tuple[str, str]: + + return (await aiotools.run_async(_stun_get_info, stun_host, stun_port, src_ip, src_port, timeout)) + + +def _stun_get_info( + stun_host: str, + stun_port: int, + src_ip: str, + src_port: int, + timeout: float, +) -> Tuple[str, str]: + + # Partially based on https://github.com/JohnVillalovos/pystun + + (family, _, _, _, addr) = socket.getaddrinfo(src_ip, src_port, type=socket.SOCK_DGRAM)[0] + with socket.socket(family, socket.SOCK_DGRAM) as sock: + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + sock.settimeout(timeout) + sock.bind(addr) + (nat_type, response) = _get_nat_type( + stun_host=stun_host, + stun_port=stun_port, + src_ip=src_ip, + sock=sock, + ) + return (nat_type, (response.ext.ip if response.ext is not None else "")) + + +def _get_nat_type( # pylint: disable=too-many-return-statements + stun_host: str, + stun_port: int, + src_ip: str, + sock: socket.socket, +) -> Tuple[str, StunResponse]: + + first = _stun_request("First probe", stun_host, stun_port, b"", sock) + if not first.ok: + return (StunNatType.BLOCKED, first) + if first.ext is None: + raise RuntimeError(f"Ext addr is None: {first}") + + request = struct.pack(">HHI", 0x0003, 0x0004, 0x00000006) # Change-Request + response = _stun_request("Change request [ext_ip == src_ip]", stun_host, stun_port, request, sock) + + if first.ext.ip == src_ip: + if response.ok: + return (StunNatType.OPEN_INTERNET, response) + return (StunNatType.SYMMETRIC_UDP_FW, response) + + if response.ok: + return (StunNatType.FULL_CONE_NAT, response) + + if first.changed is None: + raise RuntimeError(f"Changed addr is None: {first}") + response = _stun_request("Change request [ext_ip != src_ip]", first.changed.ip, first.changed.port, b"", sock) + if not response.ok: + return (StunNatType.CHANGED_ADDR_ERROR, response) + + if response.ext == first.ext: + request = struct.pack(">HHI", 0x0003, 0x0004, 0x00000002) + response = _stun_request("Change port", first.changed.ip, stun_port, request, sock) + if response.ok: + return (StunNatType.RESTRICTED_NAT, response) + return (StunNatType.RESTRICTED_PORT_NAT, response) + + return (StunNatType.SYMMETRIC_NAT, response) + + +def _stun_request( # pylint: disable=too-many-locals + ctx: str, + host: str, + port: int, + request: bytes, + sock: socket.socket, +) -> StunResponse: + + # TODO: Support IPv6 and RFC 5389 + # The first 4 bytes of the response are the Type (2) and Length (2) + # The 5th byte is Reserved + # The 6th byte is the Family: 0x01 = IPv4, 0x02 = IPv6 + # The remaining bytes are the IP address. 32 bits for IPv4 or 128 bits for + # IPv6. + # More info at: https://tools.ietf.org/html/rfc3489#section-11.2.1 + # And at: https://tools.ietf.org/html/rfc5389#section-15.1 + + trans_id = secrets.token_bytes(16) + request = struct.pack(">HH", 0x0001, len(request)) + trans_id + request # Bind Request + + try: + sock.sendto(request, (host, port)) + except Exception as err: + get_logger().error("%s: Can't send request: %s", ctx, tools.efmt(err)) + return StunResponse(ok=False) + try: + response = sock.recvfrom(2048)[0] + except Exception as err: + get_logger().error("%s: Can't recv response: %s", ctx, tools.efmt(err)) + return StunResponse(ok=False) + + (response_type, payload_len) = struct.unpack(">HH", response[:4]) + if response_type != 0x0101: + get_logger().error("%s: Invalid response type: %#.4x", ctx, response_type) + return StunResponse(ok=False) + if trans_id != response[4:20]: + get_logger().error("%s: Transaction ID mismatch") + return StunResponse(ok=False) + + parsed: Dict[str, StunAddress] = {} + base = 20 + remaining = payload_len + while remaining > 0: + (attr_type, attr_len) = struct.unpack(">HH", response[base:(base + 4)]) + base += 4 + field = { + 0x0001: "ext", # MAPPED-ADDRESS + 0x0004: "src", # SOURCE-ADDRESS + 0x0005: "changed", # CHANGED-ADDRESS + }.get(attr_type) + if field is not None: + parsed[field] = _parse_address(response[base:]) + base += attr_len + remaining -= (4 + attr_len) + return StunResponse(ok=True, **parsed) + + +def _parse_address(data: bytes) -> StunAddress: + family = data[1] + if family == 1: + parts = struct.unpack(">HBBBB", data[2:8]) + return StunAddress( + ip=".".join(map(str, parts[1:])), + port=parts[0], + ) + raise RuntimeError(f"Only IPv4 supported; received: {family}") |