Refactored config to allow full group inheritance

This commit is contained in:
Orsiris de Jong 2023-12-15 13:09:41 +01:00
parent ccd3339d1b
commit 65d4b4fb60

View file

@ -19,11 +19,13 @@ from copy import deepcopy
from pathlib import Path
from ruamel.yaml import YAML
from ruamel.yaml.compat import ordereddict
from ruamel.yaml.comments import CommentedMap
from logging import getLogger
import re
import platform
from cryptidy import symmetric_encryption as enc
from ofunctions.random import random_string
from ofunctions.misc import replace_in_iterable
from npbackup.customization import ID_STRING
@ -168,7 +170,7 @@ empty_config_dict = {
},
"env": {
"env_variables": [],
"env_encrypted_variables": []
"encrypted_env_variables": []
},
},
"identity": {
@ -195,61 +197,6 @@ empty_config_dict = {
}
def iter_over_keys(d: dict, fn: Callable) -> dict:
"""
Execute value=fn(value) on any key in a nested env
"""
if isinstance(d, dict):
for key, value in d.items():
if isinstance(value, dict):
d[key] = iter_over_keys(value, fn)
else:
d[key] = fn(key, d[key])
return d
# TODO: use ofunctions.misc
def replace_in_iterable(
src: Union[dict, list], original: Union[str, Callable], replacement: Any = None, callable_wants_key: bool = False
):
"""
Recursive replace data in a struct
Replaces every instance of string original with string replacement in a list/dict
If original is a callable function, it will replace every instance of original with callable(original)
If original is a callable function and callable_wants_key == True,
it will replace every instance of original with callable(key, original) for dicts
and with callable(original) for any other data type
"""
def _replace_in_iterable(key, _src):
if isinstance(_src, dict) or isinstance(_src, list):
_src = replace_in_iterable(_src, original, replacement, callable_wants_key)
elif isinstance(original, Callable):
if callable_wants_key:
_src = original(key, _src)
else:
_src = original(_src)
elif isinstance(_src, str) and isinstance(replacement, str):
_src = _src.replace(original, replacement)
else:
_src = replacement
return _src
if isinstance(src, dict):
for key, value in src.items():
src[key] = _replace_in_iterable(key, value)
elif isinstance(src, list):
result = []
for entry in src:
result.append(_replace_in_iterable(None, entry))
src = result
else:
src = _replace_in_iterable(None, src)
return src
def crypt_config(full_config: dict, aes_key: str, encrypted_options: List[str], operation: str):
try:
def _crypt_config(key: str, value: Any) -> Any:
@ -358,37 +305,60 @@ def get_repo_config(full_config: dict, repo_name: str = 'default', eval_variable
and a dict containing the repo interitance status
"""
repo_config = ordereddict()
config_inheritance = ordereddict()
def inherit_group_settings(repo_config: dict, group_config: dict) -> Tuple[dict, dict]:
"""
iter over group settings, update repo_config, and produce an identical version of repo_config
called config_inheritance, where every value is replaced with a boolean which states inheritance status
"""
# WIP # TODO: cannot use local repo config, need to make a deepcopy first ?
if isinstance(group_config, dict):
for key, value in group_config.items():
if isinstance(value, dict):
repo_config[key] = inherit_group_settings(repo_config.g(key), group_config.g(key))
elif isinstance(value, list) and isinstance(repo_config.g(key), list):
merged_lists = repo_config.g(key) + group_config.g(key)
repo_config.s(key, merged_lists)
config_inheritance.s(key, True)
else:
if not repo_config or not repo_config.g(key):
repo_config = group_config
config_inheritance.s(key, True)
_repo_config = deepcopy(repo_config)
_group_config = deepcopy(group_config)
_config_inheritance = deepcopy(repo_config)
def _inherit_group_settings(_repo_config: dict, _group_config: dict, _config_inheritance: dict) -> Tuple[dict, dict]:
if isinstance(_group_config, dict):
if not _repo_config:
# Initialize blank if not set
_repo_config = CommentedMap()
_config_inheritance = CommentedMap()
for key, value in _group_config.items():
if isinstance(value, dict):
__repo_config, __config_inheritance = _inherit_group_settings(_repo_config.g(key), _group_config.g(key), _config_inheritance.g(key))
_repo_config.s(key, __repo_config)
_config_inheritance.s(key, __config_inheritance)
elif isinstance(value, list):
# TODO: Lists containing dicts won't be updated in repo_config here
# we need to have
# for elt in list:
# recurse into elt if elt is dict
if isinstance(_repo_config.g(key), list):
merged_lists = _repo_config.g(key) + _group_config.g(key)
else:
merged_lists = _group_config.g(key)
_repo_config.s(key, merged_lists)
_config_inheritance.s(key, True)
else:
config_inheritance.s(key, False)
return repo_config, config_inheritance
# Tricky part
# repo_config may already contain a struct
if not _repo_config:
_repo_config = CommentedMap()
_config_inheritance = CommentedMap()
if not _repo_config.g(key):
_repo_config.s(key, value)
_config_inheritance.s(key, True)
else:
_config_inheritance.s(key, False)
return _repo_config, _config_inheritance
return _inherit_group_settings(_repo_config, _group_config, _config_inheritance)
try:
repo_config = deepcopy(full_config.g(f'repos.{repo_name}'))
# Let's make a copy of config since it's a "pointer object"
config_inheritance = replace_in_iterable(deepcopy(full_config.g(f'repos.{repo_name}')), False)
repo_config = deepcopy(full_config.g(f'repos.{repo_name}'))
except KeyError:
logger.error(f"No repo with name {repo_name} found in config")
return None
@ -398,30 +368,7 @@ def get_repo_config(full_config: dict, repo_name: str = 'default', eval_variable
except KeyError:
logger.warning(f"Repo {repo_name} has no group")
else:
inherit_group_settings(repo_config, group_config)
"""
sections = full_config.g(f'groups.{repo_group}')
if sections:
for section in sections:
# TODO: ordereddict.g() returns None when key doesn't exist instead of KeyError
# So we need this horrible hack
try:
if not repo_config.g(section):
repo_config.s(section, {})
config_inheritance.s(section, {})
except KeyError:
repo_config.s(section, {})
config_inheritance.s(section, {})
sub_sections = full_config.g(f'groups.{repo_group}.{section}')
if sub_sections:
for entries in sub_sections:
# Do not overwrite repo values already present
if not repo_config.g(f'{section}.{entries}'):
repo_config.s(f'{section}.{entries}', full_config.g(f'groups.{repo_group}.{section}.{entries}'))
config_inheritance.s(f'{section}.{entries}', True)
else:
config_inheritance.s(f'{section}.{entries}', False)
"""
repo_config, config_inheritance = inherit_group_settings(repo_config, group_config)
if eval_variables:
repo_config = evaluate_variables(repo_config, full_config)