diff options
author | Devaev Maxim <[email protected]> | 2018-06-28 07:06:47 +0300 |
---|---|---|
committer | Devaev Maxim <[email protected]> | 2018-06-28 07:06:47 +0300 |
commit | ba3c49a816ca627f6cc6390364fd483cabd5ac71 (patch) | |
tree | ba17e8705a5956e728eb2bb53fce1931be53e4d3 | |
parent | 89164b184a69a1ebf168c4bdcee308764ff16f76 (diff) |
refactoring and graceful self-kill
-rw-r--r-- | kvmd/kvmd/__init__.py | 171 | ||||
-rw-r--r-- | kvmd/kvmd/server.py | 174 |
2 files changed, 177 insertions, 168 deletions
diff --git a/kvmd/kvmd/__init__.py b/kvmd/kvmd/__init__.py index d3d8db7b..03088dcb 100644 --- a/kvmd/kvmd/__init__.py +++ b/kvmd/kvmd/__init__.py @@ -1,181 +1,16 @@ import asyncio import logging -import time - -from typing import List -from typing import Set -from typing import Callable -from typing import Optional - -import aiohttp from .application import init from .atx import Atx from .streamer import Streamer +from .server import Server from . import gpio # ===== -_logger = logging.getLogger(__name__) - - -def _system_task(method: Callable) -> Callable: - async def wrap(self: "_Server") -> None: - try: - await method(self) - except asyncio.CancelledError: - pass - except Exception: - _logger.exception("Unhandled exception") - raise SystemExit(1) - return wrap - - -class _Server: # pylint: disable=too-many-instance-attributes - def __init__( - self, - atx: Atx, - streamer: Streamer, - heartbeat: float, - atx_leds_poll: float, - video_shutdown_delay: float, - loop: asyncio.AbstractEventLoop, - ) -> None: - - self.__atx = atx - self.__streamer = streamer - self.__heartbeat = heartbeat - self.__video_shutdown_delay = video_shutdown_delay - self.__atx_leds_poll = atx_leds_poll - self.__loop = loop - - self.__sockets: Set[aiohttp.web.WebSocketResponse] = set() - self.__sockets_lock = asyncio.Lock() - - self.__system_tasks: List[asyncio.Task] = [] - - def run(self, host: str, port: int) -> None: - app = aiohttp.web.Application(loop=self.__loop) - app.router.add_get("/", self.__root_handler) - app.router.add_get("/ws", self.__ws_handler) - app.on_shutdown.append(self.__on_shutdown) - app.on_cleanup.append(self.__on_cleanup) - - self.__system_tasks.extend([ - self.__loop.create_task(self.__stream_controller()), - self.__loop.create_task(self.__poll_dead_sockets()), - self.__loop.create_task(self.__poll_atx_leds()), - ]) - - aiohttp.web.run_app( - app=app, - host=host, - port=port, - print=(lambda text: [_logger.info(line.strip()) for line in text.strip().splitlines()]), # type: ignore - ) - - async def __root_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response: - return aiohttp.web.Response(text="OK") - - async def __ws_handler(self, request: aiohttp.web.Request) -> aiohttp.web.WebSocketResponse: - ws = aiohttp.web.WebSocketResponse(heartbeat=self.__heartbeat) - await ws.prepare(request) - await self.__register_socket(ws) - async for msg in ws: - if msg.type == aiohttp.web.WSMsgType.TEXT: - retval = await self.__execute_command(msg.data) - if retval: - await ws.send_str(retval) - else: - break - return ws - - async def __on_shutdown(self, _: aiohttp.web.Application) -> None: - _logger.info("Cancelling system tasks ...") - for task in self.__system_tasks: - task.cancel() - await asyncio.gather(*self.__system_tasks) - - _logger.info("Disconnecting clients ...") - for ws in list(self.__sockets): - await self.__remove_socket(ws) - - async def __on_cleanup(self, _: aiohttp.web.Application) -> None: - if self.__streamer.is_running(): - await self.__streamer.stop() - - @_system_task - async def __stream_controller(self) -> None: - prev = 0 - shutdown_at = 0.0 - while True: - cur = len(self.__sockets) - if prev == 0 and cur > 0: - if not self.__streamer.is_running(): - await self.__streamer.start() - elif prev > 0 and cur == 0: - shutdown_at = time.time() + self.__video_shutdown_delay - elif prev == 0 and cur == 0 and time.time() > shutdown_at: - if self.__streamer.is_running(): - await self.__streamer.stop() - prev = cur - await asyncio.sleep(0.1) - - @_system_task - async def __poll_dead_sockets(self) -> None: - while True: - for ws in list(self.__sockets): - if ws.closed or not ws._req.transport: # pylint: disable=protected-access - await self.__remove_socket(ws) - await asyncio.sleep(0.1) - - @_system_task - async def __poll_atx_leds(self) -> None: - while True: - if self.__sockets: - await self.__broadcast("EVENT atx_leds %d %d" % (self.__atx.get_leds())) - await asyncio.sleep(self.__atx_leds_poll) - - async def __broadcast(self, msg: str) -> None: - await asyncio.gather(*[ - ws.send_str(msg) - for ws in list(self.__sockets) - if not ws.closed and ws._req.transport # pylint: disable=protected-access - ], return_exceptions=True) - - async def __execute_command(self, command: str) -> Optional[str]: - (command, args) = (command.strip().split(" ", maxsplit=1) + [""])[:2] - if command == "CLICK": - method = { - "power": self.__atx.click_power, - "power_long": self.__atx.click_power_long, - "reset": self.__atx.click_reset, - }.get(args) - if method: - await method() - return None - _logger.warning("Received an incorrect command: %r", command) - return "ERROR incorrect command" - - async def __register_socket(self, ws: aiohttp.web.WebSocketResponse) -> None: - async with self.__sockets_lock: - self.__sockets.add(ws) - _logger.info("Registered new client socket: remote=%s; id=%d; active=%d", - ws._req.remote, id(ws), len(self.__sockets)) # pylint: disable=protected-access - - async def __remove_socket(self, ws: aiohttp.web.WebSocketResponse) -> None: - async with self.__sockets_lock: - try: - self.__sockets.remove(ws) - _logger.info("Removed client socket: remote=%s; id=%d; active=%d", - ws._req.remote, id(ws), len(self.__sockets)) # pylint: disable=protected-access - await ws.close() - except Exception: - pass - - def main() -> None: config = init() with gpio.bcm(): @@ -198,7 +33,7 @@ def main() -> None: loop=loop, ) - _Server( + Server( atx=atx, streamer=streamer, heartbeat=config["server"]["heartbeat"], @@ -209,4 +44,4 @@ def main() -> None: host=config["server"]["host"], port=config["server"]["port"], ) - _logger.info("Bye-bye") + logging.getLogger(__name__).info("Bye-bye") diff --git a/kvmd/kvmd/server.py b/kvmd/kvmd/server.py new file mode 100644 index 00000000..3216d3fd --- /dev/null +++ b/kvmd/kvmd/server.py @@ -0,0 +1,174 @@ +import os +import signal +import asyncio +import logging +import time + +from typing import List +from typing import Set +from typing import Callable +from typing import Optional + +import aiohttp + +from .atx import Atx +from .streamer import Streamer + + +# ===== +_logger = logging.getLogger(__name__) + + +def _system_task(method: Callable) -> Callable: + async def wrap(self: "Server") -> None: + try: + await method(self) + except asyncio.CancelledError: + pass + except Exception: + _logger.exception("Unhandled exception, killing myself ...") + os.kill(os.getpid(), signal.SIGTERM) + return wrap + + +class Server: # pylint: disable=too-many-instance-attributes + def __init__( + self, + atx: Atx, + streamer: Streamer, + heartbeat: float, + atx_leds_poll: float, + video_shutdown_delay: float, + loop: asyncio.AbstractEventLoop, + ) -> None: + + self.__atx = atx + self.__streamer = streamer + self.__heartbeat = heartbeat + self.__video_shutdown_delay = video_shutdown_delay + self.__atx_leds_poll = atx_leds_poll + self.__loop = loop + + self.__sockets: Set[aiohttp.web.WebSocketResponse] = set() + self.__sockets_lock = asyncio.Lock() + + self.__system_tasks: List[asyncio.Task] = [] + + def run(self, host: str, port: int) -> None: + app = aiohttp.web.Application(loop=self.__loop) + app.router.add_get("/", self.__root_handler) + app.router.add_get("/ws", self.__ws_handler) + app.on_shutdown.append(self.__on_shutdown) + app.on_cleanup.append(self.__on_cleanup) + + self.__system_tasks.extend([ + self.__loop.create_task(self.__stream_controller()), + self.__loop.create_task(self.__poll_dead_sockets()), + self.__loop.create_task(self.__poll_atx_leds()), + ]) + + aiohttp.web.run_app( + app=app, + host=host, + port=port, + print=(lambda text: [_logger.info(line.strip()) for line in text.strip().splitlines()]), # type: ignore + ) + + async def __root_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response: + return aiohttp.web.Response(text="OK") + + async def __ws_handler(self, request: aiohttp.web.Request) -> aiohttp.web.WebSocketResponse: + ws = aiohttp.web.WebSocketResponse(heartbeat=self.__heartbeat) + await ws.prepare(request) + await self.__register_socket(ws) + async for msg in ws: + if msg.type == aiohttp.web.WSMsgType.TEXT: + retval = await self.__execute_command(msg.data) + if retval: + await ws.send_str(retval) + else: + break + return ws + + async def __on_shutdown(self, _: aiohttp.web.Application) -> None: + _logger.info("Cancelling system tasks ...") + for task in self.__system_tasks: + task.cancel() + await asyncio.gather(*self.__system_tasks) + + _logger.info("Disconnecting clients ...") + for ws in list(self.__sockets): + await self.__remove_socket(ws) + + async def __on_cleanup(self, _: aiohttp.web.Application) -> None: + if self.__streamer.is_running(): + await self.__streamer.stop() + + @_system_task + async def __stream_controller(self) -> None: + prev = 0 + shutdown_at = 0.0 + while True: + cur = len(self.__sockets) + if prev == 0 and cur > 0: + if not self.__streamer.is_running(): + await self.__streamer.start() + elif prev > 0 and cur == 0: + shutdown_at = time.time() + self.__video_shutdown_delay + elif prev == 0 and cur == 0 and time.time() > shutdown_at: + if self.__streamer.is_running(): + await self.__streamer.stop() + prev = cur + await asyncio.sleep(0.1) + + @_system_task + async def __poll_dead_sockets(self) -> None: + while True: + for ws in list(self.__sockets): + if ws.closed or not ws._req.transport: # pylint: disable=protected-access + await self.__remove_socket(ws) + await asyncio.sleep(0.1) + + @_system_task + async def __poll_atx_leds(self) -> None: + while True: + if self.__sockets: + await self.__broadcast("EVENT atx_leds %d %d" % (self.__atx.get_leds())) + await asyncio.sleep(self.__atx_leds_poll) + + async def __broadcast(self, msg: str) -> None: + await asyncio.gather(*[ + ws.send_str(msg) + for ws in list(self.__sockets) + if not ws.closed and ws._req.transport # pylint: disable=protected-access + ], return_exceptions=True) + + async def __execute_command(self, command: str) -> Optional[str]: + (command, args) = (command.strip().split(" ", maxsplit=1) + [""])[:2] + if command == "CLICK": + method = { + "power": self.__atx.click_power, + "power_long": self.__atx.click_power_long, + "reset": self.__atx.click_reset, + }.get(args) + if method: + await method() + return None + _logger.warning("Received an incorrect command: %r", command) + return "ERROR incorrect command" + + async def __register_socket(self, ws: aiohttp.web.WebSocketResponse) -> None: + async with self.__sockets_lock: + self.__sockets.add(ws) + _logger.info("Registered new client socket: remote=%s; id=%d; active=%d", + ws._req.remote, id(ws), len(self.__sockets)) # pylint: disable=protected-access + + async def __remove_socket(self, ws: aiohttp.web.WebSocketResponse) -> None: + async with self.__sockets_lock: + try: + self.__sockets.remove(ws) + _logger.info("Removed client socket: remote=%s; id=%d; active=%d", + ws._req.remote, id(ws), len(self.__sockets)) # pylint: disable=protected-access + await ws.close() + except Exception: + pass |