Refactor NPBackupRunner to return stdout/stderr to GUI

This commit is contained in:
Orsiris de Jong 2023-12-19 20:32:14 +01:00
parent e65e91c7a6
commit afaa055806
6 changed files with 173 additions and 23 deletions

View file

@ -16,7 +16,9 @@ import logging
import queue
from datetime import datetime, timedelta
from functools import wraps
import queue
from command_runner import command_runner
from ofunctions.threading import threaded
from ofunctions.platform import os_arch
from npbackup.restic_metrics import restic_output_2_metrics, upload_metrics
from npbackup.restic_wrapper import ResticRunner
@ -114,6 +116,9 @@ class NPBackupRunner:
# This can lead to a problem when the config file can be written by users other than npbackup
def __init__(self, repo_config: Optional[dict] = None):
self._stdout = None
self._stderr = None
if repo_config:
self.repo_config = repo_config
@ -176,6 +181,22 @@ class NPBackupRunner:
self._stdout = value
self.apply_config_to_restic_runner()
@property
def stderr(self):
return self._stderr
@stderr.setter
def stderr(self, value):
if (
not isinstance(value, str)
and not isinstance(value, int)
and not isinstance(value, Callable)
and not isinstance(value, queue.Queue)
):
raise ValueError("Bogus stdout parameter given: {}".format(value))
self._stderr = value
self.apply_config_to_restic_runner()
@property
def has_binary(self) -> bool:
if self.is_ready:
@ -207,6 +228,30 @@ class NPBackupRunner:
return result
return wrapper
def write_logs(self, msg: str, error: bool=False):
logger.info(msg)
if error:
if self.stderr:
self.stderr.put(msg)
else:
if self.stdout:
self.stdout.put(msg)
def close_queues(fn: Callable):
"""
Function that sends None to both stdout and stderr queues so GUI gets proper results
"""
def wrapper(self, *args, **kwargs):
close_queues = kwargs.pop("close_queues", True)
result = fn(self, *args, **kwargs)
if close_queues:
if self.stdout:
self.stdout.put(None)
if self.stderr:
self.stderr.put(None)
return result
return wrapper
def create_restic_runner(self) -> None:
can_run = True
@ -267,6 +312,9 @@ class NPBackupRunner:
binary_search_paths=[BASEDIR, CURRENT_DIR],
)
self.restic_runner.stdout = self.stdout
self.restic_runner.stderr = self.stderr
if self.restic_runner.binary is None:
# Let's try to load our internal binary for dev purposes
arch = os_arch()
@ -381,8 +429,16 @@ class NPBackupRunner:
self.minimum_backup_age = 1440
self.restic_runner.verbose = self.verbose
self.restic_runner.stdout = self.stdout
# TODO
#self.restic_runner.stdout = self.stdout
#self.restic_runner.stderr = self.stderr
###########################
# ACTUAL RUNNER FUNCTIONS #
###########################
@close_queues
@exec_timer
def list(self) -> Optional[dict]:
if not self.is_ready:
@ -391,6 +447,7 @@ class NPBackupRunner:
snapshots = self.restic_runner.snapshots()
return snapshots
@close_queues
@exec_timer
def find(self, path: str) -> bool:
if not self.is_ready:
@ -404,6 +461,7 @@ class NPBackupRunner:
return True
return False
@close_queues
@exec_timer
def ls(self, snapshot: str) -> Optional[dict]:
if not self.is_ready:
@ -412,6 +470,7 @@ class NPBackupRunner:
result = self.restic_runner.ls(snapshot)
return result
@close_queues
@exec_timer
def check_recent_backups(self) -> bool:
"""
@ -444,6 +503,7 @@ class NPBackupRunner:
logger.error("Cannot connect to repository or repository empty.")
return result, backup_tz
@close_queues
@exec_timer
def backup(self, force: bool = False) -> bool:
"""
@ -601,10 +661,16 @@ class NPBackupRunner:
)
return result
@close_queues
@exec_timer
def restore(self, snapshot: str, target: str, restore_includes: List[str]) -> bool:
if not self.is_ready:
return False
if not self.repo_config.g("permissions") in ['restore', 'full']:
msg = "You don't have permissions to restore this repo"
self.output_queue.put(msg)
logger.critical(msg)
return False
logger.info("Launching restore to {}".format(target))
result = self.restic_runner.restore(
snapshot=snapshot,
@ -613,6 +679,7 @@ class NPBackupRunner:
)
return result
@close_queues
@exec_timer
def forget(self, snapshot: str) -> bool:
if not self.is_ready:
@ -621,14 +688,16 @@ class NPBackupRunner:
result = self.restic_runner.forget(snapshot)
return result
@close_queues
@exec_timer
def check(self, read_data: bool = True) -> bool:
if not self.is_ready:
return False
logger.info("Checking repository")
self.write_logs("Checking repository")
result = self.restic_runner.check(read_data)
return result
@close_queues
@exec_timer
def prune(self) -> bool:
if not self.is_ready:
@ -637,6 +706,7 @@ class NPBackupRunner:
result = self.restic_runner.prune()
return result
@close_queues
@exec_timer
def repair(self, order: str) -> bool:
if not self.is_ready:
@ -645,15 +715,33 @@ class NPBackupRunner:
result = self.restic_runner.repair(order)
return result
@close_queues
@exec_timer
def raw(self, command: str) -> bool:
logger.info("Running raw command: {}".format(command))
result = self.restic_runner.raw(command=command)
return result
@close_queues
@exec_timer
def group_runner(
self, repo_list: list, operation: str, result_queue: Optional[queue.Queue]
self, repo_list: list, operation: str, **kwargs
) -> bool:
group_result = True
# Make sure we don't close the stdout/stderr queues when running multiple operations
kwargs = {
**kwargs,
**{'close_queues': False}
}
for repo in repo_list:
print(f"Running {operation} for repo {repo}")
print("run to the hills")
self.write_logs(f"Running {operation} for repo {repo}")
result = self.__getattribute__(operation)(**kwargs)
if result:
self.write_logs(f"Finished {operation} for repo {repo}")
else:
self.write_logs(f"Operation {operation} failed for repo {repo}", error=True)
group_result = False
self.write_logs("Finished execution group operations")
return group_result

View file

@ -47,7 +47,6 @@ from npbackup.core.runner import NPBackupRunner
from npbackup.core.i18n_helper import _t
from npbackup.core.upgrade_runner import run_upgrade, check_new_version
from npbackup.path_helper import CURRENT_DIR
from npbackup.interface_entrypoint import entrypoint
from npbackup.__version__ import version_string
from npbackup.__debug__ import _DEBUG
from npbackup.gui.config import config_gui
@ -535,12 +534,13 @@ def restore_window(
@threaded
def _gui_backup(repo_config, stdout) -> Future:
def _gui_backup(repo_config, stdout, stderr) -> Future:
runner = NPBackupRunner(repo_config=repo_config)
runner.verbose = (
True # We must use verbose so we get progress output from ResticRunner
)
runner.stdout = stdout
runner.stderr = stderr
result = runner.backup(
force=True,
) # Since we run manually, force backup regardless of recent backup state
@ -729,11 +729,12 @@ def _main_gui():
# We need to read that window at least once fopr it to exist
progress_window.read(timeout=1)
stdout = queue.Queue()
stderr = queue.Queue()
# let's use a mutable so the backup thread can modify it
# We get a thread result, hence pylint will complain the thread isn't a tuple
# pylint: disable=E1101 (no-member)
thread = _gui_backup(repo_config=repo_config, stdout=stdout)
thread = _gui_backup(repo_config=repo_config, stdout=stdout, stderr=stderr)
while not thread.done() and not thread.cancelled():
try:
stdout_line = stdout.get(timeout=0.01)

View file

@ -7,7 +7,7 @@ __intname__ = "npbackup.gui.operations"
__author__ = "Orsiris de Jong"
__copyright__ = "Copyright (C) 2023 NetInvent"
__license__ = "GPL-3.0-only"
__build__ = "2023083101"
__build__ = "2023121901"
from typing import Tuple
@ -138,6 +138,10 @@ def operations_gui(full_config: dict) -> dict:
# Auto reisze table to window size
window["repo-list"].expand(True, True)
# Create queues for getting runner data
stdout_queue = queue.Queue()
stderr_queue = queue.Queue()
while True:
event, values = window.read(timeout=60000)
@ -160,23 +164,67 @@ def operations_gui(full_config: dict) -> dict:
repos = complete_repo_list
else:
repos = values["repo-list"]
result_queue = queue.Queue()
runner = NPBackupRunner()
print(repos)
runner.stdout = stdout_queue
runner.stderr = stderr_queue
group_runner_repo_list = [repo_name for backend_type, repo_name in repos]
if event == '--FORGET--':
operation = 'forget'
op_args = {}
if event == '--QUICK-CHECK--':
operation = 'quick_check'
operation = 'check'
op_args = {'read_data': False}
if event == '--FULL-CHECK--':
operation = 'full_check'
operation = 'check'
op_args = {'read_data': True}
if event == '--STANDARD-PRUNE--':
operation = 'standard_prune'
operation = 'prune'
op_args = {}
if event == '--MAX-PRUNE--':
operation = 'max_prune'
runner.group_runner(group_runner_repo_list, operation, result_queue)
operation = 'prune'
op_args = {}
thread = runner.group_runner(group_runner_repo_list, operation, **op_args)
read_stdout_queue = True
read_sterr_queue = True
progress_layout = [
[sg.Text(_t("operations_gui.last_message"))],
[sg.Multiline(key='-OPERATIONS-PROGRESS-STDOUT-', size=(40, 10))],
[sg.Text(_t("operations_gui.error_messages"))],
[sg.Multiline(key='-OPERATIONS-PROGRESS-STDERR-', size=(40, 10))],
[sg.Button(_t("generic.close"), key="--EXIT--")]
]
progress_window = sg.Window("Operation status", progress_layout)
event, values = progress_window.read(timeout=0.01)
while read_stdout_queue or read_sterr_queue:
# Read stdout queue
try:
stdout_data = stdout_queue.get(timeout=0.01)
except queue.Empty:
pass
else:
if stdout_data is None:
read_stdout_queue = False
else:
progress_window['-OPERATIONS-PROGRESS-STDOUT-'].Update(stdout_data)
# Read stderr queue
try:
stderr_data = stderr_queue.get(timeout=0.01)
except queue.Empty:
pass
else:
if stderr_data is None:
read_sterr_queue = False
else:
progress_window['-OPERATIONS-PROGRESS-STDERR-'].Update(f"{progress_window['-OPERATIONS-PROGRESS-STDERR-'].get()}\n{stderr_data}")
_, _ = progress_window.read()
progress_window.close()
event = '---STATE-UPDATE---'
if event == "---STATE-UPDATE---":
complete_repo_list = gui_update_state(window, full_config)

View file

@ -35,6 +35,7 @@ class ResticRunner:
repository: str,
password: str,
binary_search_paths: List[str] = None,
) -> None:
self.repository = str(repository).strip()
self.password = str(password).strip()
@ -77,7 +78,8 @@ class ResticRunner:
None # Function which will make executor abort if result is True
)
self._executor_finished = False # Internal value to check whether executor is done, accessed via self.executor_finished property
self._stdout = None # Optional outputs when command is run as thread
self._stdout = None # Optional outputs when running GUI, to get interactive output
self._stderr = None
def on_exit(self) -> bool:
self._executor_finished = True
@ -145,6 +147,14 @@ class ResticRunner:
def stdout(self, value: Optional[Union[int, str, Callable, queue.Queue]]):
self._stdout = value
@property
def stderr(self) -> Optional[Union[int, str, Callable, queue.Queue]]:
return self._stderr
@stdout.setter
def stderr(self, value: Optional[Union[int, str, Callable, queue.Queue]]):
self._stderr = value
@property
def verbose(self) -> bool:
return self._verbose
@ -191,7 +201,7 @@ class ResticRunner:
cmd: str,
errors_allowed: bool = False,
timeout: int = None,
live_stream=False,
live_stream=False, # TODO remove live stream since everything is live
) -> Tuple[bool, str]:
"""
Executes restic with given command
@ -220,6 +230,7 @@ class ResticRunner:
live_output=self.verbose,
valid_exit_codes=errors_allowed,
stdout=self._stdout,
stderr=self._stderr,
stop_on=self.stop_on,
on_exit=self.on_exit,
method="poller",

View file

@ -7,9 +7,10 @@ en:
options: Options
create: Create
change: Change
close: Close
_yes: Yes
_no: No
yes: Yes
no: No
seconds: seconds
minutes: minutes

View file

@ -7,9 +7,10 @@ fr:
options: Options
create: Créer
change: Changer
close: Fermer
_yes: Oui
_no: Non
yes: Oui
no: Non
seconds: secondes
minutes: minutes