Reformat files with black

This commit is contained in:
Orsiris de Jong 2023-02-01 01:51:15 +01:00
parent 61fdecc837
commit 7829008886
8 changed files with 165 additions and 116 deletions

View file

@ -17,8 +17,8 @@ import argparse
import atexit
from command_runner import command_runner
ARCHES = ['x86', 'x64']
AUDIENCES = ['public', 'private']
ARCHES = ["x86", "x64"]
AUDIENCES = ["public", "private"]
# Insert parent dir as path se we get to use npbackup as package
sys.path.insert(0, os.path.normpath(os.path.join(os.path.dirname(__file__), "..")))
@ -39,6 +39,7 @@ import glob
del sys.path[0]
def check_private_build(audience):
private = False
try:
@ -64,31 +65,42 @@ def check_private_build(audience):
def move_audience_files(audience):
for dir in [os.path.join(BASEDIR, os.pardir, 'examples'), BASEDIR]:
if audience == 'private':
possible_non_used_path = '_NOUSE_private_'
guessed_files = glob.glob(os.path.join(dir, '{}*'.format(possible_non_used_path)))
for dir in [os.path.join(BASEDIR, os.pardir, "examples"), BASEDIR]:
if audience == "private":
possible_non_used_path = "_NOUSE_private_"
guessed_files = glob.glob(
os.path.join(dir, "{}*".format(possible_non_used_path))
)
for file in guessed_files:
os.rename(file, file.replace(possible_non_used_path, "_private_"))
elif audience == 'public':
possible_non_used_path = '_private_'
guessed_files = glob.glob(os.path.join(dir, '{}*'.format(possible_non_used_path)))
elif audience == "public":
possible_non_used_path = "_private_"
guessed_files = glob.glob(
os.path.join(dir, "{}*".format(possible_non_used_path))
)
for file in guessed_files:
os.rename(file, file.replace(possible_non_used_path, "_NOUSE{}".format(possible_non_used_path)))
os.rename(
file,
file.replace(
possible_non_used_path,
"_NOUSE{}".format(possible_non_used_path),
),
)
def get_conf_dist_file(audience):
if audience == 'private':
if audience == "private":
file = "_private_npbackup.conf.dist"
else:
file = "npbackup.conf.dist"
dist_conf_file_path = os.path.join(BASEDIR, os.pardir, "examples", file)
return dist_conf_file_path
return dist_conf_file_path
def have_nuitka_commercial():
try:
import nuitka.plugins.commercial
print("Running with nuitka commercial")
return True
except ImportError:
@ -106,7 +118,7 @@ def compile(arch, audience):
restic_executable = "restic"
platform = "linux"
PACKAGE_DIR = 'npbackup'
PACKAGE_DIR = "npbackup"
check_private_build(audience)
BUILDS_DIR = os.path.abspath(os.path.join(BASEDIR, os.pardir, "BUILDS"))
@ -124,7 +136,10 @@ def compile(arch, audience):
FILE_VERSION = _npbackup_version + ".0"
file_description = "{} P{}-{}{}".format(
FILE_DESCRIPTION, sys.version_info[1], arch, "priv" if audience == 'private' else ''
FILE_DESCRIPTION,
sys.version_info[1],
arch,
"priv" if audience == "private" else "",
)
restic_source_file = get_restic_internal_binary(arch)
@ -139,20 +154,22 @@ def compile(arch, audience):
license_dest_file = os.path.join(PACKAGE_DIR, os.path.basename(LICENSE_FILE))
icon_file = os.path.join(PACKAGE_DIR, 'npbackup_icon.ico')
icon_file = os.path.join(PACKAGE_DIR, "npbackup_icon.ico")
# Installer specific files, no need for a npbackup package directory here
program_executable_path = os.path.join(OUTPUT_DIR, program_executable)
dist_conf_file_source = get_conf_dist_file(audience)
dist_conf_file_dest = os.path.basename(dist_conf_file_source.replace('_private_', ''))
dist_conf_file_dest = os.path.basename(
dist_conf_file_source.replace("_private_", "")
)
excludes_dir = "excludes"
excludes_dir_source = os.path.join(BASEDIR, os.pardir, excludes_dir)
excludes_dir_dest = excludes_dir
NUITKA_OPTIONS = '--enable-plugin=data-hiding' if have_nuitka_commercial() else ''
NUITKA_OPTIONS = "--enable-plugin=data-hiding" if have_nuitka_commercial() else ""
EXE_OPTIONS = '--company-name="{}" --product-name="{}" --file-version="{}" --product-version="{}" --copyright="{}" --file-description="{}" --trademarks="{}"'.format(
COMPANY_NAME,
@ -199,7 +216,7 @@ def compile(arch, audience):
)
CMD = '{} -m nuitka --python-flag=no_docstrings --python-flag=-O {} {} --onefile --plugin-enable=tk-inter --include-data-file="{}"="{}" --include-data-file="{}"="{}" --include-data-dir="{}"="{}" --windows-icon-from-ico="{}" --windows-uac-admin --output-dir="{}" bin/NPBackupInstaller.py'.format(
PYTHON_EXECUTABLE,
NUITKA_OPTIONS,
NUITKA_OPTIONS,
EXE_OPTIONS,
program_executable_path,
program_executable,
@ -217,7 +234,7 @@ def compile(arch, audience):
errors = True
else:
## Create version file
with open(os.path.join(BUILDS_DIR, audience, 'VERSION'), 'w') as fh:
with open(os.path.join(BUILDS_DIR, audience, "VERSION"), "w") as fh:
fh.write(npbackup_version)
print("COMPILE ERRORS", errors)
@ -233,32 +250,32 @@ class ArchAction(argparse.Action):
class AudienceAction(argparse.Action):
def __call__(self, parser, namespace, values, option_string=None):
if values not in AUDIENCES + ['all']:
if values not in AUDIENCES + ["all"]:
print("Got value:", values)
raise argparse.ArgumentError(self, "Not a valid audience")
setattr(namespace, self.dest, values)
setattr(namespace, self.dest, values)
if __name__ == "__main__":
parser = argparse.ArgumentParser(
prog="npbackup compile.py", description="Compiler script for NPBackup"
prog="npbackup compile.py", description="Compiler script for NPBackup"
)
parser.add_argument(
"--arch",
type=str,
dest="arch",
default=None,
required=True,
action=ArchAction,
help="Target arch, x64 or x86",
"--arch",
type=str,
dest="arch",
default=None,
required=True,
action=ArchAction,
help="Target arch, x64 or x86",
)
parser.add_argument(
"--audience",
type=str,
dest="audience",
default='private',
default="private",
required=False,
help="Target audience, private or public",
)
@ -266,13 +283,16 @@ if __name__ == "__main__":
args = parser.parse_args()
# Make sure we get out dev environment back when compilation ends / fails
atexit.register(move_audience_files, 'private',)
atexit.register(
move_audience_files,
"private",
)
try:
if args.audience.lower() == 'all':
if args.audience.lower() == "all":
audiences = AUDIENCES
else:
audiences = [args.audience]
for audience in audiences:
move_audience_files(audience)
compile(arch=args.arch, audience=audience)

View file

@ -215,7 +215,9 @@ This is free software, and you are welcome to redistribute it under certain cond
)
parser.add_argument("--license", action="store_true", help=("Show license"))
parser.add_argument("--auto-upgrade", action="store_true", help=("Auto upgrade NPBackup"))
parser.add_argument(
"--auto-upgrade", action="store_true", help=("Auto upgrade NPBackup")
)
args = parser.parse_args()
if args.version:
@ -299,19 +301,23 @@ This is free software, and you are welcome to redistribute it under certain cond
# Try to perform an auto upgrade if needed
try:
auto_upgrade = config_dict['options']['auto_upgrade']
auto_upgrade = config_dict["options"]["auto_upgrade"]
except KeyError:
auto_upgrade = True
try:
auto_upgrade_interval = config_dict['options']['auto_upgrade_interval']
auto_upgrade_interval = config_dict["options"]["auto_upgrade_interval"]
except KeyError:
auto_upgrade_interval = 10
if (auto_upgrade and need_upgrade(auto_upgrade_interval)) or args.auto_upgrade:
try:
auto_upgrade_upgrade_url = config_dict['options']['auto_upgrade_server_url']
auto_upgrade_username = config_dict['options']['auto_upgrade_server_username']
auto_upgrade_password = config_dict['options']['auto_upgrade_server_password']
auto_upgrade_upgrade_url = config_dict["options"]["auto_upgrade_server_url"]
auto_upgrade_username = config_dict["options"][
"auto_upgrade_server_username"
]
auto_upgrade_password = config_dict["options"][
"auto_upgrade_server_password"
]
except KeyError as exc:
logger.error("Missing auto upgrade info: %s", exc)
else:
@ -319,7 +325,11 @@ This is free software, and you are welcome to redistribute it under certain cond
logger.info("Running user initiated auto upgrade")
else:
logger.info("Running program initiated auto upgrade")
result = auto_upgrader(upgrade_url=auto_upgrade_upgrade_url, username=auto_upgrade_username, password=auto_upgrade_password)
result = auto_upgrader(
upgrade_url=auto_upgrade_upgrade_url,
username=auto_upgrade_username,
password=auto_upgrade_password,
)
if args.auto_upgrade:
if result:
sys.exit(0)

View file

@ -41,7 +41,7 @@ def need_upgrade(upgrade_interval: int) -> bool:
Basic counter which allows an upgrade only every X times this is called so failed operations won't end in an endless upgrade loop
"""
# file counter, local, home, or temp if not available
return True # WIP
return True # WIP
def auto_upgrader(upgrade_url: str, username: str, password: str) -> bool:
@ -53,7 +53,9 @@ def auto_upgrader(upgrade_url: str, username: str, password: str) -> bool:
"""
is_nuitka = "__compiled__" in globals()
if not is_nuitka:
logger.info("Auto upgrade will only upgrade compiled verions. Please use 'pip install --upgrade npbackup' instead")
logger.info(
"Auto upgrade will only upgrade compiled verions. Please use 'pip install --upgrade npbackup' instead"
)
return True
logger.info("Upgrade server is %s", upgrade_url)
requestor = Requestor(upgrade_url, username, password)
@ -63,37 +65,45 @@ def auto_upgrader(upgrade_url: str, username: str, password: str) -> bool:
logger.error("Cannot reach upgrade server")
return False
try:
if not server_ident['app'] == 'npbackup.upgrader':
if not server_ident["app"] == "npbackup.upgrader":
logger.error("Current server is not a recognized NPBackup update server")
return False
except (KeyError, TypeError):
logger.error("Current server is not a NPBackup update server")
return False
result = requestor.data_model('current_version')
result = requestor.data_model("current_version")
try:
online_version = result['version']
online_version = result["version"]
except KeyError:
logger.error("Upgrade server failed to provide proper version info")
return False
else:
if online_version:
if version.parse(online_version) > version.parse(npbackup_version):
logger.info("Current version %s is older than online version %s", npbackup_version, online_version)
logger.info(
"Current version %s is older than online version %s",
npbackup_version,
online_version,
)
else:
logger.info("Current version %s is up-to-date (online version %s)", npbackup_version, online_version)
logger.info(
"Current version %s is up-to-date (online version %s)",
npbackup_version,
online_version,
)
return True
platform_and_arch = '{}/{}'.format(get_os(), os_arch()).lower()
platform_and_arch = "{}/{}".format(get_os(), os_arch()).lower()
file_info = requestor.data_model('upgrades', id_record=platform_and_arch)
file_info = requestor.data_model("upgrades", id_record=platform_and_arch)
try:
sha256sum = file_info['sha256sum']
sha256sum = file_info["sha256sum"]
except (KeyError, TypeError):
logger.error("Cannot get file description")
return False
file_data = requestor.requestor('upgrades/' + platform_and_arch + '/data', raw=True)
file_data = requestor.requestor("upgrades/" + platform_and_arch + "/data", raw=True)
if not file_data:
logger.error("Cannot get update file")
return False
@ -101,19 +111,24 @@ def auto_upgrader(upgrade_url: str, username: str, password: str) -> bool:
if sha256sum_data(file_data) != sha256sum:
logger.error("Invalid checksum, won't upgrade")
return False
executable = os.path.join(tempfile.gettempdir(), file_info['filename'])
with open(executable, 'wb') as fh:
executable = os.path.join(tempfile.gettempdir(), file_info["filename"])
with open(executable, "wb") as fh:
fh.write(file_data)
logger.info("Upgrade file written to %s", executable)
log_file = os.path.join(tempfile.gettempdir(), file_info['filename'] + '.log')
log_file = os.path.join(tempfile.gettempdir(), file_info["filename"] + ".log")
# Actual upgrade process
new_executable = os.path.join(CURRENT_DIR, os.path.basename(CURRENT_EXECUTABLE))
cmd = "del \"{}\"; move \"{}\" \"{}\"; del \"{}\" > {}".format(CURRENT_EXECUTABLE, executable, new_executable, executable, log_file)
logger.info("Launching upgrade. Current process will quit. Upgrade starts in %s seconds. Upgrade is done by OS logged in %s", UPGRADE_DEFER_TIME, log_file)
cmd = 'del "{}"; move "{}" "{}"; del "{}" > {}'.format(
CURRENT_EXECUTABLE, executable, new_executable, executable, log_file
)
logger.info(
"Launching upgrade. Current process will quit. Upgrade starts in %s seconds. Upgrade is done by OS logged in %s",
UPGRADE_DEFER_TIME,
log_file,
)
logger.debug(cmd)
deferred_command(cmd, defer_time=UPGRADE_DEFER_TIME)
return True

View file

@ -11,7 +11,7 @@ __build__ = "202303101"
__version__ = "1.0.0"
DEVEL=True
DEVEL = True
import sys
import os
@ -21,42 +21,44 @@ from ofunctions.logger_utils import logger_get_logger
config_dict = configuration.load_config()
try:
listen = config_dict['http_server']['listen']
listen = config_dict["http_server"]["listen"]
except KeyError:
listen = None
try:
port = config_dict['http_server']['port']
port = config_dict["http_server"]["port"]
except KeyError:
listen = None
if DEVEL:
import uvicorn as server
server_args = {
'workers': 1,
'log_level': "debug",
'reload': True,
'host': listen if listen else '0.0.0.0',
'port': port if port else 8080
"workers": 1,
"log_level": "debug",
"reload": True,
"host": listen if listen else "0.0.0.0",
"port": port if port else 8080,
}
else:
import gunicorn as server
server_args = {
'workers': 8,
'reload': False,
'host': listen if listen else '0.0.0.0',
'port': port if port else 8080
"workers": 8,
"reload": False,
"host": listen if listen else "0.0.0.0",
"port": port if port else 8080,
}
logger = logger_get_logger()
if __name__ == "__main__":
try:
server.run("upgrade_server.api:app", **server_args)
except KeyboardInterrupt as exc:
logger.error("Program interrupted by keyoard: {}".format(exc))
sys.exit(200)
except Exception as exc:
logger.error("Program interrupted by error: {}".format(exc))
logger.critical('Trace:', exc_info=True)
sys.exit(201)
logger.critical("Trace:", exc_info=True)
sys.exit(201)

View file

@ -25,19 +25,21 @@ config_dict = configuration.load_config()
logger = logging.getLogger()
#### Create app
#app = FastAPI() # standard FastAPI initialization
app = FastAPIOffline() # Offline FastAPI initialization, allows /docs to not use online CDN
# app = FastAPI() # standard FastAPI initialization
app = (
FastAPIOffline()
) # Offline FastAPI initialization, allows /docs to not use online CDN
security = HTTPBasic()
def get_current_username(credentials: HTTPBasicCredentials = Depends(security)):
current_username_bytes = credentials.username.encode("utf8")
correct_username_bytes = config_dict['http_server']['username'].encode('utf-8')
correct_username_bytes = config_dict["http_server"]["username"].encode("utf-8")
is_correct_username = secrets.compare_digest(
current_username_bytes, correct_username_bytes
)
current_password_bytes = credentials.password.encode("utf8")
correct_password_bytes = config_dict['http_server']['password'].encode('utf-8')
correct_password_bytes = config_dict["http_server"]["password"].encode("utf-8")
is_correct_password = secrets.compare_digest(
current_password_bytes, correct_password_bytes
)
@ -51,26 +53,21 @@ def get_current_username(credentials: HTTPBasicCredentials = Depends(security)):
@app.get("/")
async def api_root(auth = Depends(get_current_username)):
async def api_root(auth=Depends(get_current_username)):
if crud.is_enabled():
return {
"app": __appname__,
}
else:
return {
"app": "Currently under maintenance"
}
else:
return {"app": "Currently under maintenance"}
@app.get("/current_version", response_model=CurrentVersion, status_code=200)
async def current_version(auth = Depends(get_current_username)):
async def current_version(auth=Depends(get_current_username)):
try:
result = crud.get_current_version()
if not result:
raise HTTPException(
status_code=404,
detail="Not found"
)
raise HTTPException(status_code=404, detail="Not found")
return result
except HTTPException:
raise
@ -83,16 +80,13 @@ async def current_version(auth = Depends(get_current_username)):
@app.get("/upgrades/{platform}/{arch}", response_model=FileSend, status_code=200)
async def upgrades(platform: Platform, arch: Arch, auth = Depends(get_current_username)):
async def upgrades(platform: Platform, arch: Arch, auth=Depends(get_current_username)):
file = FileGet(platform=platform, arch=arch)
try:
result = crud.get_file(file)
if not result:
raise HTTPException(
status_code=404,
detail="Not found"
)
raise HTTPException(status_code=404, detail="Not found")
return result
except HTTPException:
raise
@ -103,20 +97,16 @@ async def upgrades(platform: Platform, arch: Arch, auth = Depends(get_current_us
detail="Cannot get file: {}".format(exc),
)
@app.get("/upgrades/{platform}/{arch}/data", status_code=200)
async def download(platform: Platform, arch: Arch, auth = Depends(get_current_username)):
async def download(platform: Platform, arch: Arch, auth=Depends(get_current_username)):
file = FileGet(platform=platform, arch=arch)
try:
result = crud.get_file(file, content=True)
if not result:
raise HTTPException(
status_code=404,
detail="Not found"
)
headers = {
"Content-Disposition": 'attachment; filename="npbackup"'
}
return Response(content=result, media_type="application/dat", headers=headers)
raise HTTPException(status_code=404, detail="Not found")
headers = {"Content-Disposition": 'attachment; filename="npbackup"'}
return Response(content=result, media_type="application/dat", headers=headers)
except HTTPException:
raise
except Exception as exc:
@ -125,4 +115,3 @@ async def download(platform: Platform, arch: Arch, auth = Depends(get_current_us
status_code=400,
detail="Cannot get file: {}".format(exc),
)

View file

@ -23,6 +23,7 @@ config_dict = configuration.load_config()
logger = getLogger(__intname__)
def sha256sum_data(data):
# type: (bytes) -> str
"""
@ -39,31 +40,42 @@ def is_enabled() -> bool:
def get_current_version() -> Optional[CurrentVersion]:
try:
path = os.path.join(config_dict['upgrades']['data_root'], 'VERSION')
path = os.path.join(config_dict["upgrades"]["data_root"], "VERSION")
print(path)
if os.path.isfile(path):
with open(path, 'r') as fh:
ver =fh.readline()
return(CurrentVersion(version=ver))
with open(path, "r") as fh:
ver = fh.readline()
return CurrentVersion(version=ver)
except OSError:
logger.error("Cannot get current version")
except Exception:
logger.error("Version seems to be bogus in VERSION file")
def get_file(file: FileGet, content: bool = False) -> Optional[Union[FileSend, bytes]]:
possible_filename = 'npbackup{}'.format(
'.exe' if file.platform.value == 'windows' else ''
possible_filename = "npbackup{}".format(
".exe" if file.platform.value == "windows" else ""
)
path = os.path.join(
config_dict["upgrades"]["data_root"],
file.platform.value,
file.arch.value,
possible_filename,
)
path = os.path.join(config_dict['upgrades']['data_root'], file.platform.value, file.arch.value, possible_filename)
logger.info("Searching for %s", path)
if not os.path.isfile(path):
return None
with open(path, 'rb') as fh:
with open(path, "rb") as fh:
bytes = fh.read()
if content:
return bytes
length = len(bytes)
sha256 = sha256sum_data(bytes)
file_send = FileSend(arch=file.arch.value, platform=file.platform.value, sha256sum=sha256, filename=possible_filename, file_length=length)
return file_send
file_send = FileSend(
arch=file.arch.value,
platform=file.platform.value,
sha256sum=sha256,
filename=possible_filename,
file_length=length,
)
return file_send

View file

@ -28,11 +28,12 @@ class FileBase(BaseModel):
arch: Arch
platform: Platform
class FileGet(FileBase):
pass
class FileSend(FileBase):
sha256sum: constr(min_length=64, max_length=64)
filename: str
file_length: int

View file

@ -14,4 +14,4 @@ from pydantic import BaseModel
class CurrentVersion(BaseModel):
version: str
version: str