WIP: permissions

This commit is contained in:
Orsiris de Jong 2024-01-22 09:39:14 +01:00
parent 1ef8887b8b
commit d63b93ea6d

View file

@ -7,7 +7,7 @@ __intname__ = "npbackup.configuration"
__author__ = "Orsiris de Jong"
__copyright__ = "Copyright (C) 2022-2024 NetInvent"
__license__ = "GPL-3.0-only"
__build__ = "2023122901"
__build__ = "2024010501"
__version__ = "2.0.0 for npbackup 2.3.0+"
MIN_CONF_VERSION = 2.3
@ -91,12 +91,16 @@ def d(self, path, sep="."):
Deletion for dot notation in a dict/OrderedDict
d.d('my.array.keys')
"""
data = self
keys = path.split(sep)
lastkey = keys[-1]
for key in keys[:-1]:
data = data[key]
data.pop(lastkey)
try:
data = self
keys = path.split(sep)
lastkey = keys[-1]
for key in keys[:-1]:
data = data[key]
data.pop(lastkey)
except KeyError:
# We don't care deleting non existent keys ^^
pass
ordereddict.g = g
@ -380,35 +384,49 @@ def evaluate_variables(repo_config: dict, full_config: dict) -> dict:
return repo_config
def extract_permissions_from_repo_config(repo_config: dict) -> dict:
def extract_permissions_from_full_config(full_config: dict) -> dict:
"""
Extract permissions and manager password from repo_uri tuple
repo_config objects in memory are always "expanded"
This function is in order to expand when loading config
"""
repo_uri = repo_config.g("repo_uri")
if isinstance(repo_uri, tuple):
repo_uri, permissions, manager_password = repo_uri
repo_config.s("permissions", permissions)
repo_config.s("manager_password", manager_password)
return repo_config
for repo in full_config.g("repos").keys():
repo_uri = full_config.g(f"repos.{repo}.repo_uri")
if isinstance(repo_uri, tuple):
repo_uri, permissions, manager_password = repo_uri
# Overwrite existing permissions / password if it was set in repo_uri
full_config.s(f"repos.{repo}.repo_uri", repo_uri)
full_config.s(f"repos.{repo}.permissions", permissions)
full_config.s(f"repos.{repo}.manager_password", manager_password)
full_config.s(f"repos.{repo}.__saved_manager_password", manager_password)
return full_config
def inject_permissions_into_repo_config(repo_config: dict) -> dict:
def inject_permissions_into_full_config(full_config: dict) -> Tuple[bool, dict]:
"""
Make sure repo_uri is a tuple containing permissions and manager password
This function is used before saving config
NPF-SEC-00006: Never inject permissions if some are already present
NPF-SEC-00006: Never inject permissions if some are already present unless current manager password equals initial one
"""
repo_uri = repo_config.g("repo_uri")
permissions = repo_config.g("permissions")
manager_password = repo_config.g("manager_password")
repo_config.s(repo_uri, (repo_uri, permissions, manager_password))
repo_config.d("repo_uri")
repo_config.d("permissions")
repo_config.d("manager_password")
return repo_config
updated_full_config = False
for repo in full_config.g("repos").keys():
repo_uri = full_config.g(f"repos.{repo}.repo_uri")
manager_password = full_config.g(f"repos.{repo}.manager_password")
permissions = full_config.g(f"repos.{repo}.permissions")
__saved_manager_password = full_config.g(f"repos.{repo}.__saved_manager_password")
if __saved_manager_password and manager_password and __saved_manager_password == manager_password:
updated_full_config = True
full_config.s(f"repos.{repo}.repo_uri", (repo_uri, permissions, manager_password))
full_config.s(f"repos.{repo}.is_protected", True)
else:
logger.info(f"Permissions are already set for repo {repo}. Will not update them unless manager password is given")
full_config.d(f"repos.{repo}.__saved_manager_password") # Don't keep decrypted manager password
full_config.d(f"repos.{repo}.permissions")
full_config.d(f"repos.{repo}.manager_password")
return updated_full_config, full_config
def get_manager_password(full_config: dict, repo_name: str) -> str:
@ -508,7 +526,6 @@ def get_repo_config(
if eval_variables:
repo_config = evaluate_variables(repo_config, full_config)
repo_config = extract_permissions_from_repo_config(repo_config)
return repo_config, config_inheritance
@ -593,6 +610,15 @@ def load_config(config_file: Path) -> Optional[dict]:
config_file_is_updated = True
logger.info("Handling random variables in configuration files")
# Inject permissions into conf file if needed
is_modified, full_config = inject_permissions_into_full_config(full_config)
if is_modified:
config_file_is_updated = True
logger.info("Handling permissions in configuration file")
# Extract permissions / password from repo
full_config = extract_permissions_from_full_config(full_config)
# save config file if needed
if config_file_is_updated:
logger.info("Updating config file")
@ -603,6 +629,8 @@ def load_config(config_file: Path) -> Optional[dict]:
def save_config(config_file: Path, full_config: dict) -> bool:
try:
with open(config_file, "w", encoding="utf-8") as file_handle:
_, full_config = inject_permissions_into_full_config(full_config)
if not is_encrypted(full_config):
full_config = crypt_config(
full_config, AES_KEY, ENCRYPTED_OPTIONS, operation="encrypt"