diff options
author | Devaev Maxim <[email protected]> | 2019-04-06 05:32:02 +0300 |
---|---|---|
committer | Devaev Maxim <[email protected]> | 2019-04-06 08:04:26 +0300 |
commit | 1d75b738a08c98a5d3d8ac3c685e77360f4c1267 (patch) | |
tree | 3aa89dc7fd0ab737e9332714a784e9d4dde0a362 /kvmd | |
parent | 73e04b71ed55a46c939f12548b31746617af2bca (diff) |
validators, tests
Diffstat (limited to 'kvmd')
-rw-r--r-- | kvmd/apps/__init__.py | 197 | ||||
-rw-r--r-- | kvmd/apps/cleanup/__init__.py | 9 | ||||
-rw-r--r-- | kvmd/apps/htpasswd/__init__.py | 29 | ||||
-rw-r--r-- | kvmd/apps/kvmd/__init__.py | 14 | ||||
-rw-r--r-- | kvmd/apps/kvmd/hid.py | 2 | ||||
-rw-r--r-- | kvmd/apps/kvmd/server.py | 106 | ||||
-rw-r--r-- | kvmd/apps/kvmd/streamer.py | 8 | ||||
-rw-r--r-- | kvmd/gpio.py | 8 | ||||
-rw-r--r-- | kvmd/validators/__init__.py | 83 | ||||
-rw-r--r-- | kvmd/validators/auth.py | 43 | ||||
-rw-r--r-- | kvmd/validators/basic.py | 73 | ||||
-rw-r--r-- | kvmd/validators/fs.py | 52 | ||||
-rw-r--r-- | kvmd/validators/hw.py | 42 | ||||
-rw-r--r-- | kvmd/validators/kvm.py | 48 | ||||
-rw-r--r-- | kvmd/validators/net.py | 67 | ||||
-rw-r--r-- | kvmd/yamlconf/__init__.py | 99 | ||||
-rw-r--r-- | kvmd/yamlconf/dumper.py | 8 | ||||
-rw-r--r-- | kvmd/yamlconf/loader.py | 11 |
18 files changed, 641 insertions, 258 deletions
diff --git a/kvmd/apps/__init__.py b/kvmd/apps/__init__.py index d2007f7d..062e9bfe 100644 --- a/kvmd/apps/__init__.py +++ b/kvmd/apps/__init__.py @@ -29,14 +29,13 @@ import logging.config from typing import Tuple from typing import List from typing import Dict -from typing import Sequence from typing import Optional -from typing import Union import pygments import pygments.lexers.data import pygments.formatters +from ..yamlconf import ConfigError from ..yamlconf import make_config from ..yamlconf import Section from ..yamlconf import Option @@ -44,31 +43,59 @@ from ..yamlconf import build_raw_from_options from ..yamlconf.dumper import make_config_dump from ..yamlconf.loader import load_yaml_file +from ..validators.basic import valid_bool +from ..validators.basic import valid_number +from ..validators.basic import valid_int_f1 +from ..validators.basic import valid_float_f01 + +from ..validators.fs import valid_abs_path +from ..validators.fs import valid_abs_path_exists +from ..validators.fs import valid_unix_mode + +from ..validators.net import valid_ip_or_host +from ..validators.net import valid_port + +from ..validators.auth import valid_auth_type + +from ..validators.kvm import valid_stream_quality +from ..validators.kvm import valid_stream_fps + +from ..validators.hw import valid_tty_speed +from ..validators.hw import valid_gpio_pin +from ..validators.hw import valid_gpio_pin_optional + # ===== def init( - prog: str=sys.argv[0], + prog: Optional[str]=None, description: Optional[str]=None, add_help: bool=True, + argv: Optional[List[str]]=None, ) -> Tuple[argparse.ArgumentParser, List[str], Section]: - args_parser = argparse.ArgumentParser(prog=prog, description=description, add_help=add_help) + argv = (argv or sys.argv) + assert len(argv) > 0 + + args_parser = argparse.ArgumentParser(prog=(prog or argv[0]), description=description, add_help=add_help) args_parser.add_argument("-c", "--config", dest="config_path", default="/etc/kvmd/main.yaml", metavar="<file>", - help="Set config file path") + type=valid_abs_path_exists, help="Set config file path") args_parser.add_argument("-o", "--set-options", dest="set_options", default=[], nargs="+", help="Override config options list (like sec/sub/opt=value)") args_parser.add_argument("-m", "--dump-config", dest="dump_config", action="store_true", help="View current configuration (include all overrides)") - (options, remaining) = args_parser.parse_known_args(sys.argv) + (options, remaining) = args_parser.parse_known_args(argv) + raw_config: Dict = {} - options.config_path = os.path.expanduser(options.config_path) - if os.path.exists(options.config_path): + if options.config_path: + options.config_path = os.path.expanduser(options.config_path) raw_config = load_yaml_file(options.config_path) - else: - raw_config = {} - _merge_dicts(raw_config, build_raw_from_options(options.set_options)) + scheme = _get_config_scheme() - config = make_config(raw_config, scheme) + try: + _merge_dicts(raw_config, build_raw_from_options(options.set_options)) + config = make_config(raw_config, scheme) + except ConfigError as err: + raise SystemExit("Config error: " + str(err)) if options.dump_config: dump = make_config_dump(config) @@ -96,135 +123,93 @@ def _merge_dicts(dest: Dict, src: Dict) -> None: dest[key] = src[key] -def _as_pin(pin: int) -> int: - if not isinstance(pin, int) or pin <= 0: - raise ValueError("Invalid pin number") - return pin - - -def _as_optional_pin(pin: int) -> int: - if not isinstance(pin, int) or pin < -1: - raise ValueError("Invalid optional pin number") - return pin - - -def _as_path(path: str) -> str: - if not isinstance(path, str): - raise ValueError("Invalid path") - path = str(path).strip() - if not path: - raise ValueError("Invalid path") - return path - - -def _as_optional_path(path: str) -> str: - if not isinstance(path, str): - raise ValueError("Invalid path") - return str(path).strip() - - -def _as_string_list(values: Union[str, Sequence]) -> List[str]: - if isinstance(values, str): - values = [values] - return list(map(str, values)) - - -def _as_auth_type(auth_type: str) -> str: - if not isinstance(auth_type, str): - raise ValueError("Invalid auth type") - auth_type = str(auth_type).strip() - if auth_type not in ["basic"]: - raise ValueError("Invalid auth type") - return auth_type - - def _get_config_scheme() -> Dict: return { "kvmd": { "server": { - "host": Option("localhost"), - "port": Option(0), - "unix": Option("", type=_as_optional_path, rename="unix_path"), - "unix_rm": Option(False), - "unix_mode": Option(0), - "heartbeat": Option(3.0), + "host": Option("localhost", type=valid_ip_or_host), + "port": Option(0, type=valid_port), + "unix": Option("", type=valid_abs_path, only_if="!port", unpack_as="unix_path"), + "unix_rm": Option(False, type=valid_bool), + "unix_mode": Option(0, type=valid_unix_mode), + "heartbeat": Option(3.0, type=valid_float_f01), "access_log_format": Option("[%P / %{X-Real-IP}i] '%r' => %s; size=%b ---" " referer='%{Referer}i'; user_agent='%{User-Agent}i'"), }, "auth": { - "type": Option("basic", type=_as_auth_type, rename="auth_type"), + "type": Option("basic", type=valid_auth_type, unpack_as="auth_type"), "basic": { - "htpasswd": Option("/etc/kvmd/htpasswd", type=_as_path, rename="htpasswd_path"), + "htpasswd": Option("/etc/kvmd/htpasswd", type=valid_abs_path_exists, unpack_as="htpasswd_path"), }, }, "info": { - "meta": Option("/etc/kvmd/meta.yaml", type=_as_path, rename="meta_path"), - "extras": Option("/usr/share/kvmd/extras", type=_as_path, rename="extras_path"), + "meta": Option("/etc/kvmd/meta.yaml", type=valid_abs_path_exists, unpack_as="meta_path"), + "extras": Option("/usr/share/kvmd/extras", type=valid_abs_path_exists, unpack_as="extras_path"), }, "hid": { - "reset_pin": Option(0, type=_as_pin), - "reset_delay": Option(0.1), - - "device": Option("", type=_as_path, rename="device_path"), - "speed": Option(115200), - "read_timeout": Option(2.0), - "read_retries": Option(10), - "common_retries": Option(100), - "retries_delay": Option(0.1), - "noop": Option(False), - - "state_poll": Option(0.1), + "reset_pin": Option(-1, type=valid_gpio_pin), + "reset_delay": Option(0.1, type=valid_float_f01), + + "device": Option("", type=valid_abs_path_exists, unpack_as="device_path"), + "speed": Option(115200, type=valid_tty_speed), + "read_timeout": Option(2.0, type=valid_float_f01), + "read_retries": Option(10, type=valid_int_f1), + "common_retries": Option(100, type=valid_int_f1), + "retries_delay": Option(0.1, type=valid_float_f01), + "noop": Option(False, type=valid_bool), + + "state_poll": Option(0.1, type=valid_float_f01), }, "atx": { - "enabled": Option(True), + "enabled": Option(True, type=valid_bool), - "power_led_pin": Option(-1, type=_as_optional_pin), - "hdd_led_pin": Option(-1, type=_as_optional_pin), - "power_switch_pin": Option(-1, type=_as_optional_pin), - "reset_switch_pin": Option(-1, type=_as_optional_pin), + "power_led_pin": Option(-1, type=valid_gpio_pin, only_if="enabled"), + "hdd_led_pin": Option(-1, type=valid_gpio_pin, only_if="enabled"), + "power_switch_pin": Option(-1, type=valid_gpio_pin, only_if="enabled"), + "reset_switch_pin": Option(-1, type=valid_gpio_pin, only_if="enabled"), - "click_delay": Option(0.1), - "long_click_delay": Option(5.5), + "click_delay": Option(0.1, type=valid_float_f01), + "long_click_delay": Option(5.5, type=valid_float_f01), - "state_poll": Option(0.1), + "state_poll": Option(0.1, type=valid_float_f01), }, "msd": { - "enabled": Option(True), + "enabled": Option(True, type=valid_bool), - "target_pin": Option(-1, type=_as_optional_pin), - "reset_pin": Option(-1, type=_as_optional_pin), + "target_pin": Option(-1, type=valid_gpio_pin, only_if="enabled"), + "reset_pin": Option(-1, type=valid_gpio_pin, only_if="enabled"), - "device": Option("", type=_as_optional_path, rename="device_path"), - "init_delay": Option(2.0), - "reset_delay": Option(1.0), - "write_meta": Option(True), - "chunk_size": Option(65536), + "device": Option("", type=valid_abs_path, only_if="enabled", unpack_as="device_path"), + "init_delay": Option(2.0, type=valid_float_f01), + "reset_delay": Option(1.0, type=valid_float_f01), + "write_meta": Option(True, type=valid_bool), + "chunk_size": Option(65536, type=(lambda arg: valid_number(arg, min=1024))), }, "streamer": { - "cap_pin": Option(0, type=_as_optional_pin), - "conv_pin": Option(0, type=_as_optional_pin), + "cap_pin": Option(-1, type=valid_gpio_pin_optional), + "conv_pin": Option(-1, type=valid_gpio_pin_optional), - "sync_delay": Option(1.0), - "init_delay": Option(1.0), - "init_restart_after": Option(0.0), - "shutdown_delay": Option(10.0), - "state_poll": Option(1.0), + "sync_delay": Option(1.0, type=valid_float_f01), + "init_delay": Option(1.0, type=valid_float_f01), + "init_restart_after": Option(0.0, type=(lambda arg: valid_number(arg, min=0.0, type=float))), + "shutdown_delay": Option(10.0, type=valid_float_f01), + "state_poll": Option(1.0, type=valid_float_f01), - "quality": Option(80), - "desired_fps": Option(0), + "quality": Option(80, type=valid_stream_quality), + "desired_fps": Option(0, type=valid_stream_fps), - "host": Option("localhost"), - "port": Option(0), - "unix": Option("", type=_as_optional_path, rename="unix_path"), - "timeout": Option(2.0), + "host": Option("localhost", type=valid_ip_or_host), + "port": Option(0, type=valid_port), + "unix": Option("", type=valid_abs_path, only_if="!port", unpack_as="unix_path"), + "timeout": Option(2.0, type=valid_float_f01), - "cmd": Option(["/bin/true"], type=_as_string_list), + "cmd": Option(["/bin/true"]), # TODO: Validator }, }, diff --git a/kvmd/apps/cleanup/__init__.py b/kvmd/apps/cleanup/__init__.py index b39559f6..39bd7a71 100644 --- a/kvmd/apps/cleanup/__init__.py +++ b/kvmd/apps/cleanup/__init__.py @@ -24,6 +24,9 @@ import os import subprocess import time +from typing import List +from typing import Optional + from ...logging import get_logger from ... import gpio @@ -32,8 +35,8 @@ from .. import init # ===== -def main() -> None: - config = init("kvmd-cleanup", description="Kill KVMD and clear resources")[2].kvmd +def main(argv: Optional[List[str]]=None) -> None: + config = init("kvmd-cleanup", description="Kill KVMD and clear resources", argv=argv)[2].kvmd logger = get_logger(0) logger.info("Cleaning up ...") @@ -47,7 +50,7 @@ def main() -> None: ("streamer_cap_pin", config.streamer.cap_pin), ("streamer_conv_pin", config.streamer.conv_pin), ]: - if pin > 0: + if pin >= 0: logger.info("Writing value=0 to pin=%d (%s)", pin, name) gpio.set_output(pin, initial=False) diff --git a/kvmd/apps/htpasswd/__init__.py b/kvmd/apps/htpasswd/__init__.py index bc77c0e8..27f940b9 100644 --- a/kvmd/apps/htpasswd/__init__.py +++ b/kvmd/apps/htpasswd/__init__.py @@ -22,7 +22,6 @@ import sys import os -import re import getpass import tempfile import contextlib @@ -34,12 +33,16 @@ import passlib.apache from ...yamlconf import Section +from ...validators import ValidatorError +from ...validators.auth import valid_user +from ...validators.auth import valid_passwd + from .. import init # ===== def _get_htpasswd_path(config: Section) -> str: - if config.kvmd.auth.auth_type != "basic": + if config.kvmd.auth.type != "basic": print("Warning: KVMD does not use basic auth", file=sys.stderr) return config.kvmd.auth.basic.htpasswd @@ -69,13 +72,6 @@ def _get_htpasswd_for_write(config: Section) -> Generator[passlib.apache.Htpassw os.remove(tmp_path) -def _valid_user(user: str) -> str: - stripped = user.strip() - if re.match(r"^[a-z_][a-z0-9_-]*$", stripped): - return stripped - raise SystemExit("Invalid user %r" % (user)) - - # ==== def _cmd_list(config: Section, _: argparse.Namespace) -> None: for user in passlib.apache.HtpasswdFile(_get_htpasswd_path(config)).users(): @@ -85,10 +81,10 @@ def _cmd_list(config: Section, _: argparse.Namespace) -> None: def _cmd_set(config: Section, options: argparse.Namespace) -> None: with _get_htpasswd_for_write(config) as htpasswd: if options.read_stdin: - passwd = input() + passwd = valid_passwd(input()) else: - passwd = getpass.getpass("Password: ", stream=sys.stderr) - if getpass.getpass("Repeat: ", stream=sys.stderr) != passwd: + passwd = valid_passwd(getpass.getpass("Password: ", stream=sys.stderr)) + if valid_passwd(getpass.getpass("Repeat: ", stream=sys.stderr)) != passwd: raise SystemExit("Sorry, passwords do not match") htpasswd.set_password(options.user, passwd) @@ -113,13 +109,16 @@ def main() -> None: cmd_list_parser.set_defaults(cmd=_cmd_list) cmd_set_parser = subparsers.add_parser("set", help="Create user or change password") - cmd_set_parser.add_argument("user", type=_valid_user) + cmd_set_parser.add_argument("user", type=valid_user) cmd_set_parser.add_argument("-i", "--read-stdin", action="store_true", help="Read password from stdin") cmd_set_parser.set_defaults(cmd=_cmd_set) cmd_delete_parser = subparsers.add_parser("del", help="Delete user") - cmd_delete_parser.add_argument("user", type=_valid_user) + cmd_delete_parser.add_argument("user", type=valid_user) cmd_delete_parser.set_defaults(cmd=_cmd_delete) options = parser.parse_args(argv[1:]) - options.cmd(config, options) + try: + options.cmd(config, options) + except ValidatorError as err: + raise SystemExit(str(err)) diff --git a/kvmd/apps/kvmd/__init__.py b/kvmd/apps/kvmd/__init__.py index 3f2431d7..1ab14fd6 100644 --- a/kvmd/apps/kvmd/__init__.py +++ b/kvmd/apps/kvmd/__init__.py @@ -45,15 +45,15 @@ def main() -> None: # pylint: disable=protected-access loop = asyncio.get_event_loop() Server( - auth_manager=AuthManager(**config.auth._unpack_renamed()), - info_manager=InfoManager(loop=loop, **config.info._unpack_renamed()), + auth_manager=AuthManager(**config.auth._unpack()), + info_manager=InfoManager(loop=loop, **config.info._unpack()), log_reader=LogReader(loop=loop), - hid=Hid(**config.hid._unpack_renamed()), - atx=Atx(**config.atx._unpack_renamed()), - msd=MassStorageDevice(loop=loop, **config.msd._unpack_renamed()), - streamer=Streamer(loop=loop, **config.streamer._unpack_renamed()), + hid=Hid(**config.hid._unpack()), + atx=Atx(**config.atx._unpack()), + msd=MassStorageDevice(loop=loop, **config.msd._unpack()), + streamer=Streamer(loop=loop, **config.streamer._unpack()), loop=loop, - ).run(**config.server._unpack_renamed()) + ).run(**config.server._unpack()) get_logger().info("Bye-bye") diff --git a/kvmd/apps/kvmd/hid.py b/kvmd/apps/kvmd/hid.py index 22d4acf3..2b41eda0 100644 --- a/kvmd/apps/kvmd/hid.py +++ b/kvmd/apps/kvmd/hid.py @@ -46,7 +46,7 @@ from ... import gpio # ===== def _get_keymap() -> Dict[str, int]: - return yaml.load(pkgutil.get_data("kvmd", "data/keymap.yaml").decode()) # type: ignore + return yaml.safe_load(pkgutil.get_data("kvmd", "data/keymap.yaml").decode()) # type: ignore _KEYMAP = _get_keymap() diff --git a/kvmd/apps/kvmd/server.py b/kvmd/apps/kvmd/server.py index fc701c94..ae82015e 100644 --- a/kvmd/apps/kvmd/server.py +++ b/kvmd/apps/kvmd/server.py @@ -21,7 +21,6 @@ import os -import re import signal import socket import asyncio @@ -36,7 +35,6 @@ from typing import Dict from typing import Set from typing import Callable from typing import Optional -from typing import Any import aiohttp.web import setproctitle @@ -45,6 +43,18 @@ from ...logging import get_logger from ...aioregion import RegionIsBusyError +from ...validators import ValidatorError + +from ...validators.basic import valid_bool +from ...validators.auth import valid_user +from ...validators.auth import valid_passwd +from ...validators.auth import valid_auth_token +from ...validators.kvm import valid_atx_button +from ...validators.kvm import valid_kvm_target +from ...validators.kvm import valid_log_seek +from ...validators.kvm import valid_stream_quality +from ...validators.kvm import valid_stream_fps + from ... import __version__ from .auth import AuthManager @@ -80,10 +90,6 @@ class HttpError(Exception): pass -class BadRequestError(HttpError): - pass - - class UnauthorizedError(HttpError): pass @@ -138,7 +144,7 @@ def _exposed(http_method: str, path: str, auth_required: bool=True) -> Callable: if auth_required: token = request.cookies.get(_COOKIE_AUTH_TOKEN, "") if token: - user = self._auth_manager.check(_valid_token(token)) + user = self._auth_manager.check(valid_auth_token(token)) if not user: raise ForbiddenError("Forbidden") setattr(request, _ATTR_KVMD_USER, user) @@ -149,7 +155,7 @@ def _exposed(http_method: str, path: str, auth_required: bool=True) -> Callable: except RegionIsBusyError as err: return _json_exception(err, 409) - except (BadRequestError, AtxOperationError, MsdOperationError) as err: + except (ValidatorError, AtxOperationError, MsdOperationError) as err: return _json_exception(err, 400) except UnauthorizedError as err: return _json_exception(err, 401) @@ -178,51 +184,6 @@ def _system_task(method: Callable) -> Callable: return wrap -def _valid_user(user: Any) -> str: - if isinstance(user, str): - stripped = user.strip() - if re.match(r"^[a-z_][a-z0-9_-]*$", stripped): - return stripped - raise BadRequestError("Invalid user characters %r" % (user)) - - -def _valid_passwd(passwd: Any) -> str: - if isinstance(passwd, str): - if re.match(r"[\x20-\x7e]*$", passwd): - return passwd - raise BadRequestError("Invalid password characters") - - -def _valid_token(token: Optional[str]) -> str: - if isinstance(token, str): - token = token.strip().lower() - if re.match(r"^[0-9a-f]{64}$", token): - return token - raise BadRequestError("Invalid auth token characters") - - -def _valid_bool(name: str, flag: Optional[str]) -> bool: - flag = str(flag).strip().lower() - if flag in ["1", "true", "yes"]: - return True - elif flag in ["0", "false", "no"]: - return False - raise BadRequestError("Invalid param '%s'" % (name)) - - -def _valid_int(name: str, value: Optional[str], min_value: Optional[int]=None, max_value: Optional[int]=None) -> int: - try: - value_int = int(value) # type: ignore - if ( - (min_value is not None and value_int < min_value) - or (max_value is not None and value_int > max_value) - ): - raise ValueError() - return value_int - except Exception: - raise BadRequestError("Invalid param %r" % (name)) - - class _Events(Enum): INFO_STATE = "info_state" HID_STATE = "hid_state" @@ -337,8 +298,8 @@ class Server: # pylint: disable=too-many-instance-attributes async def __auth_login_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response: credentials = await request.post() token = self._auth_manager.login( - user=_valid_user(credentials.get("user", "")), - passwd=_valid_passwd(credentials.get("passwd", "")), + user=valid_user(credentials.get("user", "")), + passwd=valid_passwd(credentials.get("passwd", "")), ) if token: return _json({}, set_cookies={_COOKIE_AUTH_TOKEN: token}) @@ -346,7 +307,7 @@ class Server: # pylint: disable=too-many-instance-attributes @_exposed("POST", "/auth/logout") async def __auth_logout_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response: - token = _valid_token(request.cookies.get(_COOKIE_AUTH_TOKEN, "")) + token = valid_auth_token(request.cookies.get(_COOKIE_AUTH_TOKEN, "")) self._auth_manager.logout(token) return _json({}) @@ -362,8 +323,8 @@ class Server: # pylint: disable=too-many-instance-attributes @_exposed("GET", "/log") async def __log_handler(self, request: aiohttp.web.Request) -> aiohttp.web.StreamResponse: - seek = _valid_int("seek", request.query.get("seek", "0"), 0) - follow = _valid_bool("follow", request.query.get("follow", "false")) + seek = valid_log_seek(request.query.get("seek", "0")) + follow = valid_bool(request.query.get("follow", "false")) response = aiohttp.web.StreamResponse(status=200, reason="OK", headers={"Content-Type": "text/plain"}) await response.prepare(request) async for record in self.__log_reader.poll_log(seek, follow): @@ -460,15 +421,12 @@ class Server: # pylint: disable=too-many-instance-attributes @_exposed("POST", "/atx/click") async def __atx_click_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response: - button = request.query.get("button") - clicker = { + button = valid_atx_button(request.query.get("button")) + await ({ "power": self.__atx.click_power, "power_long": self.__atx.click_power_long, "reset": self.__atx.click_reset, - }.get(button) - if not clicker: - raise BadRequestError("Invalid param 'button'") - await clicker() + }[button])() return _json({"clicked": button}) # ===== MSD @@ -479,13 +437,11 @@ class Server: # pylint: disable=too-many-instance-attributes @_exposed("POST", "/msd/connect") async def __msd_connect_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response: - to = request.query.get("to") - if to == "kvm": - return _json(await self.__msd.connect_to_kvm()) - elif to == "server": - return _json(await self.__msd.connect_to_pc()) - else: - raise BadRequestError("Invalid param 'to'") + to = valid_kvm_target(request.query.get("to")) + return _json(await ({ + "kvm": self.__msd.connect_to_kvm, + "server": self.__msd.connect_to_pc, + }[to])()) @_exposed("POST", "/msd/write") async def __msd_write_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response: @@ -496,12 +452,12 @@ class Server: # pylint: disable=too-many-instance-attributes async with self.__msd: field = await reader.next() if not field or field.name != "image_name": - raise BadRequestError("Missing 'image_name' field") + raise ValidatorError("Missing 'image_name' field") image_name = (await field.read()).decode("utf-8")[:256] field = await reader.next() if not field or field.name != "image_data": - raise BadRequestError("Missing 'image_data' field") + raise ValidatorError("Missing 'image_data' field") logger.info("Writing image %r to mass-storage device ...", image_name) await self.__msd.write_image_info(image_name, False) @@ -530,8 +486,8 @@ class Server: # pylint: disable=too-many-instance-attributes @_exposed("POST", "/streamer/set_params") async def __streamer_set_params_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response: for (name, validator) in [ - ("quality", lambda arg: _valid_int("quality", arg, 1, 100)), - ("desired_fps", lambda arg: _valid_int("desired_fps", arg, 0, 30)), + ("quality", valid_stream_quality), + ("desired_fps", valid_stream_fps), ]: value = request.query.get(name) if value: diff --git a/kvmd/apps/kvmd/streamer.py b/kvmd/apps/kvmd/streamer.py index 71662959..771d5be3 100644 --- a/kvmd/apps/kvmd/streamer.py +++ b/kvmd/apps/kvmd/streamer.py @@ -63,8 +63,8 @@ class Streamer: # pylint: disable=too-many-instance-attributes loop: asyncio.AbstractEventLoop, ) -> None: - self.__cap_pin = (gpio.set_output(cap_pin) if cap_pin > 0 else 0) - self.__conv_pin = (gpio.set_output(conv_pin) if conv_pin > 0 else 0) + self.__cap_pin = (gpio.set_output(cap_pin) if cap_pin >= 0 else -1) + self.__conv_pin = (gpio.set_output(conv_pin) if conv_pin >= 0 else -1) self.__sync_delay = sync_delay self.__init_delay = init_delay @@ -179,9 +179,9 @@ class Streamer: # pylint: disable=too-many-instance-attributes async def __set_hw_enabled(self, enabled: bool) -> None: # XXX: This sequence is very important to enable converter and cap board - if self.__cap_pin > 0: + if self.__cap_pin >= 0: gpio.write(self.__cap_pin, enabled) - if self.__conv_pin > 0: + if self.__conv_pin >= 0: if enabled: await asyncio.sleep(self.__sync_delay) gpio.write(self.__conv_pin, enabled) diff --git a/kvmd/gpio.py b/kvmd/gpio.py index 64a1a100..ef9d1c5e 100644 --- a/kvmd/gpio.py +++ b/kvmd/gpio.py @@ -43,22 +43,22 @@ def bcm() -> Generator[None, None, None]: def set_output(pin: int, initial: bool=False) -> int: - assert pin > 0, pin + assert pin >= 0, pin GPIO.setup(pin, GPIO.OUT, initial=initial) return pin def set_input(pin: int) -> int: - assert pin > 0, pin + assert pin >= 0, pin GPIO.setup(pin, GPIO.IN) return pin def read(pin: int) -> bool: - assert pin > 0, pin + assert pin >= 0, pin return bool(GPIO.input(pin)) def write(pin: int, flag: bool) -> None: - assert pin > 0, pin + assert pin >= 0, pin GPIO.output(pin, flag) diff --git a/kvmd/validators/__init__.py b/kvmd/validators/__init__.py new file mode 100644 index 00000000..f4b0f1fa --- /dev/null +++ b/kvmd/validators/__init__.py @@ -0,0 +1,83 @@ +# ========================================================================== # +# # +# KVMD - The main Pi-KVM daemon. # +# # +# Copyright (C) 2018 Maxim Devaev <[email protected]> # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see <https://www.gnu.org/licenses/>. # +# # +# ========================================================================== # + + +import re + +from typing import List +from typing import Callable +from typing import NoReturn +from typing import Any + + +# ===== +class ValidatorError(ValueError): + pass + + +# ===== +def raise_error(arg: Any, name: str, hide: bool=False) -> NoReturn: + arg_str = " " + if not hide: + arg_str = (" %r " if isinstance(arg, (str, bytes)) else " '%s' ") % (arg) + raise ValidatorError("The argument" + arg_str + "is not a valid " + name) + + +def check_not_none(arg: Any, name: str) -> Any: + if arg is None: + raise ValidatorError("Empty argument is not a valid %s" % (name)) + return arg + + +def check_not_none_string(arg: Any, name: str, strip: bool=True) -> str: + arg = str(check_not_none(arg, name)) + if strip: + arg = arg.strip() + return arg + + +def check_in_list(arg: Any, name: str, variants: List) -> Any: + if arg not in variants: + raise_error(arg, name) + return arg + + +def check_string_in_list(arg: Any, name: str, variants: List[str], lower: bool=True) -> Any: + arg = check_not_none_string(arg, name) + if lower: + arg = arg.lower() + return check_in_list(arg, name, variants) + + +def check_re_match(arg: Any, name: str, pattern: str, strip: bool=True, hide: bool=False) -> str: + arg = check_not_none_string(arg, name, strip=strip) + if re.match(pattern, arg, flags=re.MULTILINE) is None: + raise_error(arg, name, hide=hide) + return arg + + +def check_any(arg: Any, name: str, validators: List[Callable[[Any], Any]]) -> Any: + for validator in validators: + try: + return validator(arg) + except Exception: + pass + raise_error(arg, name) diff --git a/kvmd/validators/auth.py b/kvmd/validators/auth.py new file mode 100644 index 00000000..5f6188e1 --- /dev/null +++ b/kvmd/validators/auth.py @@ -0,0 +1,43 @@ +# ========================================================================== # +# # +# KVMD - The main Pi-KVM daemon. # +# # +# Copyright (C) 2018 Maxim Devaev <[email protected]> # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see <https://www.gnu.org/licenses/>. # +# # +# ========================================================================== # + + +from typing import Any + +from . import check_string_in_list +from . import check_re_match + + +# ===== +def valid_user(arg: Any) -> str: + return check_re_match(arg, "username characters", r"^[a-z_][a-z0-9_-]*$") + + +def valid_passwd(arg: Any) -> str: + return check_re_match(arg, "passwd characters", r"^[\x20-\x7e]*\Z$", strip=False, hide=True) + + +def valid_auth_token(arg: Any) -> str: + return check_re_match(arg, "auth token", r"^[0-9a-f]{64}$", hide=True) + + +def valid_auth_type(arg: Any) -> str: + return check_string_in_list(arg, "auth type", ["basic"]) diff --git a/kvmd/validators/basic.py b/kvmd/validators/basic.py new file mode 100644 index 00000000..e4a2336e --- /dev/null +++ b/kvmd/validators/basic.py @@ -0,0 +1,73 @@ +# ========================================================================== # +# # +# KVMD - The main Pi-KVM daemon. # +# # +# Copyright (C) 2018 Maxim Devaev <[email protected]> # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see <https://www.gnu.org/licenses/>. # +# # +# ========================================================================== # + + +from typing import Type +from typing import Union +from typing import Any + +from . import ValidatorError +from . import raise_error +from . import check_not_none_string +from . import check_in_list + + +# ===== +def valid_bool(arg: Any) -> bool: + true_args = ["1", "true", "yes"] + false_args = ["0", "false", "no"] + + name = "bool (%r or %r)" % (true_args, false_args) + + arg = check_not_none_string(arg, name).lower() + arg = check_in_list(arg, name, true_args + false_args) + return (arg in true_args) + + +def valid_number( + arg: Any, + min: Union[int, float, None]=None, # pylint: disable=redefined-builtin + max: Union[int, float, None]=None, # pylint: disable=redefined-builtin + type: Union[Type[int], Type[float]]=int, # pylint: disable=redefined-builtin + name: str="", +) -> Union[int, float]: + + name = (name or type.__name__) + + arg = check_not_none_string(arg, name) + try: + arg = type(arg) + except Exception: + raise_error(arg, name) + + if min is not None and arg < min: + raise ValidatorError("The argument '%s' must be %s and greater or equial than %s" % (arg, name, min)) + if max is not None and arg > max: + raise ValidatorError("The argument '%s' must be %s and lesser or equal then %s" % (arg, name, max)) + return arg + + +def valid_int_f1(arg: Any) -> int: + return int(valid_number(arg, min=1)) + + +def valid_float_f01(arg: Any) -> float: + return float(valid_number(arg, min=0.1, type=float)) diff --git a/kvmd/validators/fs.py b/kvmd/validators/fs.py new file mode 100644 index 00000000..ed895818 --- /dev/null +++ b/kvmd/validators/fs.py @@ -0,0 +1,52 @@ +# ========================================================================== # +# # +# KVMD - The main Pi-KVM daemon. # +# # +# Copyright (C) 2018 Maxim Devaev <[email protected]> # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see <https://www.gnu.org/licenses/>. # +# # +# ========================================================================== # + + +import os + +from typing import Any + +from . import raise_error +from . import check_not_none_string + +from .basic import valid_number + + +# ===== +def valid_abs_path(arg: Any, exists: bool=False) -> str: + name = ("existent absolute path" if exists else "absolute path") + + if len(str(arg).strip()) == 0: + arg = None + arg = check_not_none_string(arg, name) + + arg = os.path.abspath(arg) + if exists and not os.access(arg, os.F_OK): + raise_error(arg, name) + return arg + + +def valid_abs_path_exists(arg: Any) -> str: + return valid_abs_path(arg, exists=True) + + +def valid_unix_mode(arg: Any) -> int: + return int(valid_number(arg, min=0, name="UNIX mode")) diff --git a/kvmd/validators/hw.py b/kvmd/validators/hw.py new file mode 100644 index 00000000..a68743ad --- /dev/null +++ b/kvmd/validators/hw.py @@ -0,0 +1,42 @@ +# ========================================================================== # +# # +# KVMD - The main Pi-KVM daemon. # +# # +# Copyright (C) 2018 Maxim Devaev <[email protected]> # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see <https://www.gnu.org/licenses/>. # +# # +# ========================================================================== # + + +from typing import Any + +from . import check_in_list + +from .basic import valid_number + + +# ===== +def valid_tty_speed(arg: Any) -> int: + name = "TTY speed" + arg = int(valid_number(arg, name=name)) + return check_in_list(arg, name, [1200, 2400, 4800, 9600, 19200, 38400, 57600, 115200]) + + +def valid_gpio_pin(arg: Any) -> int: + return int(valid_number(arg, min=0, name="GPIO pin")) + + +def valid_gpio_pin_optional(arg: Any) -> int: + return int(valid_number(arg, min=-1, name="optional GPIO pin")) diff --git a/kvmd/validators/kvm.py b/kvmd/validators/kvm.py new file mode 100644 index 00000000..034587ef --- /dev/null +++ b/kvmd/validators/kvm.py @@ -0,0 +1,48 @@ +# ========================================================================== # +# # +# KVMD - The main Pi-KVM daemon. # +# # +# Copyright (C) 2018 Maxim Devaev <[email protected]> # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see <https://www.gnu.org/licenses/>. # +# # +# ========================================================================== # + + +from typing import Any + +from . import check_string_in_list + +from .basic import valid_number + + +# ===== +def valid_atx_button(arg: Any) -> str: + return check_string_in_list(arg, "ATX button", ["power", "power_long", "reset"]) + + +def valid_kvm_target(arg: Any) -> str: + return check_string_in_list(arg, "KVM target", ["kvm", "server"]) + + +def valid_log_seek(arg: Any) -> int: + return int(valid_number(arg, min=0, name="log seek")) + + +def valid_stream_quality(arg: Any) -> int: + return int(valid_number(arg, min=1, max=100, name="stream quality")) + + +def valid_stream_fps(arg: Any) -> int: + return int(valid_number(arg, min=0, max=30, name="stream FPS")) diff --git a/kvmd/validators/net.py b/kvmd/validators/net.py new file mode 100644 index 00000000..cad87f07 --- /dev/null +++ b/kvmd/validators/net.py @@ -0,0 +1,67 @@ +# ========================================================================== # +# # +# KVMD - The main Pi-KVM daemon. # +# # +# Copyright (C) 2018 Maxim Devaev <[email protected]> # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see <https://www.gnu.org/licenses/>. # +# # +# ========================================================================== # + + +import socket + +from typing import Any + +from . import check_not_none_string +from . import check_re_match +from . import check_any + +from .basic import valid_number + + +# ===== +def valid_ip_or_host(arg: Any) -> str: + name = "IP address or RFC-1123 hostname" + return check_any( + arg=check_not_none_string(arg, name), + name=name, + validators=[ + valid_ip, + valid_rfc_host, + ], + ) + + +def valid_ip(arg: Any) -> str: + name = "IP address" + return check_any( + arg=check_not_none_string(arg, name), + name=name, + validators=[ + lambda arg: (arg, socket.inet_pton(socket.AF_INET, arg))[0], + lambda arg: (arg, socket.inet_pton(socket.AF_INET6, arg))[0], + ], + ) + + +def valid_rfc_host(arg: Any) -> str: + # http://stackoverflow.com/questions/106179/regular-expression-to-match-hostname-or-ip-address + pattern = r"^(([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]*[a-zA-Z0-9])\.)*" \ + r"([A-Za-z0-9]|[A-Za-z0-9][A-Za-z0-9\-]*[A-Za-z0-9])$" + return check_re_match(arg, "RFC-1123 hostname", pattern) + + +def valid_port(arg: Any) -> int: + return int(valid_number(arg, min=0, max=65535, name="TCP/UDP port")) diff --git a/kvmd/yamlconf/__init__.py b/kvmd/yamlconf/__init__.py index 4c15fee1..bfda9b78 100644 --- a/kvmd/yamlconf/__init__.py +++ b/kvmd/yamlconf/__init__.py @@ -31,14 +31,19 @@ from typing import Any # ===== +class ConfigError(ValueError): + pass + + +# ===== def build_raw_from_options(options: List[str]) -> Dict[str, Any]: raw: Dict[str, Any] = {} for option in options: (key, value) = (option.split("=", 1) + [None])[:2] # type: ignore if len(key.strip()) == 0: - raise ValueError("Empty option key (required 'key=value' instead of '{}')".format(option)) + raise ConfigError("Empty option key (required 'key=value' instead of %r)" % (option)) if value is None: - raise ValueError("No value for key '{}'".format(key)) + raise ConfigError("No value for key %r" % (key)) section = raw subs = list(map(str.strip, key.split("/"))) @@ -56,7 +61,7 @@ def _parse_value(value: str) -> Any: and value not in ["true", "false", "null"] and not value.startswith(("{", "[", "\"")) ): - value = "\"{}\"".format(value) + value = "\"%s\"" % (value) return json.loads(value) @@ -66,33 +71,33 @@ class Section(dict): dict.__init__(self) self.__meta: Dict[str, Dict[str, Any]] = {} - def _unpack_renamed(self, _section: Optional["Section"]=None) -> Dict[str, Any]: + def _unpack(self, _section: Optional["Section"]=None) -> Dict[str, Any]: if _section is None: _section = self unpacked: Dict[str, Any] = {} for (key, value) in _section.items(): if isinstance(value, Section): - unpacked[key] = value._unpack_renamed() # pylint: disable=protected-access + unpacked[key] = value._unpack() # pylint: disable=protected-access else: # Option - unpacked[_section._get_rename(key)] = value # pylint: disable=protected-access + unpacked[_section._get_unpack_as(key)] = value # pylint: disable=protected-access return unpacked - def _set_meta(self, key: str, default: Any, help: str, rename: str) -> None: # pylint: disable=redefined-builtin + def _set_meta(self, key: str, default: Any, unpack_as: str, help: str) -> None: # pylint: disable=redefined-builtin self.__meta[key] = { "default": default, - "help": help, - "rename": rename, + "unpack_as": unpack_as, + "help": help, } def _get_default(self, key: str) -> Any: return self.__meta[key]["default"] + def _get_unpack_as(self, key: str) -> str: + return (self.__meta[key]["unpack_as"] or key) + def _get_help(self, key: str) -> str: return self.__meta[key]["help"] - def _get_rename(self, key: str) -> str: - return (self.__meta[key]["rename"] or key) - def __getattribute__(self, key: str) -> Any: if key in self: return self[key] @@ -106,46 +111,74 @@ class Option: def __init__( self, default: Any, - help: str="", # pylint: disable=redefined-builtin type: Optional[Callable[[Any], Any]]=None, # pylint: disable=redefined-builtin - rename: str="", + only_if: str="", + unpack_as: str="", + help: str="", # pylint: disable=redefined-builtin ) -> None: self.default = default - self.help = help self.type: Callable[[Any], Any] = (type or (self.__type(default) if default is not None else str)) # type: ignore - self.rename = rename + self.only_if = only_if + self.unpack_as = unpack_as + self.help = help def __repr__(self) -> str: - return "<Option(default={self.default}, type={self.type}, help={self.help}, rename={self.rename})>".format(self=self) + return "<Option(default={0.default}, type={0.type}, only_if={0.only_if}, unpack_as={0.unpack_as})>".format(self) # ===== def make_config(raw: Dict[str, Any], scheme: Dict[str, Any], _keys: Tuple[str, ...]=()) -> Section: if not isinstance(raw, dict): - raise ValueError("The node '{}' must be a dictionary".format("/".join(_keys) or "/")) + raise ConfigError("The node %r must be a dictionary" % ("/".join(_keys) or "/")) config = Section() - for (key, option) in scheme.items(): - full_key = _keys + (key,) - full_name = "/".join(full_key) - - if isinstance(option, Option): - value = raw.get(key, option.default) - try: - value = option.type(value) - except Exception: - raise ValueError("Invalid value '{value}' for key '{key}'".format(key=full_name, value=value)) + + def make_full_key(key: str) -> Tuple[str, ...]: + return _keys + (key,) + + def make_full_name(key: str) -> str: + return "/".join(make_full_key(key)) + + def process_option(key: str, no_only_if: bool=False) -> Any: + if key not in config: + option: Option = scheme[key] + only_if = option.only_if + only_if_negative = option.only_if.startswith("!") + if only_if_negative: + only_if = only_if[1:] + + if only_if and no_only_if: # pylint: disable=no-else-raise + # Перекрестный only_if запрещен + raise RuntimeError("Found only_if recursuon on key %r" % (make_full_name(key))) + elif only_if and ( + (not only_if_negative and not process_option(only_if, no_only_if=True)) + or (only_if_negative and process_option(only_if, no_only_if=True)) + ): + # Если есть условие и оно ложно - ставим дефолт и не валидируем + value = option.default + else: + value = raw.get(key, option.default) + try: + value = option.type(value) + except ValueError as err: + raise ConfigError("Invalid value %r for key %r: %s" % (value, make_full_name(key), str(err))) + config[key] = value config._set_meta( # pylint: disable=protected-access key=key, default=option.default, + unpack_as=option.unpack_as, help=option.help, - rename=option.rename, ) - elif isinstance(option, dict): - config[key] = make_config(raw.get(key, {}), option, full_key) + return config[key] + + for key in scheme: + if isinstance(scheme[key], Option): + process_option(key) + elif isinstance(scheme[key], dict): + config[key] = make_config(raw.get(key, {}), scheme[key], make_full_key(key)) else: - raise RuntimeError("Incorrect scheme definition for key '{}':" - " the value is {}, not dict or Option()".format(full_name, type(option))) + raise RuntimeError("Incorrect scheme definition for key %r:" + " the value is %r, not dict() or Option()" % (make_full_name(key), type(scheme[key]))) return config diff --git a/kvmd/yamlconf/dumper.py b/kvmd/yamlconf/dumper.py index 517965fe..93bced3a 100644 --- a/kvmd/yamlconf/dumper.py +++ b/kvmd/yamlconf/dumper.py @@ -44,17 +44,17 @@ def _inner_make_dump(config: Section, _level: int=0) -> List[str]: for (key, value) in sorted(config.items(), key=operator.itemgetter(0)): indent = " " * _INDENT * _level if isinstance(value, Section): - lines.append("{}{}:".format(indent, key)) + lines.append("%s%s:" % (indent, key)) lines += _inner_make_dump(value, _level + 1) lines.append("") else: default = config._get_default(key) # pylint: disable=protected-access comment = config._get_help(key) # pylint: disable=protected-access if default == value: - lines.append("{}{}: {} # {}".format(indent, key, _make_yaml(value, _level), comment)) + lines.append("%s%s: %s # %s" % (indent, key, _make_yaml(value, _level), comment)) else: - lines.append("{}# {}: {} # {}".format(indent, key, _make_yaml(default, _level), comment)) - lines.append("{}{}: {}".format(indent, key, _make_yaml(value, _level))) + lines.append("%s# %s: %s # %s" % (indent, key, _make_yaml(default, _level), comment)) + lines.append("%s%s: %s" % (indent, key, _make_yaml(value, _level))) return lines diff --git a/kvmd/yamlconf/loader.py b/kvmd/yamlconf/loader.py index ba82152c..c72a45dc 100644 --- a/kvmd/yamlconf/loader.py +++ b/kvmd/yamlconf/loader.py @@ -26,7 +26,6 @@ from typing import IO from typing import Any import yaml -import yaml.loader import yaml.nodes @@ -37,17 +36,17 @@ def load_yaml_file(path: str) -> Any: return yaml.load(yaml_file, _YamlLoader) except Exception: # Reraise internal exception as standard ValueError and show the incorrect file - raise ValueError("Incorrect YAML syntax in file '{}'".format(path)) + raise ValueError("Incorrect YAML syntax in file %r" % (path)) -class _YamlLoader(yaml.loader.Loader): # pylint: disable=too-many-ancestors +class _YamlLoader(yaml.SafeLoader): def __init__(self, yaml_file: IO) -> None: - yaml.loader.Loader.__init__(self, yaml_file) + super().__init__(yaml_file) self.__root = os.path.dirname(yaml_file.name) def include(self, node: yaml.nodes.Node) -> str: - path = os.path.join(self.__root, self.construct_scalar(node)) # pylint: disable=no-member + path = os.path.join(self.__root, self.construct_scalar(node)) return load_yaml_file(path) -_YamlLoader.add_constructor("!include", _YamlLoader.include) # pylint: disable=no-member +_YamlLoader.add_constructor("!include", _YamlLoader.include) |