WIP queue mgmt and thread mgmt

This commit is contained in:
Orsiris de Jong 2023-12-20 10:33:42 +01:00
parent afaa055806
commit bee0c0c840
2 changed files with 33 additions and 19 deletions

View file

@ -438,7 +438,7 @@ class NPBackupRunner:
# ACTUAL RUNNER FUNCTIONS #
###########################
@close_queues
#@close_queues
@exec_timer
def list(self) -> Optional[dict]:
if not self.is_ready:
@ -447,7 +447,7 @@ class NPBackupRunner:
snapshots = self.restic_runner.snapshots()
return snapshots
@close_queues
#@close_queues
@exec_timer
def find(self, path: str) -> bool:
if not self.is_ready:
@ -461,7 +461,7 @@ class NPBackupRunner:
return True
return False
@close_queues
#@close_queues
@exec_timer
def ls(self, snapshot: str) -> Optional[dict]:
if not self.is_ready:
@ -470,7 +470,7 @@ class NPBackupRunner:
result = self.restic_runner.ls(snapshot)
return result
@close_queues
#@close_queues
@exec_timer
def check_recent_backups(self) -> bool:
"""
@ -503,7 +503,7 @@ class NPBackupRunner:
logger.error("Cannot connect to repository or repository empty.")
return result, backup_tz
@close_queues
#@close_queues
@exec_timer
def backup(self, force: bool = False) -> bool:
"""
@ -661,7 +661,7 @@ class NPBackupRunner:
)
return result
@close_queues
#@close_queues
@exec_timer
def restore(self, snapshot: str, target: str, restore_includes: List[str]) -> bool:
if not self.is_ready:
@ -679,7 +679,7 @@ class NPBackupRunner:
)
return result
@close_queues
#@close_queues
@exec_timer
def forget(self, snapshot: str) -> bool:
if not self.is_ready:
@ -688,7 +688,7 @@ class NPBackupRunner:
result = self.restic_runner.forget(snapshot)
return result
@close_queues
#@close_queues
@exec_timer
def check(self, read_data: bool = True) -> bool:
if not self.is_ready:
@ -697,7 +697,7 @@ class NPBackupRunner:
result = self.restic_runner.check(read_data)
return result
@close_queues
#@close_queues
@exec_timer
def prune(self) -> bool:
if not self.is_ready:
@ -706,7 +706,7 @@ class NPBackupRunner:
result = self.restic_runner.prune()
return result
@close_queues
#@close_queues
@exec_timer
def repair(self, order: str) -> bool:
if not self.is_ready:
@ -715,14 +715,14 @@ class NPBackupRunner:
result = self.restic_runner.repair(order)
return result
@close_queues
#@close_queues
@exec_timer
def raw(self, command: str) -> bool:
logger.info("Running raw command: {}".format(command))
result = self.restic_runner.raw(command=command)
return result
@close_queues
#@close_queues
@exec_timer
def group_runner(
self, repo_list: list, operation: str, **kwargs
@ -730,10 +730,10 @@ class NPBackupRunner:
group_result = True
# Make sure we don't close the stdout/stderr queues when running multiple operations
kwargs = {
**kwargs,
**{'close_queues': False}
}
#kwargs = {
# **kwargs,
# **{'close_queues': False}
#}
for repo in repo_list:
self.write_logs(f"Running {operation} for repo {repo}")
@ -744,4 +744,6 @@ class NPBackupRunner:
self.write_logs(f"Operation {operation} failed for repo {repo}", error=True)
group_result = False
self.write_logs("Finished execution group operations")
from time import sleep
sleep(2)
return group_result

View file

@ -185,7 +185,11 @@ def operations_gui(full_config: dict) -> dict:
if event == '--MAX-PRUNE--':
operation = 'prune'
op_args = {}
thread = runner.group_runner(group_runner_repo_list, operation, **op_args)
@threaded
def _group_runner(group_runner_repo_list, operation, **op_args) -> Future:
return runner.group_runner(group_runner_repo_list, operation, **op_args)
thread = _group_runner(group_runner_repo_list, operation, **op_args)
read_stdout_queue = True
read_sterr_queue = True
@ -199,7 +203,8 @@ def operations_gui(full_config: dict) -> dict:
progress_window = sg.Window("Operation status", progress_layout)
event, values = progress_window.read(timeout=0.01)
while read_stdout_queue or read_sterr_queue:
#while read_stdout_queue or read_sterr_queue:
while not thread.done() and not thread.cancelled():
# Read stdout queue
try:
stdout_data = stdout_queue.get(timeout=0.01)
@ -222,7 +227,14 @@ def operations_gui(full_config: dict) -> dict:
else:
progress_window['-OPERATIONS-PROGRESS-STDERR-'].Update(f"{progress_window['-OPERATIONS-PROGRESS-STDERR-'].get()}\n{stderr_data}")
_, _ = progress_window.read()
# So we actually need to read the progress window for it to refresh...
_, _ = progress_window.read(.01)
# Keep the window open until user has done something
while True:
event, _ = progress_window.read()
if event in (sg.WIN_CLOSED, sg.WIN_X_EVENT, '--EXIT--'):
break
progress_window.close()
event = '---STATE-UPDATE---'