diff options
-rw-r--r-- | kvmd/apps/__init__.py | 7 | ||||
-rw-r--r-- | kvmd/validators/auth.py | 7 | ||||
-rw-r--r-- | kvmd/validators/basic.py | 29 | ||||
-rw-r--r-- | kvmd/validators/fs.py | 19 | ||||
-rw-r--r-- | tests/test_app_cleanup.py | 7 | ||||
-rw-r--r-- | tests/test_validators_auth.py | 23 | ||||
-rw-r--r-- | tests/test_validators_basic.py | 37 | ||||
-rw-r--r-- | tests/test_validators_fs.py | 27 |
8 files changed, 147 insertions, 9 deletions
diff --git a/kvmd/apps/__init__.py b/kvmd/apps/__init__.py index 77a9644b..3a653d11 100644 --- a/kvmd/apps/__init__.py +++ b/kvmd/apps/__init__.py @@ -51,9 +51,12 @@ from ..validators.basic import valid_number from ..validators.basic import valid_int_f1 from ..validators.basic import valid_float_f01 +from ..validators.auth import valid_users_list + from ..validators.fs import valid_abs_path from ..validators.fs import valid_abs_path_exists from ..validators.fs import valid_unix_mode +from ..validators.fs import valid_command from ..validators.net import valid_ip_or_host from ..validators.net import valid_port @@ -150,7 +153,7 @@ def _get_config_scheme() -> Dict: }, "auth": { - "internal_users": Option([]), + "internal_users": Option([], type=valid_users_list), "internal_type": Option("htpasswd"), "external_type": Option(""), # "internal": {}, @@ -222,7 +225,7 @@ def _get_config_scheme() -> Dict: "unix": Option("", type=valid_abs_path, only_if="!port", unpack_as="unix_path"), "timeout": Option(2.0, type=valid_float_f01), - "cmd": Option(["/bin/true"]), # TODO: Validator + "cmd": Option(["/bin/true"], type=valid_command), }, }, diff --git a/kvmd/validators/auth.py b/kvmd/validators/auth.py index 7e110162..535ba445 100644 --- a/kvmd/validators/auth.py +++ b/kvmd/validators/auth.py @@ -20,8 +20,11 @@ # ========================================================================== # +from typing import List from typing import Any +from .basic import valid_string_list + from . import check_re_match @@ -30,6 +33,10 @@ def valid_user(arg: Any) -> str: return check_re_match(arg, "username characters", r"^[a-z_][a-z0-9_-]*$") +def valid_users_list(arg: Any) -> List[str]: + return valid_string_list(arg, subval=valid_user, name="users list") + + def valid_passwd(arg: Any) -> str: return check_re_match(arg, "passwd characters", r"^[\x20-\x7e]*\Z$", strip=False, hide=True) diff --git a/kvmd/validators/basic.py b/kvmd/validators/basic.py index e4a2336e..62f9b36d 100644 --- a/kvmd/validators/basic.py +++ b/kvmd/validators/basic.py @@ -20,7 +20,12 @@ # ========================================================================== # +import re + +from typing import List from typing import Type +from typing import Callable +from typing import Optional from typing import Union from typing import Any @@ -71,3 +76,27 @@ def valid_int_f1(arg: Any) -> int: def valid_float_f01(arg: Any) -> float: return float(valid_number(arg, min=0.1, type=float)) + + +def valid_string_list( + arg: Any, + delim: str=r"[,\t ]+", + subval: Optional[Callable[[Any], Any]]=None, + name: str="", +) -> List[str]: + + if not name: + name = "string list" + + if subval is None: + subval = (lambda item: check_not_none_string(item, name + " item")) + + if not isinstance(arg, (list, tuple)): + arg = check_not_none_string(arg, name) + arg = list(filter(None, re.split(delim, arg))) + if subval is not None: + try: + arg = list(map(subval, arg)) + except Exception: + raise ValidatorError("Failed sub-validator on one of the item of %r" % (arg)) + return arg diff --git a/kvmd/validators/fs.py b/kvmd/validators/fs.py index ed895818..505df8e2 100644 --- a/kvmd/validators/fs.py +++ b/kvmd/validators/fs.py @@ -22,17 +22,20 @@ import os +from typing import List from typing import Any from . import raise_error from . import check_not_none_string from .basic import valid_number +from .basic import valid_string_list # ===== -def valid_abs_path(arg: Any, exists: bool=False) -> str: - name = ("existent absolute path" if exists else "absolute path") +def valid_abs_path(arg: Any, exists: bool=False, name: str="") -> str: + if not name: + name = ("existent absolute path" if exists else "absolute path") if len(str(arg).strip()) == 0: arg = None @@ -44,9 +47,17 @@ def valid_abs_path(arg: Any, exists: bool=False) -> str: return arg -def valid_abs_path_exists(arg: Any) -> str: - return valid_abs_path(arg, exists=True) +def valid_abs_path_exists(arg: Any, name: str="") -> str: + return valid_abs_path(arg, exists=True, name=name) def valid_unix_mode(arg: Any) -> int: return int(valid_number(arg, min=0, name="UNIX mode")) + + +def valid_command(arg: Any) -> List[str]: + cmd = valid_string_list(arg, delim=r"[,\t]+", name="command") + if len(cmd) == 0: + raise_error(arg, "command") + cmd[0] = valid_abs_path_exists(cmd[0], name="command entry point") + return cmd diff --git a/tests/test_app_cleanup.py b/tests/test_app_cleanup.py index d0f1f009..3e2e4c72 100644 --- a/tests/test_app_cleanup.py +++ b/tests/test_app_cleanup.py @@ -35,7 +35,8 @@ from kvmd.apps.cleanup import main def test_ok(tmpdir) -> None: # type: ignore queue: multiprocessing.queues.Queue = multiprocessing.Queue() - ustreamer_fake_name = "ustr-" + secrets.token_hex(3) + ustreamer_tmp_path = os.path.abspath(str(tmpdir.join("ustr-" + secrets.token_hex(3)))) + os.symlink("/usr/bin/ustreamer", ustreamer_tmp_path) ustreamer_sock_path = os.path.abspath(str(tmpdir.join("ustreamer-fake.sock"))) open(ustreamer_sock_path, "w").close() @@ -43,7 +44,7 @@ def test_ok(tmpdir) -> None: # type: ignore open(kvmd_sock_path, "w").close() def ustreamer_fake() -> None: - setproctitle.setproctitle(ustreamer_fake_name) + setproctitle.setproctitle(os.path.basename(ustreamer_tmp_path)) queue.put(True) while True: time.sleep(1) @@ -60,7 +61,7 @@ def test_ok(tmpdir) -> None: # type: ignore "kvmd/server/unix=" + kvmd_sock_path, "kvmd/streamer/port=0", "kvmd/streamer/unix=" + ustreamer_sock_path, - "kvmd/streamer/cmd=[\"%s\"]" % (ustreamer_fake_name), + "kvmd/streamer/cmd=" + ustreamer_tmp_path, ]) assert not os.path.exists(ustreamer_sock_path) diff --git a/tests/test_validators_auth.py b/tests/test_validators_auth.py index cec7efa7..f3cdf939 100644 --- a/tests/test_validators_auth.py +++ b/tests/test_validators_auth.py @@ -20,12 +20,14 @@ # ========================================================================== # +from typing import List from typing import Any import pytest from kvmd.validators import ValidatorError from kvmd.validators.auth import valid_user +from kvmd.validators.auth import valid_users_list from kvmd.validators.auth import valid_passwd from kvmd.validators.auth import valid_auth_token @@ -59,6 +61,27 @@ def test_fail__valid_user(arg: Any) -> None: # ===== [email protected]("arg, retval", [ + ("foo, bar, ", ["foo", "bar"]), + ("foo bar", ["foo", "bar"]), + (["foo", "bar"], ["foo", "bar"]), + ("", []), + (" ", []), + (", ", []), + (", foo, ", ["foo"]), + ([], []), +]) +def test_ok__valid_users_list(arg: Any, retval: List) -> None: + assert valid_users_list(arg) == retval + + [email protected]("arg", [None, [None], [""], [" "], ["user,"]]) +def test_fail__valid_users_list(arg: Any) -> None: # pylint: disable=invalid-name + with pytest.raises(ValidatorError): + print(valid_users_list(arg)) + + +# ===== @pytest.mark.parametrize("arg", [ "glados", "test", diff --git a/tests/test_validators_basic.py b/tests/test_validators_basic.py index 0231e29a..96feb666 100644 --- a/tests/test_validators_basic.py +++ b/tests/test_validators_basic.py @@ -20,6 +20,7 @@ # ========================================================================== # +from typing import List from typing import Any import pytest @@ -29,6 +30,7 @@ from kvmd.validators.basic import valid_bool from kvmd.validators.basic import valid_number from kvmd.validators.basic import valid_int_f1 from kvmd.validators.basic import valid_float_f01 +from kvmd.validators.basic import valid_string_list # ===== @@ -105,3 +107,38 @@ def test_ok__valid_float_f01(arg: Any) -> None: def test_fail__valid_float_f01(arg: Any) -> None: with pytest.raises(ValidatorError): print(valid_float_f01(arg)) + + +# ===== [email protected]("arg, retval", [ + ("a, b, c", ["a", "b", "c"]), + ("a b c", ["a", "b", "c"]), + (["a", "b", "c"], ["a", "b", "c"]), + ("", []), + (" ", []), + (", ", []), + (", a, ", ["a"]), + ([], []), +]) +def test_ok__valid_string_list(arg: Any, retval: List) -> None: + assert valid_string_list(arg) == retval + + [email protected]("arg, retval", [ + ("1, 2, 3", [1, 2, 3]), + ("1 2 3", [1, 2, 3]), + ([1, 2, 3], [1, 2, 3]), + ("", []), + (" ", []), + (", ", []), + (", 1, ", [1]), + ([], []), +]) +def test_ok__valid_string_list__subval(arg: Any, retval: List) -> None: # pylint: disable=invalid-name + assert valid_string_list(arg, subval=int) == retval + + [email protected]("arg", [None, [None]]) +def test_fail__valid_string_list(arg: Any) -> None: # pylint: disable=invalid-name + with pytest.raises(ValidatorError): + print(valid_string_list(arg)) diff --git a/tests/test_validators_fs.py b/tests/test_validators_fs.py index 854d025c..c2f13393 100644 --- a/tests/test_validators_fs.py +++ b/tests/test_validators_fs.py @@ -22,6 +22,7 @@ import os +from typing import List from typing import Any import pytest @@ -30,6 +31,7 @@ from kvmd.validators import ValidatorError from kvmd.validators.fs import valid_abs_path from kvmd.validators.fs import valid_abs_path_exists from kvmd.validators.fs import valid_unix_mode +from kvmd.validators.fs import valid_command # ===== @@ -89,3 +91,28 @@ def test_ok__valid_unix_mode(arg: Any) -> None: def test_fail__valid_unix_mode(arg: Any) -> None: with pytest.raises(ValidatorError): print(valid_unix_mode(arg)) + + +# ===== [email protected]("arg, retval", [ + (["/bin/true"], ["/bin/true"]), + (["/bin/true", 1, 2, 3], ["/bin/true", "1", "2", "3"]), + ("/bin/true, 1, 2, 3,", ["/bin/true", "1", "2", "3"]), + ("/bin/true", ["/bin/true"]), +]) +def test_ok__valid_command(arg: Any, retval: List[str]) -> None: + assert valid_command(arg) == retval + + [email protected]("arg", [ + ["/bin/blahblahblah"], + ["/bin/blahblahblah", 1, 2, 3], + [" "], + [], + "/bin/blahblahblah, 1, 2, 3,", + "/bin/blahblahblah", + " ", +]) +def test_fail__valid_command(arg: Any) -> None: + with pytest.raises(ValidatorError): + print(valid_command(arg)) |