summaryrefslogtreecommitdiff
path: root/kvmd
diff options
context:
space:
mode:
authorDevaev Maxim <[email protected]>2018-06-28 07:06:47 +0300
committerDevaev Maxim <[email protected]>2018-06-28 07:06:47 +0300
commitba3c49a816ca627f6cc6390364fd483cabd5ac71 (patch)
treeba17e8705a5956e728eb2bb53fce1931be53e4d3 /kvmd
parent89164b184a69a1ebf168c4bdcee308764ff16f76 (diff)
refactoring and graceful self-kill
Diffstat (limited to 'kvmd')
-rw-r--r--kvmd/kvmd/__init__.py171
-rw-r--r--kvmd/kvmd/server.py174
2 files changed, 177 insertions, 168 deletions
diff --git a/kvmd/kvmd/__init__.py b/kvmd/kvmd/__init__.py
index d3d8db7b..03088dcb 100644
--- a/kvmd/kvmd/__init__.py
+++ b/kvmd/kvmd/__init__.py
@@ -1,181 +1,16 @@
import asyncio
import logging
-import time
-
-from typing import List
-from typing import Set
-from typing import Callable
-from typing import Optional
-
-import aiohttp
from .application import init
from .atx import Atx
from .streamer import Streamer
+from .server import Server
from . import gpio
# =====
-_logger = logging.getLogger(__name__)
-
-
-def _system_task(method: Callable) -> Callable:
- async def wrap(self: "_Server") -> None:
- try:
- await method(self)
- except asyncio.CancelledError:
- pass
- except Exception:
- _logger.exception("Unhandled exception")
- raise SystemExit(1)
- return wrap
-
-
-class _Server: # pylint: disable=too-many-instance-attributes
- def __init__(
- self,
- atx: Atx,
- streamer: Streamer,
- heartbeat: float,
- atx_leds_poll: float,
- video_shutdown_delay: float,
- loop: asyncio.AbstractEventLoop,
- ) -> None:
-
- self.__atx = atx
- self.__streamer = streamer
- self.__heartbeat = heartbeat
- self.__video_shutdown_delay = video_shutdown_delay
- self.__atx_leds_poll = atx_leds_poll
- self.__loop = loop
-
- self.__sockets: Set[aiohttp.web.WebSocketResponse] = set()
- self.__sockets_lock = asyncio.Lock()
-
- self.__system_tasks: List[asyncio.Task] = []
-
- def run(self, host: str, port: int) -> None:
- app = aiohttp.web.Application(loop=self.__loop)
- app.router.add_get("/", self.__root_handler)
- app.router.add_get("/ws", self.__ws_handler)
- app.on_shutdown.append(self.__on_shutdown)
- app.on_cleanup.append(self.__on_cleanup)
-
- self.__system_tasks.extend([
- self.__loop.create_task(self.__stream_controller()),
- self.__loop.create_task(self.__poll_dead_sockets()),
- self.__loop.create_task(self.__poll_atx_leds()),
- ])
-
- aiohttp.web.run_app(
- app=app,
- host=host,
- port=port,
- print=(lambda text: [_logger.info(line.strip()) for line in text.strip().splitlines()]), # type: ignore
- )
-
- async def __root_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
- return aiohttp.web.Response(text="OK")
-
- async def __ws_handler(self, request: aiohttp.web.Request) -> aiohttp.web.WebSocketResponse:
- ws = aiohttp.web.WebSocketResponse(heartbeat=self.__heartbeat)
- await ws.prepare(request)
- await self.__register_socket(ws)
- async for msg in ws:
- if msg.type == aiohttp.web.WSMsgType.TEXT:
- retval = await self.__execute_command(msg.data)
- if retval:
- await ws.send_str(retval)
- else:
- break
- return ws
-
- async def __on_shutdown(self, _: aiohttp.web.Application) -> None:
- _logger.info("Cancelling system tasks ...")
- for task in self.__system_tasks:
- task.cancel()
- await asyncio.gather(*self.__system_tasks)
-
- _logger.info("Disconnecting clients ...")
- for ws in list(self.__sockets):
- await self.__remove_socket(ws)
-
- async def __on_cleanup(self, _: aiohttp.web.Application) -> None:
- if self.__streamer.is_running():
- await self.__streamer.stop()
-
- @_system_task
- async def __stream_controller(self) -> None:
- prev = 0
- shutdown_at = 0.0
- while True:
- cur = len(self.__sockets)
- if prev == 0 and cur > 0:
- if not self.__streamer.is_running():
- await self.__streamer.start()
- elif prev > 0 and cur == 0:
- shutdown_at = time.time() + self.__video_shutdown_delay
- elif prev == 0 and cur == 0 and time.time() > shutdown_at:
- if self.__streamer.is_running():
- await self.__streamer.stop()
- prev = cur
- await asyncio.sleep(0.1)
-
- @_system_task
- async def __poll_dead_sockets(self) -> None:
- while True:
- for ws in list(self.__sockets):
- if ws.closed or not ws._req.transport: # pylint: disable=protected-access
- await self.__remove_socket(ws)
- await asyncio.sleep(0.1)
-
- @_system_task
- async def __poll_atx_leds(self) -> None:
- while True:
- if self.__sockets:
- await self.__broadcast("EVENT atx_leds %d %d" % (self.__atx.get_leds()))
- await asyncio.sleep(self.__atx_leds_poll)
-
- async def __broadcast(self, msg: str) -> None:
- await asyncio.gather(*[
- ws.send_str(msg)
- for ws in list(self.__sockets)
- if not ws.closed and ws._req.transport # pylint: disable=protected-access
- ], return_exceptions=True)
-
- async def __execute_command(self, command: str) -> Optional[str]:
- (command, args) = (command.strip().split(" ", maxsplit=1) + [""])[:2]
- if command == "CLICK":
- method = {
- "power": self.__atx.click_power,
- "power_long": self.__atx.click_power_long,
- "reset": self.__atx.click_reset,
- }.get(args)
- if method:
- await method()
- return None
- _logger.warning("Received an incorrect command: %r", command)
- return "ERROR incorrect command"
-
- async def __register_socket(self, ws: aiohttp.web.WebSocketResponse) -> None:
- async with self.__sockets_lock:
- self.__sockets.add(ws)
- _logger.info("Registered new client socket: remote=%s; id=%d; active=%d",
- ws._req.remote, id(ws), len(self.__sockets)) # pylint: disable=protected-access
-
- async def __remove_socket(self, ws: aiohttp.web.WebSocketResponse) -> None:
- async with self.__sockets_lock:
- try:
- self.__sockets.remove(ws)
- _logger.info("Removed client socket: remote=%s; id=%d; active=%d",
- ws._req.remote, id(ws), len(self.__sockets)) # pylint: disable=protected-access
- await ws.close()
- except Exception:
- pass
-
-
def main() -> None:
config = init()
with gpio.bcm():
@@ -198,7 +33,7 @@ def main() -> None:
loop=loop,
)
- _Server(
+ Server(
atx=atx,
streamer=streamer,
heartbeat=config["server"]["heartbeat"],
@@ -209,4 +44,4 @@ def main() -> None:
host=config["server"]["host"],
port=config["server"]["port"],
)
- _logger.info("Bye-bye")
+ logging.getLogger(__name__).info("Bye-bye")
diff --git a/kvmd/kvmd/server.py b/kvmd/kvmd/server.py
new file mode 100644
index 00000000..3216d3fd
--- /dev/null
+++ b/kvmd/kvmd/server.py
@@ -0,0 +1,174 @@
+import os
+import signal
+import asyncio
+import logging
+import time
+
+from typing import List
+from typing import Set
+from typing import Callable
+from typing import Optional
+
+import aiohttp
+
+from .atx import Atx
+from .streamer import Streamer
+
+
+# =====
+_logger = logging.getLogger(__name__)
+
+
+def _system_task(method: Callable) -> Callable:
+ async def wrap(self: "Server") -> None:
+ try:
+ await method(self)
+ except asyncio.CancelledError:
+ pass
+ except Exception:
+ _logger.exception("Unhandled exception, killing myself ...")
+ os.kill(os.getpid(), signal.SIGTERM)
+ return wrap
+
+
+class Server: # pylint: disable=too-many-instance-attributes
+ def __init__(
+ self,
+ atx: Atx,
+ streamer: Streamer,
+ heartbeat: float,
+ atx_leds_poll: float,
+ video_shutdown_delay: float,
+ loop: asyncio.AbstractEventLoop,
+ ) -> None:
+
+ self.__atx = atx
+ self.__streamer = streamer
+ self.__heartbeat = heartbeat
+ self.__video_shutdown_delay = video_shutdown_delay
+ self.__atx_leds_poll = atx_leds_poll
+ self.__loop = loop
+
+ self.__sockets: Set[aiohttp.web.WebSocketResponse] = set()
+ self.__sockets_lock = asyncio.Lock()
+
+ self.__system_tasks: List[asyncio.Task] = []
+
+ def run(self, host: str, port: int) -> None:
+ app = aiohttp.web.Application(loop=self.__loop)
+ app.router.add_get("/", self.__root_handler)
+ app.router.add_get("/ws", self.__ws_handler)
+ app.on_shutdown.append(self.__on_shutdown)
+ app.on_cleanup.append(self.__on_cleanup)
+
+ self.__system_tasks.extend([
+ self.__loop.create_task(self.__stream_controller()),
+ self.__loop.create_task(self.__poll_dead_sockets()),
+ self.__loop.create_task(self.__poll_atx_leds()),
+ ])
+
+ aiohttp.web.run_app(
+ app=app,
+ host=host,
+ port=port,
+ print=(lambda text: [_logger.info(line.strip()) for line in text.strip().splitlines()]), # type: ignore
+ )
+
+ async def __root_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
+ return aiohttp.web.Response(text="OK")
+
+ async def __ws_handler(self, request: aiohttp.web.Request) -> aiohttp.web.WebSocketResponse:
+ ws = aiohttp.web.WebSocketResponse(heartbeat=self.__heartbeat)
+ await ws.prepare(request)
+ await self.__register_socket(ws)
+ async for msg in ws:
+ if msg.type == aiohttp.web.WSMsgType.TEXT:
+ retval = await self.__execute_command(msg.data)
+ if retval:
+ await ws.send_str(retval)
+ else:
+ break
+ return ws
+
+ async def __on_shutdown(self, _: aiohttp.web.Application) -> None:
+ _logger.info("Cancelling system tasks ...")
+ for task in self.__system_tasks:
+ task.cancel()
+ await asyncio.gather(*self.__system_tasks)
+
+ _logger.info("Disconnecting clients ...")
+ for ws in list(self.__sockets):
+ await self.__remove_socket(ws)
+
+ async def __on_cleanup(self, _: aiohttp.web.Application) -> None:
+ if self.__streamer.is_running():
+ await self.__streamer.stop()
+
+ @_system_task
+ async def __stream_controller(self) -> None:
+ prev = 0
+ shutdown_at = 0.0
+ while True:
+ cur = len(self.__sockets)
+ if prev == 0 and cur > 0:
+ if not self.__streamer.is_running():
+ await self.__streamer.start()
+ elif prev > 0 and cur == 0:
+ shutdown_at = time.time() + self.__video_shutdown_delay
+ elif prev == 0 and cur == 0 and time.time() > shutdown_at:
+ if self.__streamer.is_running():
+ await self.__streamer.stop()
+ prev = cur
+ await asyncio.sleep(0.1)
+
+ @_system_task
+ async def __poll_dead_sockets(self) -> None:
+ while True:
+ for ws in list(self.__sockets):
+ if ws.closed or not ws._req.transport: # pylint: disable=protected-access
+ await self.__remove_socket(ws)
+ await asyncio.sleep(0.1)
+
+ @_system_task
+ async def __poll_atx_leds(self) -> None:
+ while True:
+ if self.__sockets:
+ await self.__broadcast("EVENT atx_leds %d %d" % (self.__atx.get_leds()))
+ await asyncio.sleep(self.__atx_leds_poll)
+
+ async def __broadcast(self, msg: str) -> None:
+ await asyncio.gather(*[
+ ws.send_str(msg)
+ for ws in list(self.__sockets)
+ if not ws.closed and ws._req.transport # pylint: disable=protected-access
+ ], return_exceptions=True)
+
+ async def __execute_command(self, command: str) -> Optional[str]:
+ (command, args) = (command.strip().split(" ", maxsplit=1) + [""])[:2]
+ if command == "CLICK":
+ method = {
+ "power": self.__atx.click_power,
+ "power_long": self.__atx.click_power_long,
+ "reset": self.__atx.click_reset,
+ }.get(args)
+ if method:
+ await method()
+ return None
+ _logger.warning("Received an incorrect command: %r", command)
+ return "ERROR incorrect command"
+
+ async def __register_socket(self, ws: aiohttp.web.WebSocketResponse) -> None:
+ async with self.__sockets_lock:
+ self.__sockets.add(ws)
+ _logger.info("Registered new client socket: remote=%s; id=%d; active=%d",
+ ws._req.remote, id(ws), len(self.__sockets)) # pylint: disable=protected-access
+
+ async def __remove_socket(self, ws: aiohttp.web.WebSocketResponse) -> None:
+ async with self.__sockets_lock:
+ try:
+ self.__sockets.remove(ws)
+ _logger.info("Removed client socket: remote=%s; id=%d; active=%d",
+ ws._req.remote, id(ws), len(self.__sockets)) # pylint: disable=protected-access
+ await ws.close()
+ except Exception:
+ pass