WIP: Full cli-gui refactoring

This commit is contained in:
Orsiris de Jong 2023-12-11 00:37:34 +01:00
parent f698cf0f06
commit e00d9cabf1
5 changed files with 423 additions and 357 deletions

View file

@ -1,16 +1,13 @@
#! /usr/bin/env python3
# -*- coding: utf-8 -*-
#
# This file is part of npbackup, and is really just a binary shortcut to launch npbackup.main
# This file is part of npbackup, and is really just a binary shortcut to launch npbackup.__main__
import os
import sys
sys.path.insert(0, os.path.normpath(os.path.join(os.path.dirname(__file__), "..")))
from npbackup.globvars import Globvars
g = Globvars
g.GUI = False
from npbackup.__main__ import main
del sys.path[0]

View file

@ -1,19 +1,16 @@
#! /usr/bin/env python3
# -*- coding: utf-8 -*-
#
# This file is part of npbackup, and is really just a binary shortcut to launch npbackup.main
# This file is part of npbackup, and is really just a binary shortcut to launch npbackup.gui.__main__
import os
import sys
sys.path.insert(0, os.path.normpath(os.path.join(os.path.dirname(__file__), "..")))
from npbackup.globvars import Globvars
g = Globvars
g.GUI = True
from npbackup.__main__ import main
from npbackup.gui.__main__ import main_gui
del sys.path[0]
if __name__ == "__main__":
main()
main_gui()

View file

@ -3,131 +3,38 @@
#
# This file is part of npbackup
__intname__ = "npbackup"
__author__ = "Orsiris de Jong"
__site__ = "https://www.netperfect.fr/npbackup"
__description__ = "NetPerfect Backup Client"
__copyright__ = "Copyright (C) 2022-2023 NetInvent"
__license__ = "GPL-3.0-only"
__build__ = "2023083101"
__version__ = "2.3.0-dev"
__intname__ = "npbackup.cli_interface"
import os
import sys
import atexit
from argparse import ArgumentParser
import dateutil.parser
from datetime import datetime
import tempfile
import pidfile
import ofunctions.logger_utils
from ofunctions.platform import python_arch
from ofunctions.process import kill_childs
from npbackup.globvars import Globvars
# This is needed so we get no GUI version messages
if Globvars.GUI:
try:
import PySimpleGUI as sg
import _tkinter
except ImportError as exc:
if not IS_COMPILED:
print(str(exc))
else:
print("Missing packages in binary.")
sys.exit(1)
from npbackup.customization import (
PYSIMPLEGUI_THEME,
OEM_ICON,
)
from npbackup.path_helper import CURRENT_DIR
from npbackup.configuration import IS_PRIV_BUILD
from npbackup.customization import (
LICENSE_TEXT,
LICENSE_FILE,
)
from npbackup import configuration
from npbackup.core.runner import NPBackupRunner
from npbackup.core.i18n_helper import _t
from npbackup.path_helper import CURRENT_DIR, CURRENT_EXECUTABLE
from npbackup.core.nuitka_helper import IS_COMPILED
from npbackup.upgrade_client.upgrader import need_upgrade
from npbackup.core.upgrade_runner import run_upgrade
if Globvars.GUI:
from npbackup.gui.config import config_gui
from npbackup.gui.operations import operations_gui
from npbackup.gui.main import main_gui
sg.theme(PYSIMPLEGUI_THEME)
sg.SetOptions(icon=OEM_ICON)
if os.name == "nt":
from npbackup.windows.task import create_scheduled_task
# Nuitka compat, see https://stackoverflow.com/a/74540217
try:
# pylint: disable=W0611 (unused-import)
from charset_normalizer import md__mypyc # noqa
except ImportError:
pass
from npbackup.interface_entrypoint import entrypoint
from npbackup.__version__ import __intname__ as intname, __version__, __build__, __copyright__, __description__
_DEBUG = False
_VERBOSE = False
LOG_FILE = os.path.join(CURRENT_DIR, "{}.log".format(__intname__))
CONFIG_FILE = os.path.join(CURRENT_DIR, "{}.conf".format(__intname__))
PID_FILE = os.path.join(tempfile.gettempdir(), "{}.pid".format(__intname__))
logger = ofunctions.logger_utils.logger_get_logger(LOG_FILE)
def execution_logs(start_time: datetime) -> None:
"""
Try to know if logger.warning or worse has been called
logger._cache contains a dict of values like {10: boolean, 20: boolean, 30: boolean, 40: boolean, 50: boolean}
where
10 = debug, 20 = info, 30 = warning, 40 = error, 50 = critical
so "if 30 in logger._cache" checks if warning has been triggered
ATTENTION: logger._cache does only contain cache of current main, not modules, deprecated in favor of
ofunctions.logger_utils.ContextFilterWorstLevel
ATTENTION: For ofunctions.logger_utils.ContextFilterWorstLevel will only check current logger instance
So using logger = getLogger("anotherinstance") will create a separate instance from the one we can inspect
Makes sense ;)
"""
end_time = datetime.utcnow()
logger_worst_level = 0
for flt in logger.filters:
if isinstance(flt, ofunctions.logger_utils.ContextFilterWorstLevel):
logger_worst_level = flt.worst_level
log_level_reached = "success"
try:
if logger_worst_level >= 40:
log_level_reached = "errors"
elif logger_worst_level >= 30:
log_level_reached = "warnings"
except AttributeError as exc:
logger.error("Cannot get worst log level reached: {}".format(exc))
logger.info(
"ExecTime = {}, finished, state is: {}.".format(
end_time - start_time, log_level_reached
)
)
# using sys.exit(code) in a atexit function will swallow the exitcode and render 0
def interface():
def cli_interface():
global _DEBUG
global _VERBOSE
global CONFIG_FILE
parser = ArgumentParser(
prog="{} {} - {}".format(__description__, __copyright__, __site__),
prog=f"{__description__}",
description="""Portable Network Backup Client\n
This program is distributed under the GNU General Public License and comes with ABSOLUTELY NO WARRANTY.\n
This is free software, and you are welcome to redistribute it under certain conditions; Please type --license for more info.""",
@ -156,17 +63,14 @@ This is free software, and you are welcome to redistribute it under certain cond
help="Path to alternative configuration file",
)
if Globvars.GUI:
parser.add_argument(
"--config-gui",
action="store_true",
default=False,
help="Show configuration GUI",
)
parser.add_argument(
"--operations-gui", action="store_true", help="Show operations GUI"
)
parser.add_argument(
"--repo-name",
dest="repo_name",
type=str,
default="default",
required=False,
help="Name of the repository to work with. Defaults to 'default'"
)
parser.add_argument(
"-l", "--list", action="store_true", help="Show current snapshots"
@ -254,11 +158,10 @@ This is free software, and you are welcome to redistribute it under certain cond
)
args = parser.parse_args()
version_string = "{} v{}{}{}-{} {} - {}".format(
__intname__,
intname,
__version__,
"-PRIV" if configuration.IS_PRIV_BUILD else "",
"-PRIV" if IS_PRIV_BUILD else "",
"-P{}".format(sys.version_info[1]),
python_arch(),
__build__,
@ -284,246 +187,17 @@ This is free software, and you are welcome to redistribute it under certain cond
if args.verbose:
_VERBOSE = True
# Make sure we log execution time and error state at the end of the program
if args.backup or args.restore or args.find or args.list or args.check:
atexit.register(
execution_logs,
datetime.utcnow(),
)
if args.config_file:
if not os.path.isfile(args.config_file):
logger.critical("Given file {} cannot be read.".format(args.config_file))
CONFIG_FILE = args.config_file
# Program entry
if Globvars.GUI and (args.config_gui or args.operations_gui):
try:
config_dict = configuration.load_config(CONFIG_FILE)
if not config_dict:
logger.error("Cannot load config file")
sys.exit(24)
except FileNotFoundError:
logger.warning(
'No configuration file found. Please use --config-file "path" to specify one or put a config file into current directory. Will create fresh config file in current directory.'
)
config_dict = configuration.empty_config_dict
if args.config_gui:
config_dict = config_gui(config_dict, CONFIG_FILE)
if args.operations_gui:
config_dict = operations_gui(config_dict, CONFIG_FILE)
sys.exit(0)
if args.create_scheduled_task:
try:
result = create_scheduled_task(
executable_path=CURRENT_EXECUTABLE,
interval_minutes=int(args.create_scheduled_task),
)
if result:
sys.exit(0)
else:
sys.exit(22)
except ValueError:
sys.exit(23)
try:
config_dict = configuration.load_config(CONFIG_FILE)
except FileNotFoundError:
config_dict = None
if not config_dict:
message = _t("config_gui.no_config_available")
logger.error(message)
if config_dict is None and Globvars.GUI:
config_dict = configuration.empty_config_dict
# If no arguments are passed, assume we are launching the GUI
if len(sys.argv) == 1:
try:
result = sg.Popup(
"{}\n\n{}".format(message, _t("config_gui.create_new_config")),
custom_text=(_t("generic._yes"), _t("generic._no")),
keep_on_top=True,
)
if result == _t("generic._yes"):
config_dict = config_gui(config_dict, CONFIG_FILE)
sg.Popup(_t("config_gui.saved_initial_config"))
else:
logger.error("No configuration created via GUI")
sys.exit(7)
except _tkinter.TclError as exc:
logger.info(
'Tkinter error: "{}". Is this a headless server ?'.format(exc)
)
parser.print_help(sys.stderr)
sys.exit(1)
sys.exit(7)
elif not config_dict:
if len(sys.argv) == 1 and Globvars.GUI:
sg.Popup(_t("config_gui.bogus_config_file", config_file=CONFIG_FILE))
sys.exit(7)
if args.upgrade_conf:
# Whatever we need to add here for future releases
# Eg:
logger.info("Upgrading configuration file to version %s", __version__)
try:
config_dict["identity"]
except KeyError:
# Create new section identity, as per upgrade 2.2.0rc2
config_dict["identity"] = {"machine_id": "${HOSTNAME}"}
configuration.save_config(CONFIG_FILE, config_dict)
sys.exit(0)
# Try to perform an auto upgrade if needed
try:
auto_upgrade = config_dict["options"]["auto_upgrade"]
except KeyError:
auto_upgrade = True
try:
auto_upgrade_interval = config_dict["options"]["interval"]
except KeyError:
auto_upgrade_interval = 10
if (auto_upgrade and need_upgrade(auto_upgrade_interval)) or args.auto_upgrade:
if args.auto_upgrade:
logger.info("Running user initiated auto upgrade")
else:
logger.info("Running program initiated auto upgrade")
result = run_upgrade(config_dict)
if result:
sys.exit(0)
elif args.auto_upgrade:
sys.exit(23)
dry_run = False
if args.dry_run:
dry_run = True
npbackup_runner = NPBackupRunner(config_dict=config_dict)
npbackup_runner.dry_run = dry_run
npbackup_runner.verbose = _VERBOSE
if not npbackup_runner.backend_version:
logger.critical("No backend available. Cannot continue")
sys.exit(25)
logger.info("Backend: {}".format(npbackup_runner.backend_version))
if args.check:
if npbackup_runner.check_recent_backups():
sys.exit(0)
else:
sys.exit(2)
if args.list:
result = npbackup_runner.list()
if result:
for snapshot in result:
try:
tags = snapshot["tags"]
except KeyError:
tags = None
logger.info(
"ID: {} Hostname: {}, Username: {}, Tags: {}, source: {}, time: {}".format(
snapshot["short_id"],
snapshot["hostname"],
snapshot["username"],
tags,
snapshot["paths"],
dateutil.parser.parse(snapshot["time"]),
)
)
sys.exit(0)
else:
sys.exit(2)
if args.ls:
result = npbackup_runner.ls(snapshot=args.ls)
if result:
logger.info("Snapshot content:")
for entry in result:
logger.info(entry)
sys.exit(0)
else:
logger.error("Snapshot could not be listed.")
sys.exit(2)
if args.find:
result = npbackup_runner.find(path=args.find)
if result:
sys.exit(0)
else:
sys.exit(2)
try:
with pidfile.PIDFile(PID_FILE):
if args.backup:
result = npbackup_runner.backup(force=args.force)
if result:
logger.info("Backup finished.")
sys.exit(0)
else:
logger.error("Backup operation failed.")
sys.exit(2)
if args.restore:
result = npbackup_runner.restore(
snapshot=args.restore_from_snapshot,
target=args.restore,
restore_includes=args.restore_include,
)
if result:
sys.exit(0)
else:
sys.exit(2)
if args.forget:
result = npbackup_runner.forget(snapshot=args.forget)
if result:
sys.exit(0)
else:
sys.exit(2)
if args.raw:
result = npbackup_runner.raw(command=args.raw)
if result:
sys.exit(0)
else:
sys.exit(2)
except pidfile.AlreadyRunningError:
logger.warning("Backup process already running. Will not continue.")
# EXIT_CODE 21 = current backup process already running
sys.exit(21)
if Globvars.GUI:
try:
with pidfile.PIDFile(PID_FILE):
try:
main_gui(config_dict, CONFIG_FILE, version_string)
except _tkinter.TclError as exc:
logger.info(
'Tkinter error: "{}". Is this a headless server ?'.format(exc)
)
parser.print_help(sys.stderr)
sys.exit(1)
except pidfile.AlreadyRunningError:
logger.warning("Backup GUI already running. Will not continue")
# EXIT_CODE 21 = current backup process already running
sys.exit(21)
else:
parser.print_help(sys.stderr)
entrypoint()
def main():
try:
# kill_childs normally would not be necessary, but let's just be foolproof here (kills restic subprocess in all cases)
atexit.register(
kill_childs,
os.getpid(),
)
interface()
cli_interface()
except KeyboardInterrupt as exc:
logger.error("Program interrupted by keyboard. {}".format(exc))
logger.info("Trace:", exc_info=True)

View file

@ -7,12 +7,13 @@ __intname__ = "npbackup.gui.main"
__author__ = "Orsiris de Jong"
__copyright__ = "Copyright (C) 2022-2023 NetInvent"
__license__ = "GPL-3.0-only"
__build__ = "2023083101"
__build__ = "2023121001"
from typing import List, Optional, Tuple
import sys
import os
from pathlib import Path
from logging import getLogger
import re
from datetime import datetime
@ -20,6 +21,7 @@ import dateutil
import queue
from time import sleep
import PySimpleGUI as sg
import _tkinter
from ofunctions.threading import threaded, Future
from threading import Thread
from ofunctions.misc import BytesConverter
@ -43,7 +45,18 @@ from npbackup.gui.helpers import get_anon_repo_uri
from npbackup.core.runner import NPBackupRunner
from npbackup.core.i18n_helper import _t
from npbackup.core.upgrade_runner import run_upgrade, check_new_version
from npbackup.interface_entrypoint import entrypoint
from npbackup.__version__ import __intname__ as intname, __version__, __build__, __copyright__
from npbackup.gui.config import config_gui
from npbackup.gui.operations import operations_gui
from npbackup.customization import (
PYSIMPLEGUI_THEME,
OEM_ICON,
)
sg.theme(PYSIMPLEGUI_THEME)
sg.SetOptions(icon=OEM_ICON)
logger = getLogger()
@ -517,7 +530,7 @@ def _gui_backup(config_dict, stdout) -> Future:
return result
def main_gui(config_dict: dict, config_file: str, version_string: str):
def _main_gui():
backup_destination = _t("main_gui.local_folder")
backend_type, repo_uri = get_anon_repo_uri(config_dict["repo"]["repository"])
@ -713,3 +726,11 @@ def main_gui(config_dict: dict, config_file: str, version_string: str):
_gui_update_state(window, current_state, backup_tz, snapshot_list)
if current_state is None:
sg.Popup(_t("main_gui.cannot_get_repo_status"))
def main_gui():
try:
_main_gui()
except _tkinter.TclError as exc:
logger.critical(f'Tkinter error: "{exc}". Is this a headless server ?')
sys.exit(250)

View file

@ -0,0 +1,377 @@
#! /usr/bin/env python
# -*- coding: utf-8 -*-
#
# This file is part of npbackup
__intname__ = "npbackup"
__author__ = "Orsiris de Jong"
__site__ = "https://www.netperfect.fr/npbackup"
__description__ = "NetPerfect Backup Client"
__copyright__ = "Copyright (C) 2022-2023 NetInvent"
__license__ = "GPL-3.0-only"
__build__ = "2023083101"
__version__ = "2.3.0-dev"
import os
import sys
import atexit
import dateutil.parser
from datetime import datetime
import tempfile
import pidfile
import ofunctions.logger_utils
from ofunctions.platform import python_arch
from ofunctions.process import kill_childs
from npbackup.customization import (
LICENSE_TEXT,
LICENSE_FILE,
)
from npbackup import configuration
from npbackup.core.runner import NPBackupRunner
from npbackup.core.i18n_helper import _t
from npbackup.path_helper import CURRENT_DIR, CURRENT_EXECUTABLE
from npbackup.core.nuitka_helper import IS_COMPILED
from npbackup.upgrade_client.upgrader import need_upgrade
from npbackup.core.upgrade_runner import run_upgrade
if os.name == "nt":
from npbackup.windows.task import create_scheduled_task
# Nuitka compat, see https://stackoverflow.com/a/74540217
try:
# pylint: disable=W0611 (unused-import)
from charset_normalizer import md__mypyc # noqa
except ImportError:
pass
_DEBUG = False
_VERBOSE = False
LOG_FILE = os.path.join(CURRENT_DIR, "{}.log".format(__intname__))
CONFIG_FILE = os.path.join(CURRENT_DIR, "{}.conf".format(__intname__))
PID_FILE = os.path.join(tempfile.gettempdir(), "{}.pid".format(__intname__))
logger = ofunctions.logger_utils.logger_get_logger(LOG_FILE)
def execution_logs(start_time: datetime) -> None:
"""
Try to know if logger.warning or worse has been called
logger._cache contains a dict of values like {10: boolean, 20: boolean, 30: boolean, 40: boolean, 50: boolean}
where
10 = debug, 20 = info, 30 = warning, 40 = error, 50 = critical
so "if 30 in logger._cache" checks if warning has been triggered
ATTENTION: logger._cache does only contain cache of current main, not modules, deprecated in favor of
ofunctions.logger_utils.ContextFilterWorstLevel
ATTENTION: For ofunctions.logger_utils.ContextFilterWorstLevel will only check current logger instance
So using logger = getLogger("anotherinstance") will create a separate instance from the one we can inspect
Makes sense ;)
"""
end_time = datetime.utcnow()
logger_worst_level = 0
for flt in logger.filters:
if isinstance(flt, ofunctions.logger_utils.ContextFilterWorstLevel):
logger_worst_level = flt.worst_level
log_level_reached = "success"
try:
if logger_worst_level >= 40:
log_level_reached = "errors"
elif logger_worst_level >= 30:
log_level_reached = "warnings"
except AttributeError as exc:
logger.error("Cannot get worst log level reached: {}".format(exc))
logger.info(
"ExecTime = {}, finished, state is: {}.".format(
end_time - start_time, log_level_reached
)
)
# using sys.exit(code) in a atexit function will swallow the exitcode and render 0
def interface():
version_string = "{} v{}{}{}-{} {} - {}".format(
__intname__,
__version__,
"-PRIV" if configuration.IS_PRIV_BUILD else "",
"-P{}".format(sys.version_info[1]),
python_arch(),
__build__,
__copyright__,
)
if args.version:
print(version_string)
sys.exit(0)
logger.info(version_string)
if args.license:
try:
with open(LICENSE_FILE, "r", encoding="utf-8") as file_handle:
print(file_handle.read())
except OSError:
print(LICENSE_TEXT)
sys.exit(0)
if args.debug or os.environ.get("_DEBUG", "False").capitalize() == "True":
_DEBUG = True
logger.setLevel(ofunctions.logger_utils.logging.DEBUG)
if args.verbose:
_VERBOSE = True
# Make sure we log execution time and error state at the end of the program
if args.backup or args.restore or args.find or args.list or args.check:
atexit.register(
execution_logs,
datetime.utcnow(),
)
if args.config_file:
if not os.path.isfile(args.config_file):
logger.critical("Given file {} cannot be read.".format(args.config_file))
CONFIG_FILE = args.config_file
# Program entry
if Globvars.GUI and (args.config_gui or args.operations_gui):
try:
config = configuration.load_config(CONFIG_FILE)
if not config:
logger.error("Cannot load config file")
sys.exit(24)
except FileNotFoundError:
logger.warning(
'No configuration file found. Please use --config-file "path" to specify one or put a config file into current directory. Will create fresh config file in current directory.'
)
config = configuration.empty_config_dict
if args.config_gui:
config = config_gui(config, CONFIG_FILE)
if args.operations_gui:
config = operations_gui(config, CONFIG_FILE)
sys.exit(0)
if args.create_scheduled_task:
try:
result = create_scheduled_task(
executable_path=CURRENT_EXECUTABLE,
interval_minutes=int(args.create_scheduled_task),
)
if result:
sys.exit(0)
else:
sys.exit(22)
except ValueError:
sys.exit(23)
try:
config = configuration.load_config(CONFIG_FILE)
repo_config = configuration.get_repo_config(config_dict, args.repo_name)
except FileNotFoundError:
config = None
if not config:
message = _t("config_gui.no_config_available")
logger.error(message)
if config_dict is None and Globvars.GUI:
config_dict = configuration.empty_config_dict
# If no arguments are passed, assume we are launching the GUI
if len(sys.argv) == 1:
try:
result = sg.Popup(
"{}\n\n{}".format(message, _t("config_gui.create_new_config")),
custom_text=(_t("generic._yes"), _t("generic._no")),
keep_on_top=True,
)
if result == _t("generic._yes"):
config_dict = config_gui(config_dict, CONFIG_FILE)
sg.Popup(_t("config_gui.saved_initial_config"))
else:
logger.error("No configuration created via GUI")
sys.exit(7)
except _tkinter.TclError as exc:
logger.info(
'Tkinter error: "{}". Is this a headless server ?'.format(exc)
)
parser.print_help(sys.stderr)
sys.exit(1)
sys.exit(7)
elif not config_dict:
if len(sys.argv) == 1 and Globvars.GUI:
sg.Popup(_t("config_gui.bogus_config_file", config_file=CONFIG_FILE))
sys.exit(7)
if args.upgrade_conf:
# Whatever we need to add here for future releases
# Eg:
logger.info("Upgrading configuration file to version %s", __version__)
try:
config_dict["identity"]
except KeyError:
# Create new section identity, as per upgrade 2.2.0rc2
config_dict["identity"] = {"machine_id": "${HOSTNAME}"}
configuration.save_config(CONFIG_FILE, config_dict)
sys.exit(0)
# Try to perform an auto upgrade if needed
try:
auto_upgrade = config_dict["options"]["auto_upgrade"]
except KeyError:
auto_upgrade = True
try:
auto_upgrade_interval = config_dict["options"]["interval"]
except KeyError:
auto_upgrade_interval = 10
if (auto_upgrade and need_upgrade(auto_upgrade_interval)) or args.auto_upgrade:
if args.auto_upgrade:
logger.info("Running user initiated auto upgrade")
else:
logger.info("Running program initiated auto upgrade")
result = run_upgrade(config_dict)
if result:
sys.exit(0)
elif args.auto_upgrade:
sys.exit(23)
dry_run = False
if args.dry_run:
dry_run = True
npbackup_runner = NPBackupRunner(config_dict=config_dict)
npbackup_runner.dry_run = dry_run
npbackup_runner.verbose = _VERBOSE
if not npbackup_runner.backend_version:
logger.critical("No backend available. Cannot continue")
sys.exit(25)
logger.info("Backend: {}".format(npbackup_runner.backend_version))
if args.check:
if npbackup_runner.check_recent_backups():
sys.exit(0)
else:
sys.exit(2)
if args.list:
result = npbackup_runner.list()
if result:
for snapshot in result:
try:
tags = snapshot["tags"]
except KeyError:
tags = None
logger.info(
"ID: {} Hostname: {}, Username: {}, Tags: {}, source: {}, time: {}".format(
snapshot["short_id"],
snapshot["hostname"],
snapshot["username"],
tags,
snapshot["paths"],
dateutil.parser.parse(snapshot["time"]),
)
)
sys.exit(0)
else:
sys.exit(2)
if args.ls:
result = npbackup_runner.ls(snapshot=args.ls)
if result:
logger.info("Snapshot content:")
for entry in result:
logger.info(entry)
sys.exit(0)
else:
logger.error("Snapshot could not be listed.")
sys.exit(2)
if args.find:
result = npbackup_runner.find(path=args.find)
if result:
sys.exit(0)
else:
sys.exit(2)
try:
with pidfile.PIDFile(PID_FILE):
if args.backup:
result = npbackup_runner.backup(force=args.force)
if result:
logger.info("Backup finished.")
sys.exit(0)
else:
logger.error("Backup operation failed.")
sys.exit(2)
if args.restore:
result = npbackup_runner.restore(
snapshot=args.restore_from_snapshot,
target=args.restore,
restore_includes=args.restore_include,
)
if result:
sys.exit(0)
else:
sys.exit(2)
if args.forget:
result = npbackup_runner.forget(snapshot=args.forget)
if result:
sys.exit(0)
else:
sys.exit(2)
if args.raw:
result = npbackup_runner.raw(command=args.raw)
if result:
sys.exit(0)
else:
sys.exit(2)
except pidfile.AlreadyRunningError:
logger.warning("Backup process already running. Will not continue.")
# EXIT_CODE 21 = current backup process already running
sys.exit(21)
def main():
try:
# kill_childs normally would not be necessary, but let's just be foolproof here (kills restic subprocess in all cases)
atexit.register(
kill_childs,
os.getpid(),
)
interface()
except KeyboardInterrupt as exc:
logger.error("Program interrupted by keyboard. {}".format(exc))
logger.info("Trace:", exc_info=True)
# EXIT_CODE 200 = keyboard interrupt
sys.exit(200)
except Exception as exc:
logger.error("Program interrupted by error. {}".format(exc))
logger.info("Trace:", exc_info=True)
# EXIT_CODE 201 = Non handled exception
sys.exit(201)
if __name__ == "__main__":
main()
def entrypoint():
pass