summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kvmd/kvmd/__init__.py17
-rw-r--r--kvmd/kvmd/streamer.py19
-rw-r--r--kvmd/requirements.txt2
-rw-r--r--kvmd/tox.ini8
-rwxr-xr-xkvmd/wscli.py53
5 files changed, 77 insertions, 22 deletions
diff --git a/kvmd/kvmd/__init__.py b/kvmd/kvmd/__init__.py
index 10848c5b..e259654c 100644
--- a/kvmd/kvmd/__init__.py
+++ b/kvmd/kvmd/__init__.py
@@ -60,9 +60,10 @@ class _Application:
vga_power=self.__config["video"]["pinout"]["vga"],
sync_delay=self.__config["video"]["sync_delay"],
mjpg_streamer=self.__config["video"]["mjpg_streamer"],
+ loop=self.__loop,
)
- self.__system_futures: List[asyncio.Future] = []
+ self.__system_tasks: List[asyncio.Task] = []
def run(self) -> None:
app = aiohttp.web.Application(loop=self.__loop)
@@ -71,10 +72,10 @@ class _Application:
app.on_shutdown.append(self.__on_shutdown)
app.on_cleanup.append(self.__on_cleanup)
- self.__system_futures.extend([
- asyncio.ensure_future(self.__poll_dead_sockets(), loop=self.__loop),
- asyncio.ensure_future(self.__poll_atx_leds(), loop=self.__loop),
- asyncio.ensure_future(self.__poll_streamer_events(), loop=self.__loop),
+ self.__system_tasks.extend([
+ self.__loop.create_task(self.__poll_dead_sockets()),
+ self.__loop.create_task(self.__poll_atx_leds()),
+ self.__loop.create_task(self.__poll_streamer_events()),
])
aiohttp.web.run_app(
@@ -109,9 +110,9 @@ class _Application:
logger = get_logger()
logger.info("Cancelling tasks ...")
- for future in self.__system_futures:
- future.cancel()
- await asyncio.gather(*self.__system_futures)
+ for task in self.__system_tasks:
+ task.cancel()
+ await asyncio.gather(*self.__system_tasks)
logger.info("Cleaning up GPIO ...")
GPIO.cleanup()
diff --git a/kvmd/kvmd/streamer.py b/kvmd/kvmd/streamer.py
index af2900fb..ab6e717e 100644
--- a/kvmd/kvmd/streamer.py
+++ b/kvmd/kvmd/streamer.py
@@ -11,13 +11,14 @@ from RPi import GPIO
# =====
-class Streamer:
+class Streamer: # pylint: disable=too-many-instance-attributes
def __init__(
self,
cap_power: int,
vga_power: int,
sync_delay: float,
mjpg_streamer: Dict,
+ loop: asyncio.AbstractEventLoop,
) -> None:
self.__cap_power = self.__set_output_pin(cap_power)
@@ -30,9 +31,11 @@ class Streamer:
" -o 'output_http.so -p -l %(host)s %(port)s'"
) % (mjpg_streamer)
+ self.__loop = loop
+
self.__lock = asyncio.Lock()
self.__events_queue: asyncio.Queue = asyncio.Queue()
- self.__proc_future: Optional[asyncio.Future] = None
+ self.__proc_task: Optional[asyncio.Task] = None
def __set_output_pin(self, pin: int) -> int:
GPIO.setup(pin, GPIO.OUT)
@@ -46,18 +49,18 @@ class Streamer:
async def start(self) -> None:
async with self.__lock:
get_logger().info("Starting mjpg_streamer ...")
- assert not self.__proc_future
+ assert not self.__proc_task
await self.__set_hw_enabled(True)
- self.__proc_future = asyncio.ensure_future(self.__process(), loop=asyncio.get_event_loop())
+ self.__proc_task = self.__loop.create_task(self.__process())
async def stop(self) -> None:
async with self.__lock:
get_logger().info("Stopping mjpg_streamer ...")
- if self.__proc_future:
- self.__proc_future.cancel()
- await asyncio.gather(self.__proc_future, return_exceptions=True)
+ if self.__proc_task:
+ self.__proc_task.cancel()
+ await asyncio.gather(self.__proc_task, return_exceptions=True)
await self.__set_hw_enabled(False)
- self.__proc_future = None
+ self.__proc_task = None
await self.__events_queue.put("mjpg_streamer stopped")
async def __set_hw_enabled(self, enabled: bool) -> None:
diff --git a/kvmd/requirements.txt b/kvmd/requirements.txt
index 95592a40..f7a1faff 100644
--- a/kvmd/requirements.txt
+++ b/kvmd/requirements.txt
@@ -2,5 +2,3 @@ RPi.GPIO
aiohttp
contextlog
pyyaml
-bumpversion
-tox
diff --git a/kvmd/tox.ini b/kvmd/tox.ini
index d627345f..69ab7011 100644
--- a/kvmd/tox.ini
+++ b/kvmd/tox.ini
@@ -6,26 +6,26 @@ skipsdist = True
basepython = python3.6
[testenv:flake8]
-commands = flake8 kvmd
+commands = flake8 kvmd wscli.py
deps =
flake8
flake8-double-quotes
-rdev_requirements.txt
[testenv:pylint]
-commands = pylint --output-format=colorized --reports=no kvmd
+commands = pylint --output-format=colorized --reports=no kvmd wscli.py
deps =
pylint
-rdev_requirements.txt
[testenv:mypy]
-commands = mypy kvmd
+commands = mypy kvmd wscli.py
deps =
mypy
-rdev_requirements.txt
[testenv:vulture]
-commands = vulture kvmd
+commands = vulture kvmd wscli.py
deps =
vulture
-rdev_requirements.txt
diff --git a/kvmd/wscli.py b/kvmd/wscli.py
new file mode 100755
index 00000000..4923029d
--- /dev/null
+++ b/kvmd/wscli.py
@@ -0,0 +1,53 @@
+#!/usr/bin/env python3
+
+
+import sys
+import signal
+import asyncio
+import argparse
+
+import aiohttp
+
+
+# =====
+async def _run_client(loop: asyncio.AbstractEventLoop, url: str) -> None:
+ def stdin_callback() -> None:
+ line = sys.stdin.buffer.readline().decode()
+ if line:
+ asyncio.ensure_future(ws.send_str(line), loop=loop)
+ else:
+ loop.stop()
+
+ loop.add_reader(sys.stdin.fileno(), stdin_callback)
+
+ async def dispatch() -> None:
+ while True:
+ msg = await ws.receive()
+ if msg.type == aiohttp.WSMsgType.TEXT:
+ print("Received:", msg.data.strip())
+ else:
+ if msg.type == aiohttp.WSMsgType.CLOSE:
+ await ws.close()
+ elif msg.type == aiohttp.WSMsgType.ERROR:
+ print("Error during receive:", ws.exception())
+ elif msg.type == aiohttp.WSMsgType.CLOSED:
+ pass
+ break
+
+ async with aiohttp.ClientSession().ws_connect(url) as ws:
+ await dispatch()
+
+
+def main() -> None:
+ parser = argparse.ArgumentParser()
+ parser.add_argument("-u", "--url", default="http://localhost:8080")
+ options = parser.parse_args()
+
+ loop = asyncio.get_event_loop()
+ loop.add_signal_handler(signal.SIGINT, loop.stop)
+ loop.create_task(_run_client(loop, options.url))
+ loop.run_forever()
+
+
+if __name__ == "__main__":
+ main()