summaryrefslogtreecommitdiff
path: root/kvmd/apps
diff options
context:
space:
mode:
authorShantur Rathore <[email protected]>2021-09-20 01:22:48 +0100
committerGitHub <[email protected]>2021-09-20 03:22:48 +0300
commitf160fb561fa5bdcc7e35d35d16f66cc9150a0859 (patch)
treeb143c46ba4748d07da0b877d43152a7444a91c13 /kvmd/apps
parente38c65f18187163292c9e35957ac0b62efb04b19 (diff)
Implement macro recording for gpio (#65)
Diffstat (limited to 'kvmd/apps')
-rw-r--r--kvmd/apps/kvmd/api/ugpio.py27
1 files changed, 26 insertions, 1 deletions
diff --git a/kvmd/apps/kvmd/api/ugpio.py b/kvmd/apps/kvmd/api/ugpio.py
index 444ec83e..ddff00f7 100644
--- a/kvmd/apps/kvmd/api/ugpio.py
+++ b/kvmd/apps/kvmd/api/ugpio.py
@@ -19,9 +19,11 @@
# #
# ========================================================================== #
+from typing import Dict
from aiohttp.web import Request
from aiohttp.web import Response
+from aiohttp.web import WebSocketResponse
from ....validators.basic import valid_bool
from ....validators.basic import valid_float_f0
@@ -30,6 +32,7 @@ from ....validators.ugpio import valid_ugpio_channel
from ..ugpio import UserGpio
from ..http import exposed_http
+from ..http import exposed_ws
from ..http import make_json_response
@@ -38,7 +41,7 @@ class UserGpioApi:
def __init__(self, user_gpio: UserGpio) -> None:
self.__user_gpio = user_gpio
- # =====
+ # ===== Http
@exposed_http("GET", "/gpio")
async def __state_handler(self, _: Request) -> Response:
@@ -62,3 +65,25 @@ class UserGpioApi:
wait = valid_bool(request.query.get("wait", "0"))
await self.__user_gpio.pulse(channel, delay, wait)
return make_json_response()
+
+ # ===== Websocket
+
+ @exposed_ws("gpio_switch")
+ async def __ws_gpio_switch_handler(self, _: WebSocketResponse, event: Dict) -> None:
+ try:
+ channel = valid_ugpio_channel(event["channel"])
+ state = valid_bool(event["state"])
+ wait = valid_bool(event["wait"])
+ except Exception:
+ return
+ await self.__user_gpio.switch(channel, state, wait)
+
+ @exposed_ws("gpio_pulse")
+ async def __ws_gpio_pulse_handler(self, _: WebSocketResponse, event: Dict) -> None:
+ try:
+ channel = valid_ugpio_channel(event["channel"])
+ delay = valid_float_f0(event["delay"])
+ wait = valid_bool(event["wait"])
+ except Exception:
+ return
+ await self.__user_gpio.pulse(channel, delay, wait)