diff options
-rw-r--r-- | kvmd/apps/__init__.py | 34 | ||||
-rw-r--r-- | kvmd/apps/cleanup/__init__.py | 6 | ||||
-rw-r--r-- | kvmd/apps/kvmd/api/export.py | 4 | ||||
-rw-r--r-- | kvmd/apps/kvmd/ugpio.py | 7 | ||||
-rw-r--r-- | kvmd/plugins/ugpio/__init__.py | 13 | ||||
-rw-r--r-- | kvmd/plugins/ugpio/hidrelay.py | 10 | ||||
-rw-r--r-- | kvmd/validators/kvm.py | 4 | ||||
-rw-r--r-- | testenv/tests/validators/test_kvm.py | 6 |
8 files changed, 58 insertions, 26 deletions
diff --git a/kvmd/apps/__init__.py b/kvmd/apps/__init__.py index c03fc48b..a1bdb916 100644 --- a/kvmd/apps/__init__.py +++ b/kvmd/apps/__init__.py @@ -22,6 +22,7 @@ import sys import os +import functools import argparse import logging import logging.config @@ -43,6 +44,8 @@ from ..plugins.auth import get_auth_service_class from ..plugins.hid import get_hid_class from ..plugins.atx import get_atx_class from ..plugins.msd import get_msd_class + +from ..plugins.ugpio import UserGpioModes from ..plugins.ugpio import get_ugpio_driver_class from ..yamlconf import ConfigError @@ -181,29 +184,40 @@ def _patch_dynamic( # pylint: disable=too-many-locals rebuild = True if load_gpio: - drivers: Set[str] = set() + driver: str + drivers: Dict[str, Set[str]] = {} # Name to modes for (driver, params) in { # type: ignore "__gpio__": {}, **tools.rget(raw_config, "kvmd", "gpio", "drivers"), }.items(): with manual_validated(driver, "kvmd", "gpio", "drivers", "<key>"): driver = valid_ugpio_driver(driver) + driver_type = valid_stripped_string_not_empty(params.get("type", "gpio")) + driver_class = get_ugpio_driver_class(driver_type) + drivers[driver] = driver_class.get_modes() scheme["kvmd"]["gpio"]["drivers"][driver] = { "type": Option(driver_type, type=valid_stripped_string_not_empty), - **get_ugpio_driver_class(driver_type).get_plugin_options() + **driver_class.get_plugin_options() } - drivers.add(driver) - for (channel, params) in tools.rget(raw_config, "kvmd", "gpio", "scheme").items(): - with manual_validated(channel, "kvmd", "gpio", "scheme", "<key>"): + path = ("kvmd", "gpio", "scheme") + for (channel, params) in tools.rget(raw_config, *path).items(): + with manual_validated(channel, *path, "<key>"): channel = valid_ugpio_channel(channel) - with manual_validated(params.get("mode", ""), "kvmd", "gpio", "scheme", channel, "mode"): - mode = valid_ugpio_mode(params.get("mode", "")) + + driver = params.get("driver", "__gpio__") + with manual_validated(driver, *path, channel, "driver"): + driver = valid_ugpio_driver(driver, set(drivers)) + + mode: str = params.get("mode", "") + with manual_validated(mode, *path, channel, "mode"): + mode = valid_ugpio_mode(mode, drivers[driver]) + scheme["kvmd"]["gpio"]["scheme"][channel] = { - "driver": Option("__gpio__", type=(lambda arg: valid_ugpio_driver(arg, drivers))), + "driver": Option("__gpio__", type=functools.partial(valid_ugpio_driver, variants=set(drivers))), "pin": Option(-1, type=valid_gpio_pin), - "mode": Option("", type=valid_ugpio_mode), + "mode": Option("", type=functools.partial(valid_ugpio_mode, variants=drivers[driver])), "inverted": Option(False, type=valid_bool), **({ "busy_delay": Option(0.2, type=valid_float_f01), @@ -214,7 +228,7 @@ def _patch_dynamic( # pylint: disable=too-many-locals "min_delay": Option(0.1, type=valid_float_f01), "max_delay": Option(0.1, type=valid_float_f01), }, - } if mode == "output" else {}) + } if mode == UserGpioModes.OUTPUT else {}) } rebuild = True diff --git a/kvmd/apps/cleanup/__init__.py b/kvmd/apps/cleanup/__init__.py index bb9b3b71..708e3399 100644 --- a/kvmd/apps/cleanup/__init__.py +++ b/kvmd/apps/cleanup/__init__.py @@ -60,12 +60,6 @@ def _clear_gpio(config: Section) -> None: ("streamer/cap", config.streamer.cap_pin), ("streamer/conv", config.streamer.conv_pin), - - # *([ - # (f"gpio/{channel}", params.pin) - # for (channel, params) in config.gpio.scheme.items() - # if params.mode == "output" - # ]), ]: if pin >= 0: logger.info("Writing 0 to GPIO pin=%d (%s)", pin, name) diff --git a/kvmd/apps/kvmd/api/export.py b/kvmd/apps/kvmd/api/export.py index b659b5fe..7b911828 100644 --- a/kvmd/apps/kvmd/api/export.py +++ b/kvmd/apps/kvmd/api/export.py @@ -32,6 +32,8 @@ from .... import tools from ....plugins.atx import BaseAtx +from ....plugins.ugpio import UserGpioModes + from ..info import InfoManager from ..ugpio import UserGpio @@ -59,7 +61,7 @@ class ExportApi: self.__append_prometheus_rows(rows, atx_state["enabled"], "pikvm_atx_enabled") self.__append_prometheus_rows(rows, atx_state["leds"]["power"], "pikvm_atx_power") - for mode in ["input", "output"]: + for mode in sorted(UserGpioModes.ALL): for (channel, ch_state) in gpio_state[f"{mode}s"].items(): for key in ["online", "state"]: self.__append_prometheus_rows(rows, ch_state["state"], f"pikvm_gpio_{mode}_{key}_{channel}") diff --git a/kvmd/apps/kvmd/ugpio.py b/kvmd/apps/kvmd/ugpio.py index 3f62d286..06c1a6e5 100644 --- a/kvmd/apps/kvmd/ugpio.py +++ b/kvmd/apps/kvmd/ugpio.py @@ -34,6 +34,7 @@ from ...logging import get_logger from ...plugins.ugpio import GpioError from ...plugins.ugpio import GpioOperationError from ...plugins.ugpio import GpioDriverOfflineError +from ...plugins.ugpio import UserGpioModes from ...plugins.ugpio import BaseUserGpioDriver from ...plugins.ugpio import get_ugpio_driver_class @@ -250,7 +251,7 @@ class UserGpio: for (channel, ch_config) in tools.sorted_kvs(config.scheme): driver = self.__drivers[ch_config.driver] - if ch_config.mode == "input": + if ch_config.mode == UserGpioModes.INPUT: self.__inputs[channel] = _GpioInput(channel, ch_config, driver) else: # output: self.__outputs[channel] = _GpioOutput(channel, ch_config, driver, self.__notifier) @@ -331,12 +332,12 @@ class UserGpio: if parts: if parts[0] in self.__inputs: items.append({ - "type": "input", + "type": UserGpioModes.INPUT, "channel": parts[0], }) elif parts[0] in self.__outputs: items.append({ - "type": "output", + "type": UserGpioModes.OUTPUT, "channel": parts[0], "text": (parts[1] if len(parts) > 1 else "Click"), }) diff --git a/kvmd/plugins/ugpio/__init__.py b/kvmd/plugins/ugpio/__init__.py index 1e7725e8..9ed48a5f 100644 --- a/kvmd/plugins/ugpio/__init__.py +++ b/kvmd/plugins/ugpio/__init__.py @@ -20,6 +20,7 @@ # ========================================================================== # +from typing import Set from typing import Type from typing import Optional from typing import Any @@ -47,6 +48,14 @@ class GpioDriverOfflineError(GpioOperationError): # ===== +class UserGpioModes: + INPUT = "input" + OUTPUT = "output" + + ALL = set([INPUT, OUTPUT]) + + +# ===== class BaseUserGpioDriver(BasePlugin): def __init__( # pylint: disable=super-init-not-called self, @@ -61,6 +70,10 @@ class BaseUserGpioDriver(BasePlugin): def get_instance_id(self) -> str: return self._instance_name + @classmethod + def get_modes(cls) -> Set[str]: + return set(UserGpioModes.ALL) + def register_input(self, pin: int) -> None: raise NotImplementedError diff --git a/kvmd/plugins/ugpio/hidrelay.py b/kvmd/plugins/ugpio/hidrelay.py index 85d2f808..b7f88cb8 100644 --- a/kvmd/plugins/ugpio/hidrelay.py +++ b/kvmd/plugins/ugpio/hidrelay.py @@ -24,6 +24,7 @@ import asyncio import contextlib from typing import Dict +from typing import Set from typing import Optional import hid @@ -39,6 +40,7 @@ from ...validators.basic import valid_float_f01 from ...validators.os import valid_abs_path from . import GpioDriverOfflineError +from . import UserGpioModes from . import BaseUserGpioDriver @@ -73,8 +75,12 @@ class Plugin(BaseUserGpioDriver): "state_poll": Option(5.0, type=valid_float_f01), } + @classmethod + def get_modes(cls) -> Set[str]: + return set([UserGpioModes.OUTPUT]) + def register_input(self, pin: int) -> None: - _ = pin + raise RuntimeError(f"Unsupported mode 'input' for pin={pin} on {self}") def register_output(self, pin: int, initial: Optional[bool]) -> None: self.__initials[pin] = initial @@ -153,7 +159,7 @@ class Plugin(BaseUserGpioDriver): def __check_pin(self, pin: int) -> bool: ok = (0 <= pin <= 7) if not ok: - get_logger(0).warning("Unsupported pin for %s on %s: %d", self, self.__device_path, pin) + get_logger(0).warning("Unsupported pin=%d for %s on %s", pin, self, self.__device_path) return ok @contextlib.contextmanager diff --git a/kvmd/validators/kvm.py b/kvmd/validators/kvm.py index 5a1fe1ae..676d5539 100644 --- a/kvmd/validators/kvm.py +++ b/kvmd/validators/kvm.py @@ -116,8 +116,8 @@ def valid_ugpio_channel(arg: Any) -> str: return check_len(check_re_match(arg, name, r"^[a-zA-Z_][a-zA-Z0-9_-]*$"), name, 255) -def valid_ugpio_mode(arg: Any) -> str: - return check_string_in_list(arg, "GPIO mode", ["input", "output"]) +def valid_ugpio_mode(arg: Any, variants: Set[str]) -> str: + return check_string_in_list(arg, "GPIO driver's pin mode", variants) def valid_ugpio_view_table(arg: Any) -> List[List[str]]: diff --git a/testenv/tests/validators/test_kvm.py b/testenv/tests/validators/test_kvm.py index 8e9029ab..2f6df4cd 100644 --- a/testenv/tests/validators/test_kvm.py +++ b/testenv/tests/validators/test_kvm.py @@ -44,6 +44,8 @@ from kvmd.validators.kvm import valid_ugpio_channel from kvmd.validators.kvm import valid_ugpio_mode from kvmd.validators.kvm import valid_ugpio_view_table +from kvmd.plugins.ugpio import UserGpioModes + # ===== @pytest.mark.parametrize("arg", ["ON ", "OFF ", "OFF_HARD ", "RESET_HARD "]) @@ -254,13 +256,13 @@ def test_fail__valid_ugpio_driver_variants(arg: Any) -> None: # ===== @pytest.mark.parametrize("arg", ["Input ", " OUTPUT "]) def test_ok__valid_ugpio_mode(arg: Any) -> None: - assert valid_ugpio_mode(arg) == arg.strip().lower() + assert valid_ugpio_mode(arg, UserGpioModes.ALL) == arg.strip().lower() @pytest.mark.parametrize("arg", ["test", "", None]) def test_fail__valid_ugpio_mode(arg: Any) -> None: with pytest.raises(ValidatorError): - print(valid_ugpio_mode(arg)) + print(valid_ugpio_mode(arg, UserGpioModes.ALL)) # ===== |