From 65d4b4fb603b3648ab4fb6065a8b56eab6b217af Mon Sep 17 00:00:00 2001 From: Orsiris de Jong Date: Fri, 15 Dec 2023 13:09:41 +0100 Subject: [PATCH] Refactored config to allow full group inheritance --- npbackup/configuration.py | 151 +++++++++++++------------------------- 1 file changed, 49 insertions(+), 102 deletions(-) diff --git a/npbackup/configuration.py b/npbackup/configuration.py index 049aae2..74449bb 100644 --- a/npbackup/configuration.py +++ b/npbackup/configuration.py @@ -19,11 +19,13 @@ from copy import deepcopy from pathlib import Path from ruamel.yaml import YAML from ruamel.yaml.compat import ordereddict +from ruamel.yaml.comments import CommentedMap from logging import getLogger import re import platform from cryptidy import symmetric_encryption as enc from ofunctions.random import random_string +from ofunctions.misc import replace_in_iterable from npbackup.customization import ID_STRING @@ -168,7 +170,7 @@ empty_config_dict = { }, "env": { "env_variables": [], - "env_encrypted_variables": [] + "encrypted_env_variables": [] }, }, "identity": { @@ -195,61 +197,6 @@ empty_config_dict = { } -def iter_over_keys(d: dict, fn: Callable) -> dict: - """ - Execute value=fn(value) on any key in a nested env - """ - if isinstance(d, dict): - for key, value in d.items(): - if isinstance(value, dict): - d[key] = iter_over_keys(value, fn) - else: - d[key] = fn(key, d[key]) - return d - - -# TODO: use ofunctions.misc -def replace_in_iterable( - src: Union[dict, list], original: Union[str, Callable], replacement: Any = None, callable_wants_key: bool = False -): - """ - Recursive replace data in a struct - - Replaces every instance of string original with string replacement in a list/dict - - If original is a callable function, it will replace every instance of original with callable(original) - If original is a callable function and callable_wants_key == True, - it will replace every instance of original with callable(key, original) for dicts - and with callable(original) for any other data type - """ - - def _replace_in_iterable(key, _src): - if isinstance(_src, dict) or isinstance(_src, list): - _src = replace_in_iterable(_src, original, replacement, callable_wants_key) - elif isinstance(original, Callable): - if callable_wants_key: - _src = original(key, _src) - else: - _src = original(_src) - elif isinstance(_src, str) and isinstance(replacement, str): - _src = _src.replace(original, replacement) - else: - _src = replacement - return _src - - if isinstance(src, dict): - for key, value in src.items(): - src[key] = _replace_in_iterable(key, value) - elif isinstance(src, list): - result = [] - for entry in src: - result.append(_replace_in_iterable(None, entry)) - src = result - else: - src = _replace_in_iterable(None, src) - return src - - def crypt_config(full_config: dict, aes_key: str, encrypted_options: List[str], operation: str): try: def _crypt_config(key: str, value: Any) -> Any: @@ -358,37 +305,60 @@ def get_repo_config(full_config: dict, repo_name: str = 'default', eval_variable and a dict containing the repo interitance status """ - repo_config = ordereddict() - config_inheritance = ordereddict() - def inherit_group_settings(repo_config: dict, group_config: dict) -> Tuple[dict, dict]: """ iter over group settings, update repo_config, and produce an identical version of repo_config called config_inheritance, where every value is replaced with a boolean which states inheritance status """ - - # WIP # TODO: cannot use local repo config, need to make a deepcopy first ? - - if isinstance(group_config, dict): - for key, value in group_config.items(): - if isinstance(value, dict): - repo_config[key] = inherit_group_settings(repo_config.g(key), group_config.g(key)) - elif isinstance(value, list) and isinstance(repo_config.g(key), list): - merged_lists = repo_config.g(key) + group_config.g(key) - repo_config.s(key, merged_lists) - config_inheritance.s(key, True) - else: - if not repo_config or not repo_config.g(key): - repo_config = group_config - config_inheritance.s(key, True) + _repo_config = deepcopy(repo_config) + _group_config = deepcopy(group_config) + _config_inheritance = deepcopy(repo_config) + + def _inherit_group_settings(_repo_config: dict, _group_config: dict, _config_inheritance: dict) -> Tuple[dict, dict]: + if isinstance(_group_config, dict): + if not _repo_config: + # Initialize blank if not set + _repo_config = CommentedMap() + _config_inheritance = CommentedMap() + for key, value in _group_config.items(): + if isinstance(value, dict): + __repo_config, __config_inheritance = _inherit_group_settings(_repo_config.g(key), _group_config.g(key), _config_inheritance.g(key)) + _repo_config.s(key, __repo_config) + _config_inheritance.s(key, __config_inheritance) + elif isinstance(value, list): + # TODO: Lists containing dicts won't be updated in repo_config here + # we need to have + # for elt in list: + # recurse into elt if elt is dict + if isinstance(_repo_config.g(key), list): + merged_lists = _repo_config.g(key) + _group_config.g(key) + else: + merged_lists = _group_config.g(key) + _repo_config.s(key, merged_lists) + _config_inheritance.s(key, True) else: - config_inheritance.s(key, False) - return repo_config, config_inheritance + # Tricky part + # repo_config may already contain a struct + if not _repo_config: + _repo_config = CommentedMap() + _config_inheritance = CommentedMap() + if not _repo_config.g(key): + _repo_config.s(key, value) + _config_inheritance.s(key, True) + else: + _config_inheritance.s(key, False) + + + + + + + return _repo_config, _config_inheritance + return _inherit_group_settings(_repo_config, _group_config, _config_inheritance) try: - repo_config = deepcopy(full_config.g(f'repos.{repo_name}')) # Let's make a copy of config since it's a "pointer object" - config_inheritance = replace_in_iterable(deepcopy(full_config.g(f'repos.{repo_name}')), False) + repo_config = deepcopy(full_config.g(f'repos.{repo_name}')) except KeyError: logger.error(f"No repo with name {repo_name} found in config") return None @@ -398,30 +368,7 @@ def get_repo_config(full_config: dict, repo_name: str = 'default', eval_variable except KeyError: logger.warning(f"Repo {repo_name} has no group") else: - inherit_group_settings(repo_config, group_config) - """ - sections = full_config.g(f'groups.{repo_group}') - if sections: - for section in sections: - # TODO: ordereddict.g() returns None when key doesn't exist instead of KeyError - # So we need this horrible hack - try: - if not repo_config.g(section): - repo_config.s(section, {}) - config_inheritance.s(section, {}) - except KeyError: - repo_config.s(section, {}) - config_inheritance.s(section, {}) - sub_sections = full_config.g(f'groups.{repo_group}.{section}') - if sub_sections: - for entries in sub_sections: - # Do not overwrite repo values already present - if not repo_config.g(f'{section}.{entries}'): - repo_config.s(f'{section}.{entries}', full_config.g(f'groups.{repo_group}.{section}.{entries}')) - config_inheritance.s(f'{section}.{entries}', True) - else: - config_inheritance.s(f'{section}.{entries}', False) - """ + repo_config, config_inheritance = inherit_group_settings(repo_config, group_config) if eval_variables: repo_config = evaluate_variables(repo_config, full_config)