diff options
-rw-r--r-- | kvmd/apps/__init__.py | 8 | ||||
-rw-r--r-- | kvmd/apps/janus/runner.py | 29 | ||||
-rw-r--r-- | kvmd/apps/janus/stun.py | 268 |
3 files changed, 149 insertions, 156 deletions
diff --git a/kvmd/apps/__init__.py b/kvmd/apps/__init__.py index 4f6f160d..9fc0c2bb 100644 --- a/kvmd/apps/__init__.py +++ b/kvmd/apps/__init__.py @@ -661,9 +661,11 @@ def _get_config_scheme() -> Dict: "janus": { "stun": { - "host": Option("stun.l.google.com", type=valid_ip_or_host, unpack_as="stun_host"), - "port": Option(19302, type=valid_port, unpack_as="stun_port"), - "timeout": Option(5.0, type=valid_float_f01, unpack_as="stun_timeout"), + "host": Option("stun.l.google.com", type=valid_ip_or_host, unpack_as="stun_host"), + "port": Option(19302, type=valid_port, unpack_as="stun_port"), + "timeout": Option(5.0, type=valid_float_f01, unpack_as="stun_timeout"), + "retries": Option(5, type=valid_int_f1, unpack_as="stun_retries"), + "retries_delay": Option(5.0, type=valid_float_f01, unpack_as="stun_retries_delay"), }, "check": { diff --git a/kvmd/apps/janus/runner.py b/kvmd/apps/janus/runner.py index 69d7a33b..d590f368 100644 --- a/kvmd/apps/janus/runner.py +++ b/kvmd/apps/janus/runner.py @@ -15,7 +15,7 @@ from ... import aioproc from ...logging import get_logger -from .stun import stun_get_info +from .stun import Stun # ===== @@ -35,6 +35,8 @@ class JanusRunner: # pylint: disable=too-many-instance-attributes stun_host: str, stun_port: int, stun_timeout: float, + stun_retries: int, + stun_retries_delay: float, check_interval: int, check_retries: int, @@ -45,9 +47,7 @@ class JanusRunner: # pylint: disable=too-many-instance-attributes cmd_append: List[str], ) -> None: - self.__stun_host = stun_host - self.__stun_port = stun_port - self.__stun_timeout = stun_timeout + self.__stun = Stun(stun_host, stun_port, stun_timeout, stun_retries, stun_retries_delay) self.__check_interval = check_interval self.__check_retries = check_retries @@ -74,12 +74,15 @@ class JanusRunner: # pylint: disable=too-many-instance-attributes try: prev_netcfg: Optional[_Netcfg] = None while True: + retry = 0 netcfg = _Netcfg() - for _ in range(self.__check_retries - 1): + for retry in range(self.__check_retries): netcfg = await self.__get_netcfg() if netcfg.ext_ip: break await asyncio.sleep(self.__check_retries_delay) + if retry != 0 and netcfg.ext_ip: + logger.info("I'm fine, continue working ...") if netcfg != prev_netcfg: logger.info("Got new %s", netcfg) @@ -99,8 +102,8 @@ class JanusRunner: # pylint: disable=too-many-instance-attributes async def __get_netcfg(self) -> _Netcfg: src_ip = (self.__get_default_ip() or "0.0.0.0") - (nat_type, ext_ip) = await self.__get_stun_info(src_ip) - return _Netcfg(nat_type, src_ip, ext_ip, self.__stun_host, self.__stun_port) + (stun, (nat_type, ext_ip)) = await self.__get_stun_info(src_ip) + return _Netcfg(nat_type, src_ip, ext_ip, stun.host, stun.port) def __get_default_ip(self) -> str: try: @@ -122,18 +125,12 @@ class JanusRunner: # pylint: disable=too-many-instance-attributes get_logger().error("Can't get default IP: %s", tools.efmt(err)) return "" - async def __get_stun_info(self, src_ip: str) -> Tuple[str, str]: + async def __get_stun_info(self, src_ip: str) -> Tuple[Stun, Tuple[str, str]]: try: - return (await stun_get_info( - stun_host=self.__stun_host, - stun_port=self.__stun_port, - src_ip=src_ip, - src_port=0, - timeout=self.__stun_timeout, - )) + return (self.__stun, (await self.__stun.get_info(src_ip, 0))) except Exception as err: get_logger().error("Can't get STUN info: %s", tools.efmt(err)) - return ("", "") + return (self.__stun, ("", "")) # ===== diff --git a/kvmd/apps/janus/stun.py b/kvmd/apps/janus/stun.py index 954f2d73..9a54e5df 100644 --- a/kvmd/apps/janus/stun.py +++ b/kvmd/apps/janus/stun.py @@ -1,3 +1,4 @@ +import asyncio import socket import struct import secrets @@ -40,144 +41,137 @@ class StunNatType: # ===== -async def stun_get_info( - stun_host: str, - stun_port: int, - src_ip: str, - src_port: int, - timeout: float, -) -> Tuple[str, str]: - - return (await aiotools.run_async(_stun_get_info, stun_host, stun_port, src_ip, src_port, timeout)) - - -def _stun_get_info( - stun_host: str, - stun_port: int, - src_ip: str, - src_port: int, - timeout: float, -) -> Tuple[str, str]: - +class Stun: # Partially based on https://github.com/JohnVillalovos/pystun - (family, _, _, _, addr) = socket.getaddrinfo(src_ip, src_port, type=socket.SOCK_DGRAM)[0] - with socket.socket(family, socket.SOCK_DGRAM) as sock: - sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - sock.settimeout(timeout) - sock.bind(addr) - (nat_type, response) = _get_nat_type( - stun_host=stun_host, - stun_port=stun_port, - src_ip=src_ip, - sock=sock, - ) - return (nat_type, (response.ext.ip if response.ext is not None else "")) - - -def _get_nat_type( # pylint: disable=too-many-return-statements - stun_host: str, - stun_port: int, - src_ip: str, - sock: socket.socket, -) -> Tuple[str, StunResponse]: - - first = _stun_request("First probe", stun_host, stun_port, b"", sock) - if not first.ok: - return (StunNatType.BLOCKED, first) - if first.ext is None: - raise RuntimeError(f"Ext addr is None: {first}") - - request = struct.pack(">HHI", 0x0003, 0x0004, 0x00000006) # Change-Request - response = _stun_request("Change request [ext_ip == src_ip]", stun_host, stun_port, request, sock) - - if first.ext.ip == src_ip: - if response.ok: - return (StunNatType.OPEN_INTERNET, response) - return (StunNatType.SYMMETRIC_UDP_FW, response) - - if response.ok: - return (StunNatType.FULL_CONE_NAT, response) - - if first.changed is None: - raise RuntimeError(f"Changed addr is None: {first}") - response = _stun_request("Change request [ext_ip != src_ip]", first.changed.ip, first.changed.port, b"", sock) - if not response.ok: - return (StunNatType.CHANGED_ADDR_ERROR, response) + def __init__( + self, + host: str, + port: int, + timeout: float, + retries: int, + retries_delay: float, + ) -> None: + + self.host = host + self.port = port + self.__timeout = timeout + self.__retries = retries + self.__retries_delay = retries_delay + + self.__sock: Optional[socket.socket] = None + + async def get_info(self, src_ip: str, src_port: int) -> Tuple[str, str]: + + (family, _, _, _, addr) = socket.getaddrinfo(src_ip, src_port, type=socket.SOCK_DGRAM)[0] + try: + with socket.socket(family, socket.SOCK_DGRAM) as self.__sock: + self.__sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.__sock.settimeout(self.__timeout) + self.__sock.bind(addr) + (nat_type, response) = await self.__get_nat_type(src_ip) + return (nat_type, (response.ext.ip if response.ext is not None else "")) + finally: + self.__sock = None + + async def __get_nat_type(self, src_ip: str) -> Tuple[str, StunResponse]: # pylint: disable=too-many-return-statements + first = await self.__make_request("First probe") + if not first.ok: + return (StunNatType.BLOCKED, first) + + request = struct.pack(">HHI", 0x0003, 0x0004, 0x00000006) # Change-Request + response = await self.__make_request("Change request [ext_ip == src_ip]", request) + + if first.ext is not None and first.ext.ip == src_ip: + if response.ok: + return (StunNatType.OPEN_INTERNET, response) + return (StunNatType.SYMMETRIC_UDP_FW, response) - if response.ext == first.ext: - request = struct.pack(">HHI", 0x0003, 0x0004, 0x00000002) - response = _stun_request("Change port", first.changed.ip, stun_port, request, sock) if response.ok: - return (StunNatType.RESTRICTED_NAT, response) - return (StunNatType.RESTRICTED_PORT_NAT, response) - - return (StunNatType.SYMMETRIC_NAT, response) - - -def _stun_request( # pylint: disable=too-many-locals - ctx: str, - host: str, - port: int, - request: bytes, - sock: socket.socket, -) -> StunResponse: - - # TODO: Support IPv6 and RFC 5389 - # The first 4 bytes of the response are the Type (2) and Length (2) - # The 5th byte is Reserved - # The 6th byte is the Family: 0x01 = IPv4, 0x02 = IPv6 - # The remaining bytes are the IP address. 32 bits for IPv4 or 128 bits for - # IPv6. - # More info at: https://tools.ietf.org/html/rfc3489#section-11.2.1 - # And at: https://tools.ietf.org/html/rfc5389#section-15.1 - - trans_id = secrets.token_bytes(16) - request = struct.pack(">HH", 0x0001, len(request)) + trans_id + request # Bind Request - - try: - sock.sendto(request, (host, port)) - except Exception as err: - get_logger().error("%s: Can't send request: %s", ctx, tools.efmt(err)) - return StunResponse(ok=False) - try: - response = sock.recvfrom(2048)[0] - except Exception as err: - get_logger().error("%s: Can't recv response: %s", ctx, tools.efmt(err)) - return StunResponse(ok=False) - - (response_type, payload_len) = struct.unpack(">HH", response[:4]) - if response_type != 0x0101: - get_logger().error("%s: Invalid response type: %#.4x", ctx, response_type) - return StunResponse(ok=False) - if trans_id != response[4:20]: - get_logger().error("%s: Transaction ID mismatch") - return StunResponse(ok=False) - - parsed: Dict[str, StunAddress] = {} - base = 20 - remaining = payload_len - while remaining > 0: - (attr_type, attr_len) = struct.unpack(">HH", response[base:(base + 4)]) - base += 4 - field = { - 0x0001: "ext", # MAPPED-ADDRESS - 0x0004: "src", # SOURCE-ADDRESS - 0x0005: "changed", # CHANGED-ADDRESS - }.get(attr_type) - if field is not None: - parsed[field] = _parse_address(response[base:]) - base += attr_len - remaining -= (4 + attr_len) - return StunResponse(ok=True, **parsed) - - -def _parse_address(data: bytes) -> StunAddress: - family = data[1] - if family == 1: - parts = struct.unpack(">HBBBB", data[2:8]) - return StunAddress( - ip=".".join(map(str, parts[1:])), - port=parts[0], - ) - raise RuntimeError(f"Only IPv4 supported; received: {family}") + return (StunNatType.FULL_CONE_NAT, response) + + if first.changed is None: + raise RuntimeError(f"Changed addr is None: {first}") + response = await self.__make_request("Change request [ext_ip != src_ip]", b"", *first.changed.ip) + if not response.ok: + return (StunNatType.CHANGED_ADDR_ERROR, response) + + if response.ext == first.ext: + request = struct.pack(">HHI", 0x0003, 0x0004, 0x00000002) + response = await self.__make_request("Change port", request, first.changed.ip) + if response.ok: + return (StunNatType.RESTRICTED_NAT, response) + return (StunNatType.RESTRICTED_PORT_NAT, response) + + return (StunNatType.SYMMETRIC_NAT, response) + + async def __make_request(self, ctx: str, request: bytes=b"", host: str="", port: int=0) -> StunResponse: + # TODO: Support IPv6 and RFC 5389 + # The first 4 bytes of the response are the Type (2) and Length (2) + # The 5th byte is Reserved + # The 6th byte is the Family: 0x01 = IPv4, 0x02 = IPv6 + # The remaining bytes are the IP address. 32 bits for IPv4 or 128 bits for + # IPv6. + # More info at: https://tools.ietf.org/html/rfc3489#section-11.2.1 + # And at: https://tools.ietf.org/html/rfc5389#section-15.1 + + (response, error) = (b"", "") + for _ in range(self.__retries): + (response, error) = await self.__inner_make_request(request, host, port) + if not error: + break + await asyncio.sleep(self.__retries_delay) + if error: + get_logger(0).error("%s: Can't perform STUN request after %d retries; last error: %s", + ctx, self.__retries, error) + return StunResponse(ok=False) + + parsed: Dict[str, StunAddress] = {} + offset = 0 + remaining = len(response) + while remaining > 0: + (attr_type, attr_len) = struct.unpack(">HH", response[offset : offset + 4]) # noqa: E203 + offset += 4 + field = { + 0x0001: "ext", # MAPPED-ADDRESS + 0x0004: "src", # SOURCE-ADDRESS + 0x0005: "changed", # CHANGED-ADDRESS + }.get(attr_type) + if field is not None: + parsed[field] = self.__parse_address(response[offset:]) + offset += attr_len + remaining -= (4 + attr_len) + return StunResponse(ok=True, **parsed) + + async def __inner_make_request(self, request: bytes, host: str, port: int) -> Tuple[bytes, str]: + assert self.__sock is not None + + trans_id = secrets.token_bytes(16) + request = struct.pack(">HH", 0x0001, len(request)) + trans_id + request # Bind Request + + try: + await aiotools.run_async(self.__sock.sendto, request, ((host or self.host), (port or self.port))) + except Exception as err: + return (b"", f"Send error: {tools.efmt(err)}") + try: + response = (await aiotools.run_async(self.__sock.recvfrom, 2048))[0] + except Exception as err: + return (b"", f"Recv error: {tools.efmt(err)}") + + (response_type, payload_len) = struct.unpack(">HH", response[:4]) + if response_type != 0x0101: + return (b"", f"Invalid response type: {response_type:#06x}") + if trans_id != response[4:20]: + return (b"", "Transaction ID mismatch") + + return (response[20 : 20 + payload_len], "") # noqa: E203 + + def __parse_address(self, data: bytes) -> StunAddress: + family = data[1] + if family == 1: + parts = struct.unpack(">HBBBB", data[2:8]) + return StunAddress( + ip=".".join(map(str, parts[1:])), + port=parts[0], + ) + raise RuntimeError(f"Only IPv4 supported; received: {family}") |