summaryrefslogtreecommitdiff
path: root/kvmd
diff options
context:
space:
mode:
authorDevaev Maxim <[email protected]>2019-04-06 05:32:02 +0300
committerDevaev Maxim <[email protected]>2019-04-06 08:04:26 +0300
commit1d75b738a08c98a5d3d8ac3c685e77360f4c1267 (patch)
tree3aa89dc7fd0ab737e9332714a784e9d4dde0a362 /kvmd
parent73e04b71ed55a46c939f12548b31746617af2bca (diff)
validators, tests
Diffstat (limited to 'kvmd')
-rw-r--r--kvmd/apps/__init__.py197
-rw-r--r--kvmd/apps/cleanup/__init__.py9
-rw-r--r--kvmd/apps/htpasswd/__init__.py29
-rw-r--r--kvmd/apps/kvmd/__init__.py14
-rw-r--r--kvmd/apps/kvmd/hid.py2
-rw-r--r--kvmd/apps/kvmd/server.py106
-rw-r--r--kvmd/apps/kvmd/streamer.py8
-rw-r--r--kvmd/gpio.py8
-rw-r--r--kvmd/validators/__init__.py83
-rw-r--r--kvmd/validators/auth.py43
-rw-r--r--kvmd/validators/basic.py73
-rw-r--r--kvmd/validators/fs.py52
-rw-r--r--kvmd/validators/hw.py42
-rw-r--r--kvmd/validators/kvm.py48
-rw-r--r--kvmd/validators/net.py67
-rw-r--r--kvmd/yamlconf/__init__.py99
-rw-r--r--kvmd/yamlconf/dumper.py8
-rw-r--r--kvmd/yamlconf/loader.py11
18 files changed, 641 insertions, 258 deletions
diff --git a/kvmd/apps/__init__.py b/kvmd/apps/__init__.py
index d2007f7d..062e9bfe 100644
--- a/kvmd/apps/__init__.py
+++ b/kvmd/apps/__init__.py
@@ -29,14 +29,13 @@ import logging.config
from typing import Tuple
from typing import List
from typing import Dict
-from typing import Sequence
from typing import Optional
-from typing import Union
import pygments
import pygments.lexers.data
import pygments.formatters
+from ..yamlconf import ConfigError
from ..yamlconf import make_config
from ..yamlconf import Section
from ..yamlconf import Option
@@ -44,31 +43,59 @@ from ..yamlconf import build_raw_from_options
from ..yamlconf.dumper import make_config_dump
from ..yamlconf.loader import load_yaml_file
+from ..validators.basic import valid_bool
+from ..validators.basic import valid_number
+from ..validators.basic import valid_int_f1
+from ..validators.basic import valid_float_f01
+
+from ..validators.fs import valid_abs_path
+from ..validators.fs import valid_abs_path_exists
+from ..validators.fs import valid_unix_mode
+
+from ..validators.net import valid_ip_or_host
+from ..validators.net import valid_port
+
+from ..validators.auth import valid_auth_type
+
+from ..validators.kvm import valid_stream_quality
+from ..validators.kvm import valid_stream_fps
+
+from ..validators.hw import valid_tty_speed
+from ..validators.hw import valid_gpio_pin
+from ..validators.hw import valid_gpio_pin_optional
+
# =====
def init(
- prog: str=sys.argv[0],
+ prog: Optional[str]=None,
description: Optional[str]=None,
add_help: bool=True,
+ argv: Optional[List[str]]=None,
) -> Tuple[argparse.ArgumentParser, List[str], Section]:
- args_parser = argparse.ArgumentParser(prog=prog, description=description, add_help=add_help)
+ argv = (argv or sys.argv)
+ assert len(argv) > 0
+
+ args_parser = argparse.ArgumentParser(prog=(prog or argv[0]), description=description, add_help=add_help)
args_parser.add_argument("-c", "--config", dest="config_path", default="/etc/kvmd/main.yaml", metavar="<file>",
- help="Set config file path")
+ type=valid_abs_path_exists, help="Set config file path")
args_parser.add_argument("-o", "--set-options", dest="set_options", default=[], nargs="+",
help="Override config options list (like sec/sub/opt=value)")
args_parser.add_argument("-m", "--dump-config", dest="dump_config", action="store_true",
help="View current configuration (include all overrides)")
- (options, remaining) = args_parser.parse_known_args(sys.argv)
+ (options, remaining) = args_parser.parse_known_args(argv)
+ raw_config: Dict = {}
- options.config_path = os.path.expanduser(options.config_path)
- if os.path.exists(options.config_path):
+ if options.config_path:
+ options.config_path = os.path.expanduser(options.config_path)
raw_config = load_yaml_file(options.config_path)
- else:
- raw_config = {}
- _merge_dicts(raw_config, build_raw_from_options(options.set_options))
+
scheme = _get_config_scheme()
- config = make_config(raw_config, scheme)
+ try:
+ _merge_dicts(raw_config, build_raw_from_options(options.set_options))
+ config = make_config(raw_config, scheme)
+ except ConfigError as err:
+ raise SystemExit("Config error: " + str(err))
if options.dump_config:
dump = make_config_dump(config)
@@ -96,135 +123,93 @@ def _merge_dicts(dest: Dict, src: Dict) -> None:
dest[key] = src[key]
-def _as_pin(pin: int) -> int:
- if not isinstance(pin, int) or pin <= 0:
- raise ValueError("Invalid pin number")
- return pin
-
-
-def _as_optional_pin(pin: int) -> int:
- if not isinstance(pin, int) or pin < -1:
- raise ValueError("Invalid optional pin number")
- return pin
-
-
-def _as_path(path: str) -> str:
- if not isinstance(path, str):
- raise ValueError("Invalid path")
- path = str(path).strip()
- if not path:
- raise ValueError("Invalid path")
- return path
-
-
-def _as_optional_path(path: str) -> str:
- if not isinstance(path, str):
- raise ValueError("Invalid path")
- return str(path).strip()
-
-
-def _as_string_list(values: Union[str, Sequence]) -> List[str]:
- if isinstance(values, str):
- values = [values]
- return list(map(str, values))
-
-
-def _as_auth_type(auth_type: str) -> str:
- if not isinstance(auth_type, str):
- raise ValueError("Invalid auth type")
- auth_type = str(auth_type).strip()
- if auth_type not in ["basic"]:
- raise ValueError("Invalid auth type")
- return auth_type
-
-
def _get_config_scheme() -> Dict:
return {
"kvmd": {
"server": {
- "host": Option("localhost"),
- "port": Option(0),
- "unix": Option("", type=_as_optional_path, rename="unix_path"),
- "unix_rm": Option(False),
- "unix_mode": Option(0),
- "heartbeat": Option(3.0),
+ "host": Option("localhost", type=valid_ip_or_host),
+ "port": Option(0, type=valid_port),
+ "unix": Option("", type=valid_abs_path, only_if="!port", unpack_as="unix_path"),
+ "unix_rm": Option(False, type=valid_bool),
+ "unix_mode": Option(0, type=valid_unix_mode),
+ "heartbeat": Option(3.0, type=valid_float_f01),
"access_log_format": Option("[%P / %{X-Real-IP}i] '%r' => %s; size=%b ---"
" referer='%{Referer}i'; user_agent='%{User-Agent}i'"),
},
"auth": {
- "type": Option("basic", type=_as_auth_type, rename="auth_type"),
+ "type": Option("basic", type=valid_auth_type, unpack_as="auth_type"),
"basic": {
- "htpasswd": Option("/etc/kvmd/htpasswd", type=_as_path, rename="htpasswd_path"),
+ "htpasswd": Option("/etc/kvmd/htpasswd", type=valid_abs_path_exists, unpack_as="htpasswd_path"),
},
},
"info": {
- "meta": Option("/etc/kvmd/meta.yaml", type=_as_path, rename="meta_path"),
- "extras": Option("/usr/share/kvmd/extras", type=_as_path, rename="extras_path"),
+ "meta": Option("/etc/kvmd/meta.yaml", type=valid_abs_path_exists, unpack_as="meta_path"),
+ "extras": Option("/usr/share/kvmd/extras", type=valid_abs_path_exists, unpack_as="extras_path"),
},
"hid": {
- "reset_pin": Option(0, type=_as_pin),
- "reset_delay": Option(0.1),
-
- "device": Option("", type=_as_path, rename="device_path"),
- "speed": Option(115200),
- "read_timeout": Option(2.0),
- "read_retries": Option(10),
- "common_retries": Option(100),
- "retries_delay": Option(0.1),
- "noop": Option(False),
-
- "state_poll": Option(0.1),
+ "reset_pin": Option(-1, type=valid_gpio_pin),
+ "reset_delay": Option(0.1, type=valid_float_f01),
+
+ "device": Option("", type=valid_abs_path_exists, unpack_as="device_path"),
+ "speed": Option(115200, type=valid_tty_speed),
+ "read_timeout": Option(2.0, type=valid_float_f01),
+ "read_retries": Option(10, type=valid_int_f1),
+ "common_retries": Option(100, type=valid_int_f1),
+ "retries_delay": Option(0.1, type=valid_float_f01),
+ "noop": Option(False, type=valid_bool),
+
+ "state_poll": Option(0.1, type=valid_float_f01),
},
"atx": {
- "enabled": Option(True),
+ "enabled": Option(True, type=valid_bool),
- "power_led_pin": Option(-1, type=_as_optional_pin),
- "hdd_led_pin": Option(-1, type=_as_optional_pin),
- "power_switch_pin": Option(-1, type=_as_optional_pin),
- "reset_switch_pin": Option(-1, type=_as_optional_pin),
+ "power_led_pin": Option(-1, type=valid_gpio_pin, only_if="enabled"),
+ "hdd_led_pin": Option(-1, type=valid_gpio_pin, only_if="enabled"),
+ "power_switch_pin": Option(-1, type=valid_gpio_pin, only_if="enabled"),
+ "reset_switch_pin": Option(-1, type=valid_gpio_pin, only_if="enabled"),
- "click_delay": Option(0.1),
- "long_click_delay": Option(5.5),
+ "click_delay": Option(0.1, type=valid_float_f01),
+ "long_click_delay": Option(5.5, type=valid_float_f01),
- "state_poll": Option(0.1),
+ "state_poll": Option(0.1, type=valid_float_f01),
},
"msd": {
- "enabled": Option(True),
+ "enabled": Option(True, type=valid_bool),
- "target_pin": Option(-1, type=_as_optional_pin),
- "reset_pin": Option(-1, type=_as_optional_pin),
+ "target_pin": Option(-1, type=valid_gpio_pin, only_if="enabled"),
+ "reset_pin": Option(-1, type=valid_gpio_pin, only_if="enabled"),
- "device": Option("", type=_as_optional_path, rename="device_path"),
- "init_delay": Option(2.0),
- "reset_delay": Option(1.0),
- "write_meta": Option(True),
- "chunk_size": Option(65536),
+ "device": Option("", type=valid_abs_path, only_if="enabled", unpack_as="device_path"),
+ "init_delay": Option(2.0, type=valid_float_f01),
+ "reset_delay": Option(1.0, type=valid_float_f01),
+ "write_meta": Option(True, type=valid_bool),
+ "chunk_size": Option(65536, type=(lambda arg: valid_number(arg, min=1024))),
},
"streamer": {
- "cap_pin": Option(0, type=_as_optional_pin),
- "conv_pin": Option(0, type=_as_optional_pin),
+ "cap_pin": Option(-1, type=valid_gpio_pin_optional),
+ "conv_pin": Option(-1, type=valid_gpio_pin_optional),
- "sync_delay": Option(1.0),
- "init_delay": Option(1.0),
- "init_restart_after": Option(0.0),
- "shutdown_delay": Option(10.0),
- "state_poll": Option(1.0),
+ "sync_delay": Option(1.0, type=valid_float_f01),
+ "init_delay": Option(1.0, type=valid_float_f01),
+ "init_restart_after": Option(0.0, type=(lambda arg: valid_number(arg, min=0.0, type=float))),
+ "shutdown_delay": Option(10.0, type=valid_float_f01),
+ "state_poll": Option(1.0, type=valid_float_f01),
- "quality": Option(80),
- "desired_fps": Option(0),
+ "quality": Option(80, type=valid_stream_quality),
+ "desired_fps": Option(0, type=valid_stream_fps),
- "host": Option("localhost"),
- "port": Option(0),
- "unix": Option("", type=_as_optional_path, rename="unix_path"),
- "timeout": Option(2.0),
+ "host": Option("localhost", type=valid_ip_or_host),
+ "port": Option(0, type=valid_port),
+ "unix": Option("", type=valid_abs_path, only_if="!port", unpack_as="unix_path"),
+ "timeout": Option(2.0, type=valid_float_f01),
- "cmd": Option(["/bin/true"], type=_as_string_list),
+ "cmd": Option(["/bin/true"]), # TODO: Validator
},
},
diff --git a/kvmd/apps/cleanup/__init__.py b/kvmd/apps/cleanup/__init__.py
index b39559f6..39bd7a71 100644
--- a/kvmd/apps/cleanup/__init__.py
+++ b/kvmd/apps/cleanup/__init__.py
@@ -24,6 +24,9 @@ import os
import subprocess
import time
+from typing import List
+from typing import Optional
+
from ...logging import get_logger
from ... import gpio
@@ -32,8 +35,8 @@ from .. import init
# =====
-def main() -> None:
- config = init("kvmd-cleanup", description="Kill KVMD and clear resources")[2].kvmd
+def main(argv: Optional[List[str]]=None) -> None:
+ config = init("kvmd-cleanup", description="Kill KVMD and clear resources", argv=argv)[2].kvmd
logger = get_logger(0)
logger.info("Cleaning up ...")
@@ -47,7 +50,7 @@ def main() -> None:
("streamer_cap_pin", config.streamer.cap_pin),
("streamer_conv_pin", config.streamer.conv_pin),
]:
- if pin > 0:
+ if pin >= 0:
logger.info("Writing value=0 to pin=%d (%s)", pin, name)
gpio.set_output(pin, initial=False)
diff --git a/kvmd/apps/htpasswd/__init__.py b/kvmd/apps/htpasswd/__init__.py
index bc77c0e8..27f940b9 100644
--- a/kvmd/apps/htpasswd/__init__.py
+++ b/kvmd/apps/htpasswd/__init__.py
@@ -22,7 +22,6 @@
import sys
import os
-import re
import getpass
import tempfile
import contextlib
@@ -34,12 +33,16 @@ import passlib.apache
from ...yamlconf import Section
+from ...validators import ValidatorError
+from ...validators.auth import valid_user
+from ...validators.auth import valid_passwd
+
from .. import init
# =====
def _get_htpasswd_path(config: Section) -> str:
- if config.kvmd.auth.auth_type != "basic":
+ if config.kvmd.auth.type != "basic":
print("Warning: KVMD does not use basic auth", file=sys.stderr)
return config.kvmd.auth.basic.htpasswd
@@ -69,13 +72,6 @@ def _get_htpasswd_for_write(config: Section) -> Generator[passlib.apache.Htpassw
os.remove(tmp_path)
-def _valid_user(user: str) -> str:
- stripped = user.strip()
- if re.match(r"^[a-z_][a-z0-9_-]*$", stripped):
- return stripped
- raise SystemExit("Invalid user %r" % (user))
-
-
# ====
def _cmd_list(config: Section, _: argparse.Namespace) -> None:
for user in passlib.apache.HtpasswdFile(_get_htpasswd_path(config)).users():
@@ -85,10 +81,10 @@ def _cmd_list(config: Section, _: argparse.Namespace) -> None:
def _cmd_set(config: Section, options: argparse.Namespace) -> None:
with _get_htpasswd_for_write(config) as htpasswd:
if options.read_stdin:
- passwd = input()
+ passwd = valid_passwd(input())
else:
- passwd = getpass.getpass("Password: ", stream=sys.stderr)
- if getpass.getpass("Repeat: ", stream=sys.stderr) != passwd:
+ passwd = valid_passwd(getpass.getpass("Password: ", stream=sys.stderr))
+ if valid_passwd(getpass.getpass("Repeat: ", stream=sys.stderr)) != passwd:
raise SystemExit("Sorry, passwords do not match")
htpasswd.set_password(options.user, passwd)
@@ -113,13 +109,16 @@ def main() -> None:
cmd_list_parser.set_defaults(cmd=_cmd_list)
cmd_set_parser = subparsers.add_parser("set", help="Create user or change password")
- cmd_set_parser.add_argument("user", type=_valid_user)
+ cmd_set_parser.add_argument("user", type=valid_user)
cmd_set_parser.add_argument("-i", "--read-stdin", action="store_true", help="Read password from stdin")
cmd_set_parser.set_defaults(cmd=_cmd_set)
cmd_delete_parser = subparsers.add_parser("del", help="Delete user")
- cmd_delete_parser.add_argument("user", type=_valid_user)
+ cmd_delete_parser.add_argument("user", type=valid_user)
cmd_delete_parser.set_defaults(cmd=_cmd_delete)
options = parser.parse_args(argv[1:])
- options.cmd(config, options)
+ try:
+ options.cmd(config, options)
+ except ValidatorError as err:
+ raise SystemExit(str(err))
diff --git a/kvmd/apps/kvmd/__init__.py b/kvmd/apps/kvmd/__init__.py
index 3f2431d7..1ab14fd6 100644
--- a/kvmd/apps/kvmd/__init__.py
+++ b/kvmd/apps/kvmd/__init__.py
@@ -45,15 +45,15 @@ def main() -> None:
# pylint: disable=protected-access
loop = asyncio.get_event_loop()
Server(
- auth_manager=AuthManager(**config.auth._unpack_renamed()),
- info_manager=InfoManager(loop=loop, **config.info._unpack_renamed()),
+ auth_manager=AuthManager(**config.auth._unpack()),
+ info_manager=InfoManager(loop=loop, **config.info._unpack()),
log_reader=LogReader(loop=loop),
- hid=Hid(**config.hid._unpack_renamed()),
- atx=Atx(**config.atx._unpack_renamed()),
- msd=MassStorageDevice(loop=loop, **config.msd._unpack_renamed()),
- streamer=Streamer(loop=loop, **config.streamer._unpack_renamed()),
+ hid=Hid(**config.hid._unpack()),
+ atx=Atx(**config.atx._unpack()),
+ msd=MassStorageDevice(loop=loop, **config.msd._unpack()),
+ streamer=Streamer(loop=loop, **config.streamer._unpack()),
loop=loop,
- ).run(**config.server._unpack_renamed())
+ ).run(**config.server._unpack())
get_logger().info("Bye-bye")
diff --git a/kvmd/apps/kvmd/hid.py b/kvmd/apps/kvmd/hid.py
index 22d4acf3..2b41eda0 100644
--- a/kvmd/apps/kvmd/hid.py
+++ b/kvmd/apps/kvmd/hid.py
@@ -46,7 +46,7 @@ from ... import gpio
# =====
def _get_keymap() -> Dict[str, int]:
- return yaml.load(pkgutil.get_data("kvmd", "data/keymap.yaml").decode()) # type: ignore
+ return yaml.safe_load(pkgutil.get_data("kvmd", "data/keymap.yaml").decode()) # type: ignore
_KEYMAP = _get_keymap()
diff --git a/kvmd/apps/kvmd/server.py b/kvmd/apps/kvmd/server.py
index fc701c94..ae82015e 100644
--- a/kvmd/apps/kvmd/server.py
+++ b/kvmd/apps/kvmd/server.py
@@ -21,7 +21,6 @@
import os
-import re
import signal
import socket
import asyncio
@@ -36,7 +35,6 @@ from typing import Dict
from typing import Set
from typing import Callable
from typing import Optional
-from typing import Any
import aiohttp.web
import setproctitle
@@ -45,6 +43,18 @@ from ...logging import get_logger
from ...aioregion import RegionIsBusyError
+from ...validators import ValidatorError
+
+from ...validators.basic import valid_bool
+from ...validators.auth import valid_user
+from ...validators.auth import valid_passwd
+from ...validators.auth import valid_auth_token
+from ...validators.kvm import valid_atx_button
+from ...validators.kvm import valid_kvm_target
+from ...validators.kvm import valid_log_seek
+from ...validators.kvm import valid_stream_quality
+from ...validators.kvm import valid_stream_fps
+
from ... import __version__
from .auth import AuthManager
@@ -80,10 +90,6 @@ class HttpError(Exception):
pass
-class BadRequestError(HttpError):
- pass
-
-
class UnauthorizedError(HttpError):
pass
@@ -138,7 +144,7 @@ def _exposed(http_method: str, path: str, auth_required: bool=True) -> Callable:
if auth_required:
token = request.cookies.get(_COOKIE_AUTH_TOKEN, "")
if token:
- user = self._auth_manager.check(_valid_token(token))
+ user = self._auth_manager.check(valid_auth_token(token))
if not user:
raise ForbiddenError("Forbidden")
setattr(request, _ATTR_KVMD_USER, user)
@@ -149,7 +155,7 @@ def _exposed(http_method: str, path: str, auth_required: bool=True) -> Callable:
except RegionIsBusyError as err:
return _json_exception(err, 409)
- except (BadRequestError, AtxOperationError, MsdOperationError) as err:
+ except (ValidatorError, AtxOperationError, MsdOperationError) as err:
return _json_exception(err, 400)
except UnauthorizedError as err:
return _json_exception(err, 401)
@@ -178,51 +184,6 @@ def _system_task(method: Callable) -> Callable:
return wrap
-def _valid_user(user: Any) -> str:
- if isinstance(user, str):
- stripped = user.strip()
- if re.match(r"^[a-z_][a-z0-9_-]*$", stripped):
- return stripped
- raise BadRequestError("Invalid user characters %r" % (user))
-
-
-def _valid_passwd(passwd: Any) -> str:
- if isinstance(passwd, str):
- if re.match(r"[\x20-\x7e]*$", passwd):
- return passwd
- raise BadRequestError("Invalid password characters")
-
-
-def _valid_token(token: Optional[str]) -> str:
- if isinstance(token, str):
- token = token.strip().lower()
- if re.match(r"^[0-9a-f]{64}$", token):
- return token
- raise BadRequestError("Invalid auth token characters")
-
-
-def _valid_bool(name: str, flag: Optional[str]) -> bool:
- flag = str(flag).strip().lower()
- if flag in ["1", "true", "yes"]:
- return True
- elif flag in ["0", "false", "no"]:
- return False
- raise BadRequestError("Invalid param '%s'" % (name))
-
-
-def _valid_int(name: str, value: Optional[str], min_value: Optional[int]=None, max_value: Optional[int]=None) -> int:
- try:
- value_int = int(value) # type: ignore
- if (
- (min_value is not None and value_int < min_value)
- or (max_value is not None and value_int > max_value)
- ):
- raise ValueError()
- return value_int
- except Exception:
- raise BadRequestError("Invalid param %r" % (name))
-
-
class _Events(Enum):
INFO_STATE = "info_state"
HID_STATE = "hid_state"
@@ -337,8 +298,8 @@ class Server: # pylint: disable=too-many-instance-attributes
async def __auth_login_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response:
credentials = await request.post()
token = self._auth_manager.login(
- user=_valid_user(credentials.get("user", "")),
- passwd=_valid_passwd(credentials.get("passwd", "")),
+ user=valid_user(credentials.get("user", "")),
+ passwd=valid_passwd(credentials.get("passwd", "")),
)
if token:
return _json({}, set_cookies={_COOKIE_AUTH_TOKEN: token})
@@ -346,7 +307,7 @@ class Server: # pylint: disable=too-many-instance-attributes
@_exposed("POST", "/auth/logout")
async def __auth_logout_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response:
- token = _valid_token(request.cookies.get(_COOKIE_AUTH_TOKEN, ""))
+ token = valid_auth_token(request.cookies.get(_COOKIE_AUTH_TOKEN, ""))
self._auth_manager.logout(token)
return _json({})
@@ -362,8 +323,8 @@ class Server: # pylint: disable=too-many-instance-attributes
@_exposed("GET", "/log")
async def __log_handler(self, request: aiohttp.web.Request) -> aiohttp.web.StreamResponse:
- seek = _valid_int("seek", request.query.get("seek", "0"), 0)
- follow = _valid_bool("follow", request.query.get("follow", "false"))
+ seek = valid_log_seek(request.query.get("seek", "0"))
+ follow = valid_bool(request.query.get("follow", "false"))
response = aiohttp.web.StreamResponse(status=200, reason="OK", headers={"Content-Type": "text/plain"})
await response.prepare(request)
async for record in self.__log_reader.poll_log(seek, follow):
@@ -460,15 +421,12 @@ class Server: # pylint: disable=too-many-instance-attributes
@_exposed("POST", "/atx/click")
async def __atx_click_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response:
- button = request.query.get("button")
- clicker = {
+ button = valid_atx_button(request.query.get("button"))
+ await ({
"power": self.__atx.click_power,
"power_long": self.__atx.click_power_long,
"reset": self.__atx.click_reset,
- }.get(button)
- if not clicker:
- raise BadRequestError("Invalid param 'button'")
- await clicker()
+ }[button])()
return _json({"clicked": button})
# ===== MSD
@@ -479,13 +437,11 @@ class Server: # pylint: disable=too-many-instance-attributes
@_exposed("POST", "/msd/connect")
async def __msd_connect_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response:
- to = request.query.get("to")
- if to == "kvm":
- return _json(await self.__msd.connect_to_kvm())
- elif to == "server":
- return _json(await self.__msd.connect_to_pc())
- else:
- raise BadRequestError("Invalid param 'to'")
+ to = valid_kvm_target(request.query.get("to"))
+ return _json(await ({
+ "kvm": self.__msd.connect_to_kvm,
+ "server": self.__msd.connect_to_pc,
+ }[to])())
@_exposed("POST", "/msd/write")
async def __msd_write_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response:
@@ -496,12 +452,12 @@ class Server: # pylint: disable=too-many-instance-attributes
async with self.__msd:
field = await reader.next()
if not field or field.name != "image_name":
- raise BadRequestError("Missing 'image_name' field")
+ raise ValidatorError("Missing 'image_name' field")
image_name = (await field.read()).decode("utf-8")[:256]
field = await reader.next()
if not field or field.name != "image_data":
- raise BadRequestError("Missing 'image_data' field")
+ raise ValidatorError("Missing 'image_data' field")
logger.info("Writing image %r to mass-storage device ...", image_name)
await self.__msd.write_image_info(image_name, False)
@@ -530,8 +486,8 @@ class Server: # pylint: disable=too-many-instance-attributes
@_exposed("POST", "/streamer/set_params")
async def __streamer_set_params_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response:
for (name, validator) in [
- ("quality", lambda arg: _valid_int("quality", arg, 1, 100)),
- ("desired_fps", lambda arg: _valid_int("desired_fps", arg, 0, 30)),
+ ("quality", valid_stream_quality),
+ ("desired_fps", valid_stream_fps),
]:
value = request.query.get(name)
if value:
diff --git a/kvmd/apps/kvmd/streamer.py b/kvmd/apps/kvmd/streamer.py
index 71662959..771d5be3 100644
--- a/kvmd/apps/kvmd/streamer.py
+++ b/kvmd/apps/kvmd/streamer.py
@@ -63,8 +63,8 @@ class Streamer: # pylint: disable=too-many-instance-attributes
loop: asyncio.AbstractEventLoop,
) -> None:
- self.__cap_pin = (gpio.set_output(cap_pin) if cap_pin > 0 else 0)
- self.__conv_pin = (gpio.set_output(conv_pin) if conv_pin > 0 else 0)
+ self.__cap_pin = (gpio.set_output(cap_pin) if cap_pin >= 0 else -1)
+ self.__conv_pin = (gpio.set_output(conv_pin) if conv_pin >= 0 else -1)
self.__sync_delay = sync_delay
self.__init_delay = init_delay
@@ -179,9 +179,9 @@ class Streamer: # pylint: disable=too-many-instance-attributes
async def __set_hw_enabled(self, enabled: bool) -> None:
# XXX: This sequence is very important to enable converter and cap board
- if self.__cap_pin > 0:
+ if self.__cap_pin >= 0:
gpio.write(self.__cap_pin, enabled)
- if self.__conv_pin > 0:
+ if self.__conv_pin >= 0:
if enabled:
await asyncio.sleep(self.__sync_delay)
gpio.write(self.__conv_pin, enabled)
diff --git a/kvmd/gpio.py b/kvmd/gpio.py
index 64a1a100..ef9d1c5e 100644
--- a/kvmd/gpio.py
+++ b/kvmd/gpio.py
@@ -43,22 +43,22 @@ def bcm() -> Generator[None, None, None]:
def set_output(pin: int, initial: bool=False) -> int:
- assert pin > 0, pin
+ assert pin >= 0, pin
GPIO.setup(pin, GPIO.OUT, initial=initial)
return pin
def set_input(pin: int) -> int:
- assert pin > 0, pin
+ assert pin >= 0, pin
GPIO.setup(pin, GPIO.IN)
return pin
def read(pin: int) -> bool:
- assert pin > 0, pin
+ assert pin >= 0, pin
return bool(GPIO.input(pin))
def write(pin: int, flag: bool) -> None:
- assert pin > 0, pin
+ assert pin >= 0, pin
GPIO.output(pin, flag)
diff --git a/kvmd/validators/__init__.py b/kvmd/validators/__init__.py
new file mode 100644
index 00000000..f4b0f1fa
--- /dev/null
+++ b/kvmd/validators/__init__.py
@@ -0,0 +1,83 @@
+# ========================================================================== #
+# #
+# KVMD - The main Pi-KVM daemon. #
+# #
+# Copyright (C) 2018 Maxim Devaev <[email protected]> #
+# #
+# This program is free software: you can redistribute it and/or modify #
+# it under the terms of the GNU General Public License as published by #
+# the Free Software Foundation, either version 3 of the License, or #
+# (at your option) any later version. #
+# #
+# This program is distributed in the hope that it will be useful, #
+# but WITHOUT ANY WARRANTY; without even the implied warranty of #
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
+# GNU General Public License for more details. #
+# #
+# You should have received a copy of the GNU General Public License #
+# along with this program. If not, see <https://www.gnu.org/licenses/>. #
+# #
+# ========================================================================== #
+
+
+import re
+
+from typing import List
+from typing import Callable
+from typing import NoReturn
+from typing import Any
+
+
+# =====
+class ValidatorError(ValueError):
+ pass
+
+
+# =====
+def raise_error(arg: Any, name: str, hide: bool=False) -> NoReturn:
+ arg_str = " "
+ if not hide:
+ arg_str = (" %r " if isinstance(arg, (str, bytes)) else " '%s' ") % (arg)
+ raise ValidatorError("The argument" + arg_str + "is not a valid " + name)
+
+
+def check_not_none(arg: Any, name: str) -> Any:
+ if arg is None:
+ raise ValidatorError("Empty argument is not a valid %s" % (name))
+ return arg
+
+
+def check_not_none_string(arg: Any, name: str, strip: bool=True) -> str:
+ arg = str(check_not_none(arg, name))
+ if strip:
+ arg = arg.strip()
+ return arg
+
+
+def check_in_list(arg: Any, name: str, variants: List) -> Any:
+ if arg not in variants:
+ raise_error(arg, name)
+ return arg
+
+
+def check_string_in_list(arg: Any, name: str, variants: List[str], lower: bool=True) -> Any:
+ arg = check_not_none_string(arg, name)
+ if lower:
+ arg = arg.lower()
+ return check_in_list(arg, name, variants)
+
+
+def check_re_match(arg: Any, name: str, pattern: str, strip: bool=True, hide: bool=False) -> str:
+ arg = check_not_none_string(arg, name, strip=strip)
+ if re.match(pattern, arg, flags=re.MULTILINE) is None:
+ raise_error(arg, name, hide=hide)
+ return arg
+
+
+def check_any(arg: Any, name: str, validators: List[Callable[[Any], Any]]) -> Any:
+ for validator in validators:
+ try:
+ return validator(arg)
+ except Exception:
+ pass
+ raise_error(arg, name)
diff --git a/kvmd/validators/auth.py b/kvmd/validators/auth.py
new file mode 100644
index 00000000..5f6188e1
--- /dev/null
+++ b/kvmd/validators/auth.py
@@ -0,0 +1,43 @@
+# ========================================================================== #
+# #
+# KVMD - The main Pi-KVM daemon. #
+# #
+# Copyright (C) 2018 Maxim Devaev <[email protected]> #
+# #
+# This program is free software: you can redistribute it and/or modify #
+# it under the terms of the GNU General Public License as published by #
+# the Free Software Foundation, either version 3 of the License, or #
+# (at your option) any later version. #
+# #
+# This program is distributed in the hope that it will be useful, #
+# but WITHOUT ANY WARRANTY; without even the implied warranty of #
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
+# GNU General Public License for more details. #
+# #
+# You should have received a copy of the GNU General Public License #
+# along with this program. If not, see <https://www.gnu.org/licenses/>. #
+# #
+# ========================================================================== #
+
+
+from typing import Any
+
+from . import check_string_in_list
+from . import check_re_match
+
+
+# =====
+def valid_user(arg: Any) -> str:
+ return check_re_match(arg, "username characters", r"^[a-z_][a-z0-9_-]*$")
+
+
+def valid_passwd(arg: Any) -> str:
+ return check_re_match(arg, "passwd characters", r"^[\x20-\x7e]*\Z$", strip=False, hide=True)
+
+
+def valid_auth_token(arg: Any) -> str:
+ return check_re_match(arg, "auth token", r"^[0-9a-f]{64}$", hide=True)
+
+
+def valid_auth_type(arg: Any) -> str:
+ return check_string_in_list(arg, "auth type", ["basic"])
diff --git a/kvmd/validators/basic.py b/kvmd/validators/basic.py
new file mode 100644
index 00000000..e4a2336e
--- /dev/null
+++ b/kvmd/validators/basic.py
@@ -0,0 +1,73 @@
+# ========================================================================== #
+# #
+# KVMD - The main Pi-KVM daemon. #
+# #
+# Copyright (C) 2018 Maxim Devaev <[email protected]> #
+# #
+# This program is free software: you can redistribute it and/or modify #
+# it under the terms of the GNU General Public License as published by #
+# the Free Software Foundation, either version 3 of the License, or #
+# (at your option) any later version. #
+# #
+# This program is distributed in the hope that it will be useful, #
+# but WITHOUT ANY WARRANTY; without even the implied warranty of #
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
+# GNU General Public License for more details. #
+# #
+# You should have received a copy of the GNU General Public License #
+# along with this program. If not, see <https://www.gnu.org/licenses/>. #
+# #
+# ========================================================================== #
+
+
+from typing import Type
+from typing import Union
+from typing import Any
+
+from . import ValidatorError
+from . import raise_error
+from . import check_not_none_string
+from . import check_in_list
+
+
+# =====
+def valid_bool(arg: Any) -> bool:
+ true_args = ["1", "true", "yes"]
+ false_args = ["0", "false", "no"]
+
+ name = "bool (%r or %r)" % (true_args, false_args)
+
+ arg = check_not_none_string(arg, name).lower()
+ arg = check_in_list(arg, name, true_args + false_args)
+ return (arg in true_args)
+
+
+def valid_number(
+ arg: Any,
+ min: Union[int, float, None]=None, # pylint: disable=redefined-builtin
+ max: Union[int, float, None]=None, # pylint: disable=redefined-builtin
+ type: Union[Type[int], Type[float]]=int, # pylint: disable=redefined-builtin
+ name: str="",
+) -> Union[int, float]:
+
+ name = (name or type.__name__)
+
+ arg = check_not_none_string(arg, name)
+ try:
+ arg = type(arg)
+ except Exception:
+ raise_error(arg, name)
+
+ if min is not None and arg < min:
+ raise ValidatorError("The argument '%s' must be %s and greater or equial than %s" % (arg, name, min))
+ if max is not None and arg > max:
+ raise ValidatorError("The argument '%s' must be %s and lesser or equal then %s" % (arg, name, max))
+ return arg
+
+
+def valid_int_f1(arg: Any) -> int:
+ return int(valid_number(arg, min=1))
+
+
+def valid_float_f01(arg: Any) -> float:
+ return float(valid_number(arg, min=0.1, type=float))
diff --git a/kvmd/validators/fs.py b/kvmd/validators/fs.py
new file mode 100644
index 00000000..ed895818
--- /dev/null
+++ b/kvmd/validators/fs.py
@@ -0,0 +1,52 @@
+# ========================================================================== #
+# #
+# KVMD - The main Pi-KVM daemon. #
+# #
+# Copyright (C) 2018 Maxim Devaev <[email protected]> #
+# #
+# This program is free software: you can redistribute it and/or modify #
+# it under the terms of the GNU General Public License as published by #
+# the Free Software Foundation, either version 3 of the License, or #
+# (at your option) any later version. #
+# #
+# This program is distributed in the hope that it will be useful, #
+# but WITHOUT ANY WARRANTY; without even the implied warranty of #
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
+# GNU General Public License for more details. #
+# #
+# You should have received a copy of the GNU General Public License #
+# along with this program. If not, see <https://www.gnu.org/licenses/>. #
+# #
+# ========================================================================== #
+
+
+import os
+
+from typing import Any
+
+from . import raise_error
+from . import check_not_none_string
+
+from .basic import valid_number
+
+
+# =====
+def valid_abs_path(arg: Any, exists: bool=False) -> str:
+ name = ("existent absolute path" if exists else "absolute path")
+
+ if len(str(arg).strip()) == 0:
+ arg = None
+ arg = check_not_none_string(arg, name)
+
+ arg = os.path.abspath(arg)
+ if exists and not os.access(arg, os.F_OK):
+ raise_error(arg, name)
+ return arg
+
+
+def valid_abs_path_exists(arg: Any) -> str:
+ return valid_abs_path(arg, exists=True)
+
+
+def valid_unix_mode(arg: Any) -> int:
+ return int(valid_number(arg, min=0, name="UNIX mode"))
diff --git a/kvmd/validators/hw.py b/kvmd/validators/hw.py
new file mode 100644
index 00000000..a68743ad
--- /dev/null
+++ b/kvmd/validators/hw.py
@@ -0,0 +1,42 @@
+# ========================================================================== #
+# #
+# KVMD - The main Pi-KVM daemon. #
+# #
+# Copyright (C) 2018 Maxim Devaev <[email protected]> #
+# #
+# This program is free software: you can redistribute it and/or modify #
+# it under the terms of the GNU General Public License as published by #
+# the Free Software Foundation, either version 3 of the License, or #
+# (at your option) any later version. #
+# #
+# This program is distributed in the hope that it will be useful, #
+# but WITHOUT ANY WARRANTY; without even the implied warranty of #
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
+# GNU General Public License for more details. #
+# #
+# You should have received a copy of the GNU General Public License #
+# along with this program. If not, see <https://www.gnu.org/licenses/>. #
+# #
+# ========================================================================== #
+
+
+from typing import Any
+
+from . import check_in_list
+
+from .basic import valid_number
+
+
+# =====
+def valid_tty_speed(arg: Any) -> int:
+ name = "TTY speed"
+ arg = int(valid_number(arg, name=name))
+ return check_in_list(arg, name, [1200, 2400, 4800, 9600, 19200, 38400, 57600, 115200])
+
+
+def valid_gpio_pin(arg: Any) -> int:
+ return int(valid_number(arg, min=0, name="GPIO pin"))
+
+
+def valid_gpio_pin_optional(arg: Any) -> int:
+ return int(valid_number(arg, min=-1, name="optional GPIO pin"))
diff --git a/kvmd/validators/kvm.py b/kvmd/validators/kvm.py
new file mode 100644
index 00000000..034587ef
--- /dev/null
+++ b/kvmd/validators/kvm.py
@@ -0,0 +1,48 @@
+# ========================================================================== #
+# #
+# KVMD - The main Pi-KVM daemon. #
+# #
+# Copyright (C) 2018 Maxim Devaev <[email protected]> #
+# #
+# This program is free software: you can redistribute it and/or modify #
+# it under the terms of the GNU General Public License as published by #
+# the Free Software Foundation, either version 3 of the License, or #
+# (at your option) any later version. #
+# #
+# This program is distributed in the hope that it will be useful, #
+# but WITHOUT ANY WARRANTY; without even the implied warranty of #
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
+# GNU General Public License for more details. #
+# #
+# You should have received a copy of the GNU General Public License #
+# along with this program. If not, see <https://www.gnu.org/licenses/>. #
+# #
+# ========================================================================== #
+
+
+from typing import Any
+
+from . import check_string_in_list
+
+from .basic import valid_number
+
+
+# =====
+def valid_atx_button(arg: Any) -> str:
+ return check_string_in_list(arg, "ATX button", ["power", "power_long", "reset"])
+
+
+def valid_kvm_target(arg: Any) -> str:
+ return check_string_in_list(arg, "KVM target", ["kvm", "server"])
+
+
+def valid_log_seek(arg: Any) -> int:
+ return int(valid_number(arg, min=0, name="log seek"))
+
+
+def valid_stream_quality(arg: Any) -> int:
+ return int(valid_number(arg, min=1, max=100, name="stream quality"))
+
+
+def valid_stream_fps(arg: Any) -> int:
+ return int(valid_number(arg, min=0, max=30, name="stream FPS"))
diff --git a/kvmd/validators/net.py b/kvmd/validators/net.py
new file mode 100644
index 00000000..cad87f07
--- /dev/null
+++ b/kvmd/validators/net.py
@@ -0,0 +1,67 @@
+# ========================================================================== #
+# #
+# KVMD - The main Pi-KVM daemon. #
+# #
+# Copyright (C) 2018 Maxim Devaev <[email protected]> #
+# #
+# This program is free software: you can redistribute it and/or modify #
+# it under the terms of the GNU General Public License as published by #
+# the Free Software Foundation, either version 3 of the License, or #
+# (at your option) any later version. #
+# #
+# This program is distributed in the hope that it will be useful, #
+# but WITHOUT ANY WARRANTY; without even the implied warranty of #
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
+# GNU General Public License for more details. #
+# #
+# You should have received a copy of the GNU General Public License #
+# along with this program. If not, see <https://www.gnu.org/licenses/>. #
+# #
+# ========================================================================== #
+
+
+import socket
+
+from typing import Any
+
+from . import check_not_none_string
+from . import check_re_match
+from . import check_any
+
+from .basic import valid_number
+
+
+# =====
+def valid_ip_or_host(arg: Any) -> str:
+ name = "IP address or RFC-1123 hostname"
+ return check_any(
+ arg=check_not_none_string(arg, name),
+ name=name,
+ validators=[
+ valid_ip,
+ valid_rfc_host,
+ ],
+ )
+
+
+def valid_ip(arg: Any) -> str:
+ name = "IP address"
+ return check_any(
+ arg=check_not_none_string(arg, name),
+ name=name,
+ validators=[
+ lambda arg: (arg, socket.inet_pton(socket.AF_INET, arg))[0],
+ lambda arg: (arg, socket.inet_pton(socket.AF_INET6, arg))[0],
+ ],
+ )
+
+
+def valid_rfc_host(arg: Any) -> str:
+ # http://stackoverflow.com/questions/106179/regular-expression-to-match-hostname-or-ip-address
+ pattern = r"^(([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]*[a-zA-Z0-9])\.)*" \
+ r"([A-Za-z0-9]|[A-Za-z0-9][A-Za-z0-9\-]*[A-Za-z0-9])$"
+ return check_re_match(arg, "RFC-1123 hostname", pattern)
+
+
+def valid_port(arg: Any) -> int:
+ return int(valid_number(arg, min=0, max=65535, name="TCP/UDP port"))
diff --git a/kvmd/yamlconf/__init__.py b/kvmd/yamlconf/__init__.py
index 4c15fee1..bfda9b78 100644
--- a/kvmd/yamlconf/__init__.py
+++ b/kvmd/yamlconf/__init__.py
@@ -31,14 +31,19 @@ from typing import Any
# =====
+class ConfigError(ValueError):
+ pass
+
+
+# =====
def build_raw_from_options(options: List[str]) -> Dict[str, Any]:
raw: Dict[str, Any] = {}
for option in options:
(key, value) = (option.split("=", 1) + [None])[:2] # type: ignore
if len(key.strip()) == 0:
- raise ValueError("Empty option key (required 'key=value' instead of '{}')".format(option))
+ raise ConfigError("Empty option key (required 'key=value' instead of %r)" % (option))
if value is None:
- raise ValueError("No value for key '{}'".format(key))
+ raise ConfigError("No value for key %r" % (key))
section = raw
subs = list(map(str.strip, key.split("/")))
@@ -56,7 +61,7 @@ def _parse_value(value: str) -> Any:
and value not in ["true", "false", "null"]
and not value.startswith(("{", "[", "\""))
):
- value = "\"{}\"".format(value)
+ value = "\"%s\"" % (value)
return json.loads(value)
@@ -66,33 +71,33 @@ class Section(dict):
dict.__init__(self)
self.__meta: Dict[str, Dict[str, Any]] = {}
- def _unpack_renamed(self, _section: Optional["Section"]=None) -> Dict[str, Any]:
+ def _unpack(self, _section: Optional["Section"]=None) -> Dict[str, Any]:
if _section is None:
_section = self
unpacked: Dict[str, Any] = {}
for (key, value) in _section.items():
if isinstance(value, Section):
- unpacked[key] = value._unpack_renamed() # pylint: disable=protected-access
+ unpacked[key] = value._unpack() # pylint: disable=protected-access
else: # Option
- unpacked[_section._get_rename(key)] = value # pylint: disable=protected-access
+ unpacked[_section._get_unpack_as(key)] = value # pylint: disable=protected-access
return unpacked
- def _set_meta(self, key: str, default: Any, help: str, rename: str) -> None: # pylint: disable=redefined-builtin
+ def _set_meta(self, key: str, default: Any, unpack_as: str, help: str) -> None: # pylint: disable=redefined-builtin
self.__meta[key] = {
"default": default,
- "help": help,
- "rename": rename,
+ "unpack_as": unpack_as,
+ "help": help,
}
def _get_default(self, key: str) -> Any:
return self.__meta[key]["default"]
+ def _get_unpack_as(self, key: str) -> str:
+ return (self.__meta[key]["unpack_as"] or key)
+
def _get_help(self, key: str) -> str:
return self.__meta[key]["help"]
- def _get_rename(self, key: str) -> str:
- return (self.__meta[key]["rename"] or key)
-
def __getattribute__(self, key: str) -> Any:
if key in self:
return self[key]
@@ -106,46 +111,74 @@ class Option:
def __init__(
self,
default: Any,
- help: str="", # pylint: disable=redefined-builtin
type: Optional[Callable[[Any], Any]]=None, # pylint: disable=redefined-builtin
- rename: str="",
+ only_if: str="",
+ unpack_as: str="",
+ help: str="", # pylint: disable=redefined-builtin
) -> None:
self.default = default
- self.help = help
self.type: Callable[[Any], Any] = (type or (self.__type(default) if default is not None else str)) # type: ignore
- self.rename = rename
+ self.only_if = only_if
+ self.unpack_as = unpack_as
+ self.help = help
def __repr__(self) -> str:
- return "<Option(default={self.default}, type={self.type}, help={self.help}, rename={self.rename})>".format(self=self)
+ return "<Option(default={0.default}, type={0.type}, only_if={0.only_if}, unpack_as={0.unpack_as})>".format(self)
# =====
def make_config(raw: Dict[str, Any], scheme: Dict[str, Any], _keys: Tuple[str, ...]=()) -> Section:
if not isinstance(raw, dict):
- raise ValueError("The node '{}' must be a dictionary".format("/".join(_keys) or "/"))
+ raise ConfigError("The node %r must be a dictionary" % ("/".join(_keys) or "/"))
config = Section()
- for (key, option) in scheme.items():
- full_key = _keys + (key,)
- full_name = "/".join(full_key)
-
- if isinstance(option, Option):
- value = raw.get(key, option.default)
- try:
- value = option.type(value)
- except Exception:
- raise ValueError("Invalid value '{value}' for key '{key}'".format(key=full_name, value=value))
+
+ def make_full_key(key: str) -> Tuple[str, ...]:
+ return _keys + (key,)
+
+ def make_full_name(key: str) -> str:
+ return "/".join(make_full_key(key))
+
+ def process_option(key: str, no_only_if: bool=False) -> Any:
+ if key not in config:
+ option: Option = scheme[key]
+ only_if = option.only_if
+ only_if_negative = option.only_if.startswith("!")
+ if only_if_negative:
+ only_if = only_if[1:]
+
+ if only_if and no_only_if: # pylint: disable=no-else-raise
+ # Перекрестный only_if запрещен
+ raise RuntimeError("Found only_if recursuon on key %r" % (make_full_name(key)))
+ elif only_if and (
+ (not only_if_negative and not process_option(only_if, no_only_if=True))
+ or (only_if_negative and process_option(only_if, no_only_if=True))
+ ):
+ # Если есть условие и оно ложно - ставим дефолт и не валидируем
+ value = option.default
+ else:
+ value = raw.get(key, option.default)
+ try:
+ value = option.type(value)
+ except ValueError as err:
+ raise ConfigError("Invalid value %r for key %r: %s" % (value, make_full_name(key), str(err)))
+
config[key] = value
config._set_meta( # pylint: disable=protected-access
key=key,
default=option.default,
+ unpack_as=option.unpack_as,
help=option.help,
- rename=option.rename,
)
- elif isinstance(option, dict):
- config[key] = make_config(raw.get(key, {}), option, full_key)
+ return config[key]
+
+ for key in scheme:
+ if isinstance(scheme[key], Option):
+ process_option(key)
+ elif isinstance(scheme[key], dict):
+ config[key] = make_config(raw.get(key, {}), scheme[key], make_full_key(key))
else:
- raise RuntimeError("Incorrect scheme definition for key '{}':"
- " the value is {}, not dict or Option()".format(full_name, type(option)))
+ raise RuntimeError("Incorrect scheme definition for key %r:"
+ " the value is %r, not dict() or Option()" % (make_full_name(key), type(scheme[key])))
return config
diff --git a/kvmd/yamlconf/dumper.py b/kvmd/yamlconf/dumper.py
index 517965fe..93bced3a 100644
--- a/kvmd/yamlconf/dumper.py
+++ b/kvmd/yamlconf/dumper.py
@@ -44,17 +44,17 @@ def _inner_make_dump(config: Section, _level: int=0) -> List[str]:
for (key, value) in sorted(config.items(), key=operator.itemgetter(0)):
indent = " " * _INDENT * _level
if isinstance(value, Section):
- lines.append("{}{}:".format(indent, key))
+ lines.append("%s%s:" % (indent, key))
lines += _inner_make_dump(value, _level + 1)
lines.append("")
else:
default = config._get_default(key) # pylint: disable=protected-access
comment = config._get_help(key) # pylint: disable=protected-access
if default == value:
- lines.append("{}{}: {} # {}".format(indent, key, _make_yaml(value, _level), comment))
+ lines.append("%s%s: %s # %s" % (indent, key, _make_yaml(value, _level), comment))
else:
- lines.append("{}# {}: {} # {}".format(indent, key, _make_yaml(default, _level), comment))
- lines.append("{}{}: {}".format(indent, key, _make_yaml(value, _level)))
+ lines.append("%s# %s: %s # %s" % (indent, key, _make_yaml(default, _level), comment))
+ lines.append("%s%s: %s" % (indent, key, _make_yaml(value, _level)))
return lines
diff --git a/kvmd/yamlconf/loader.py b/kvmd/yamlconf/loader.py
index ba82152c..c72a45dc 100644
--- a/kvmd/yamlconf/loader.py
+++ b/kvmd/yamlconf/loader.py
@@ -26,7 +26,6 @@ from typing import IO
from typing import Any
import yaml
-import yaml.loader
import yaml.nodes
@@ -37,17 +36,17 @@ def load_yaml_file(path: str) -> Any:
return yaml.load(yaml_file, _YamlLoader)
except Exception:
# Reraise internal exception as standard ValueError and show the incorrect file
- raise ValueError("Incorrect YAML syntax in file '{}'".format(path))
+ raise ValueError("Incorrect YAML syntax in file %r" % (path))
-class _YamlLoader(yaml.loader.Loader): # pylint: disable=too-many-ancestors
+class _YamlLoader(yaml.SafeLoader):
def __init__(self, yaml_file: IO) -> None:
- yaml.loader.Loader.__init__(self, yaml_file)
+ super().__init__(yaml_file)
self.__root = os.path.dirname(yaml_file.name)
def include(self, node: yaml.nodes.Node) -> str:
- path = os.path.join(self.__root, self.construct_scalar(node)) # pylint: disable=no-member
+ path = os.path.join(self.__root, self.construct_scalar(node))
return load_yaml_file(path)
-_YamlLoader.add_constructor("!include", _YamlLoader.include) # pylint: disable=no-member
+_YamlLoader.add_constructor("!include", _YamlLoader.include)