summaryrefslogtreecommitdiff
path: root/kvmd
diff options
context:
space:
mode:
authorDevaev Maxim <[email protected]>2020-03-01 00:37:25 +0300
committerDevaev Maxim <[email protected]>2020-03-01 02:31:06 +0300
commitcae9ad9a2191a10eeab8371601a24aaec8957dd8 (patch)
treea82506212ffba44f5ea51b8e0c084d51a428f6ae /kvmd
parent75d9b858d73bf3ed31597e554b27520e3e31f72e (diff)
removed busyloop from stream controller
Diffstat (limited to 'kvmd')
-rw-r--r--kvmd/aiomulti.py1
-rw-r--r--kvmd/aiotools.py15
-rw-r--r--kvmd/apps/kvmd/server.py7
3 files changed, 22 insertions, 1 deletions
diff --git a/kvmd/aiomulti.py b/kvmd/aiomulti.py
index 18d112f1..04eb45e1 100644
--- a/kvmd/aiomulti.py
+++ b/kvmd/aiomulti.py
@@ -56,6 +56,7 @@ class AioProcessNotifier:
return False
+# =====
class AioSharedFlags:
def __init__(
self,
diff --git a/kvmd/aiotools.py b/kvmd/aiotools.py
index 76798e5c..993209d1 100644
--- a/kvmd/aiotools.py
+++ b/kvmd/aiotools.py
@@ -22,6 +22,7 @@
import os
import asyncio
+import asyncio.queues
import functools
import contextlib
import types
@@ -158,3 +159,17 @@ class AioExclusiveRegion:
_tb: types.TracebackType,
) -> None:
self.exit()
+
+
+# =====
+class AioNotifier:
+ def __init__(self) -> None:
+ self.__queue: asyncio.queues.Queue = asyncio.Queue()
+
+ async def notify(self) -> None:
+ await self.__queue.put(None)
+
+ async def wait(self) -> None:
+ await self.__queue.get()
+ while not self.__queue.empty():
+ await self.__queue.get()
diff --git a/kvmd/apps/kvmd/server.py b/kvmd/apps/kvmd/server.py
index 11d37afb..2d70d4b6 100644
--- a/kvmd/apps/kvmd/server.py
+++ b/kvmd/apps/kvmd/server.py
@@ -148,6 +148,7 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins
self.__system_tasks: List[asyncio.Task] = []
+ self.__streamer_notifier = aiotools.AioNotifier()
self.__reset_streamer = False
self.__new_streamer_params: Dict = {}
@@ -210,11 +211,13 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins
value = request.query.get(name)
if value:
self.__new_streamer_params[name] = validator(value)
+ await self.__streamer_notifier.notify()
return make_json_response()
@exposed_http("POST", "/streamer/reset")
async def __streamer_reset_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
self.__reset_streamer = True
+ await self.__streamer_notifier.notify()
return make_json_response()
# ===== WEBSOCKET
@@ -384,6 +387,7 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins
self.__sockets.add(ws)
remote: Optional[str] = (ws._req.remote if ws._req is not None else None) # pylint: disable=protected-access
get_logger().info("Registered new client socket: remote=%s; id=%d; active=%d", remote, id(ws), len(self.__sockets))
+ await self.__streamer_notifier.notify()
async def __remove_socket(self, ws: aiohttp.web.WebSocketResponse) -> None:
async with self.__sockets_lock:
@@ -397,6 +401,7 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins
raise
except Exception:
pass
+ await self.__streamer_notifier.notify()
# ===== SYSTEM TASKS
@@ -420,7 +425,7 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins
self.__reset_streamer = False
prev = cur
- await asyncio.sleep(0.1)
+ await self.__streamer_notifier.wait()
async def __poll_state(self, event_type: _Events, poller: AsyncGenerator[Dict, None]) -> None:
async for state in poller: