npbackup/npbackup/gui/operations.py
2024-06-21 03:28:13 +02:00

337 lines
12 KiB
Python

#! /usr/bin/env python
# -*- coding: utf-8 -*-
#
# This file is part of npbackup
__intname__ = "npbackup.gui.operations"
__author__ = "Orsiris de Jong"
__copyright__ = "Copyright (C) 2023-2024 NetInvent"
__license__ = "GPL-3.0-only"
__build__ = "2024061601"
import os
from logging import getLogger
import npbackup.gui.PySimpleGUI as sg
from npbackup.configuration import get_repo_config, get_group_list, get_repos_by_group, get_manager_password
from npbackup.core.i18n_helper import _t
from npbackup.gui.helpers import get_anon_repo_uri, gui_thread_runner
from resources.customization import (
OEM_STRING,
OEM_LOGO,
)
from npbackup.gui.config import ENCRYPTED_DATA_PLACEHOLDER, ask_manager_password
logger = getLogger(__intname__)
def gui_update_state(window, full_config: dict, unencrypted: str = None) -> list:
repo_list = []
try:
for repo_name in full_config.g("repos"):
repo_config, _ = get_repo_config(full_config, repo_name)
if repo_config.g(f"repo_uri") and (
repo_config.g(f"repo_opts.repo_password")
or repo_config.g(f"repo_opts.repo_password_command")
):
backend_type, repo_uri = get_anon_repo_uri(repo_config.g(f"repo_uri"))
repo_group = repo_config.g("repo_group")
if not unencrypted and unencrypted != repo_name:
repo_uri = ENCRYPTED_DATA_PLACEHOLDER
repo_list.append([repo_name, repo_group, backend_type, repo_uri])
else:
logger.warning("Incomplete operations repo {}".format(repo_name))
except KeyError:
logger.info("No operations repos configured")
window["repo-list"].update(repo_list)
return repo_list
def operations_gui(full_config: dict) -> dict:
"""
Operate on one or multiple repositories
"""
def _select_groups():
group_list = get_group_list(full_config)
selector_layout = [
[
sg.Table(
values=group_list,
headings=["Group Name"],
key="-GROUP_LIST-",
auto_size_columns=True,
justification="left",
expand_x=True,
expand_y=True,
)
],
[
sg.Push(),
sg.Button(_t("generic.cancel"), key="--CANCEL--"),
sg.Button(
_t("operations_gui.apply_to_selected_groups"),
key="--SELECTED_GROUPS--",
),
sg.Button(_t("operations_gui.apply_to_all"), key="--APPLY_TO_ALL--"),
],
]
select_group_window = sg.Window("Group", selector_layout)
while True:
event, values = select_group_window.read()
if event in (sg.WIN_CLOSED, sg.WIN_X_EVENT, "--CANCEL--"):
result = []
break
if event == "--SELECTED_GROUPS--":
if not values["-GROUP_LIST-"]:
sg.Popup(_t("operations_gui.no_groups_selected"))
continue
repo_list = []
for group_index in values["-GROUP_LIST-"]:
group_name = group_list[group_index]
repo_list += get_repos_by_group(
full_config, group_name
)
result = repo_list
break
if event == "--APPLY_TO_ALL--":
result = []
for value in complete_repo_list:
result.append(value[0])
break
select_group_window.close()
return result
# This is a stupid hack to make sure uri column is large enough
headings = [
"Name ",
"Group ",
"Backend",
"URI ",
]
layout = [
[
sg.Column(
[
[
sg.Column(
[[sg.Image(data=OEM_LOGO)]],
vertical_alignment="top",
),
sg.Column(
[
[sg.Text(OEM_STRING, font="Arial 14")],
],
justification="C",
element_justification="C",
vertical_alignment="top",
),
],
[sg.Text(_t("operations_gui.configured_repositories"))],
[
sg.Table(
values=[[]],
headings=headings,
key="repo-list",
auto_size_columns=True,
justification="left",
),
],
[
sg.Button(
_t("operations_gui.quick_check"),
key="--QUICK-CHECK--",
size=(45, 1),
),
sg.Button(
_t("operations_gui.full_check"),
key="--FULL-CHECK--",
size=(45, 1),
),
],
[
sg.Button(
_t("operations_gui.repair_index"),
key="--REPAIR-INDEX--",
size=(45, 1),
),
sg.Button(
_t("operations_gui.repair_snapshots"),
key="--REPAIR-SNAPSHOTS--",
size=(45, 1),
),
],
[
sg.Button(
_t("operations_gui.unlock"), key="--UNLOCK--", size=(45, 1)
),
sg.Button(
_t("operations_gui.forget_using_retention_policy"),
key="--FORGET--",
size=(45, 1),
),
],
[
sg.Button(
_t("operations_gui.standard_prune"),
key="--STANDARD-PRUNE--",
size=(45, 1),
),
sg.Button(
_t("operations_gui.max_prune"),
key="--MAX-PRUNE--",
size=(45, 1),
),
],
[
sg.Button(
_t("operations_gui.stats"),
key="--STATS--",
size=(45, 1),
)
],
[sg.Button(_t("generic.quit"), key="--EXIT--")],
],
element_justification="C",
)
]
]
right_click_menu = ["", [_t("config_gui.show_decrypted")]]
window = sg.Window(
"Configuration",
layout,
# size=(600, 600),
text_justification="C",
auto_size_text=True,
auto_size_buttons=True,
no_titlebar=False,
grab_anywhere=True,
keep_on_top=False,
alpha_channel=1.0,
default_button_element_size=(20, 1),
right_click_menu=right_click_menu,
finalize=True,
)
complete_repo_list = gui_update_state(window, full_config)
# Auto reisze table to window size
window["repo-list"].expand(True, True)
while True:
event, values = window.read()
if event in (sg.WIN_CLOSED, sg.WIN_X_EVENT, "--EXIT--"):
break
if event == _t("config_gui.show_decrypted"):
try:
object_name = complete_repo_list[values["repo-list"][0]][0]
except Exception as exc:
logger.debug("Trace:", exc_info=True)
object_name = None
if not object_name:
sg.PopupError(_t("operations_gui.no_repo_selected"), keep_on_top=True)
continue
manager_password = get_manager_password(
full_config, object_name
)
# NPF-SEC-00009
env_manager_password = os.environ.get("NPBACKUP_MANAGER_PASSWORD", None)
if not manager_password:
sg.PopupError(_t("config_gui.no_manager_password_defined"), keep_on_top=True)
continue
if (
env_manager_password and env_manager_password == manager_password
) or ask_manager_password(manager_password):
complete_repo_list = gui_update_state(window, full_config, unencrypted=object_name)
continue
if event in (
"--QUICK-CHECK--",
"--FULL-CHECK--",
"--REPAIR-INDEX--",
"--REPAIR-SNAPSHOTS--",
"--UNLOCK--",
"--FORGET--",
"--STANDARD-PRUNE--",
"--MAX-PRUNE--",
"--STATS--",
):
if not values["repo-list"]:
repos = _select_groups()
else:
repos = []
for value in values["repo-list"]:
repos.append(complete_repo_list[value][0])
repo_config_list = []
if not repos:
continue
for repo_name in repos:
repo_config, __annotations__ = get_repo_config(
full_config, repo_name
)
repo_config_list.append(repo_config)
operation = None
op_args = None
gui_msg = None
if event == "--FORGET--":
operation = "forget"
op_args = {"use_policy": True}
gui_msg = _t("operations_gui.forget_using_retention_policy")
if event == "--QUICK-CHECK--":
operation = "check"
op_args = {"read_data": False}
gui_msg = _t("operations_gui.quick_check")
if event == "--FULL-CHECK--":
operation = "check"
op_args = {"read_data": True}
gui_msg = _t("operations_gui.full_check")
if event == "--UNLOCK--":
operation = "unlock"
op_args = {}
gui_msg = _t("operations_gui.unlock")
if event == "--REPAIR-INDEX--":
operation = "repair"
op_args = {"subject": "index"}
gui_msg = _t("operations_gui.repair_index")
if event == "--REPAIR-SNAPSHOTS--":
operation = "repair"
op_args = {"subject": "snapshots"}
gui_msg = _t("operations_gui.repair_snapshots")
if event == "--STANDARD-PRUNE--":
operation = "prune"
op_args = {}
gui_msg = _t("operations_gui.standard_prune")
if event == "--MAX-PRUNE--":
operation = "prune"
op_args = {"max": True}
gui_msg = _t("operations_gui.max_prune")
if event == "--STATS--":
operation = "stats"
op_args = {}
gui_msg = _t("operations_gui.stats")
if operation:
gui_thread_runner(
None,
"group_runner",
operation=operation,
repo_config_list=repo_config_list,
__autoclose=False,
__compact=False,
__gui_msg=gui_msg,
**op_args,
)
else:
logger.error(f"Bogus operation: {operation}")
event = "---STATE-UPDATE---"
if event == "---STATE-UPDATE---":
complete_repo_list = gui_update_state(window, full_config)
window.close()
return full_config