mirror of
https://github.com/netinvent/npbackup.git
synced 2024-11-10 17:15:28 +08:00
Reformat files with black
This commit is contained in:
parent
b9777de96b
commit
5abb8df778
6 changed files with 114 additions and 53 deletions
|
@ -234,14 +234,18 @@ empty_config_dict = {
|
|||
},
|
||||
}
|
||||
|
||||
|
||||
def convert_to_commented_map(
|
||||
source_dict,
|
||||
):
|
||||
if isinstance(source_dict, dict):
|
||||
return CommentedMap({k: convert_to_commented_map(v) for k, v in source_dict.items()})
|
||||
return CommentedMap(
|
||||
{k: convert_to_commented_map(v) for k, v in source_dict.items()}
|
||||
)
|
||||
else:
|
||||
return source_dict
|
||||
|
||||
|
||||
def get_default_config() -> dict:
|
||||
"""
|
||||
Returns a config dict as nested CommentedMaps (used by ruamel.yaml to keep comments intact)
|
||||
|
@ -483,12 +487,22 @@ def extract_permissions_from_full_config(full_config: dict) -> dict:
|
|||
repo_uri, permissions, manager_password = repo_uri
|
||||
# Overwrite existing permissions / password if it was set in repo_uri
|
||||
full_config.s(f"{object_type}.{object_name}.repo_uri", repo_uri)
|
||||
full_config.s(f"{object_type}.{object_name}.permissions", permissions)
|
||||
full_config.s(f"{object_type}.{object_name}.manager_password", manager_password)
|
||||
full_config.s(
|
||||
f"{object_type}.{object_name}.permissions", permissions
|
||||
)
|
||||
full_config.s(
|
||||
f"{object_type}.{object_name}.manager_password",
|
||||
manager_password,
|
||||
)
|
||||
else:
|
||||
logger.info(f"No extra information for {object_type} {object_name} found")
|
||||
logger.info(
|
||||
f"No extra information for {object_type} {object_name} found"
|
||||
)
|
||||
# If no permissions are set, we get to use default permissions
|
||||
full_config.s(f"{object_type}.{object_name}.permissions", empty_config_dict["repos"]["default"]["permissions"])
|
||||
full_config.s(
|
||||
f"{object_type}.{object_name}.permissions",
|
||||
empty_config_dict["repos"]["default"]["permissions"],
|
||||
)
|
||||
full_config.s(f"{object_type}.{object_name}.manager_password", None)
|
||||
return full_config
|
||||
|
||||
|
@ -503,12 +517,17 @@ def inject_permissions_into_full_config(full_config: dict) -> Tuple[bool, dict]:
|
|||
for object_type in ("repos", "groups"):
|
||||
for object_name in full_config.g(object_type).keys():
|
||||
repo_uri = full_config.g(f"{object_type}.{object_name}.repo_uri")
|
||||
manager_password = full_config.g(f"{object_type}.{object_name}.manager_password")
|
||||
manager_password = full_config.g(
|
||||
f"{object_type}.{object_name}.manager_password"
|
||||
)
|
||||
permissions = full_config.g(f"{object_type}.{object_name}.permissions")
|
||||
update_manager_password = full_config.g(f"{object_type}.{object_name}.update_manager_password")
|
||||
update_manager_password = full_config.g(
|
||||
f"{object_type}.{object_name}.update_manager_password"
|
||||
)
|
||||
if update_manager_password and manager_password:
|
||||
full_config.s(
|
||||
f"{object_type}.{object_name}.repo_uri", (repo_uri, permissions, manager_password)
|
||||
f"{object_type}.{object_name}.repo_uri",
|
||||
(repo_uri, permissions, manager_password),
|
||||
)
|
||||
full_config.s(f"{object_type}.{object_name}.is_protected", True)
|
||||
elif manager_password:
|
||||
|
@ -622,7 +641,9 @@ def get_repo_config(
|
|||
_config_inheritance.s(key, True)
|
||||
# Case where repo_config contains list but group info has single str
|
||||
elif (
|
||||
isinstance(_repo_config.g(key), list) and value is not None and value != ""
|
||||
isinstance(_repo_config.g(key), list)
|
||||
and value is not None
|
||||
and value != ""
|
||||
):
|
||||
merged_lists = _repo_config.g(key) + [value]
|
||||
|
||||
|
|
|
@ -599,7 +599,9 @@ def _main_gui(viewer_mode: bool):
|
|||
logger.info(f"Using configuration file {config_file}")
|
||||
full_config = npbackup.configuration.load_config(config_file)
|
||||
if not full_config:
|
||||
sg.PopupError(f"{_t('main_gui.config_error')} {config_file}", keep_on_top=True)
|
||||
sg.PopupError(
|
||||
f"{_t('main_gui.config_error')} {config_file}", keep_on_top=True
|
||||
)
|
||||
config_exists = False
|
||||
else:
|
||||
config_exists = True
|
||||
|
@ -993,7 +995,9 @@ def _main_gui(viewer_mode: bool):
|
|||
repo_uri = _repo_uri
|
||||
repo_list = _repo_list
|
||||
else:
|
||||
sg.PopupError(_t("main_gui.cannot_load_config_keep_current"), keep_on_top=True)
|
||||
sg.PopupError(
|
||||
_t("main_gui.cannot_load_config_keep_current"), keep_on_top=True
|
||||
)
|
||||
if not viewer_mode and not config_file and not full_config:
|
||||
window["-NO-CONFIG-"].Update(visible=True)
|
||||
elif not viewer_mode:
|
||||
|
|
|
@ -7,7 +7,7 @@ __intname__ = "npbackup.gui.config"
|
|||
__author__ = "Orsiris de Jong"
|
||||
__copyright__ = "Copyright (C) 2022-2024 NetInvent"
|
||||
__license__ = "GPL-3.0-only"
|
||||
__build__ = "2024061601"
|
||||
__build__ = "2024072301"
|
||||
|
||||
|
||||
from typing import List, Tuple
|
||||
|
@ -159,14 +159,20 @@ def config_gui(full_config: dict, config_file: str):
|
|||
full_config.s(f"{object_type}.{object_name}", CommentedMap())
|
||||
elif object_type == "groups":
|
||||
if full_config.g(f"{object_type}.{object_name}"):
|
||||
full_config.s(f"{object_type}.{object_name}", configuration.get_default_repo_config())
|
||||
full_config.s(
|
||||
f"{object_type}.{object_name}",
|
||||
configuration.get_default_repo_config(),
|
||||
)
|
||||
elif object_type == "groups":
|
||||
if full_config.g(f"{object_type}.{object_name}"):
|
||||
sg.PopupError(
|
||||
_t("config_gui.group_already_exists"), keep_on_top=True
|
||||
)
|
||||
continue
|
||||
full_config.s(f"groups.{object_name}", configuration.get_default_group_config())
|
||||
full_config.s(
|
||||
f"groups.{object_name}",
|
||||
configuration.get_default_group_config(),
|
||||
)
|
||||
else:
|
||||
raise ValueError("Bogus object type given")
|
||||
break
|
||||
|
@ -184,7 +190,9 @@ def config_gui(full_config: dict, config_file: str):
|
|||
update_object_gui(full_config, None, unencrypted=False)
|
||||
return full_config
|
||||
|
||||
def update_object_selector(object_name: str = None, object_type: str = None) -> None:
|
||||
def update_object_selector(
|
||||
object_name: str = None, object_type: str = None
|
||||
) -> None:
|
||||
object_list = get_objects()
|
||||
if not object_name or not object_type:
|
||||
object = object_list[0]
|
||||
|
@ -192,7 +200,7 @@ def config_gui(full_config: dict, config_file: str):
|
|||
object = f"{object_type.capitalize()}: {object_name}"
|
||||
print(object_list)
|
||||
print(object)
|
||||
|
||||
|
||||
window["-OBJECT-SELECT-"].Update(values=object_list)
|
||||
window["-OBJECT-SELECT-"].Update(value=object)
|
||||
|
||||
|
@ -239,7 +247,7 @@ def config_gui(full_config: dict, config_file: str):
|
|||
"prometheus.instance",
|
||||
"prometheus.http_username",
|
||||
"prometheus.http_password",
|
||||
"update_manager_password"
|
||||
"update_manager_password",
|
||||
) or key.startswith("prometheus.additional_labels"):
|
||||
return
|
||||
if key == "permissions":
|
||||
|
@ -437,7 +445,9 @@ def config_gui(full_config: dict, config_file: str):
|
|||
|
||||
_iter_over_config(object_config, root_key)
|
||||
|
||||
def update_object_gui(full_config: dict, object_name: str = None, unencrypted: bool = False):
|
||||
def update_object_gui(
|
||||
full_config: dict, object_name: str = None, unencrypted: bool = False
|
||||
):
|
||||
nonlocal backup_paths_tree
|
||||
nonlocal tags_tree
|
||||
nonlocal exclude_files_tree
|
||||
|
@ -448,7 +458,6 @@ def config_gui(full_config: dict, config_file: str):
|
|||
nonlocal env_variables_tree
|
||||
nonlocal encrypted_env_variables_tree
|
||||
|
||||
|
||||
# Load fist available repo or group if none given
|
||||
if not object_name:
|
||||
object_name = get_objects()[0]
|
||||
|
@ -706,7 +715,9 @@ def config_gui(full_config: dict, config_file: str):
|
|||
current_perm = permissions[-1]
|
||||
else:
|
||||
current_perm = combo_boxes["permissions"][current_perm]
|
||||
manager_password = full_config.g(f"{object_type}.{object_name}.manager_password")
|
||||
manager_password = full_config.g(
|
||||
f"{object_type}.{object_name}.manager_password"
|
||||
)
|
||||
|
||||
layout = [
|
||||
[
|
||||
|
@ -757,8 +768,13 @@ def config_gui(full_config: dict, config_file: str):
|
|||
combo_boxes["permissions"], values["permissions"]
|
||||
)
|
||||
full_config.s(f"{object_type}.{object_name}.permissions", permission)
|
||||
full_config.s(f"{object_type}.{object_name}.manager_password", values["-MANAGER-PASSWORD-"])
|
||||
full_config.s(f"{object_type}.{object_name}.update_manager_password", True)
|
||||
full_config.s(
|
||||
f"{object_type}.{object_name}.manager_password",
|
||||
values["-MANAGER-PASSWORD-"],
|
||||
)
|
||||
full_config.s(
|
||||
f"{object_type}.{object_name}.update_manager_password", True
|
||||
)
|
||||
break
|
||||
window.close()
|
||||
return full_config
|
||||
|
@ -1239,17 +1255,14 @@ def config_gui(full_config: dict, config_file: str):
|
|||
sg.Input(key="repo_opts.repo_password_command", size=(95, 1)),
|
||||
],
|
||||
[
|
||||
|
||||
sg.Text(_t("config_gui.current_permissions"), size=(40, 1)),
|
||||
sg.Text("Default", key="current_permissions", size=(25, 1))
|
||||
sg.Text("Default", key="current_permissions", size=(25, 1)),
|
||||
],
|
||||
[
|
||||
sg.Text(_t("config_gui.manager_password_set"), size=(40, 1)),
|
||||
sg.Text(_t("generic.no"), key="manager_password_set", size=(25, 1))
|
||||
],
|
||||
[
|
||||
sg.Button(_t("config_gui.set_permissions"), key="--SET-PERMISSIONS--")
|
||||
sg.Text(_t("generic.no"), key="manager_password_set", size=(25, 1)),
|
||||
],
|
||||
[sg.Button(_t("config_gui.set_permissions"), key="--SET-PERMISSIONS--")],
|
||||
[
|
||||
sg.Text(_t("config_gui.repo_group"), size=(40, 1)),
|
||||
sg.Combo(
|
||||
|
@ -1949,8 +1962,14 @@ Google Cloud storage: GOOGLE_PROJECT_ID GOOGLE_APPLICATION_CREDENTIALS\n\
|
|||
)
|
||||
if not manager_password or ask_manager_password(manager_password):
|
||||
# We need to update full_config with current GUI values before using modifying it
|
||||
full_config = update_config_dict(full_config, current_object_type, current_object_name, values)
|
||||
full_config = set_permissions(full_config, object_type=object_type, object_name=values["-OBJECT-SELECT-"])
|
||||
full_config = update_config_dict(
|
||||
full_config, current_object_type, current_object_name, values
|
||||
)
|
||||
full_config = set_permissions(
|
||||
full_config,
|
||||
object_type=object_type,
|
||||
object_name=values["-OBJECT-SELECT-"],
|
||||
)
|
||||
update_object_gui(full_config, values["-OBJECT-SELECT-"])
|
||||
continue
|
||||
if event in (
|
||||
|
@ -2079,7 +2098,8 @@ Google Cloud storage: GOOGLE_PROJECT_ID GOOGLE_APPLICATION_CREDENTIALS\n\
|
|||
INHERITED_FOLDER_ICON,
|
||||
):
|
||||
sg.PopupError(
|
||||
_t("config_gui.cannot_remove_group_inherited_settings"), keep_on_top=True
|
||||
_t("config_gui.cannot_remove_group_inherited_settings"),
|
||||
keep_on_top=True,
|
||||
)
|
||||
continue
|
||||
tree.delete(key)
|
||||
|
@ -2087,7 +2107,9 @@ Google Cloud storage: GOOGLE_PROJECT_ID GOOGLE_APPLICATION_CREDENTIALS\n\
|
|||
continue
|
||||
if event == "--ACCEPT--":
|
||||
if object_type != "groups" and not values["repo_uri"]:
|
||||
sg.PopupError(_t("config_gui.repo_uri_cannot_be_empty"), keep_on_top=True)
|
||||
sg.PopupError(
|
||||
_t("config_gui.repo_uri_cannot_be_empty"), keep_on_top=True
|
||||
)
|
||||
continue
|
||||
full_config = update_config_dict(
|
||||
full_config, current_object_type, current_object_name, values
|
||||
|
@ -2107,12 +2129,16 @@ Google Cloud storage: GOOGLE_PROJECT_ID GOOGLE_APPLICATION_CREDENTIALS\n\
|
|||
# NPF-SEC-00009
|
||||
env_manager_password = os.environ.get("NPBACKUP_MANAGER_PASSWORD", None)
|
||||
if not manager_password:
|
||||
sg.PopupError(_t("config_gui.no_manager_password_defined"), keep_on_top=True)
|
||||
sg.PopupError(
|
||||
_t("config_gui.no_manager_password_defined"), keep_on_top=True
|
||||
)
|
||||
continue
|
||||
if (
|
||||
env_manager_password and env_manager_password == manager_password
|
||||
) or ask_manager_password(manager_password):
|
||||
update_object_gui(full_config, values["-OBJECT-SELECT-"], unencrypted=True)
|
||||
update_object_gui(
|
||||
full_config, values["-OBJECT-SELECT-"], unencrypted=True
|
||||
)
|
||||
update_global_gui(full_config, unencrypted=True)
|
||||
continue
|
||||
if event in ("create_interval_task", "create_daily_task"):
|
||||
|
@ -2126,10 +2152,14 @@ Google Cloud storage: GOOGLE_PROJECT_ID GOOGLE_APPLICATION_CREDENTIALS\n\
|
|||
if result:
|
||||
sg.Popup(_t("config_gui.scheduled_task_creation_success"))
|
||||
else:
|
||||
sg.PopupError(_t("config_gui.scheduled_task_creation_failure"), keep_on_top=True)
|
||||
sg.PopupError(
|
||||
_t("config_gui.scheduled_task_creation_failure"),
|
||||
keep_on_top=True,
|
||||
)
|
||||
except ValueError as exc:
|
||||
sg.PopupError(
|
||||
_t("config_gui.scheduled_task_creation_failure") + f": {exc}", keep_on_top=True
|
||||
_t("config_gui.scheduled_task_creation_failure") + f": {exc}",
|
||||
keep_on_top=True,
|
||||
)
|
||||
continue
|
||||
window.close()
|
||||
|
|
|
@ -259,9 +259,9 @@ def gui_thread_runner(
|
|||
if stdout_data is None:
|
||||
logger.debug("gui_thread_runner got stdout queue close signal")
|
||||
read_stdout_queue = False
|
||||
#progress_window["-OPERATIONS-PROGRESS-STDOUT-"].Update(
|
||||
# progress_window["-OPERATIONS-PROGRESS-STDOUT-"].Update(
|
||||
# "\n", append=True
|
||||
#)
|
||||
# )
|
||||
else:
|
||||
stdout_data = stdout_data.strip("\r\n")
|
||||
progress_window["-OPERATIONS-PROGRESS-STDOUT-"].Update(
|
||||
|
|
|
@ -13,7 +13,12 @@ __build__ = "2024061601"
|
|||
import os
|
||||
from logging import getLogger
|
||||
import npbackup.gui.PySimpleGUI as sg
|
||||
from npbackup.configuration import get_repo_config, get_group_list, get_repos_by_group, get_manager_password
|
||||
from npbackup.configuration import (
|
||||
get_repo_config,
|
||||
get_group_list,
|
||||
get_repos_by_group,
|
||||
get_manager_password,
|
||||
)
|
||||
from npbackup.core.i18n_helper import _t
|
||||
from npbackup.gui.helpers import get_anon_repo_uri, gui_thread_runner
|
||||
from resources.customization import (
|
||||
|
@ -91,9 +96,7 @@ def operations_gui(full_config: dict) -> dict:
|
|||
repo_list = []
|
||||
for group_index in values["-GROUP_LIST-"]:
|
||||
group_name = group_list[group_index]
|
||||
repo_list += get_repos_by_group(
|
||||
full_config, group_name
|
||||
)
|
||||
repo_list += get_repos_by_group(full_config, group_name)
|
||||
result = repo_list
|
||||
break
|
||||
if event == "--APPLY_TO_ALL--":
|
||||
|
@ -199,7 +202,7 @@ def operations_gui(full_config: dict) -> dict:
|
|||
)
|
||||
]
|
||||
]
|
||||
|
||||
|
||||
right_click_menu = ["", [_t("config_gui.show_decrypted")]]
|
||||
|
||||
window = sg.Window(
|
||||
|
@ -237,18 +240,20 @@ def operations_gui(full_config: dict) -> dict:
|
|||
if not object_name:
|
||||
sg.PopupError(_t("operations_gui.no_repo_selected"), keep_on_top=True)
|
||||
continue
|
||||
manager_password = get_manager_password(
|
||||
full_config, object_name
|
||||
)
|
||||
manager_password = get_manager_password(full_config, object_name)
|
||||
# NPF-SEC-00009
|
||||
env_manager_password = os.environ.get("NPBACKUP_MANAGER_PASSWORD", None)
|
||||
if not manager_password:
|
||||
sg.PopupError(_t("config_gui.no_manager_password_defined"), keep_on_top=True)
|
||||
sg.PopupError(
|
||||
_t("config_gui.no_manager_password_defined"), keep_on_top=True
|
||||
)
|
||||
continue
|
||||
if (
|
||||
env_manager_password and env_manager_password == manager_password
|
||||
) or ask_manager_password(manager_password):
|
||||
complete_repo_list = gui_update_state(window, full_config, unencrypted=object_name)
|
||||
complete_repo_list = gui_update_state(
|
||||
window, full_config, unencrypted=object_name
|
||||
)
|
||||
continue
|
||||
if event in (
|
||||
"--QUICK-CHECK--",
|
||||
|
@ -272,9 +277,7 @@ def operations_gui(full_config: dict) -> dict:
|
|||
if not repos:
|
||||
continue
|
||||
for repo_name in repos:
|
||||
repo_config, __annotations__ = get_repo_config(
|
||||
full_config, repo_name
|
||||
)
|
||||
repo_config, __annotations__ = get_repo_config(full_config, repo_name)
|
||||
repo_config_list.append(repo_config)
|
||||
operation = None
|
||||
op_args = None
|
||||
|
|
|
@ -824,10 +824,13 @@ class ResticRunner:
|
|||
if exclude_file:
|
||||
if os.path.isfile(exclude_file):
|
||||
cmd += f' --{case_ignore_param}exclude-file "{exclude_file}"'
|
||||
elif os.path.isfile(os.path.join(CURRENT_DIR, os.path.basename(exclude_file))):
|
||||
elif os.path.isfile(
|
||||
os.path.join(CURRENT_DIR, os.path.basename(exclude_file))
|
||||
):
|
||||
cmd += f' --{case_ignore_param}exclude-file "{os.path.join(CURRENT_DIR, os.path.basename(exclude_file))}"'
|
||||
self.write_logs(
|
||||
f"Expanding exclude file path to {CURRENT_DIR}", level="info"
|
||||
f"Expanding exclude file path to {CURRENT_DIR}",
|
||||
level="info",
|
||||
)
|
||||
else:
|
||||
self.write_logs(
|
||||
|
|
Loading…
Reference in a new issue