diff options
author | Devaev Maxim <[email protected]> | 2018-07-02 21:17:19 +0300 |
---|---|---|
committer | Devaev Maxim <[email protected]> | 2018-07-02 21:17:19 +0300 |
commit | 0582398521610e3c8dc0a43d7b30c1eb1b9dd705 (patch) | |
tree | b49dbf58f5f421051c909bd0d26ef3c8b5b8f800 /kvmd | |
parent | 87f8cb350b0301da99cb36760027980a8d899e4e (diff) |
better api, refactoring
Diffstat (limited to 'kvmd')
-rw-r--r-- | kvmd/kvmd/__init__.py | 26 | ||||
-rw-r--r-- | kvmd/kvmd/atx.py | 14 | ||||
-rw-r--r-- | kvmd/kvmd/msd.py | 10 | ||||
-rw-r--r-- | kvmd/kvmd/server.py | 125 | ||||
-rw-r--r-- | kvmd/kvmd/streamer.py | 16 |
5 files changed, 120 insertions, 71 deletions
diff --git a/kvmd/kvmd/__init__.py b/kvmd/kvmd/__init__.py index 8b950212..a8c80097 100644 --- a/kvmd/kvmd/__init__.py +++ b/kvmd/kvmd/__init__.py @@ -25,12 +25,12 @@ def main() -> None: ) atx = Atx( - power_led=int(config["atx"]["leds"]["pinout"]["power"]), - hdd_led=int(config["atx"]["leds"]["pinout"]["hdd"]), - power_switch=int(config["atx"]["switches"]["pinout"]["power"]), - reset_switch=int(config["atx"]["switches"]["pinout"]["reset"]), - click_delay=float(config["atx"]["switches"]["click_delay"]), - long_click_delay=float(config["atx"]["switches"]["long_click_delay"]), + power_led=int(config["atx"]["pinout"]["power_led"]), + hdd_led=int(config["atx"]["pinout"]["hdd_led"]), + power_switch=int(config["atx"]["pinout"]["power_switch"]), + reset_switch=int(config["atx"]["pinout"]["reset_switch"]), + click_delay=float(config["atx"]["click_delay"]), + long_click_delay=float(config["atx"]["long_click_delay"]), ) msd = MassStorageDevice( @@ -40,10 +40,12 @@ def main() -> None: ) streamer = Streamer( - cap_power=int(config["video"]["pinout"]["cap"]), - conv_power=int(config["video"]["pinout"]["conv"]), - sync_delay=float(config["video"]["sync_delay"]), - cmd=list(map(str, config["video"]["cmd"])), + cap_power=int(config["streamer"]["pinout"]["cap"]), + conv_power=int(config["streamer"]["pinout"]["conv"]), + sync_delay=float(config["streamer"]["sync_delay"]), + width=int(config["streamer"]["size"]["width"]), + height=int(config["streamer"]["size"]["height"]), + cmd=list(map(str, config["streamer"]["cmd"])), loop=loop, ) @@ -53,8 +55,8 @@ def main() -> None: msd=msd, streamer=streamer, heartbeat=float(config["server"]["heartbeat"]), - atx_leds_poll=float(config["atx"]["leds"]["poll"]), - video_shutdown_delay=float(config["video"]["shutdown_delay"]), + atx_state_poll=float(config["atx"]["state_poll"]), + streamer_shutdown_delay=float(config["streamer"]["shutdown_delay"]), msd_chunk_size=int(config["msd"]["chunk_size"]), loop=loop, ).run( diff --git a/kvmd/kvmd/atx.py b/kvmd/kvmd/atx.py index 686531a1..cbf8b34b 100644 --- a/kvmd/kvmd/atx.py +++ b/kvmd/kvmd/atx.py @@ -1,6 +1,6 @@ import asyncio -from typing import Tuple +from typing import Dict from .logging import get_logger @@ -29,11 +29,13 @@ class Atx: self.__lock = asyncio.Lock() - def get_leds(self) -> Tuple[bool, bool]: - return ( - not gpio.read(self.__power_led), - not gpio.read(self.__hdd_led), - ) + def get_state(self) -> Dict: + return { + "leds": { + "power": (not gpio.read(self.__power_led)), + "hdd": (not gpio.read(self.__hdd_led)), + }, + } async def click_power(self) -> None: if (await self.__click(self.__power_switch, self.__click_delay)): diff --git a/kvmd/kvmd/msd.py b/kvmd/kvmd/msd.py index 58a4d8ae..84dbf50f 100644 --- a/kvmd/kvmd/msd.py +++ b/kvmd/kvmd/msd.py @@ -118,8 +118,12 @@ class MassStorageDevice: get_logger().info("Using bind %r as mass-storage device", self._bind) try: loop.run_until_complete(self.connect_to_kvm(no_delay=True)) - except Exception: - get_logger().exception("Mass-storage device is not operational") + except Exception as err: + if isinstance(err, MassStorageError): + log = get_logger().warning + else: + log = get_logger().exception + log("Mass-storage device is not operational: %s", err) self._bind = "" else: get_logger().warning("Missing bind; mass-storage device is not operational") @@ -133,7 +137,7 @@ class MassStorageDevice: await asyncio.sleep(self.__init_delay) path = locate_by_bind(self._bind) if not path: - raise RuntimeError("Can't locate device by bind %r" % (self._bind)) + raise MassStorageError("Can't locate device by bind %r" % (self._bind)) self.__device_info = explore_device(path) get_logger().info("Mass-storage device switched to KVM: %s", self.__device_info) diff --git a/kvmd/kvmd/server.py b/kvmd/kvmd/server.py index 0a9e848a..1d9ed066 100644 --- a/kvmd/kvmd/server.py +++ b/kvmd/kvmd/server.py @@ -1,9 +1,11 @@ import os import signal import asyncio +import json import time from typing import List +from typing import Dict from typing import Set from typing import Callable from typing import Optional @@ -38,19 +40,29 @@ def _system_task(method: Callable) -> Callable: def _exceptions_as_400(msg: str, exceptions: List[Type[Exception]]) -> Callable: def make_wrapper(method: Callable) -> Callable: - async def wrap(self: "Server", request: aiohttp.web.Request) -> aiohttp.web.WebSocketResponse: + async def wrap(self: "Server", request: aiohttp.web.Request) -> aiohttp.web.Response: try: return (await method(self, request)) except tuple(exceptions) as err: # pylint: disable=catching-non-exception get_logger().exception(msg) return aiohttp.web.json_response({ - "error": type(err).__name__, - "error_msg": str(err), + "ok": False, + "result": { + "error": type(err).__name__, + "error_msg": str(err), + }, }, status=400) return wrap return make_wrapper +def _json_200(result: Optional[Dict]=None) -> aiohttp.web.Response: + return aiohttp.web.json_response({ + "ok": True, + "result": (result or {}), + }) + + class Server: # pylint: disable=too-many-instance-attributes def __init__( self, @@ -59,8 +71,8 @@ class Server: # pylint: disable=too-many-instance-attributes msd: MassStorageDevice, streamer: Streamer, heartbeat: float, - atx_leds_poll: float, - video_shutdown_delay: float, + atx_state_poll: float, + streamer_shutdown_delay: float, msd_chunk_size: int, loop: asyncio.AbstractEventLoop, ) -> None: @@ -70,8 +82,8 @@ class Server: # pylint: disable=too-many-instance-attributes self.__msd = msd self.__streamer = streamer self.__heartbeat = heartbeat - self.__video_shutdown_delay = video_shutdown_delay - self.__atx_leds_poll = atx_leds_poll + self.__streamer_shutdown_delay = streamer_shutdown_delay + self.__atx_state_poll = atx_state_poll self.__msd_chunk_size = msd_chunk_size self.__loop = loop @@ -80,17 +92,25 @@ class Server: # pylint: disable=too-many-instance-attributes self.__system_tasks: List[asyncio.Task] = [] - self.__restart_video = False + self.__reset_streamer = False def run(self, host: str, port: int) -> None: self.__keyboard.start() app = aiohttp.web.Application(loop=self.__loop) - app.router.add_get("/", self.__root_handler) + app.router.add_get("/ws", self.__ws_handler) + + app.router.add_get("/atx", self.__atx_state_handler) + app.router.add_post("/atx/click", self.__atx_click_handler) + app.router.add_get("/msd", self.__msd_state_handler) app.router.add_post("/msd/connect", self.__msd_connect_handler) app.router.add_post("/msd/write", self.__msd_write_handler) + + app.router.add_get("/streamer", self.__streamer_state_handler) + app.router.add_post("/streamer/reset", self.__streamer_reset_handler) + app.on_shutdown.append(self.__on_shutdown) app.on_cleanup.append(self.__on_cleanup) @@ -98,44 +118,55 @@ class Server: # pylint: disable=too-many-instance-attributes self.__loop.create_task(self.__keyboard_watchdog()), self.__loop.create_task(self.__stream_controller()), self.__loop.create_task(self.__poll_dead_sockets()), - self.__loop.create_task(self.__poll_atx_leds()), + self.__loop.create_task(self.__poll_atx_state()), ]) aiohttp.web.run_app(app, host=host, port=port, print=self.__run_app_print) - async def __root_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response: - return aiohttp.web.Response(text="OK") - async def __ws_handler(self, request: aiohttp.web.Request) -> aiohttp.web.WebSocketResponse: ws = aiohttp.web.WebSocketResponse(heartbeat=self.__heartbeat) await ws.prepare(request) await self.__register_socket(ws) async for msg in ws: if msg.type == aiohttp.web.WSMsgType.TEXT: - retval = await self.__execute_command(msg.data) - if retval: - await ws.send_str(retval) + await ws.send_str(json.dumps({"msg_type": "echo", "msg": msg.data})) else: break return ws + async def __atx_state_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response: + return _json_200(self.__atx.get_state()) + + @_exceptions_as_400("Click error", [RuntimeError]) + async def __atx_click_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response: + button = request.query.get("button") + if button == "power": + await self.__atx.click_power() + elif button == "power_long": + await self.__atx.click_power_long() + elif button == "reset": + await self.__atx.click_reset() + else: + raise RuntimeError("Missing or invalid 'button=%s'" % (button)) + return _json_200({"clicked": button}) + async def __msd_state_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response: - return aiohttp.web.json_response(self.__msd.get_state()) + return _json_200(self.__msd.get_state()) @_exceptions_as_400("Mass-storage error", [MassStorageError, RuntimeError]) async def __msd_connect_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response: to = request.query.get("to") if to == "kvm": await self.__msd.connect_to_kvm() - await self.__broadcast("EVENT msd connected_to_kvm") + await self.__broadcast_event("msd_state", state="connected_to_kvm") # type: ignore elif to == "pc": await self.__msd.connect_to_pc() - await self.__broadcast("EVENT msd connected_to_pc") + await self.__broadcast_event("msd_state", state="connected_to_pc") # type: ignore else: raise RuntimeError("Missing or invalid 'to=%s'" % (to)) - return aiohttp.web.json_response(self.__msd.get_state()) + return _json_200(self.__msd.get_state()) - @_exceptions_as_400("Can't write image to mass-storage device", [MassStorageError, RuntimeError, OSError]) + @_exceptions_as_400("Can't write data to mass-storage device", [MassStorageError, RuntimeError, OSError]) async def __msd_write_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response: logger = get_logger(0) reader = await request.multipart() @@ -146,18 +177,25 @@ class Server: # pylint: disable=too-many-instance-attributes raise RuntimeError("Missing 'data' field") async with self.__msd: - await self.__broadcast("EVENT msd busy") + await self.__broadcast_event("msd_state", state="busy") # type: ignore logger.info("Writing image to mass-storage device ...") while True: chunk = await field.read_chunk(self.__msd_chunk_size) if not chunk: break writed = await self.__msd.write(chunk) - await self.__broadcast("EVENT msd free") + await self.__broadcast_event("msd_state", state="free") # type: ignore finally: if writed != 0: logger.info("Writed %d bytes to mass-storage device", writed) - return aiohttp.web.json_response({"writed": writed}) + return _json_200({"writed": writed}) + + async def __streamer_state_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response: + return _json_200(self.__streamer.get_state()) + + async def __streamer_reset_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response: + self.__reset_streamer = True + return _json_200() def __run_app_print(self, text: str) -> None: logger = get_logger() @@ -198,16 +236,16 @@ class Server: # pylint: disable=too-many-instance-attributes if not self.__streamer.is_running(): await self.__streamer.start() elif prev > 0 and cur == 0: - shutdown_at = time.time() + self.__video_shutdown_delay + shutdown_at = time.time() + self.__streamer_shutdown_delay elif prev == 0 and cur == 0 and time.time() > shutdown_at: if self.__streamer.is_running(): await self.__streamer.stop() - if self.__restart_video: + if self.__reset_streamer: if self.__streamer.is_running(): await self.__streamer.stop() await self.__streamer.start() - self.__restart_video = False + self.__reset_streamer = False prev = cur await asyncio.sleep(0.1) @@ -221,36 +259,25 @@ class Server: # pylint: disable=too-many-instance-attributes await asyncio.sleep(0.1) @_system_task - async def __poll_atx_leds(self) -> None: + async def __poll_atx_state(self) -> None: while True: if self.__sockets: - await self.__broadcast("EVENT atx_leds %d %d" % (self.__atx.get_leds())) - await asyncio.sleep(self.__atx_leds_poll) + await self.__broadcast_event("atx_state", **self.__atx.get_state()) + await asyncio.sleep(self.__atx_state_poll) - async def __broadcast(self, msg: str) -> None: + async def __broadcast_event(self, event: str, **kwargs: Dict) -> None: await asyncio.gather(*[ - ws.send_str(msg) + ws.send_str(json.dumps({ + "msg_type": "event", + "msg": { + "event": event, + "event_attrs": kwargs, + }, + })) for ws in list(self.__sockets) if not ws.closed and ws._req.transport # pylint: disable=protected-access ], return_exceptions=True) - async def __execute_command(self, command: str) -> Optional[str]: - (command, args) = (command.strip().split(" ", maxsplit=1) + [""])[:2] - if command == "CLICK": - method = { - "power": self.__atx.click_power, - "power_long": self.__atx.click_power_long, - "reset": self.__atx.click_reset, - }.get(args) - if method: - await method() - return None - elif command == "RESTART_VIDEO": - self.__restart_video = True - return None - get_logger().warning("Received an incorrect command: %r", command) - return "ERROR incorrect command" - async def __register_socket(self, ws: aiohttp.web.WebSocketResponse) -> None: async with self.__sockets_lock: self.__sockets.add(ws) diff --git a/kvmd/kvmd/streamer.py b/kvmd/kvmd/streamer.py index 72f818cb..089d79e1 100644 --- a/kvmd/kvmd/streamer.py +++ b/kvmd/kvmd/streamer.py @@ -2,6 +2,7 @@ import asyncio import asyncio.subprocess from typing import List +from typing import Dict from typing import Optional from .logging import get_logger @@ -16,6 +17,8 @@ class Streamer: # pylint: disable=too-many-instance-attributes cap_power: int, conv_power: int, sync_delay: float, + width: int, + height: int, cmd: List[str], loop: asyncio.AbstractEventLoop, ) -> None: @@ -25,7 +28,9 @@ class Streamer: # pylint: disable=too-many-instance-attributes self.__cap_power = gpio.set_output(cap_power) self.__conv_power = (gpio.set_output(conv_power) if conv_power > 0 else conv_power) self.__sync_delay = sync_delay - self.__cmd = cmd + self.__width = width + self.__height = height + self.__cmd = [part.format(width=width, height=height) for part in cmd] self.__loop = loop @@ -48,6 +53,15 @@ class Streamer: # pylint: disable=too-many-instance-attributes def is_running(self) -> bool: return bool(self.__proc_task) + def get_state(self) -> Dict: + return { + "is_running": self.is_running(), + "size": { + "width": self.__width, + "height": self.__height, + }, + } + async def cleanup(self) -> None: if self.is_running(): await self.stop() |