Reformat files with black

This commit is contained in:
deajan 2024-04-21 16:06:47 +02:00
parent 21ec56ea55
commit 2380dcb1e4
5 changed files with 59 additions and 46 deletions

View file

@ -82,7 +82,7 @@ This is free software, and you are welcome to redistribute it under certain cond
type=str,
default=None,
required=False,
help="Comme separated list of groups to work with. Can accept special name '__all__' to work with all repositories."
help="Comme separated list of groups to work with. Can accept special name '__all__' to work with all repositories.",
)
parser.add_argument("-b", "--backup", action="store_true", help="Run a backup")
parser.add_argument(
@ -237,28 +237,28 @@ This is free software, and you are welcome to redistribute it under certain cond
"--show-config",
action="store_true",
required=False,
help="Show full inherited configuration for current repo"
help="Show full inherited configuration for current repo",
)
parser.add_argument(
"--manager-password",
type=str,
default=None,
required=False,
help="Optional manager password when showing config"
help="Optional manager password when showing config",
)
parser.add_argument(
"--external-backend-binary",
type=str,
default=None,
required=False,
help="Full path to alternative external backend binary"
help="Full path to alternative external backend binary",
)
parser.add_argument(
"--group-operation",
type=str,
default=None,
required=False,
help="Launch an operation on a group of repositories given by --repo-group"
help="Launch an operation on a group of repositories given by --repo-group",
)
args = parser.parse_args()
@ -317,7 +317,7 @@ This is free software, and you are welcome to redistribute it under certain cond
msg = "Cannot obtain repo config"
json_error_logging(False, msg, "critical")
sys.exit(71)
if not args.group_operation:
repo_name = None
if not args.repo_name:
@ -329,7 +329,7 @@ This is free software, and you are welcome to redistribute it under certain cond
sys.exit(72)
else:
repo_config = None
binary = None
if args.external_backend_binary:
binary = args.external_backend_binary
@ -337,7 +337,7 @@ This is free software, and you are welcome to redistribute it under certain cond
msg = f"External backend binary {binary} cannot be found."
json_error_logging(False, msg, "critical")
sys.exit(73)
if args.show_config:
# NPF-SEC-00009
# Load an anonymous version of the repo config
@ -348,11 +348,13 @@ This is free software, and you are welcome to redistribute it under certain cond
if __current_manager_password == args.manager_password:
show_encrypted = True
else:
# NPF-SEC
sleep(2) # Sleep to avoid brute force attacks
# NPF-SEC
sleep(2) # Sleep to avoid brute force attacks
logger.error("Wrong manager password")
sys.exit(74)
repo_config = npbackup.configuration.get_anonymous_repo_config(repo_config, show_encrypted=show_encrypted)
repo_config = npbackup.configuration.get_anonymous_repo_config(
repo_config, show_encrypted=show_encrypted
)
print(json.dumps(repo_config, indent=4))
sys.exit(0)
@ -366,7 +368,9 @@ This is free software, and you are welcome to redistribute it under certain cond
except KeyError:
auto_upgrade_interval = 10
if (auto_upgrade and upgrade_runner.need_upgrade(auto_upgrade_interval)) or args.auto_upgrade:
if (
auto_upgrade and upgrade_runner.need_upgrade(auto_upgrade_interval)
) or args.auto_upgrade:
if args.auto_upgrade:
logger.info("Running user initiated auto upgrade")
else:
@ -406,7 +410,9 @@ This is free software, and you are welcome to redistribute it under certain cond
cli_args["op_args"] = {"force": args.force}
elif args.restore or args.group_operation == "restore":
if args.restore_includes:
restore_includes = [include.strip() for include in args.restore_includes.split(',')]
restore_includes = [
include.strip() for include in args.restore_includes.split(",")
]
else:
restore_includes = None
cli_args["operation"] = "restore"
@ -465,13 +471,15 @@ This is free software, and you are welcome to redistribute it under certain cond
repo_config_list = []
if args.group_operation:
if args.repo_group:
groups = [group.strip() for group in args.repo_group.split(',')]
groups = [group.strip() for group in args.repo_group.split(",")]
for group in groups:
repos = npbackup.configuration.get_repos_by_group(full_config, group)
elif args.repo_name:
repos = [repo.strip() for repo in args.repo_name.split(',')]
repos = [repo.strip() for repo in args.repo_name.split(",")]
else:
logger.critical("No repository names or groups have been provided for group operation. Please use --repo-group or --repo-name")
logger.critical(
"No repository names or groups have been provided for group operation. Please use --repo-group or --repo-name"
)
sys.exit(74)
for repo in repos:
repo_config, _ = npbackup.configuration.get_repo_config(full_config, repo)
@ -483,7 +491,7 @@ This is free software, and you are welcome to redistribute it under certain cond
cli_args["op_args"] = {
"repo_config_list": repo_config_list,
"operation": args.group_operation,
**cli_args["op_args"]
**cli_args["op_args"],
}
if cli_args["operation"]:

View file

@ -791,17 +791,17 @@ def get_repos_by_group(full_config: dict, group: str) -> List[str]:
if full_config:
for repo in list(full_config.g("repos").keys()):
if (
(full_config.g(f"repos.{repo}.repo_group") == group or group == "__all__")
and group not in repo_list
):
full_config.g(f"repos.{repo}.repo_group") == group or group == "__all__"
) and group not in repo_list:
repo_list.append(repo)
return repo_list
def get_anonymous_repo_config(repo_config: dict, show_encrypted: bool = False) -> dict:
"""
Replace each encrypted value with
Replace each encrypted value with
"""
def _get_anonymous_repo_config(key: str, value: Any) -> Any:
if key_should_be_encrypted(key, ENCRYPTED_OPTIONS):
if isinstance(value, list):
@ -810,7 +810,7 @@ def get_anonymous_repo_config(repo_config: dict, show_encrypted: bool = False) -
else:
value = "__(o_O)__"
return value
# NPF-SEC-00008: Don't show manager password / sensible data with --show-config
repo_config.pop("manager_password", None)
repo_config.pop("__current_manager_password", None)
@ -821,4 +821,4 @@ def get_anonymous_repo_config(repo_config: dict, show_encrypted: bool = False) -
_get_anonymous_repo_config,
callable_wants_key=True,
callable_wants_root_key=True,
)
)

View file

@ -242,10 +242,7 @@ class NPBackupRunner:
@binary.setter
def binary(self, value):
if (
not isinstance(value, str)
or not os.path.isfile(value)
):
if not isinstance(value, str) or not os.path.isfile(value):
raise ValueError("Backend binary {value} is not readable")
self._binary = value
@ -417,7 +414,10 @@ class NPBackupRunner:
operation = fn.__name__
current_permissions = self.repo_config.g("permissions")
if current_permissions and not current_permissions in required_permissions[operation]:
if (
current_permissions
and not current_permissions in required_permissions[operation]
):
self.write_logs(
f"Required permissions for operation '{operation}' must be in {required_permissions[operation]}, current permission is [{current_permissions}]",
level="critical",
@ -1330,7 +1330,8 @@ class NPBackupRunner:
)
else:
self.write_logs(
f"Operation {operation} failed for repo {repo_name}", level="error"
f"Operation {operation} failed for repo {repo_name}",
level="error",
)
if not result:
group_result = False

View file

@ -64,7 +64,9 @@ sg.theme(PYSIMPLEGUI_THEME)
sg.SetOptions(icon=OEM_ICON)
def about_gui(version_string: str, full_config: dict = None, auto_upgrade_result: bool = False) -> None:
def about_gui(
version_string: str, full_config: dict = None, auto_upgrade_result: bool = False
) -> None:
if auto_upgrade_result:
new_version = [
sg.Button(
@ -449,13 +451,15 @@ def _main_gui(viewer_mode: bool):
if full_config and full_config.g("global_options.auto_upgrade_server_url"):
auto_upgrade_result = upgrade_runner.check_new_version(full_config)
if auto_upgrade_result:
r = sg.Popup(_t("config_gui.auto_upgrade_launch"), custom_text=(_t("generic.yes"), _t("generic.no")))
r = sg.Popup(
_t("config_gui.auto_upgrade_launch"),
custom_text=(_t("generic.yes"), _t("generic.no")),
)
if r == _t("generic.yes"):
result = upgrade_runner.run_upgrade(full_config)
if not result:
sg.Popup(_t("config_gui.auto_upgrade_failed"))
def select_config_file(config_file: str = None) -> None:
"""
Option to select a configuration file

View file

@ -286,14 +286,14 @@ def config_gui(full_config: dict, config_file: str):
tree = prometheus_labels_tree
for val in value:
if object_type != "group" and inherited[val]:
icon = INHERITED_TREE_ICON
else:
icon = TREE_ICON
tree.insert("", val, val, val, icon=icon)
if object_type != "group" and inherited[val]:
icon = INHERITED_TREE_ICON
else:
icon = TREE_ICON
tree.insert("", val, val, val, icon=icon)
window[key].Update(values=tree)
return
if key in ("env.env_variables", "env.encrypted_env_variables"):
if key == "env.env_variables":
tree = env_variables_tree
@ -308,7 +308,7 @@ def config_gui(full_config: dict, config_file: str):
tree.insert("", skey, skey, values=[val], icon=icon)
window[key].Update(values=tree)
return
# Update units into separate value and unit combobox
if key in (
"backup_opts.minimum_backup_size_error",
@ -465,7 +465,6 @@ def config_gui(full_config: dict, config_file: str):
else:
object_group = None
# We need to patch values since sg.Tree() only returns selected data from TreeData()
# Hence we'll fill values with a list or a dict depending on our TreeData data structure
# @PysimpleGUI: there should be a get_all_values() method or something
@ -496,7 +495,6 @@ def config_gui(full_config: dict, config_file: str):
if key and node.values:
values[tree_data_key][key] = node.values[0]
# Special treatment for env.encrypted_env_variables since they might contain an ENCRYPTED_DATA_PLACEHOLDER
# We need to update the placeholder to the actual value if exists
for k, v in values["env.encrypted_env_variables"].items():
@ -585,10 +583,10 @@ def config_gui(full_config: dict, config_file: str):
# Finally, update the config dictionary
# Debug WIP
#if object_type == "group":
#print(f"UPDATING {active_object_key} curr={current_value} new={value}")
#else:
#print(f"UPDATING {active_object_key} curr={current_value} inherited={inherited} new={value}")
# if object_type == "group":
# print(f"UPDATING {active_object_key} curr={current_value} new={value}")
# else:
# print(f"UPDATING {active_object_key} curr={current_value} inherited={inherited} new={value}")
full_config.s(active_object_key, value)
return full_config
@ -1126,7 +1124,9 @@ def config_gui(full_config: dict, config_file: str):
[sg.Button(_t("config_gui.set_permissions"), key="--SET-PERMISSIONS--")],
[
sg.Text(_t("config_gui.repo_group"), size=(40, 1)),
sg.Combo(values=configuration.get_group_list(full_config), key="repo_group"),
sg.Combo(
values=configuration.get_group_list(full_config), key="repo_group"
),
],
[
sg.Text(