WIP Rework cli interface

This commit is contained in:
Orsiris de Jong 2023-12-28 00:32:42 +01:00
parent 2894e57ad5
commit ca62614f8b
4 changed files with 332 additions and 457 deletions

View file

@ -8,51 +8,135 @@ __intname__ = "npbackup.cli_interface"
import os
import sys
from pathlib import Path
import atexit
from argparse import ArgumentParser
from datetime import datetime
import tempfile
import pidfile
import ofunctions.logger_utils
from ofunctions.platform import python_arch
from ofunctions.process import kill_childs
from npbackup.path_helper import CURRENT_DIR
from npbackup.configuration import IS_PRIV_BUILD
from npbackup.customization import (
LICENSE_TEXT,
LICENSE_FILE,
)
from npbackup.interface_entrypoint import entrypoint
from npbackup.__version__ import __intname__ as intname, __version__, __build__, __copyright__, __description__
import npbackup.configuration
from npbackup.runner_interface import entrypoint
from npbackup.__version__ import version_string
from npbackup.__debug__ import _DEBUG
from npbackup.common import execution_logs
from npbackup.core.i18n_helper import _t
if os.name == "nt":
from npbackup.windows.task import create_scheduled_task
# Nuitka compat, see https://stackoverflow.com/a/74540217
try:
# pylint: disable=W0611 (unused-import)
from charset_normalizer import md__mypyc # noqa
except ImportError:
pass
_DEBUG = False
_VERBOSE = False
LOG_FILE = os.path.join(CURRENT_DIR, "{}.log".format(__intname__))
PID_FILE = os.path.join(tempfile.gettempdir(), "{}.pid".format(__intname__))
logger = ofunctions.logger_utils.logger_get_logger(LOG_FILE)
logger = ofunctions.logger_utils.logger_get_logger(LOG_FILE, debug=_DEBUG)
def cli_interface():
global _DEBUG
global _VERBOSE
global CONFIG_FILE
parser = ArgumentParser(
prog=f"{__description__}",
prog=f"{__intname__}",
description="""Portable Network Backup Client\n
This program is distributed under the GNU General Public License and comes with ABSOLUTELY NO WARRANTY.\n
This is free software, and you are welcome to redistribute it under certain conditions; Please type --license for more info.""",
)
parser.add_argument("-b", "--backup", action="store_true", help="Run a backup")
parser.add_argument(
"--check", action="store_true", help="Check if a recent backup exists"
"-r",
"--restore",
type=str,
default=None,
required=False,
help="Restore to path given by --restore",
)
parser.add_argument("-l", "--list", action="store_true", help="Show current snapshots")
parser.add_argument(
"--ls",
type=str,
default=None,
required=False,
help='Show content given snapshot. Use "latest" for most recent snapshot.',
)
parser.add_argument(
"--find",
type=str,
default=None,
required=False,
help="Find full path of given file / directory",
)
parser.add_argument(
"--forget",
type=str,
default=None,
required=False,
help='Forget given snapshot, or specify \"policy\" to apply retention policy',
)
parser.add_argument(
"--quick-check",
action="store_true",
help="Quick check repository"
)
parser.add_argument(
"--full-check",
action="store_true",
help="Full check repository"
)
parser.add_argument(
"--prune",
action="store_true",
help="Prune data in repository"
)
parser.add_argument(
"--prune-max",
action="store_true",
help="Prune data in repository reclaiming maximum space"
)
parser.add_argument(
"--unlock",
action="store_true",
help="Unlock repository"
)
parser.add_argument(
"--repair-index",
action="store_true",
help="Repair repo index"
)
parser.add_argument(
"--repair-snapshots",
action="store_true",
help="Repair repo snapshots"
)
parser.add_argument(
"--raw",
type=str,
default=None,
required=False,
help='Run raw command against backend.',
)
parser.add_argument("-b", "--backup", action="store_true", help="Run a backup")
parser.add_argument(
"--force",
"--has-recent-backup", action="store_true", help="Check if a recent backup exists"
)
parser.add_argument(
"-f", "--force",
action="store_true",
default=False,
help="Force running a backup regardless of existing backups",
)
parser.add_argument(
"-c",
"--config-file",
@ -62,46 +146,14 @@ This is free software, and you are welcome to redistribute it under certain cond
required=False,
help="Path to alternative configuration file",
)
parser.add_argument(
"--repo-name",
dest="repo_name",
type=str,
default="default",
required=False,
help="Name of the repository to work with. Defaults to 'default'"
help="Name of the repository to work with. Defaults to 'default'",
)
parser.add_argument(
"-l", "--list", action="store_true", help="Show current snapshots"
)
parser.add_argument(
"--ls",
type=str,
default=None,
required=False,
help='Show content given snapshot. Use "latest" for most recent snapshot.',
)
parser.add_argument(
"-f",
"--find",
type=str,
default=None,
required=False,
help="Find full path of given file / directory",
)
parser.add_argument(
"-r",
"--restore",
type=str,
default=None,
required=False,
help="Restore to path given by --restore",
)
parser.add_argument(
"--restore-include",
type=str,
@ -109,7 +161,6 @@ This is free software, and you are welcome to redistribute it under certain cond
required=False,
help="Restore only paths within include path",
)
parser.add_argument(
"--restore-from-snapshot",
type=str,
@ -117,14 +168,6 @@ This is free software, and you are welcome to redistribute it under certain cond
required=False,
help="Choose which snapshot to restore from. Defaults to latest",
)
parser.add_argument(
"--forget", type=str, default=None, required=False, help="Forget snapshot"
)
parser.add_argument(
"--raw", type=str, default=None, required=False, help="Raw commands"
)
parser.add_argument(
"-v", "--verbose", action="store_true", help="Show verbose output"
)
@ -132,13 +175,11 @@ This is free software, and you are welcome to redistribute it under certain cond
parser.add_argument(
"-V", "--version", action="store_true", help="Show program version"
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Run operations in test mode (no actual modifications",
)
parser.add_argument(
"--create-scheduled-task",
type=str,
@ -146,27 +187,12 @@ This is free software, and you are welcome to redistribute it under certain cond
required=False,
help="Create task that runs every n minutes on Windows",
)
parser.add_argument("--license", action="store_true", help="Show license")
parser.add_argument(
"--auto-upgrade", action="store_true", help="Auto upgrade NPBackup"
)
parser.add_argument(
"--upgrade-conf",
action="store_true",
help="Add new configuration elements after upgrade",
)
args = parser.parse_args()
version_string = "{} v{}{}{}-{} {} - {}".format(
intname,
__version__,
"-PRIV" if IS_PRIV_BUILD else "",
"-P{}".format(sys.version_info[1]),
python_arch(),
__build__,
__copyright__,
)
if args.version:
print(version_string)
sys.exit(0)
@ -180,8 +206,7 @@ This is free software, and you are welcome to redistribute it under certain cond
print(LICENSE_TEXT)
sys.exit(0)
if args.debug or os.environ.get("_DEBUG", "False").capitalize() == "True":
_DEBUG = True
if args.debug or _DEBUG:
logger.setLevel(ofunctions.logger_utils.logging.DEBUG)
if args.verbose:
@ -189,15 +214,136 @@ This is free software, and you are welcome to redistribute it under certain cond
if args.config_file:
if not os.path.isfile(args.config_file):
logger.critical("Given file {} cannot be read.".format(args.config_file))
logger.critical(f"Config file {args.config_file} cannot be read.")
sys.exit(70)
CONFIG_FILE = args.config_file
else:
config_file = Path(f"{CURRENT_DIR}/npbackup.conf")
if config_file.exists:
CONFIG_FILE = config_file
else:
logger.critical("Cannot run without configuration file.")
sys.exit(70)
full_config = npbackup.configuration.load_config(CONFIG_FILE)
if full_config:
repo_config, _ = npbackup.configuration.get_repo_config(full_config, args.repo_name)
else:
logger.critical("Cannot obtain repo config")
sys.exit(71)
if not repo_config:
message = _t("config_gui.no_config_available")
logger.critical(message)
sys.exit(72)
# Prepare program run
cli_args = {
"repo_config": repo_config,
"verbose": args.verbose,
"dry_run": args.dry_run,
"debug": args.debug,
"operation": None,
"op_args": {}
}
if args.backup:
cli_args["operation"] = "backup"
cli_args["op_args"] = {
"force": args.force
}
elif args.restore:
cli_args["operation"] = "restore"
cli_args["op_args"] = {
"snapshot": args.snapshot,
"target": args.restore,
"restore_include": args.restore_include
}
elif args.list:
cli_args["operation"] = "list"
elif args.ls:
cli_args["operation"] = "ls"
cli_args["op_args"] = {
"snapshot": args.snapshot
}
elif args.find:
cli_args["operation"] = "find"
cli_args["op_args"] = {
"snapshot": args.snapshot,
"path": args.find
}
elif args.forget:
cli_args["operation"] = "forget"
if args.forget == "policy":
cli_args["op_args"] = {
"use_policy": True
}
else:
cli_args["op_args"] = {
"snapshots": args.forget
}
elif args.quick_check:
cli_args["operation"] = "check"
elif args.full_check:
cli_args["operation"] = "check"
cli_args["op_args"] = {
"read_data": True
}
elif args.prune:
cli_args["operation"] = "prune"
elif args.prune_max:
cli_args["operation"] = "prune"
cli_args["op_args"] = {
"max": True
}
elif args.unlock:
cli_args["operation"] = "unlock"
elif args.repair_index:
cli_args["operation"] = "repair"
cli_args["op_args"] = {
"subject": "index"
}
elif args.repair_snapshots:
cli_args["operation"] = "repair"
cli_args["op_args"] = {
"subject": "snapshots"
}
elif args.raw:
cli_args["operation"] = "raw"
cli_args["op_args"] = {
"command": args.raw
}
locking_operations = ["backup", "repair", "forget", "prune", "raw", "unlock"]
# Program entry
entrypoint()
if cli_args["operation"] in locking_operations:
try:
with pidfile.PIDFile(PID_FILE):
entrypoint(**cli_args)
except pidfile.AlreadyRunningError:
logger.critical("Backup process already running. Will not continue.")
# EXIT_CODE 21 = current backup process already running
sys.exit(21)
else:
entrypoint(**cli_args)
def main():
# Make sure we log execution time and error state at the end of the program
atexit.register(
execution_logs,
datetime.utcnow(),
)
# kill_childs normally would not be necessary, but let's just be foolproof here (kills restic subprocess in all cases)
atexit.register(
kill_childs,
os.getpid(),
)
try:
cli_interface()
sys.exit(logger.get_worst_logger_level())
except KeyboardInterrupt as exc:
logger.error("Program interrupted by keyboard. {}".format(exc))
logger.info("Trace:", exc_info=True)

View file

@ -1,377 +0,0 @@
#! /usr/bin/env python
# -*- coding: utf-8 -*-
#
# This file is part of npbackup
__intname__ = "npbackup"
__author__ = "Orsiris de Jong"
__site__ = "https://www.netperfect.fr/npbackup"
__description__ = "NetPerfect Backup Client"
__copyright__ = "Copyright (C) 2022-2023 NetInvent"
__license__ = "GPL-3.0-only"
__build__ = "2023083101"
__version__ = "2.3.0-dev"
import os
import sys
import atexit
import dateutil.parser
from datetime import datetime
import tempfile
import pidfile
import ofunctions.logger_utils
from ofunctions.platform import python_arch
from ofunctions.process import kill_childs
from npbackup.customization import (
LICENSE_TEXT,
LICENSE_FILE,
)
from npbackup import configuration
from npbackup.core.runner import NPBackupRunner
from npbackup.core.i18n_helper import _t
from npbackup.path_helper import CURRENT_DIR, CURRENT_EXECUTABLE
from npbackup.core.nuitka_helper import IS_COMPILED
from npbackup.upgrade_client.upgrader import need_upgrade
from npbackup.core.upgrade_runner import run_upgrade
if os.name == "nt":
from npbackup.windows.task import create_scheduled_task
# Nuitka compat, see https://stackoverflow.com/a/74540217
try:
# pylint: disable=W0611 (unused-import)
from charset_normalizer import md__mypyc # noqa
except ImportError:
pass
_DEBUG = False
_VERBOSE = False
LOG_FILE = os.path.join(CURRENT_DIR, "{}.log".format(__intname__))
CONFIG_FILE = os.path.join(CURRENT_DIR, "{}.conf".format(__intname__))
PID_FILE = os.path.join(tempfile.gettempdir(), "{}.pid".format(__intname__))
logger = ofunctions.logger_utils.logger_get_logger(LOG_FILE)
def execution_logs(start_time: datetime) -> None:
"""
Try to know if logger.warning or worse has been called
logger._cache contains a dict of values like {10: boolean, 20: boolean, 30: boolean, 40: boolean, 50: boolean}
where
10 = debug, 20 = info, 30 = warning, 40 = error, 50 = critical
so "if 30 in logger._cache" checks if warning has been triggered
ATTENTION: logger._cache does only contain cache of current main, not modules, deprecated in favor of
ofunctions.logger_utils.ContextFilterWorstLevel
ATTENTION: For ofunctions.logger_utils.ContextFilterWorstLevel will only check current logger instance
So using logger = getLogger("anotherinstance") will create a separate instance from the one we can inspect
Makes sense ;)
"""
end_time = datetime.utcnow()
logger_worst_level = 0
for flt in logger.filters:
if isinstance(flt, ofunctions.logger_utils.ContextFilterWorstLevel):
logger_worst_level = flt.worst_level
log_level_reached = "success"
try:
if logger_worst_level >= 40:
log_level_reached = "errors"
elif logger_worst_level >= 30:
log_level_reached = "warnings"
except AttributeError as exc:
logger.error("Cannot get worst log level reached: {}".format(exc))
logger.info(
"ExecTime = {}, finished, state is: {}.".format(
end_time - start_time, log_level_reached
)
)
# using sys.exit(code) in a atexit function will swallow the exitcode and render 0
def interface():
version_string = "{} v{}{}{}-{} {} - {}".format(
__intname__,
__version__,
"-PRIV" if configuration.IS_PRIV_BUILD else "",
"-P{}".format(sys.version_info[1]),
python_arch(),
__build__,
__copyright__,
)
if args.version:
print(version_string)
sys.exit(0)
logger.info(version_string)
if args.license:
try:
with open(LICENSE_FILE, "r", encoding="utf-8") as file_handle:
print(file_handle.read())
except OSError:
print(LICENSE_TEXT)
sys.exit(0)
if args.debug or os.environ.get("_DEBUG", "False").capitalize() == "True":
_DEBUG = True
logger.setLevel(ofunctions.logger_utils.logging.DEBUG)
if args.verbose:
_VERBOSE = True
# Make sure we log execution time and error state at the end of the program
if args.backup or args.restore or args.find or args.list or args.check:
atexit.register(
execution_logs,
datetime.utcnow(),
)
if args.config_file:
if not os.path.isfile(args.config_file):
logger.critical("Given file {} cannot be read.".format(args.config_file))
CONFIG_FILE = args.config_file
# Program entry
if Globvars.GUI and (args.config_gui or args.operations_gui):
try:
config = configuration.load_config(CONFIG_FILE)
if not config:
logger.error("Cannot load config file")
sys.exit(24)
except FileNotFoundError:
logger.warning(
'No configuration file found. Please use --config-file "path" to specify one or put a config file into current directory. Will create fresh config file in current directory.'
)
config = configuration.empty_config_dict
if args.config_gui:
config = config_gui(config, CONFIG_FILE)
if args.operations_gui:
config = operations_gui(config, CONFIG_FILE)
sys.exit(0)
if args.create_scheduled_task:
try:
result = create_scheduled_task(
executable_path=CURRENT_EXECUTABLE,
interval_minutes=int(args.create_scheduled_task),
)
if result:
sys.exit(0)
else:
sys.exit(22)
except ValueError:
sys.exit(23)
try:
config = configuration.load_config(CONFIG_FILE)
repo_config = configuration.get_repo_config(config_dict, args.repo_name)
except FileNotFoundError:
config = None
if not config:
message = _t("config_gui.no_config_available")
logger.error(message)
if config_dict is None and Globvars.GUI:
config_dict = configuration.empty_config_dict
# If no arguments are passed, assume we are launching the GUI
if len(sys.argv) == 1:
try:
result = sg.Popup(
"{}\n\n{}".format(message, _t("config_gui.create_new_config")),
custom_text=(_t("generic._yes"), _t("generic._no")),
keep_on_top=True,
)
if result == _t("generic._yes"):
config_dict = config_gui(config_dict, CONFIG_FILE)
sg.Popup(_t("config_gui.saved_initial_config"))
else:
logger.error("No configuration created via GUI")
sys.exit(7)
except _tkinter.TclError as exc:
logger.info(
'Tkinter error: "{}". Is this a headless server ?'.format(exc)
)
parser.print_help(sys.stderr)
sys.exit(1)
sys.exit(7)
elif not config_dict:
if len(sys.argv) == 1 and Globvars.GUI:
sg.Popup(_t("config_gui.bogus_config_file", config_file=CONFIG_FILE))
sys.exit(7)
if args.upgrade_conf:
# Whatever we need to add here for future releases
# Eg:
logger.info("Upgrading configuration file to version %s", __version__)
try:
config_dict["identity"]
except KeyError:
# Create new section identity, as per upgrade 2.2.0rc2
config_dict["identity"] = {"machine_id": "${HOSTNAME}"}
configuration.save_config(CONFIG_FILE, config_dict)
sys.exit(0)
# Try to perform an auto upgrade if needed
try:
auto_upgrade = config_dict["options"]["auto_upgrade"]
except KeyError:
auto_upgrade = True
try:
auto_upgrade_interval = config_dict["options"]["interval"]
except KeyError:
auto_upgrade_interval = 10
if (auto_upgrade and need_upgrade(auto_upgrade_interval)) or args.auto_upgrade:
if args.auto_upgrade:
logger.info("Running user initiated auto upgrade")
else:
logger.info("Running program initiated auto upgrade")
result = run_upgrade(config_dict)
if result:
sys.exit(0)
elif args.auto_upgrade:
sys.exit(23)
dry_run = False
if args.dry_run:
dry_run = True
npbackup_runner = NPBackupRunner(config_dict=config_dict)
npbackup_runner.dry_run = dry_run
npbackup_runner.verbose = _VERBOSE
if not npbackup_runner.backend_version:
logger.critical("No backend available. Cannot continue")
sys.exit(25)
logger.info("Backend: {}".format(npbackup_runner.backend_version))
if args.check:
if npbackup_runner.check_recent_backups():
sys.exit(0)
else:
sys.exit(2)
if args.list:
result = npbackup_runner.list()
if result:
for snapshot in result:
try:
tags = snapshot["tags"]
except KeyError:
tags = None
logger.info(
"ID: {} Hostname: {}, Username: {}, Tags: {}, source: {}, time: {}".format(
snapshot["short_id"],
snapshot["hostname"],
snapshot["username"],
tags,
snapshot["paths"],
dateutil.parser.parse(snapshot["time"]),
)
)
sys.exit(0)
else:
sys.exit(2)
if args.ls:
result = npbackup_runner.ls(snapshot=args.ls)
if result:
logger.info("Snapshot content:")
for entry in result:
logger.info(entry)
sys.exit(0)
else:
logger.error("Snapshot could not be listed.")
sys.exit(2)
if args.find:
result = npbackup_runner.find(path=args.find)
if result:
sys.exit(0)
else:
sys.exit(2)
try:
with pidfile.PIDFile(PID_FILE):
if args.backup:
result = npbackup_runner.backup(force=args.force)
if result:
logger.info("Backup finished.")
sys.exit(0)
else:
logger.error("Backup operation failed.")
sys.exit(2)
if args.restore:
result = npbackup_runner.restore(
snapshot=args.restore_from_snapshot,
target=args.restore,
restore_includes=args.restore_include,
)
if result:
sys.exit(0)
else:
sys.exit(2)
if args.forget:
result = npbackup_runner.forget(snapshot=args.forget)
if result:
sys.exit(0)
else:
sys.exit(2)
if args.raw:
result = npbackup_runner.raw(command=args.raw)
if result:
sys.exit(0)
else:
sys.exit(2)
except pidfile.AlreadyRunningError:
logger.warning("Backup process already running. Will not continue.")
# EXIT_CODE 21 = current backup process already running
sys.exit(21)
def main():
try:
# kill_childs normally would not be necessary, but let's just be foolproof here (kills restic subprocess in all cases)
atexit.register(
kill_childs,
os.getpid(),
)
interface()
except KeyboardInterrupt as exc:
logger.error("Program interrupted by keyboard. {}".format(exc))
logger.info("Trace:", exc_info=True)
# EXIT_CODE 200 = keyboard interrupt
sys.exit(200)
except Exception as exc:
logger.error("Program interrupted by error. {}".format(exc))
logger.info("Trace:", exc_info=True)
# EXIT_CODE 201 = Non handled exception
sys.exit(201)
if __name__ == "__main__":
main()
def entrypoint():
pass

View file

@ -0,0 +1,106 @@
#! /usr/bin/env python
# -*- coding: utf-8 -*-
#
# This file is part of npbackup
__intname__ = "npbackup.runner_interface"
__author__ = "Orsiris de Jong"
__site__ = "https://www.netperfect.fr/npbackup"
__description__ = "NetPerfect Backup Client"
__copyright__ = "Copyright (C) 2022-2023 NetInvent"
__license__ = "GPL-3.0-only"
__build__ = "2023122801"
import os
from logging import getLogger
from npbackup.core.runner import NPBackupRunner
logger = getLogger()
def entrypoint(*args, **kwargs):
npbackup_runner = NPBackupRunner()
npbackup_runner.repo_config = kwargs.pop("repo_config")
npbackup_runner.dry_run = kwargs.pop("dry_run")
npbackup_runner.verbose = kwargs.pop("verbose")
result = npbackup_runner.__getattribute__(kwargs.pop("operation"))(kwargs.pop("op_args"), __no_threads=True)
def auto_upgrade(full_config: dict):
pass
"""
def interface():
# Program entry
if args.create_scheduled_task:
try:
result = create_scheduled_task(
executable_path=CURRENT_EXECUTABLE,
interval_minutes=int(args.create_scheduled_task),
)
if result:
sys.exit(0)
else:
sys.exit(22)
except ValueError:
sys.exit(23)
if args.upgrade_conf:
# Whatever we need to add here for future releases
# Eg:
logger.info("Upgrading configuration file to version %s", __version__)
try:
config_dict["identity"]
except KeyError:
# Create new section identity, as per upgrade 2.2.0rc2
config_dict["identity"] = {"machine_id": "${HOSTNAME}"}
configuration.save_config(CONFIG_FILE, config_dict)
sys.exit(0)
# Try to perform an auto upgrade if needed
try:
auto_upgrade = config_dict["options"]["auto_upgrade"]
except KeyError:
auto_upgrade = True
try:
auto_upgrade_interval = config_dict["options"]["interval"]
except KeyError:
auto_upgrade_interval = 10
if (auto_upgrade and need_upgrade(auto_upgrade_interval)) or args.auto_upgrade:
if args.auto_upgrade:
logger.info("Running user initiated auto upgrade")
else:
logger.info("Running program initiated auto upgrade")
result = run_upgrade(full_config)
if result:
sys.exit(0)
elif args.auto_upgrade:
sys.exit(23)
if args.list:
result = npbackup_runner.list()
if result:
for snapshot in result:
try:
tags = snapshot["tags"]
except KeyError:
tags = None
logger.info(
"ID: {} Hostname: {}, Username: {}, Tags: {}, source: {}, time: {}".format(
snapshot["short_id"],
snapshot["hostname"],
snapshot["username"],
tags,
snapshot["paths"],
dateutil.parser.parse(snapshot["time"]),
)
)
sys.exit(0)
else:
sys.exit(2)
"""