summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kvmd/aiotools.py15
-rw-r--r--kvmd/apps/janus/runner.py57
2 files changed, 42 insertions, 30 deletions
diff --git a/kvmd/aiotools.py b/kvmd/aiotools.py
index d2c73325..f93669f9 100644
--- a/kvmd/aiotools.py
+++ b/kvmd/aiotools.py
@@ -21,6 +21,7 @@
import asyncio
+import signal
import functools
import types
@@ -102,6 +103,20 @@ async def close_writer(writer: asyncio.StreamWriter) -> bool:
# =====
+def run(coro: Coroutine) -> None:
+ def sigint_handler() -> None:
+ raise KeyboardInterrupt()
+
+ def sigterm_handler() -> None:
+ raise SystemExit()
+
+ loop = asyncio.get_event_loop()
+ loop.add_signal_handler(signal.SIGINT, sigint_handler)
+ loop.add_signal_handler(signal.SIGTERM, sigterm_handler)
+ loop.run_until_complete(coro)
+
+
+# =====
class AioNotifier:
def __init__(self) -> None:
self.__queue: "asyncio.Queue[None]" = asyncio.Queue()
diff --git a/kvmd/apps/janus/runner.py b/kvmd/apps/janus/runner.py
index 28956e0a..b62e47bd 100644
--- a/kvmd/apps/janus/runner.py
+++ b/kvmd/apps/janus/runner.py
@@ -62,43 +62,38 @@ class JanusRunner: # pylint: disable=too-many-instance-attributes
logger = get_logger(0)
logger.info("Starting Janus Runner ...")
try:
- asyncio.run(self.__run())
+ aiotools.run(self.__run())
except (SystemExit, KeyboardInterrupt):
- pass
+ aiotools.run(self.__stop_janus())
logger.info("Bye-bye")
# =====
async def __run(self) -> None:
logger = get_logger(0)
- try:
- prev_netcfg: Optional[_Netcfg] = None
- while True:
- retry = 0
- netcfg = _Netcfg()
- for retry in range(self.__check_retries):
- netcfg = await self.__get_netcfg()
- if netcfg.ext_ip:
- break
- await asyncio.sleep(self.__check_retries_delay)
- if retry != 0 and netcfg.ext_ip:
- logger.info("I'm fine, continue working ...")
-
- if netcfg != prev_netcfg:
- logger.info("Got new %s", netcfg)
- if netcfg.src_ip and netcfg.ext_ip:
- logger.info("Okay, restarting Janus ...")
- await self.__stop_janus()
- await self.__start_janus(netcfg)
- else:
- logger.error("Empty src_ip or ext_ip; stopping Janus ...")
- await self.__stop_janus()
- prev_netcfg = netcfg
-
- await asyncio.sleep(self.__check_interval)
- except: # noqa: E722
- await self.__stop_janus()
- raise
+ prev_netcfg: Optional[_Netcfg] = None
+ while True:
+ retry = 0
+ netcfg = _Netcfg()
+ for retry in range(self.__check_retries):
+ netcfg = await self.__get_netcfg()
+ if netcfg.ext_ip:
+ break
+ await asyncio.sleep(self.__check_retries_delay)
+ if retry != 0 and netcfg.ext_ip:
+ logger.info("I'm fine, continue working ...")
+
+ if netcfg != prev_netcfg:
+ logger.info("Got new %s", netcfg)
+ if netcfg.src_ip and netcfg.ext_ip:
+ await self.__stop_janus()
+ await self.__start_janus(netcfg)
+ else:
+ logger.error("Empty src_ip or ext_ip; stopping Janus ...")
+ await self.__stop_janus()
+ prev_netcfg = netcfg
+
+ await asyncio.sleep(self.__check_interval)
async def __get_netcfg(self) -> _Netcfg:
src_ip = (self.__get_default_ip() or "0.0.0.0")
@@ -136,12 +131,14 @@ class JanusRunner: # pylint: disable=too-many-instance-attributes
@aiotools.atomic
async def __start_janus(self, netcfg: _Netcfg) -> None:
+ get_logger(0).info("Starting Janus ...")
assert not self.__janus_task
self.__janus_task = asyncio.create_task(self.__janus_task_loop(netcfg))
@aiotools.atomic
async def __stop_janus(self) -> None:
if self.__janus_task:
+ get_logger(0).info("Stopping Janus ...")
self.__janus_task.cancel()
await asyncio.gather(self.__janus_task, return_exceptions=True)
await self.__kill_janus_proc()