diff options
author | Devaev Maxim <[email protected]> | 2018-11-28 23:41:03 +0300 |
---|---|---|
committer | Devaev Maxim <[email protected]> | 2018-11-28 23:41:03 +0300 |
commit | 5f7834724a48f28539a66b481772201f00147b31 (patch) | |
tree | 759b5b71ddea26302093f09e88c514436cfeaee4 | |
parent | 5407f983c87cb5fa0e123c3a2a9775ad93b1ab65 (diff) |
refactoring
-rw-r--r-- | kvmd/apps/kvmd/msd.py | 68 | ||||
-rw-r--r-- | kvmd/apps/kvmd/server.py | 64 |
2 files changed, 73 insertions, 59 deletions
diff --git a/kvmd/apps/kvmd/msd.py b/kvmd/apps/kvmd/msd.py index 2cdb9050..79295109 100644 --- a/kvmd/apps/kvmd/msd.py +++ b/kvmd/apps/kvmd/msd.py @@ -1,12 +1,14 @@ import os import struct import asyncio +import asyncio.queues import types from typing import Dict from typing import NamedTuple from typing import Callable from typing import Type +from typing import AsyncGenerator from typing import Optional from typing import Any @@ -192,6 +194,8 @@ class MassStorageDevice: # pylint: disable=too-many-instance-attributes self.__device_file: Optional[aiofiles.base.AiofilesContextManager] = None self.__written = 0 + self.__state_queue: asyncio.queues.Queue = asyncio.Queue() + logger = get_logger(0) if self._device_path: logger.info("Using %r as mass-storage device", self._device_path) @@ -208,8 +212,35 @@ class MassStorageDevice: # pylint: disable=too-many-instance-attributes else: logger.warning("Mass-storage device is not operational") + def get_state(self) -> Dict: + info = (self.__saved_device_info._asdict() if self.__saved_device_info else None) + if info: + info["hw"] = (info["hw"]._asdict() if info["hw"] else None) + info["image"] = (info["image"]._asdict() if info["image"] else None) + + connected_to: Optional[str] = None + if self._device_path: + connected_to = ("kvm" if self.__device_info else "server") + + return { + "in_operate": bool(self._device_path), + "connected_to": connected_to, + "busy": bool(self.__device_file), + "written": self.__written, + "info": info, + } + + async def poll_state(self) -> AsyncGenerator[Dict, None]: + while True: + yield (await self.__state_queue.get()) + + async def cleanup(self) -> None: + await self.__close_device_file() + gpio.write(self.__target, False) + gpio.write(self.__reset, False) + @_msd_operated - async def connect_to_kvm(self, no_delay: bool=False) -> None: + async def connect_to_kvm(self, no_delay: bool=False) -> Dict: with self.__region: if self.__device_info: raise MsdAlreadyConnectedToKvmError() @@ -217,46 +248,31 @@ class MassStorageDevice: # pylint: disable=too-many-instance-attributes if not no_delay: await asyncio.sleep(self.__init_delay) await self.__load_device_info() + state = self.get_state() + await self.__state_queue.put(state) get_logger().info("Mass-storage device switched to KVM: %s", self.__device_info) + return state @_msd_operated - async def connect_to_pc(self) -> None: + async def connect_to_pc(self) -> Dict: with self.__region: if not self.__device_info: raise MsdAlreadyConnectedToPcError() gpio.write(self.__target, True) self.__device_info = None + state = self.get_state() + await self.__state_queue.put(state) get_logger().info("Mass-storage device switched to Server") + return state @_msd_operated async def reset(self) -> None: with self.__region: + get_logger().info("Mass-storage device reset") gpio.write(self.__reset, True) await asyncio.sleep(self.__reset_delay) gpio.write(self.__reset, False) - - def get_state(self) -> Dict: - info = (self.__saved_device_info._asdict() if self.__saved_device_info else None) - if info: - info["hw"] = (info["hw"]._asdict() if info["hw"] else None) - info["image"] = (info["image"]._asdict() if info["image"] else None) - - connected_to: Optional[str] = None - if self._device_path: - connected_to = ("kvm" if self.__device_info else "server") - - return { - "in_operate": bool(self._device_path), - "connected_to": connected_to, - "busy": bool(self.__device_file), - "written": self.__written, - "info": info, - } - - async def cleanup(self) -> None: - await self.__close_device_file() - gpio.write(self.__target, False) - gpio.write(self.__reset, False) + await self.__state_queue.put(self.get_state()) @_msd_operated async def __aenter__(self) -> "MassStorageDevice": @@ -268,6 +284,7 @@ class MassStorageDevice: # pylint: disable=too-many-instance-attributes self.__written = 0 return self finally: + await self.__state_queue.put(self.get_state()) self.__region.exit() async def write_image_info(self, name: str, complete: bool) -> None: @@ -296,6 +313,7 @@ class MassStorageDevice: # pylint: disable=too-many-instance-attributes try: await self.__close_device_file() finally: + await self.__state_queue.put(self.get_state()) self.__region.exit() async def __write_to_device_file(self, data: bytes) -> None: diff --git a/kvmd/apps/kvmd/server.py b/kvmd/apps/kvmd/server.py index af9e6765..7b3192ae 100644 --- a/kvmd/apps/kvmd/server.py +++ b/kvmd/apps/kvmd/server.py @@ -183,6 +183,7 @@ class Server: # pylint: disable=too-many-instance-attributes self.__loop.create_task(self.__stream_controller()), self.__loop.create_task(self.__poll_dead_sockets()), self.__loop.create_task(self.__poll_atx_state()), + self.__loop.create_task(self.__poll_msd_state()), self.__loop.create_task(self.__poll_streamer_state()), ]) @@ -303,9 +304,7 @@ class Server: # pylint: disable=too-many-instance-attributes }.get(button) if not clicker: raise BadRequest("Invalid param 'button'") - await self.__broadcast_event(_Events.ATX_STATE, self.__atx.get_state()) await clicker() - await self.__broadcast_event(_Events.ATX_STATE, self.__atx.get_state()) return _json({"clicked": button}) # ===== MSD @@ -317,16 +316,11 @@ class Server: # pylint: disable=too-many-instance-attributes async def __msd_connect_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response: to = request.query.get("to") if to == "kvm": - await self.__msd.connect_to_kvm() - state = self.__msd.get_state() - await self.__broadcast_event(_Events.MSD_STATE, state) + return _json(await self.__msd.connect_to_kvm()) elif to == "server": - await self.__msd.connect_to_pc() - state = self.__msd.get_state() - await self.__broadcast_event(_Events.MSD_STATE, state) + return _json(await self.__msd.connect_to_pc()) else: raise BadRequest("Invalid param 'to'") - return _json(state) @_wrap_exceptions_for_web("Can't write data to mass-storage device") async def __msd_write_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response: @@ -334,17 +328,16 @@ class Server: # pylint: disable=too-many-instance-attributes reader = await request.multipart() written = 0 try: - field = await reader.next() - if not field or field.name != "image_name": - raise BadRequest("Missing 'image_name' field") - image_name = (await field.read()).decode("utf-8")[:256] + async with self.__msd: + field = await reader.next() + if not field or field.name != "image_name": + raise BadRequest("Missing 'image_name' field") + image_name = (await field.read()).decode("utf-8")[:256] - field = await reader.next() - if not field or field.name != "image_data": - raise BadRequest("Missing 'image_data' field") + field = await reader.next() + if not field or field.name != "image_data": + raise BadRequest("Missing 'image_data' field") - async with self.__msd: - await self.__broadcast_event(_Events.MSD_STATE, self.__msd.get_state()) logger.info("Writing image %r to mass-storage device ...", image_name) await self.__msd.write_image_info(image_name, False) while True: @@ -354,7 +347,6 @@ class Server: # pylint: disable=too-many-instance-attributes written = await self.__msd.write_image_chunk(chunk) await self.__msd.write_image_info(image_name, True) finally: - await self.__broadcast_event(_Events.MSD_STATE, self.__msd.get_state()) if written != 0: logger.info("Written %d bytes to mass-storage device", written) return _json({"written": written}) @@ -450,27 +442,31 @@ class Server: # pylint: disable=too-many-instance-attributes @_system_task async def __poll_atx_state(self) -> None: async for state in self.__atx.poll_state(): - if self.__sockets: - await self.__broadcast_event(_Events.ATX_STATE, state) + await self.__broadcast_event(_Events.ATX_STATE, state) + + @_system_task + async def __poll_msd_state(self) -> None: + async for state in self.__msd.poll_state(): + await self.__broadcast_event(_Events.MSD_STATE, state) @_system_task async def __poll_streamer_state(self) -> None: async for state in self.__streamer.poll_state(): - if self.__sockets: - await self.__broadcast_event(_Events.STREAMER_STATE, state) + await self.__broadcast_event(_Events.STREAMER_STATE, state) async def __broadcast_event(self, event_type: _Events, event_attrs: Dict) -> None: - await asyncio.gather(*[ - ws.send_str(json.dumps({ - "msg_type": "event", - "msg": { - "event": event_type.value, - "event_attrs": event_attrs, - }, - })) - for ws in list(self.__sockets) - if not ws.closed and ws._req.transport # pylint: disable=protected-access - ], return_exceptions=True) + if self.__sockets: + await asyncio.gather(*[ + ws.send_str(json.dumps({ + "msg_type": "event", + "msg": { + "event": event_type.value, + "event_attrs": event_attrs, + }, + })) + for ws in list(self.__sockets) + if not ws.closed and ws._req.transport # pylint: disable=protected-access + ], return_exceptions=True) async def __register_socket(self, ws: aiohttp.web.WebSocketResponse) -> None: async with self.__sockets_lock: |