summaryrefslogtreecommitdiff
path: root/kvmd/yamlconf/__init__.py
diff options
context:
space:
mode:
authorDevaev Maxim <[email protected]>2019-04-06 05:32:02 +0300
committerDevaev Maxim <[email protected]>2019-04-06 08:04:26 +0300
commit1d75b738a08c98a5d3d8ac3c685e77360f4c1267 (patch)
tree3aa89dc7fd0ab737e9332714a784e9d4dde0a362 /kvmd/yamlconf/__init__.py
parent73e04b71ed55a46c939f12548b31746617af2bca (diff)
validators, tests
Diffstat (limited to 'kvmd/yamlconf/__init__.py')
-rw-r--r--kvmd/yamlconf/__init__.py99
1 files changed, 66 insertions, 33 deletions
diff --git a/kvmd/yamlconf/__init__.py b/kvmd/yamlconf/__init__.py
index 4c15fee1..bfda9b78 100644
--- a/kvmd/yamlconf/__init__.py
+++ b/kvmd/yamlconf/__init__.py
@@ -31,14 +31,19 @@ from typing import Any
# =====
+class ConfigError(ValueError):
+ pass
+
+
+# =====
def build_raw_from_options(options: List[str]) -> Dict[str, Any]:
raw: Dict[str, Any] = {}
for option in options:
(key, value) = (option.split("=", 1) + [None])[:2] # type: ignore
if len(key.strip()) == 0:
- raise ValueError("Empty option key (required 'key=value' instead of '{}')".format(option))
+ raise ConfigError("Empty option key (required 'key=value' instead of %r)" % (option))
if value is None:
- raise ValueError("No value for key '{}'".format(key))
+ raise ConfigError("No value for key %r" % (key))
section = raw
subs = list(map(str.strip, key.split("/")))
@@ -56,7 +61,7 @@ def _parse_value(value: str) -> Any:
and value not in ["true", "false", "null"]
and not value.startswith(("{", "[", "\""))
):
- value = "\"{}\"".format(value)
+ value = "\"%s\"" % (value)
return json.loads(value)
@@ -66,33 +71,33 @@ class Section(dict):
dict.__init__(self)
self.__meta: Dict[str, Dict[str, Any]] = {}
- def _unpack_renamed(self, _section: Optional["Section"]=None) -> Dict[str, Any]:
+ def _unpack(self, _section: Optional["Section"]=None) -> Dict[str, Any]:
if _section is None:
_section = self
unpacked: Dict[str, Any] = {}
for (key, value) in _section.items():
if isinstance(value, Section):
- unpacked[key] = value._unpack_renamed() # pylint: disable=protected-access
+ unpacked[key] = value._unpack() # pylint: disable=protected-access
else: # Option
- unpacked[_section._get_rename(key)] = value # pylint: disable=protected-access
+ unpacked[_section._get_unpack_as(key)] = value # pylint: disable=protected-access
return unpacked
- def _set_meta(self, key: str, default: Any, help: str, rename: str) -> None: # pylint: disable=redefined-builtin
+ def _set_meta(self, key: str, default: Any, unpack_as: str, help: str) -> None: # pylint: disable=redefined-builtin
self.__meta[key] = {
"default": default,
- "help": help,
- "rename": rename,
+ "unpack_as": unpack_as,
+ "help": help,
}
def _get_default(self, key: str) -> Any:
return self.__meta[key]["default"]
+ def _get_unpack_as(self, key: str) -> str:
+ return (self.__meta[key]["unpack_as"] or key)
+
def _get_help(self, key: str) -> str:
return self.__meta[key]["help"]
- def _get_rename(self, key: str) -> str:
- return (self.__meta[key]["rename"] or key)
-
def __getattribute__(self, key: str) -> Any:
if key in self:
return self[key]
@@ -106,46 +111,74 @@ class Option:
def __init__(
self,
default: Any,
- help: str="", # pylint: disable=redefined-builtin
type: Optional[Callable[[Any], Any]]=None, # pylint: disable=redefined-builtin
- rename: str="",
+ only_if: str="",
+ unpack_as: str="",
+ help: str="", # pylint: disable=redefined-builtin
) -> None:
self.default = default
- self.help = help
self.type: Callable[[Any], Any] = (type or (self.__type(default) if default is not None else str)) # type: ignore
- self.rename = rename
+ self.only_if = only_if
+ self.unpack_as = unpack_as
+ self.help = help
def __repr__(self) -> str:
- return "<Option(default={self.default}, type={self.type}, help={self.help}, rename={self.rename})>".format(self=self)
+ return "<Option(default={0.default}, type={0.type}, only_if={0.only_if}, unpack_as={0.unpack_as})>".format(self)
# =====
def make_config(raw: Dict[str, Any], scheme: Dict[str, Any], _keys: Tuple[str, ...]=()) -> Section:
if not isinstance(raw, dict):
- raise ValueError("The node '{}' must be a dictionary".format("/".join(_keys) or "/"))
+ raise ConfigError("The node %r must be a dictionary" % ("/".join(_keys) or "/"))
config = Section()
- for (key, option) in scheme.items():
- full_key = _keys + (key,)
- full_name = "/".join(full_key)
-
- if isinstance(option, Option):
- value = raw.get(key, option.default)
- try:
- value = option.type(value)
- except Exception:
- raise ValueError("Invalid value '{value}' for key '{key}'".format(key=full_name, value=value))
+
+ def make_full_key(key: str) -> Tuple[str, ...]:
+ return _keys + (key,)
+
+ def make_full_name(key: str) -> str:
+ return "/".join(make_full_key(key))
+
+ def process_option(key: str, no_only_if: bool=False) -> Any:
+ if key not in config:
+ option: Option = scheme[key]
+ only_if = option.only_if
+ only_if_negative = option.only_if.startswith("!")
+ if only_if_negative:
+ only_if = only_if[1:]
+
+ if only_if and no_only_if: # pylint: disable=no-else-raise
+ # Перекрестный only_if запрещен
+ raise RuntimeError("Found only_if recursuon on key %r" % (make_full_name(key)))
+ elif only_if and (
+ (not only_if_negative and not process_option(only_if, no_only_if=True))
+ or (only_if_negative and process_option(only_if, no_only_if=True))
+ ):
+ # Если есть условие и оно ложно - ставим дефолт и не валидируем
+ value = option.default
+ else:
+ value = raw.get(key, option.default)
+ try:
+ value = option.type(value)
+ except ValueError as err:
+ raise ConfigError("Invalid value %r for key %r: %s" % (value, make_full_name(key), str(err)))
+
config[key] = value
config._set_meta( # pylint: disable=protected-access
key=key,
default=option.default,
+ unpack_as=option.unpack_as,
help=option.help,
- rename=option.rename,
)
- elif isinstance(option, dict):
- config[key] = make_config(raw.get(key, {}), option, full_key)
+ return config[key]
+
+ for key in scheme:
+ if isinstance(scheme[key], Option):
+ process_option(key)
+ elif isinstance(scheme[key], dict):
+ config[key] = make_config(raw.get(key, {}), scheme[key], make_full_key(key))
else:
- raise RuntimeError("Incorrect scheme definition for key '{}':"
- " the value is {}, not dict or Option()".format(full_name, type(option)))
+ raise RuntimeError("Incorrect scheme definition for key %r:"
+ " the value is %r, not dict() or Option()" % (make_full_name(key), type(scheme[key])))
return config