summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDevaev Maxim <[email protected]>2021-05-25 10:45:55 +0300
committerDevaev Maxim <[email protected]>2021-05-25 10:45:55 +0300
commitbed223bd865dac0c7716db0fbfb30920acc52d7d (patch)
tree79afc05314c8b41ad55e265458ae2531d8125bad
parent787e5ddbaed9a79a253a06ac0f0bcb2b619c1126 (diff)
rewrited stun
-rw-r--r--kvmd/apps/__init__.py8
-rw-r--r--kvmd/apps/janus/runner.py29
-rw-r--r--kvmd/apps/janus/stun.py268
3 files changed, 149 insertions, 156 deletions
diff --git a/kvmd/apps/__init__.py b/kvmd/apps/__init__.py
index 4f6f160d..9fc0c2bb 100644
--- a/kvmd/apps/__init__.py
+++ b/kvmd/apps/__init__.py
@@ -661,9 +661,11 @@ def _get_config_scheme() -> Dict:
"janus": {
"stun": {
- "host": Option("stun.l.google.com", type=valid_ip_or_host, unpack_as="stun_host"),
- "port": Option(19302, type=valid_port, unpack_as="stun_port"),
- "timeout": Option(5.0, type=valid_float_f01, unpack_as="stun_timeout"),
+ "host": Option("stun.l.google.com", type=valid_ip_or_host, unpack_as="stun_host"),
+ "port": Option(19302, type=valid_port, unpack_as="stun_port"),
+ "timeout": Option(5.0, type=valid_float_f01, unpack_as="stun_timeout"),
+ "retries": Option(5, type=valid_int_f1, unpack_as="stun_retries"),
+ "retries_delay": Option(5.0, type=valid_float_f01, unpack_as="stun_retries_delay"),
},
"check": {
diff --git a/kvmd/apps/janus/runner.py b/kvmd/apps/janus/runner.py
index 69d7a33b..d590f368 100644
--- a/kvmd/apps/janus/runner.py
+++ b/kvmd/apps/janus/runner.py
@@ -15,7 +15,7 @@ from ... import aioproc
from ...logging import get_logger
-from .stun import stun_get_info
+from .stun import Stun
# =====
@@ -35,6 +35,8 @@ class JanusRunner: # pylint: disable=too-many-instance-attributes
stun_host: str,
stun_port: int,
stun_timeout: float,
+ stun_retries: int,
+ stun_retries_delay: float,
check_interval: int,
check_retries: int,
@@ -45,9 +47,7 @@ class JanusRunner: # pylint: disable=too-many-instance-attributes
cmd_append: List[str],
) -> None:
- self.__stun_host = stun_host
- self.__stun_port = stun_port
- self.__stun_timeout = stun_timeout
+ self.__stun = Stun(stun_host, stun_port, stun_timeout, stun_retries, stun_retries_delay)
self.__check_interval = check_interval
self.__check_retries = check_retries
@@ -74,12 +74,15 @@ class JanusRunner: # pylint: disable=too-many-instance-attributes
try:
prev_netcfg: Optional[_Netcfg] = None
while True:
+ retry = 0
netcfg = _Netcfg()
- for _ in range(self.__check_retries - 1):
+ for retry in range(self.__check_retries):
netcfg = await self.__get_netcfg()
if netcfg.ext_ip:
break
await asyncio.sleep(self.__check_retries_delay)
+ if retry != 0 and netcfg.ext_ip:
+ logger.info("I'm fine, continue working ...")
if netcfg != prev_netcfg:
logger.info("Got new %s", netcfg)
@@ -99,8 +102,8 @@ class JanusRunner: # pylint: disable=too-many-instance-attributes
async def __get_netcfg(self) -> _Netcfg:
src_ip = (self.__get_default_ip() or "0.0.0.0")
- (nat_type, ext_ip) = await self.__get_stun_info(src_ip)
- return _Netcfg(nat_type, src_ip, ext_ip, self.__stun_host, self.__stun_port)
+ (stun, (nat_type, ext_ip)) = await self.__get_stun_info(src_ip)
+ return _Netcfg(nat_type, src_ip, ext_ip, stun.host, stun.port)
def __get_default_ip(self) -> str:
try:
@@ -122,18 +125,12 @@ class JanusRunner: # pylint: disable=too-many-instance-attributes
get_logger().error("Can't get default IP: %s", tools.efmt(err))
return ""
- async def __get_stun_info(self, src_ip: str) -> Tuple[str, str]:
+ async def __get_stun_info(self, src_ip: str) -> Tuple[Stun, Tuple[str, str]]:
try:
- return (await stun_get_info(
- stun_host=self.__stun_host,
- stun_port=self.__stun_port,
- src_ip=src_ip,
- src_port=0,
- timeout=self.__stun_timeout,
- ))
+ return (self.__stun, (await self.__stun.get_info(src_ip, 0)))
except Exception as err:
get_logger().error("Can't get STUN info: %s", tools.efmt(err))
- return ("", "")
+ return (self.__stun, ("", ""))
# =====
diff --git a/kvmd/apps/janus/stun.py b/kvmd/apps/janus/stun.py
index 954f2d73..9a54e5df 100644
--- a/kvmd/apps/janus/stun.py
+++ b/kvmd/apps/janus/stun.py
@@ -1,3 +1,4 @@
+import asyncio
import socket
import struct
import secrets
@@ -40,144 +41,137 @@ class StunNatType:
# =====
-async def stun_get_info(
- stun_host: str,
- stun_port: int,
- src_ip: str,
- src_port: int,
- timeout: float,
-) -> Tuple[str, str]:
-
- return (await aiotools.run_async(_stun_get_info, stun_host, stun_port, src_ip, src_port, timeout))
-
-
-def _stun_get_info(
- stun_host: str,
- stun_port: int,
- src_ip: str,
- src_port: int,
- timeout: float,
-) -> Tuple[str, str]:
-
+class Stun:
# Partially based on https://github.com/JohnVillalovos/pystun
- (family, _, _, _, addr) = socket.getaddrinfo(src_ip, src_port, type=socket.SOCK_DGRAM)[0]
- with socket.socket(family, socket.SOCK_DGRAM) as sock:
- sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- sock.settimeout(timeout)
- sock.bind(addr)
- (nat_type, response) = _get_nat_type(
- stun_host=stun_host,
- stun_port=stun_port,
- src_ip=src_ip,
- sock=sock,
- )
- return (nat_type, (response.ext.ip if response.ext is not None else ""))
-
-
-def _get_nat_type( # pylint: disable=too-many-return-statements
- stun_host: str,
- stun_port: int,
- src_ip: str,
- sock: socket.socket,
-) -> Tuple[str, StunResponse]:
-
- first = _stun_request("First probe", stun_host, stun_port, b"", sock)
- if not first.ok:
- return (StunNatType.BLOCKED, first)
- if first.ext is None:
- raise RuntimeError(f"Ext addr is None: {first}")
-
- request = struct.pack(">HHI", 0x0003, 0x0004, 0x00000006) # Change-Request
- response = _stun_request("Change request [ext_ip == src_ip]", stun_host, stun_port, request, sock)
-
- if first.ext.ip == src_ip:
- if response.ok:
- return (StunNatType.OPEN_INTERNET, response)
- return (StunNatType.SYMMETRIC_UDP_FW, response)
-
- if response.ok:
- return (StunNatType.FULL_CONE_NAT, response)
-
- if first.changed is None:
- raise RuntimeError(f"Changed addr is None: {first}")
- response = _stun_request("Change request [ext_ip != src_ip]", first.changed.ip, first.changed.port, b"", sock)
- if not response.ok:
- return (StunNatType.CHANGED_ADDR_ERROR, response)
+ def __init__(
+ self,
+ host: str,
+ port: int,
+ timeout: float,
+ retries: int,
+ retries_delay: float,
+ ) -> None:
+
+ self.host = host
+ self.port = port
+ self.__timeout = timeout
+ self.__retries = retries
+ self.__retries_delay = retries_delay
+
+ self.__sock: Optional[socket.socket] = None
+
+ async def get_info(self, src_ip: str, src_port: int) -> Tuple[str, str]:
+
+ (family, _, _, _, addr) = socket.getaddrinfo(src_ip, src_port, type=socket.SOCK_DGRAM)[0]
+ try:
+ with socket.socket(family, socket.SOCK_DGRAM) as self.__sock:
+ self.__sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ self.__sock.settimeout(self.__timeout)
+ self.__sock.bind(addr)
+ (nat_type, response) = await self.__get_nat_type(src_ip)
+ return (nat_type, (response.ext.ip if response.ext is not None else ""))
+ finally:
+ self.__sock = None
+
+ async def __get_nat_type(self, src_ip: str) -> Tuple[str, StunResponse]: # pylint: disable=too-many-return-statements
+ first = await self.__make_request("First probe")
+ if not first.ok:
+ return (StunNatType.BLOCKED, first)
+
+ request = struct.pack(">HHI", 0x0003, 0x0004, 0x00000006) # Change-Request
+ response = await self.__make_request("Change request [ext_ip == src_ip]", request)
+
+ if first.ext is not None and first.ext.ip == src_ip:
+ if response.ok:
+ return (StunNatType.OPEN_INTERNET, response)
+ return (StunNatType.SYMMETRIC_UDP_FW, response)
- if response.ext == first.ext:
- request = struct.pack(">HHI", 0x0003, 0x0004, 0x00000002)
- response = _stun_request("Change port", first.changed.ip, stun_port, request, sock)
if response.ok:
- return (StunNatType.RESTRICTED_NAT, response)
- return (StunNatType.RESTRICTED_PORT_NAT, response)
-
- return (StunNatType.SYMMETRIC_NAT, response)
-
-
-def _stun_request( # pylint: disable=too-many-locals
- ctx: str,
- host: str,
- port: int,
- request: bytes,
- sock: socket.socket,
-) -> StunResponse:
-
- # TODO: Support IPv6 and RFC 5389
- # The first 4 bytes of the response are the Type (2) and Length (2)
- # The 5th byte is Reserved
- # The 6th byte is the Family: 0x01 = IPv4, 0x02 = IPv6
- # The remaining bytes are the IP address. 32 bits for IPv4 or 128 bits for
- # IPv6.
- # More info at: https://tools.ietf.org/html/rfc3489#section-11.2.1
- # And at: https://tools.ietf.org/html/rfc5389#section-15.1
-
- trans_id = secrets.token_bytes(16)
- request = struct.pack(">HH", 0x0001, len(request)) + trans_id + request # Bind Request
-
- try:
- sock.sendto(request, (host, port))
- except Exception as err:
- get_logger().error("%s: Can't send request: %s", ctx, tools.efmt(err))
- return StunResponse(ok=False)
- try:
- response = sock.recvfrom(2048)[0]
- except Exception as err:
- get_logger().error("%s: Can't recv response: %s", ctx, tools.efmt(err))
- return StunResponse(ok=False)
-
- (response_type, payload_len) = struct.unpack(">HH", response[:4])
- if response_type != 0x0101:
- get_logger().error("%s: Invalid response type: %#.4x", ctx, response_type)
- return StunResponse(ok=False)
- if trans_id != response[4:20]:
- get_logger().error("%s: Transaction ID mismatch")
- return StunResponse(ok=False)
-
- parsed: Dict[str, StunAddress] = {}
- base = 20
- remaining = payload_len
- while remaining > 0:
- (attr_type, attr_len) = struct.unpack(">HH", response[base:(base + 4)])
- base += 4
- field = {
- 0x0001: "ext", # MAPPED-ADDRESS
- 0x0004: "src", # SOURCE-ADDRESS
- 0x0005: "changed", # CHANGED-ADDRESS
- }.get(attr_type)
- if field is not None:
- parsed[field] = _parse_address(response[base:])
- base += attr_len
- remaining -= (4 + attr_len)
- return StunResponse(ok=True, **parsed)
-
-
-def _parse_address(data: bytes) -> StunAddress:
- family = data[1]
- if family == 1:
- parts = struct.unpack(">HBBBB", data[2:8])
- return StunAddress(
- ip=".".join(map(str, parts[1:])),
- port=parts[0],
- )
- raise RuntimeError(f"Only IPv4 supported; received: {family}")
+ return (StunNatType.FULL_CONE_NAT, response)
+
+ if first.changed is None:
+ raise RuntimeError(f"Changed addr is None: {first}")
+ response = await self.__make_request("Change request [ext_ip != src_ip]", b"", *first.changed.ip)
+ if not response.ok:
+ return (StunNatType.CHANGED_ADDR_ERROR, response)
+
+ if response.ext == first.ext:
+ request = struct.pack(">HHI", 0x0003, 0x0004, 0x00000002)
+ response = await self.__make_request("Change port", request, first.changed.ip)
+ if response.ok:
+ return (StunNatType.RESTRICTED_NAT, response)
+ return (StunNatType.RESTRICTED_PORT_NAT, response)
+
+ return (StunNatType.SYMMETRIC_NAT, response)
+
+ async def __make_request(self, ctx: str, request: bytes=b"", host: str="", port: int=0) -> StunResponse:
+ # TODO: Support IPv6 and RFC 5389
+ # The first 4 bytes of the response are the Type (2) and Length (2)
+ # The 5th byte is Reserved
+ # The 6th byte is the Family: 0x01 = IPv4, 0x02 = IPv6
+ # The remaining bytes are the IP address. 32 bits for IPv4 or 128 bits for
+ # IPv6.
+ # More info at: https://tools.ietf.org/html/rfc3489#section-11.2.1
+ # And at: https://tools.ietf.org/html/rfc5389#section-15.1
+
+ (response, error) = (b"", "")
+ for _ in range(self.__retries):
+ (response, error) = await self.__inner_make_request(request, host, port)
+ if not error:
+ break
+ await asyncio.sleep(self.__retries_delay)
+ if error:
+ get_logger(0).error("%s: Can't perform STUN request after %d retries; last error: %s",
+ ctx, self.__retries, error)
+ return StunResponse(ok=False)
+
+ parsed: Dict[str, StunAddress] = {}
+ offset = 0
+ remaining = len(response)
+ while remaining > 0:
+ (attr_type, attr_len) = struct.unpack(">HH", response[offset : offset + 4]) # noqa: E203
+ offset += 4
+ field = {
+ 0x0001: "ext", # MAPPED-ADDRESS
+ 0x0004: "src", # SOURCE-ADDRESS
+ 0x0005: "changed", # CHANGED-ADDRESS
+ }.get(attr_type)
+ if field is not None:
+ parsed[field] = self.__parse_address(response[offset:])
+ offset += attr_len
+ remaining -= (4 + attr_len)
+ return StunResponse(ok=True, **parsed)
+
+ async def __inner_make_request(self, request: bytes, host: str, port: int) -> Tuple[bytes, str]:
+ assert self.__sock is not None
+
+ trans_id = secrets.token_bytes(16)
+ request = struct.pack(">HH", 0x0001, len(request)) + trans_id + request # Bind Request
+
+ try:
+ await aiotools.run_async(self.__sock.sendto, request, ((host or self.host), (port or self.port)))
+ except Exception as err:
+ return (b"", f"Send error: {tools.efmt(err)}")
+ try:
+ response = (await aiotools.run_async(self.__sock.recvfrom, 2048))[0]
+ except Exception as err:
+ return (b"", f"Recv error: {tools.efmt(err)}")
+
+ (response_type, payload_len) = struct.unpack(">HH", response[:4])
+ if response_type != 0x0101:
+ return (b"", f"Invalid response type: {response_type:#06x}")
+ if trans_id != response[4:20]:
+ return (b"", "Transaction ID mismatch")
+
+ return (response[20 : 20 + payload_len], "") # noqa: E203
+
+ def __parse_address(self, data: bytes) -> StunAddress:
+ family = data[1]
+ if family == 1:
+ parts = struct.unpack(">HBBBB", data[2:8])
+ return StunAddress(
+ ip=".".join(map(str, parts[1:])),
+ port=parts[0],
+ )
+ raise RuntimeError(f"Only IPv4 supported; received: {family}")