summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDevaev Maxim <[email protected]>2020-09-10 09:31:05 +0300
committerDevaev Maxim <[email protected]>2020-09-10 09:31:05 +0300
commit9cb5582565aa2301a238ca382a32dc80106f6c15 (patch)
treeb0e06d9cb15c88f72a8a42e0c02a97a7b0965663
parent967afb2d9a0f4a94026ec612801b6291db5c6a72 (diff)
only available gpio modes
-rw-r--r--kvmd/apps/__init__.py34
-rw-r--r--kvmd/apps/cleanup/__init__.py6
-rw-r--r--kvmd/apps/kvmd/api/export.py4
-rw-r--r--kvmd/apps/kvmd/ugpio.py7
-rw-r--r--kvmd/plugins/ugpio/__init__.py13
-rw-r--r--kvmd/plugins/ugpio/hidrelay.py10
-rw-r--r--kvmd/validators/kvm.py4
-rw-r--r--testenv/tests/validators/test_kvm.py6
8 files changed, 58 insertions, 26 deletions
diff --git a/kvmd/apps/__init__.py b/kvmd/apps/__init__.py
index c03fc48b..a1bdb916 100644
--- a/kvmd/apps/__init__.py
+++ b/kvmd/apps/__init__.py
@@ -22,6 +22,7 @@
import sys
import os
+import functools
import argparse
import logging
import logging.config
@@ -43,6 +44,8 @@ from ..plugins.auth import get_auth_service_class
from ..plugins.hid import get_hid_class
from ..plugins.atx import get_atx_class
from ..plugins.msd import get_msd_class
+
+from ..plugins.ugpio import UserGpioModes
from ..plugins.ugpio import get_ugpio_driver_class
from ..yamlconf import ConfigError
@@ -181,29 +184,40 @@ def _patch_dynamic( # pylint: disable=too-many-locals
rebuild = True
if load_gpio:
- drivers: Set[str] = set()
+ driver: str
+ drivers: Dict[str, Set[str]] = {} # Name to modes
for (driver, params) in { # type: ignore
"__gpio__": {},
**tools.rget(raw_config, "kvmd", "gpio", "drivers"),
}.items():
with manual_validated(driver, "kvmd", "gpio", "drivers", "<key>"):
driver = valid_ugpio_driver(driver)
+
driver_type = valid_stripped_string_not_empty(params.get("type", "gpio"))
+ driver_class = get_ugpio_driver_class(driver_type)
+ drivers[driver] = driver_class.get_modes()
scheme["kvmd"]["gpio"]["drivers"][driver] = {
"type": Option(driver_type, type=valid_stripped_string_not_empty),
- **get_ugpio_driver_class(driver_type).get_plugin_options()
+ **driver_class.get_plugin_options()
}
- drivers.add(driver)
- for (channel, params) in tools.rget(raw_config, "kvmd", "gpio", "scheme").items():
- with manual_validated(channel, "kvmd", "gpio", "scheme", "<key>"):
+ path = ("kvmd", "gpio", "scheme")
+ for (channel, params) in tools.rget(raw_config, *path).items():
+ with manual_validated(channel, *path, "<key>"):
channel = valid_ugpio_channel(channel)
- with manual_validated(params.get("mode", ""), "kvmd", "gpio", "scheme", channel, "mode"):
- mode = valid_ugpio_mode(params.get("mode", ""))
+
+ driver = params.get("driver", "__gpio__")
+ with manual_validated(driver, *path, channel, "driver"):
+ driver = valid_ugpio_driver(driver, set(drivers))
+
+ mode: str = params.get("mode", "")
+ with manual_validated(mode, *path, channel, "mode"):
+ mode = valid_ugpio_mode(mode, drivers[driver])
+
scheme["kvmd"]["gpio"]["scheme"][channel] = {
- "driver": Option("__gpio__", type=(lambda arg: valid_ugpio_driver(arg, drivers))),
+ "driver": Option("__gpio__", type=functools.partial(valid_ugpio_driver, variants=set(drivers))),
"pin": Option(-1, type=valid_gpio_pin),
- "mode": Option("", type=valid_ugpio_mode),
+ "mode": Option("", type=functools.partial(valid_ugpio_mode, variants=drivers[driver])),
"inverted": Option(False, type=valid_bool),
**({
"busy_delay": Option(0.2, type=valid_float_f01),
@@ -214,7 +228,7 @@ def _patch_dynamic( # pylint: disable=too-many-locals
"min_delay": Option(0.1, type=valid_float_f01),
"max_delay": Option(0.1, type=valid_float_f01),
},
- } if mode == "output" else {})
+ } if mode == UserGpioModes.OUTPUT else {})
}
rebuild = True
diff --git a/kvmd/apps/cleanup/__init__.py b/kvmd/apps/cleanup/__init__.py
index bb9b3b71..708e3399 100644
--- a/kvmd/apps/cleanup/__init__.py
+++ b/kvmd/apps/cleanup/__init__.py
@@ -60,12 +60,6 @@ def _clear_gpio(config: Section) -> None:
("streamer/cap", config.streamer.cap_pin),
("streamer/conv", config.streamer.conv_pin),
-
- # *([
- # (f"gpio/{channel}", params.pin)
- # for (channel, params) in config.gpio.scheme.items()
- # if params.mode == "output"
- # ]),
]:
if pin >= 0:
logger.info("Writing 0 to GPIO pin=%d (%s)", pin, name)
diff --git a/kvmd/apps/kvmd/api/export.py b/kvmd/apps/kvmd/api/export.py
index b659b5fe..7b911828 100644
--- a/kvmd/apps/kvmd/api/export.py
+++ b/kvmd/apps/kvmd/api/export.py
@@ -32,6 +32,8 @@ from .... import tools
from ....plugins.atx import BaseAtx
+from ....plugins.ugpio import UserGpioModes
+
from ..info import InfoManager
from ..ugpio import UserGpio
@@ -59,7 +61,7 @@ class ExportApi:
self.__append_prometheus_rows(rows, atx_state["enabled"], "pikvm_atx_enabled")
self.__append_prometheus_rows(rows, atx_state["leds"]["power"], "pikvm_atx_power")
- for mode in ["input", "output"]:
+ for mode in sorted(UserGpioModes.ALL):
for (channel, ch_state) in gpio_state[f"{mode}s"].items():
for key in ["online", "state"]:
self.__append_prometheus_rows(rows, ch_state["state"], f"pikvm_gpio_{mode}_{key}_{channel}")
diff --git a/kvmd/apps/kvmd/ugpio.py b/kvmd/apps/kvmd/ugpio.py
index 3f62d286..06c1a6e5 100644
--- a/kvmd/apps/kvmd/ugpio.py
+++ b/kvmd/apps/kvmd/ugpio.py
@@ -34,6 +34,7 @@ from ...logging import get_logger
from ...plugins.ugpio import GpioError
from ...plugins.ugpio import GpioOperationError
from ...plugins.ugpio import GpioDriverOfflineError
+from ...plugins.ugpio import UserGpioModes
from ...plugins.ugpio import BaseUserGpioDriver
from ...plugins.ugpio import get_ugpio_driver_class
@@ -250,7 +251,7 @@ class UserGpio:
for (channel, ch_config) in tools.sorted_kvs(config.scheme):
driver = self.__drivers[ch_config.driver]
- if ch_config.mode == "input":
+ if ch_config.mode == UserGpioModes.INPUT:
self.__inputs[channel] = _GpioInput(channel, ch_config, driver)
else: # output:
self.__outputs[channel] = _GpioOutput(channel, ch_config, driver, self.__notifier)
@@ -331,12 +332,12 @@ class UserGpio:
if parts:
if parts[0] in self.__inputs:
items.append({
- "type": "input",
+ "type": UserGpioModes.INPUT,
"channel": parts[0],
})
elif parts[0] in self.__outputs:
items.append({
- "type": "output",
+ "type": UserGpioModes.OUTPUT,
"channel": parts[0],
"text": (parts[1] if len(parts) > 1 else "Click"),
})
diff --git a/kvmd/plugins/ugpio/__init__.py b/kvmd/plugins/ugpio/__init__.py
index 1e7725e8..9ed48a5f 100644
--- a/kvmd/plugins/ugpio/__init__.py
+++ b/kvmd/plugins/ugpio/__init__.py
@@ -20,6 +20,7 @@
# ========================================================================== #
+from typing import Set
from typing import Type
from typing import Optional
from typing import Any
@@ -47,6 +48,14 @@ class GpioDriverOfflineError(GpioOperationError):
# =====
+class UserGpioModes:
+ INPUT = "input"
+ OUTPUT = "output"
+
+ ALL = set([INPUT, OUTPUT])
+
+
+# =====
class BaseUserGpioDriver(BasePlugin):
def __init__( # pylint: disable=super-init-not-called
self,
@@ -61,6 +70,10 @@ class BaseUserGpioDriver(BasePlugin):
def get_instance_id(self) -> str:
return self._instance_name
+ @classmethod
+ def get_modes(cls) -> Set[str]:
+ return set(UserGpioModes.ALL)
+
def register_input(self, pin: int) -> None:
raise NotImplementedError
diff --git a/kvmd/plugins/ugpio/hidrelay.py b/kvmd/plugins/ugpio/hidrelay.py
index 85d2f808..b7f88cb8 100644
--- a/kvmd/plugins/ugpio/hidrelay.py
+++ b/kvmd/plugins/ugpio/hidrelay.py
@@ -24,6 +24,7 @@ import asyncio
import contextlib
from typing import Dict
+from typing import Set
from typing import Optional
import hid
@@ -39,6 +40,7 @@ from ...validators.basic import valid_float_f01
from ...validators.os import valid_abs_path
from . import GpioDriverOfflineError
+from . import UserGpioModes
from . import BaseUserGpioDriver
@@ -73,8 +75,12 @@ class Plugin(BaseUserGpioDriver):
"state_poll": Option(5.0, type=valid_float_f01),
}
+ @classmethod
+ def get_modes(cls) -> Set[str]:
+ return set([UserGpioModes.OUTPUT])
+
def register_input(self, pin: int) -> None:
- _ = pin
+ raise RuntimeError(f"Unsupported mode 'input' for pin={pin} on {self}")
def register_output(self, pin: int, initial: Optional[bool]) -> None:
self.__initials[pin] = initial
@@ -153,7 +159,7 @@ class Plugin(BaseUserGpioDriver):
def __check_pin(self, pin: int) -> bool:
ok = (0 <= pin <= 7)
if not ok:
- get_logger(0).warning("Unsupported pin for %s on %s: %d", self, self.__device_path, pin)
+ get_logger(0).warning("Unsupported pin=%d for %s on %s", pin, self, self.__device_path)
return ok
@contextlib.contextmanager
diff --git a/kvmd/validators/kvm.py b/kvmd/validators/kvm.py
index 5a1fe1ae..676d5539 100644
--- a/kvmd/validators/kvm.py
+++ b/kvmd/validators/kvm.py
@@ -116,8 +116,8 @@ def valid_ugpio_channel(arg: Any) -> str:
return check_len(check_re_match(arg, name, r"^[a-zA-Z_][a-zA-Z0-9_-]*$"), name, 255)
-def valid_ugpio_mode(arg: Any) -> str:
- return check_string_in_list(arg, "GPIO mode", ["input", "output"])
+def valid_ugpio_mode(arg: Any, variants: Set[str]) -> str:
+ return check_string_in_list(arg, "GPIO driver's pin mode", variants)
def valid_ugpio_view_table(arg: Any) -> List[List[str]]:
diff --git a/testenv/tests/validators/test_kvm.py b/testenv/tests/validators/test_kvm.py
index 8e9029ab..2f6df4cd 100644
--- a/testenv/tests/validators/test_kvm.py
+++ b/testenv/tests/validators/test_kvm.py
@@ -44,6 +44,8 @@ from kvmd.validators.kvm import valid_ugpio_channel
from kvmd.validators.kvm import valid_ugpio_mode
from kvmd.validators.kvm import valid_ugpio_view_table
+from kvmd.plugins.ugpio import UserGpioModes
+
# =====
@pytest.mark.parametrize("arg", ["ON ", "OFF ", "OFF_HARD ", "RESET_HARD "])
@@ -254,13 +256,13 @@ def test_fail__valid_ugpio_driver_variants(arg: Any) -> None:
# =====
@pytest.mark.parametrize("arg", ["Input ", " OUTPUT "])
def test_ok__valid_ugpio_mode(arg: Any) -> None:
- assert valid_ugpio_mode(arg) == arg.strip().lower()
+ assert valid_ugpio_mode(arg, UserGpioModes.ALL) == arg.strip().lower()
@pytest.mark.parametrize("arg", ["test", "", None])
def test_fail__valid_ugpio_mode(arg: Any) -> None:
with pytest.raises(ValidatorError):
- print(valid_ugpio_mode(arg))
+ print(valid_ugpio_mode(arg, UserGpioModes.ALL))
# =====