Reformat files with black

This commit is contained in:
deajan 2024-10-01 00:32:12 +02:00
parent b0ee7112fc
commit 7fca974d9a
3 changed files with 65 additions and 45 deletions

View file

@ -1422,13 +1422,20 @@ class NPBackupRunner:
prune_result = self.prune(**kwargs)
result = prune_result
else:
self.write_logs("Forget failed. Won't continue housekeeping on repo", level="error")
self.write_logs(
"Forget failed. Won't continue housekeeping on repo",
level="error",
)
result = forget_result
else:
self.write_logs("Check failed. Won't continue housekeeping on repo", level="error")
self.write_logs(
"Check failed. Won't continue housekeeping on repo", level="error"
)
result = check_result
else:
self.write_logs("Unlock failed. Won't continue housekeeping in repo", level="error")
self.write_logs(
"Unlock failed. Won't continue housekeeping in repo", level="error"
)
result = unlock_result
return self.convert_to_json_output(result)

View file

@ -36,6 +36,7 @@ logger = getLogger(__intname__)
gui_object = namedtuple("GuiOjbect", ["type", "name", "group", "backend", "uri"])
def gui_update_state(window, full_config: dict, unencrypted: str = None) -> list:
repo_and_group_list = []
try:
@ -49,7 +50,9 @@ def gui_update_state(window, full_config: dict, unencrypted: str = None) -> list
repo_group = repo_config.g("repo_group")
if not unencrypted and unencrypted != repo_name:
repo_uri = ENCRYPTED_DATA_PLACEHOLDER
repo_and_group_list.append(gui_object("repo", repo_name, repo_group, backend_type, repo_uri))
repo_and_group_list.append(
gui_object("repo", repo_name, repo_group, backend_type, repo_uri)
)
else:
logger.warning("Incomplete URI/password for repo {}".format(repo_name))
for group_name in get_group_list(full_config):
@ -66,7 +69,6 @@ def task_scheduler(repos: list):
"""
task = namedtuple("Tasks", ["task", "hour", "minute", "day", "month", "weekday"])
def _get_current_tasks():
"""
mock tasks
@ -75,7 +77,7 @@ def task_scheduler(repos: list):
task("housekeeping", 0, 0, "*", "*", "*"),
task("check", 0, 0, "*", "*", "*"),
]
def _update_task_list(window):
tasks = _get_current_tasks()
task_list = []
@ -83,7 +85,6 @@ def task_scheduler(repos: list):
task_list.append(task)
window["-TASKS-"].update(values=task_list)
actions = [
"housekeeping",
"check",
@ -111,11 +112,18 @@ def task_scheduler(repos: list):
sg.Text(_t("operations_gui.select_action")),
],
[
sg.Combo(values=actions, default_value="housekeeping", key="-ACTION-", size=(20, 1)),
]
sg.Combo(
values=actions,
default_value="housekeeping",
key="-ACTION-",
size=(20, 1),
),
],
]
window = sg.Window(layout=layout, title=_t("operations_gui.task_scheduler"), finalize=True)
window = sg.Window(
layout=layout, title=_t("operations_gui.task_scheduler"), finalize=True
)
_update_task_list(window)
while True:
event, values = window.read()
@ -124,7 +132,6 @@ def task_scheduler(repos: list):
break
def operations_gui(full_config: dict) -> dict:
"""
Operate on one or multiple repositories, or groups
@ -132,7 +139,10 @@ def operations_gui(full_config: dict) -> dict:
def _get_repo_list(selected_rows):
if not values["repo-and-group-list"]:
if sg.popup_yes_no(_t("operations_gui.no_repo_selected"), keep_on_top=True) == "No":
if (
sg.popup_yes_no(_t("operations_gui.no_repo_selected"), keep_on_top=True)
== "No"
):
return False
repos = get_repo_list(full_config)
else:
@ -197,7 +207,7 @@ def operations_gui(full_config: dict) -> dict:
_t("operations_gui.task_scheduler"),
key="--TASK-SCHEDULER--",
size=(45, 1),
)
),
],
[
sg.Button(
@ -257,7 +267,10 @@ def operations_gui(full_config: dict) -> dict:
],
[
sg.Button(
_t("operations_gui.unlock"), key="--UNLOCK--", size=(45, 1), visible=False
_t("operations_gui.unlock"),
key="--UNLOCK--",
size=(45, 1),
visible=False,
),
sg.Button(
_t("operations_gui.recover"),

View file

@ -24,19 +24,12 @@ from npbackup.__version__ import IS_COMPILED
logger = getLogger()
def scheduled_task_exists(
config_file: str,
type: str,
object: str
):
def scheduled_task_exists(config_file: str, type: str, object: str):
if os.name == "nt":
return _scheduled_task_exists_windows(
config_file, type, object
)
return _scheduled_task_exists_windows(config_file, type, object)
else:
return _scheduled_task_exists_unix(
config_file, type, object
)
return _scheduled_task_exists_unix(config_file, type, object)
def get_scheduled_tasks(
config_file: str,
@ -47,9 +40,11 @@ def get_scheduled_tasks(
return _get_scheduled_tasks_windows(config_file, type, repo_names)
else:
return _get_scheduled_tasks_unix(config_file, type, repo_names)
def _get_scheduled_tasks_windows(config_file: str, type: str, repo_names: List[str] = None):
def _get_scheduled_tasks_windows(
config_file: str, type: str, repo_names: List[str] = None
):
task_name = _get_scheduled_task_name_windows(config_file, type, repo_names)
exit_code, output = command_runner(
'schtasks /QUERY /TN "{}" /FO LIST'.format(task_name),
@ -62,12 +57,7 @@ def _get_scheduled_tasks_windows(config_file: str, type: str, repo_names: List[s
return output
def _scheduled_task_exists_unix(
config_file: str,
type: str,
object_args: str
) -> bool:
def _scheduled_task_exists_unix(config_file: str, type: str, object_args: str) -> bool:
cron_file = "/etc/cron.d/npbackup"
try:
with open(cron_file, "r", encoding="utf-8") as file_handle:
@ -151,11 +141,25 @@ def create_scheduled_task(
if os.name == "nt":
return create_scheduled_task_windows(
config_file, type, CURRENT_EXECUTABLE, subject, object_args, interval_minutes, hour, minute
config_file,
type,
CURRENT_EXECUTABLE,
subject,
object_args,
interval_minutes,
hour,
minute,
)
else:
return create_scheduled_task_unix(
config_file, type, CURRENT_EXECUTABLE, subject, object_args, interval_minutes, hour, minute
config_file,
type,
CURRENT_EXECUTABLE,
subject,
object_args,
interval_minutes,
hour,
minute,
)
@ -225,17 +229,11 @@ def create_scheduled_task_unix(
return True
def _get_scheduled_task_name_windows(
config_file: str,
type: str,
subject: str
) -> str:
return f"{PROGRAM_NAME} - {type.capitalize()} {config_file} {subject}"
def _get_scheduled_task_name_windows(config_file: str, type: str, subject: str) -> str:
return f"{PROGRAM_NAME} - {type.capitalize()} {config_file} {subject}"
def _scheduled_task_exists_windows(
task_name
) -> bool:
def _scheduled_task_exists_windows(task_name) -> bool:
exit_code, _ = command_runner(
'schtasks /TN "{}"'.format(task_name),
windows_no_window=True,
@ -278,7 +276,9 @@ def create_scheduled_task_windows(
<Enabled>true</Enabled>
</TimeTrigger>"""
elif hour is not None and minute is not None:
task_args = f'{task_args}-c "{config_file}" --{type} --force --run-as-cli{object_args}'
task_args = (
f'{task_args}-c "{config_file}" --{type} --force --run-as-cli{object_args}'
)
start_date = (
datetime.datetime.now()
.replace(microsecond=0, hour=hour, minute=minute, second=0)