Enforce permissions

This commit is contained in:
deajan 2024-04-11 01:31:33 +02:00
parent b436dc2aad
commit 6279849336
4 changed files with 28 additions and 23 deletions

View file

@ -31,7 +31,6 @@
! - Post-execution script can now be force run on error / exit
! - Script result now has prometheus metrics
!- NTP server
!-permissions
## Features
- New viewer mode allowing to browse/restore restic repositories without any NPBackup configuation
@ -45,7 +44,8 @@
!- Implemented scheduled task creator for Windows & Unix
!(simple list of tasks, actions, stop on error)
- Implemented repo quick check / full check / repair index / repair snapshots / unlock / forget / prune / dump / stats commands
! Added permissions management
- Added per repo permission management
- Repos now have backup, restore and full privileges, allowing to restrict access for end users
- Added snapshot tag to snapshot list on main window
- Split npbackup into separate CLI and GUI
- Status window has been refactored so GUI now has full stdout / stderr returns from runner and backend

View file

@ -28,7 +28,7 @@ Viewer mode permissions are set to "restore".
# NPF-SEC-00006: Never inject permissions if some are already present
Since v2.3.0, we insert permissions directly into the encrypted repo URI.
Since v3.0.0, we insert permissions directly into the encrypted repo URI.
Hence, update permissions should only happen in two cases:
- CLI: Recreate repo_uri entry and add permission field from YAML file
- GUI: Enter permission password to update permissions

View file

@ -7,8 +7,8 @@ __intname__ = "npbackup.configuration"
__author__ = "Orsiris de Jong"
__copyright__ = "Copyright (C) 2022-2024 NetInvent"
__license__ = "GPL-3.0-only"
__build__ = "2024020201"
__version__ = "2.0.0 for npbackup 3.0.0+"
__build__ = "2024041101"
__version__ = "npbackup 3.0.0+"
MIN_CONF_VERSION = 3.0
MAX_CONF_VERSION = 3.0
@ -422,13 +422,18 @@ def extract_permissions_from_full_config(full_config: dict) -> dict:
"""
for repo in full_config.g("repos").keys():
repo_uri = full_config.g(f"repos.{repo}.repo_uri")
if isinstance(repo_uri, tuple):
# Extract permissions and manager password from repo_uri if set as string
if "," in repo_uri:
repo_uri = [item.strip() for item in repo_uri.split(",")]
if isinstance(repo_uri, tuple) or isinstance(repo_uri, list):
repo_uri, permissions, manager_password = repo_uri
# Overwrite existing permissions / password if it was set in repo_uri
full_config.s(f"repos.{repo}.repo_uri", repo_uri)
full_config.s(f"repos.{repo}.permissions", permissions)
full_config.s(f"repos.{repo}.manager_password", manager_password)
full_config.s(f"repos.{repo}.__current_manager_password", manager_password)
else:
logger.info(f"No extra information for repo {repo} found")
return full_config
@ -439,7 +444,6 @@ def inject_permissions_into_full_config(full_config: dict) -> Tuple[bool, dict]:
NPF-SEC-00006: Never inject permissions if some are already present unless current manager password equals initial one
"""
updated_full_config = False
for repo in full_config.g("repos").keys():
repo_uri = full_config.g(f"repos.{repo}.repo_uri")
manager_password = full_config.g(f"repos.{repo}.manager_password")
@ -452,7 +456,6 @@ def inject_permissions_into_full_config(full_config: dict) -> Tuple[bool, dict]:
__current_manager_password and manager_password
):
if __current_manager_password == manager_password:
updated_full_config = True
full_config.s(
f"repos.{repo}.repo_uri", (repo_uri, permissions, manager_password)
)
@ -467,7 +470,7 @@ def inject_permissions_into_full_config(full_config: dict) -> Tuple[bool, dict]:
) # Don't keep decrypted manager password
full_config.d(f"repos.{repo}.permissions")
full_config.d(f"repos.{repo}.manager_password")
return updated_full_config, full_config
return full_config
def get_manager_password(full_config: dict, repo_name: str) -> str:
@ -725,13 +728,7 @@ def load_config(config_file: Path) -> Optional[dict]:
config_file_is_updated = True
logger.info("Handling random variables in configuration files")
# Inject permissions into conf file if needed
is_modified, full_config = inject_permissions_into_full_config(full_config)
if is_modified:
config_file_is_updated = True
logger.info("Handling permissions in configuration file")
# Extract permissions / password from repo
# Extract permissions / password from repo if set
full_config = extract_permissions_from_full_config(full_config)
# save config file if needed
@ -744,7 +741,7 @@ def load_config(config_file: Path) -> Optional[dict]:
def save_config(config_file: Path, full_config: dict) -> bool:
try:
with open(config_file, "w", encoding="utf-8") as file_handle:
_, full_config = inject_permissions_into_full_config(full_config)
full_config = inject_permissions_into_full_config(full_config)
if not is_encrypted(full_config):
full_config = crypt_config(
@ -756,6 +753,8 @@ def save_config(config_file: Path, full_config: dict) -> bool:
full_config = crypt_config(
full_config, AES_KEY, ENCRYPTED_OPTIONS, operation="decrypt"
)
# We also need to extract permissions again
full_config = extract_permissions_from_full_config(full_config)
return True
except OSError:
logger.critical(f"Cannot save configuration file to {config_file}")

View file

@ -351,6 +351,13 @@ class NPBackupRunner:
def has_permission(fn: Callable):
"""
Decorator that checks permissions before running functions
Possible permissions are:
- backup: Backup and list backups
- restore: Backup, restore and list snapshots
- full: Full permissions
Only one permission can be set per repo
"""
@wraps(fn)
@ -383,12 +390,11 @@ class NPBackupRunner:
operation = fn.__name__
current_permissions = self.repo_config.g("permissions")
self.write_logs(
f"Permissions required for operation \'{operation}\' are {required_permissions[operation]}, current permissions are {current_permissions}",
level="info",
)
has_permissions = True # TODO: enforce permissions
if not has_permissions:
if not current_permissions in required_permissions[operation]:
self.write_logs(
f"Permissions required for operation \'{operation}\' are {required_permissions[operation]}, current permissions are {current_permissions}",
level="critical",
)
raise PermissionError
except (IndexError, KeyError, PermissionError):
self.write_logs("You don't have sufficient permissions", level="error")