diff options
author | Devaev Maxim <[email protected]> | 2020-08-31 09:01:40 +0300 |
---|---|---|
committer | Devaev Maxim <[email protected]> | 2020-08-31 09:01:40 +0300 |
commit | 77826689446750021a8051b86343739e2766a368 (patch) | |
tree | 87af438fe0c6c90e7c676129fd3786acd56ed441 | |
parent | 9feb3531506706c358e2701a0df1f7cd3b7684c3 (diff) |
gpio view and refactoring
-rw-r--r-- | kvmd/apps/__init__.py | 14 | ||||
-rw-r--r-- | kvmd/apps/kvmd/api/ugpio.py | 7 | ||||
-rw-r--r-- | kvmd/apps/kvmd/server.py | 1 | ||||
-rw-r--r-- | kvmd/apps/kvmd/ugpio.py | 5 | ||||
-rw-r--r-- | kvmd/validators/hw.py | 9 | ||||
-rw-r--r-- | kvmd/validators/kvm.py | 18 |
6 files changed, 39 insertions, 15 deletions
diff --git a/kvmd/apps/__init__.py b/kvmd/apps/__init__.py index 4e395a35..28fbfc75 100644 --- a/kvmd/apps/__init__.py +++ b/kvmd/apps/__init__.py @@ -78,10 +78,11 @@ from ..validators.kvm import valid_stream_fps from ..validators.kvm import valid_stream_resolution from ..validators.kvm import valid_hid_key from ..validators.kvm import valid_hid_mouse_move +from ..validators.kvm import valid_ugpio_mode +from ..validators.kvm import valid_ugpio_view_table from ..validators.hw import valid_gpio_pin from ..validators.hw import valid_gpio_pin_optional -from ..validators.hw import valid_gpio_mode from ..validators.hw import valid_otg_gadget from ..validators.hw import valid_otg_id @@ -175,13 +176,13 @@ def _patch_dynamic( # pylint: disable=too-many-locals if load_gpio: for (channel, params) in raw_config.get("kvmd", {}).get("gpio", {}).get("scheme", {}).items(): try: - mode = valid_gpio_mode(params.get("mode", "")) + mode = valid_ugpio_mode(params.get("mode", "")) except Exception: pass finally: ch_scheme: Dict = { "pin": Option(-1, type=valid_gpio_pin), - "mode": Option("", type=valid_gpio_mode), + "mode": Option("", type=valid_ugpio_mode), "inverted": Option(False, type=valid_bool), } if mode == "output": @@ -325,6 +326,13 @@ def _get_config_scheme() -> Dict: "gpio": { "state_poll": Option(0.1, type=valid_float_f01), "scheme": {}, # Dymanic content + "view": { + "header": { + "title": Option("Switches"), + "leds": Option([], type=valid_string_list), + }, + "table": Option([], type=valid_ugpio_view_table), + }, }, }, diff --git a/kvmd/apps/kvmd/api/ugpio.py b/kvmd/apps/kvmd/api/ugpio.py index c4852593..69ac6646 100644 --- a/kvmd/apps/kvmd/api/ugpio.py +++ b/kvmd/apps/kvmd/api/ugpio.py @@ -26,7 +26,7 @@ from aiohttp.web import Response from ....validators.basic import valid_bool from ....validators.basic import valid_float_f0 -from ....validators.hw import valid_gpio_channel +from ....validators.kvm import valid_ugpio_channel from ..ugpio import UserGpio @@ -45,19 +45,20 @@ class UserGpioApi: async def __state_handler(self, _: Request) -> Response: return make_json_response({ "scheme": (await self.__user_gpio.get_scheme()), + "view": (await self.__user_gpio.get_view()), "state": (await self.__user_gpio.get_state()), }) @exposed_http("POST", "/gpio/switch") async def __switch_handler(self, request: Request) -> Response: - channel = valid_gpio_channel(request.query.get("channel")) + channel = valid_ugpio_channel(request.query.get("channel")) state = valid_bool(request.query.get("state")) done = await self.__user_gpio.switch(channel, state) return make_json_response({"done": done}) @exposed_http("POST", "/gpio/pulse") async def __pulse_handler(self, request: Request) -> Response: - channel = valid_gpio_channel(request.query.get("channel")) + channel = valid_ugpio_channel(request.query.get("channel")) delay = valid_float_f0(request.query.get("delay", "0")) await self.__user_gpio.pulse(channel, delay) return make_json_response() diff --git a/kvmd/apps/kvmd/server.py b/kvmd/apps/kvmd/server.py index 40fd263a..5208f25d 100644 --- a/kvmd/apps/kvmd/server.py +++ b/kvmd/apps/kvmd/server.py @@ -244,6 +244,7 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins await self.__register_ws_client(client) try: await self.__broadcast_event("gpio_scheme_state", await self.__user_gpio.get_scheme()) + await self.__broadcast_event("gpio_view_state", await self.__user_gpio.get_view()) await asyncio.gather(*[ self.__broadcast_event(component.event_type, await component.get_state()) for component in self.__components diff --git a/kvmd/apps/kvmd/ugpio.py b/kvmd/apps/kvmd/ugpio.py index cc4d9685..97556aba 100644 --- a/kvmd/apps/kvmd/ugpio.py +++ b/kvmd/apps/kvmd/ugpio.py @@ -167,6 +167,8 @@ class _GpioOutput: # pylint: disable=too-many-instance-attributes # ===== class UserGpio: def __init__(self, config: Section) -> None: + self.__view = config.view + self.__state_notifier = aiotools.AioNotifier() self.__reader = gpio.BatchReader( pins=[gpio.set_input(ch_config.pin) for ch_config in config.scheme.values()], @@ -189,6 +191,9 @@ class UserGpio: "outputs": {channel: gout.get_scheme() for (channel, gout) in self.__outputs.items()}, } + async def get_view(self) -> Dict: + return self.__view + async def get_state(self) -> Dict: return { "inputs": {channel: gin.get_state() for (channel, gin) in self.__inputs.items()}, diff --git a/kvmd/validators/hw.py b/kvmd/validators/hw.py index e37eeda8..82d1221a 100644 --- a/kvmd/validators/hw.py +++ b/kvmd/validators/hw.py @@ -23,7 +23,6 @@ from typing import Any from . import check_in_list -from . import check_string_in_list from . import check_re_match from .basic import valid_number @@ -44,14 +43,6 @@ def valid_gpio_pin_optional(arg: Any) -> int: return int(valid_number(arg, min=-1, name="optional GPIO pin")) -def valid_gpio_mode(arg: Any) -> str: - return check_string_in_list(arg, "GPIO mode", ["input", "output"]) - - -def valid_gpio_channel(arg: Any) -> str: - return check_re_match(arg, "GPIO channel", r"^[a-zA-Z_][a-zA-Z0-9_-]*$")[:255] - - def valid_otg_gadget(arg: Any) -> str: return check_re_match(arg, "OTG gadget name", r"^[a-z_][a-z0-9_-]*$")[:255] diff --git a/kvmd/validators/kvm.py b/kvmd/validators/kvm.py index 140c282c..7ff96055 100644 --- a/kvmd/validators/kvm.py +++ b/kvmd/validators/kvm.py @@ -20,12 +20,14 @@ # ========================================================================== # +from typing import List from typing import Any from ..keyboard.mappings import KEYMAP from . import raise_error from . import check_string_in_list +from . import check_re_match from .basic import valid_stripped_string_not_empty from .basic import valid_number @@ -86,3 +88,19 @@ def valid_hid_mouse_button(arg: Any) -> str: def valid_hid_mouse_wheel(arg: Any) -> int: arg = valid_number(arg, name="HID mouse wheel") return min(max(-127, arg), 127) + + +# ===== +def valid_ugpio_mode(arg: Any) -> str: + return check_string_in_list(arg, "GPIO mode", ["input", "output"]) + + +def valid_ugpio_channel(arg: Any) -> str: + return check_re_match(arg, "GPIO channel", r"^[a-zA-Z_][a-zA-Z0-9_-]*$")[:255] + + +def valid_ugpio_view_table(arg: Any) -> List[List[str]]: + try: + return list(map(str, list(arg))) # type: ignore + except Exception: + raise_error("<skipped>", "GPIO view table") |