From 78290088860a74d0a65dfd20d190ccc0f55fa99a Mon Sep 17 00:00:00 2001 From: Orsiris de Jong Date: Wed, 1 Feb 2023 01:51:15 +0100 Subject: [PATCH] Reformat files with black --- bin/compile.py | 86 ++++++++++++------- npbackup/__main__.py | 24 ++++-- npbackup/upgrade_client/upgrader.py | 53 ++++++++---- upgrade_server/upgrade_server.py | 32 +++---- upgrade_server/upgrade_server/api.py | 47 ++++------ upgrade_server/upgrade_server/crud.py | 34 +++++--- upgrade_server/upgrade_server/models/files.py | 3 +- upgrade_server/upgrade_server/models/oper.py | 2 +- 8 files changed, 165 insertions(+), 116 deletions(-) diff --git a/bin/compile.py b/bin/compile.py index f50832e..34b50d9 100644 --- a/bin/compile.py +++ b/bin/compile.py @@ -17,8 +17,8 @@ import argparse import atexit from command_runner import command_runner -ARCHES = ['x86', 'x64'] -AUDIENCES = ['public', 'private'] +ARCHES = ["x86", "x64"] +AUDIENCES = ["public", "private"] # Insert parent dir as path se we get to use npbackup as package sys.path.insert(0, os.path.normpath(os.path.join(os.path.dirname(__file__), ".."))) @@ -39,6 +39,7 @@ import glob del sys.path[0] + def check_private_build(audience): private = False try: @@ -64,31 +65,42 @@ def check_private_build(audience): def move_audience_files(audience): - for dir in [os.path.join(BASEDIR, os.pardir, 'examples'), BASEDIR]: - if audience == 'private': - possible_non_used_path = '_NOUSE_private_' - guessed_files = glob.glob(os.path.join(dir, '{}*'.format(possible_non_used_path))) + for dir in [os.path.join(BASEDIR, os.pardir, "examples"), BASEDIR]: + if audience == "private": + possible_non_used_path = "_NOUSE_private_" + guessed_files = glob.glob( + os.path.join(dir, "{}*".format(possible_non_used_path)) + ) for file in guessed_files: os.rename(file, file.replace(possible_non_used_path, "_private_")) - elif audience == 'public': - possible_non_used_path = '_private_' - guessed_files = glob.glob(os.path.join(dir, '{}*'.format(possible_non_used_path))) + elif audience == "public": + possible_non_used_path = "_private_" + guessed_files = glob.glob( + os.path.join(dir, "{}*".format(possible_non_used_path)) + ) for file in guessed_files: - os.rename(file, file.replace(possible_non_used_path, "_NOUSE{}".format(possible_non_used_path))) + os.rename( + file, + file.replace( + possible_non_used_path, + "_NOUSE{}".format(possible_non_used_path), + ), + ) def get_conf_dist_file(audience): - if audience == 'private': + if audience == "private": file = "_private_npbackup.conf.dist" else: file = "npbackup.conf.dist" dist_conf_file_path = os.path.join(BASEDIR, os.pardir, "examples", file) - return dist_conf_file_path + return dist_conf_file_path def have_nuitka_commercial(): try: import nuitka.plugins.commercial + print("Running with nuitka commercial") return True except ImportError: @@ -106,7 +118,7 @@ def compile(arch, audience): restic_executable = "restic" platform = "linux" - PACKAGE_DIR = 'npbackup' + PACKAGE_DIR = "npbackup" check_private_build(audience) BUILDS_DIR = os.path.abspath(os.path.join(BASEDIR, os.pardir, "BUILDS")) @@ -124,7 +136,10 @@ def compile(arch, audience): FILE_VERSION = _npbackup_version + ".0" file_description = "{} P{}-{}{}".format( - FILE_DESCRIPTION, sys.version_info[1], arch, "priv" if audience == 'private' else '' + FILE_DESCRIPTION, + sys.version_info[1], + arch, + "priv" if audience == "private" else "", ) restic_source_file = get_restic_internal_binary(arch) @@ -139,20 +154,22 @@ def compile(arch, audience): license_dest_file = os.path.join(PACKAGE_DIR, os.path.basename(LICENSE_FILE)) - icon_file = os.path.join(PACKAGE_DIR, 'npbackup_icon.ico') + icon_file = os.path.join(PACKAGE_DIR, "npbackup_icon.ico") # Installer specific files, no need for a npbackup package directory here program_executable_path = os.path.join(OUTPUT_DIR, program_executable) dist_conf_file_source = get_conf_dist_file(audience) - dist_conf_file_dest = os.path.basename(dist_conf_file_source.replace('_private_', '')) + dist_conf_file_dest = os.path.basename( + dist_conf_file_source.replace("_private_", "") + ) excludes_dir = "excludes" excludes_dir_source = os.path.join(BASEDIR, os.pardir, excludes_dir) excludes_dir_dest = excludes_dir - NUITKA_OPTIONS = '--enable-plugin=data-hiding' if have_nuitka_commercial() else '' + NUITKA_OPTIONS = "--enable-plugin=data-hiding" if have_nuitka_commercial() else "" EXE_OPTIONS = '--company-name="{}" --product-name="{}" --file-version="{}" --product-version="{}" --copyright="{}" --file-description="{}" --trademarks="{}"'.format( COMPANY_NAME, @@ -199,7 +216,7 @@ def compile(arch, audience): ) CMD = '{} -m nuitka --python-flag=no_docstrings --python-flag=-O {} {} --onefile --plugin-enable=tk-inter --include-data-file="{}"="{}" --include-data-file="{}"="{}" --include-data-dir="{}"="{}" --windows-icon-from-ico="{}" --windows-uac-admin --output-dir="{}" bin/NPBackupInstaller.py'.format( PYTHON_EXECUTABLE, - NUITKA_OPTIONS, + NUITKA_OPTIONS, EXE_OPTIONS, program_executable_path, program_executable, @@ -217,7 +234,7 @@ def compile(arch, audience): errors = True else: ## Create version file - with open(os.path.join(BUILDS_DIR, audience, 'VERSION'), 'w') as fh: + with open(os.path.join(BUILDS_DIR, audience, "VERSION"), "w") as fh: fh.write(npbackup_version) print("COMPILE ERRORS", errors) @@ -233,32 +250,32 @@ class ArchAction(argparse.Action): class AudienceAction(argparse.Action): def __call__(self, parser, namespace, values, option_string=None): - if values not in AUDIENCES + ['all']: + if values not in AUDIENCES + ["all"]: print("Got value:", values) raise argparse.ArgumentError(self, "Not a valid audience") - setattr(namespace, self.dest, values) + setattr(namespace, self.dest, values) if __name__ == "__main__": parser = argparse.ArgumentParser( - prog="npbackup compile.py", description="Compiler script for NPBackup" + prog="npbackup compile.py", description="Compiler script for NPBackup" ) parser.add_argument( - "--arch", - type=str, - dest="arch", - default=None, - required=True, - action=ArchAction, - help="Target arch, x64 or x86", + "--arch", + type=str, + dest="arch", + default=None, + required=True, + action=ArchAction, + help="Target arch, x64 or x86", ) parser.add_argument( "--audience", type=str, dest="audience", - default='private', + default="private", required=False, help="Target audience, private or public", ) @@ -266,13 +283,16 @@ if __name__ == "__main__": args = parser.parse_args() # Make sure we get out dev environment back when compilation ends / fails - atexit.register(move_audience_files, 'private',) + atexit.register( + move_audience_files, + "private", + ) try: - if args.audience.lower() == 'all': + if args.audience.lower() == "all": audiences = AUDIENCES else: audiences = [args.audience] - + for audience in audiences: move_audience_files(audience) compile(arch=args.arch, audience=audience) diff --git a/npbackup/__main__.py b/npbackup/__main__.py index 7484800..64d1bd5 100644 --- a/npbackup/__main__.py +++ b/npbackup/__main__.py @@ -215,7 +215,9 @@ This is free software, and you are welcome to redistribute it under certain cond ) parser.add_argument("--license", action="store_true", help=("Show license")) - parser.add_argument("--auto-upgrade", action="store_true", help=("Auto upgrade NPBackup")) + parser.add_argument( + "--auto-upgrade", action="store_true", help=("Auto upgrade NPBackup") + ) args = parser.parse_args() if args.version: @@ -299,19 +301,23 @@ This is free software, and you are welcome to redistribute it under certain cond # Try to perform an auto upgrade if needed try: - auto_upgrade = config_dict['options']['auto_upgrade'] + auto_upgrade = config_dict["options"]["auto_upgrade"] except KeyError: auto_upgrade = True try: - auto_upgrade_interval = config_dict['options']['auto_upgrade_interval'] + auto_upgrade_interval = config_dict["options"]["auto_upgrade_interval"] except KeyError: auto_upgrade_interval = 10 if (auto_upgrade and need_upgrade(auto_upgrade_interval)) or args.auto_upgrade: try: - auto_upgrade_upgrade_url = config_dict['options']['auto_upgrade_server_url'] - auto_upgrade_username = config_dict['options']['auto_upgrade_server_username'] - auto_upgrade_password = config_dict['options']['auto_upgrade_server_password'] + auto_upgrade_upgrade_url = config_dict["options"]["auto_upgrade_server_url"] + auto_upgrade_username = config_dict["options"][ + "auto_upgrade_server_username" + ] + auto_upgrade_password = config_dict["options"][ + "auto_upgrade_server_password" + ] except KeyError as exc: logger.error("Missing auto upgrade info: %s", exc) else: @@ -319,7 +325,11 @@ This is free software, and you are welcome to redistribute it under certain cond logger.info("Running user initiated auto upgrade") else: logger.info("Running program initiated auto upgrade") - result = auto_upgrader(upgrade_url=auto_upgrade_upgrade_url, username=auto_upgrade_username, password=auto_upgrade_password) + result = auto_upgrader( + upgrade_url=auto_upgrade_upgrade_url, + username=auto_upgrade_username, + password=auto_upgrade_password, + ) if args.auto_upgrade: if result: sys.exit(0) diff --git a/npbackup/upgrade_client/upgrader.py b/npbackup/upgrade_client/upgrader.py index b44c7e6..c9460b6 100644 --- a/npbackup/upgrade_client/upgrader.py +++ b/npbackup/upgrade_client/upgrader.py @@ -41,7 +41,7 @@ def need_upgrade(upgrade_interval: int) -> bool: Basic counter which allows an upgrade only every X times this is called so failed operations won't end in an endless upgrade loop """ # file counter, local, home, or temp if not available - return True # WIP + return True # WIP def auto_upgrader(upgrade_url: str, username: str, password: str) -> bool: @@ -53,7 +53,9 @@ def auto_upgrader(upgrade_url: str, username: str, password: str) -> bool: """ is_nuitka = "__compiled__" in globals() if not is_nuitka: - logger.info("Auto upgrade will only upgrade compiled verions. Please use 'pip install --upgrade npbackup' instead") + logger.info( + "Auto upgrade will only upgrade compiled verions. Please use 'pip install --upgrade npbackup' instead" + ) return True logger.info("Upgrade server is %s", upgrade_url) requestor = Requestor(upgrade_url, username, password) @@ -63,37 +65,45 @@ def auto_upgrader(upgrade_url: str, username: str, password: str) -> bool: logger.error("Cannot reach upgrade server") return False try: - if not server_ident['app'] == 'npbackup.upgrader': + if not server_ident["app"] == "npbackup.upgrader": logger.error("Current server is not a recognized NPBackup update server") return False except (KeyError, TypeError): logger.error("Current server is not a NPBackup update server") return False - result = requestor.data_model('current_version') + result = requestor.data_model("current_version") try: - online_version = result['version'] + online_version = result["version"] except KeyError: logger.error("Upgrade server failed to provide proper version info") return False else: if online_version: if version.parse(online_version) > version.parse(npbackup_version): - logger.info("Current version %s is older than online version %s", npbackup_version, online_version) + logger.info( + "Current version %s is older than online version %s", + npbackup_version, + online_version, + ) else: - logger.info("Current version %s is up-to-date (online version %s)", npbackup_version, online_version) + logger.info( + "Current version %s is up-to-date (online version %s)", + npbackup_version, + online_version, + ) return True - platform_and_arch = '{}/{}'.format(get_os(), os_arch()).lower() + platform_and_arch = "{}/{}".format(get_os(), os_arch()).lower() - file_info = requestor.data_model('upgrades', id_record=platform_and_arch) + file_info = requestor.data_model("upgrades", id_record=platform_and_arch) try: - sha256sum = file_info['sha256sum'] + sha256sum = file_info["sha256sum"] except (KeyError, TypeError): logger.error("Cannot get file description") return False - - file_data = requestor.requestor('upgrades/' + platform_and_arch + '/data', raw=True) + + file_data = requestor.requestor("upgrades/" + platform_and_arch + "/data", raw=True) if not file_data: logger.error("Cannot get update file") return False @@ -101,19 +111,24 @@ def auto_upgrader(upgrade_url: str, username: str, password: str) -> bool: if sha256sum_data(file_data) != sha256sum: logger.error("Invalid checksum, won't upgrade") return False - - executable = os.path.join(tempfile.gettempdir(), file_info['filename']) - with open(executable, 'wb') as fh: + + executable = os.path.join(tempfile.gettempdir(), file_info["filename"]) + with open(executable, "wb") as fh: fh.write(file_data) logger.info("Upgrade file written to %s", executable) - log_file = os.path.join(tempfile.gettempdir(), file_info['filename'] + '.log') + log_file = os.path.join(tempfile.gettempdir(), file_info["filename"] + ".log") # Actual upgrade process new_executable = os.path.join(CURRENT_DIR, os.path.basename(CURRENT_EXECUTABLE)) - cmd = "del \"{}\"; move \"{}\" \"{}\"; del \"{}\" > {}".format(CURRENT_EXECUTABLE, executable, new_executable, executable, log_file) - logger.info("Launching upgrade. Current process will quit. Upgrade starts in %s seconds. Upgrade is done by OS logged in %s", UPGRADE_DEFER_TIME, log_file) + cmd = 'del "{}"; move "{}" "{}"; del "{}" > {}'.format( + CURRENT_EXECUTABLE, executable, new_executable, executable, log_file + ) + logger.info( + "Launching upgrade. Current process will quit. Upgrade starts in %s seconds. Upgrade is done by OS logged in %s", + UPGRADE_DEFER_TIME, + log_file, + ) logger.debug(cmd) deferred_command(cmd, defer_time=UPGRADE_DEFER_TIME) return True - diff --git a/upgrade_server/upgrade_server.py b/upgrade_server/upgrade_server.py index 143db60..a0698b6 100644 --- a/upgrade_server/upgrade_server.py +++ b/upgrade_server/upgrade_server.py @@ -11,7 +11,7 @@ __build__ = "202303101" __version__ = "1.0.0" -DEVEL=True +DEVEL = True import sys import os @@ -21,42 +21,44 @@ from ofunctions.logger_utils import logger_get_logger config_dict = configuration.load_config() try: - listen = config_dict['http_server']['listen'] + listen = config_dict["http_server"]["listen"] except KeyError: listen = None try: - port = config_dict['http_server']['port'] + port = config_dict["http_server"]["port"] except KeyError: listen = None if DEVEL: import uvicorn as server + server_args = { - 'workers': 1, - 'log_level': "debug", - 'reload': True, - 'host': listen if listen else '0.0.0.0', - 'port': port if port else 8080 + "workers": 1, + "log_level": "debug", + "reload": True, + "host": listen if listen else "0.0.0.0", + "port": port if port else 8080, } else: import gunicorn as server + server_args = { - 'workers': 8, - 'reload': False, - 'host': listen if listen else '0.0.0.0', - 'port': port if port else 8080 + "workers": 8, + "reload": False, + "host": listen if listen else "0.0.0.0", + "port": port if port else 8080, } logger = logger_get_logger() if __name__ == "__main__": try: - + server.run("upgrade_server.api:app", **server_args) except KeyboardInterrupt as exc: logger.error("Program interrupted by keyoard: {}".format(exc)) sys.exit(200) except Exception as exc: logger.error("Program interrupted by error: {}".format(exc)) - logger.critical('Trace:', exc_info=True) - sys.exit(201) \ No newline at end of file + logger.critical("Trace:", exc_info=True) + sys.exit(201) diff --git a/upgrade_server/upgrade_server/api.py b/upgrade_server/upgrade_server/api.py index 175f73c..05bc0e1 100644 --- a/upgrade_server/upgrade_server/api.py +++ b/upgrade_server/upgrade_server/api.py @@ -25,19 +25,21 @@ config_dict = configuration.load_config() logger = logging.getLogger() #### Create app -#app = FastAPI() # standard FastAPI initialization -app = FastAPIOffline() # Offline FastAPI initialization, allows /docs to not use online CDN +# app = FastAPI() # standard FastAPI initialization +app = ( + FastAPIOffline() +) # Offline FastAPI initialization, allows /docs to not use online CDN security = HTTPBasic() def get_current_username(credentials: HTTPBasicCredentials = Depends(security)): current_username_bytes = credentials.username.encode("utf8") - correct_username_bytes = config_dict['http_server']['username'].encode('utf-8') + correct_username_bytes = config_dict["http_server"]["username"].encode("utf-8") is_correct_username = secrets.compare_digest( current_username_bytes, correct_username_bytes ) current_password_bytes = credentials.password.encode("utf8") - correct_password_bytes = config_dict['http_server']['password'].encode('utf-8') + correct_password_bytes = config_dict["http_server"]["password"].encode("utf-8") is_correct_password = secrets.compare_digest( current_password_bytes, correct_password_bytes ) @@ -51,26 +53,21 @@ def get_current_username(credentials: HTTPBasicCredentials = Depends(security)): @app.get("/") -async def api_root(auth = Depends(get_current_username)): +async def api_root(auth=Depends(get_current_username)): if crud.is_enabled(): return { "app": __appname__, - } - else: - return { - "app": "Currently under maintenance" } + else: + return {"app": "Currently under maintenance"} @app.get("/current_version", response_model=CurrentVersion, status_code=200) -async def current_version(auth = Depends(get_current_username)): +async def current_version(auth=Depends(get_current_username)): try: result = crud.get_current_version() if not result: - raise HTTPException( - status_code=404, - detail="Not found" - ) + raise HTTPException(status_code=404, detail="Not found") return result except HTTPException: raise @@ -83,16 +80,13 @@ async def current_version(auth = Depends(get_current_username)): @app.get("/upgrades/{platform}/{arch}", response_model=FileSend, status_code=200) -async def upgrades(platform: Platform, arch: Arch, auth = Depends(get_current_username)): +async def upgrades(platform: Platform, arch: Arch, auth=Depends(get_current_username)): file = FileGet(platform=platform, arch=arch) try: result = crud.get_file(file) if not result: - raise HTTPException( - status_code=404, - detail="Not found" - ) + raise HTTPException(status_code=404, detail="Not found") return result except HTTPException: raise @@ -103,20 +97,16 @@ async def upgrades(platform: Platform, arch: Arch, auth = Depends(get_current_us detail="Cannot get file: {}".format(exc), ) + @app.get("/upgrades/{platform}/{arch}/data", status_code=200) -async def download(platform: Platform, arch: Arch, auth = Depends(get_current_username)): +async def download(platform: Platform, arch: Arch, auth=Depends(get_current_username)): file = FileGet(platform=platform, arch=arch) try: result = crud.get_file(file, content=True) if not result: - raise HTTPException( - status_code=404, - detail="Not found" - ) - headers = { - "Content-Disposition": 'attachment; filename="npbackup"' - } - return Response(content=result, media_type="application/dat", headers=headers) + raise HTTPException(status_code=404, detail="Not found") + headers = {"Content-Disposition": 'attachment; filename="npbackup"'} + return Response(content=result, media_type="application/dat", headers=headers) except HTTPException: raise except Exception as exc: @@ -125,4 +115,3 @@ async def download(platform: Platform, arch: Arch, auth = Depends(get_current_us status_code=400, detail="Cannot get file: {}".format(exc), ) - diff --git a/upgrade_server/upgrade_server/crud.py b/upgrade_server/upgrade_server/crud.py index 6814018..266c1d5 100644 --- a/upgrade_server/upgrade_server/crud.py +++ b/upgrade_server/upgrade_server/crud.py @@ -23,6 +23,7 @@ config_dict = configuration.load_config() logger = getLogger(__intname__) + def sha256sum_data(data): # type: (bytes) -> str """ @@ -39,31 +40,42 @@ def is_enabled() -> bool: def get_current_version() -> Optional[CurrentVersion]: try: - path = os.path.join(config_dict['upgrades']['data_root'], 'VERSION') + path = os.path.join(config_dict["upgrades"]["data_root"], "VERSION") print(path) if os.path.isfile(path): - with open(path, 'r') as fh: - ver =fh.readline() - return(CurrentVersion(version=ver)) + with open(path, "r") as fh: + ver = fh.readline() + return CurrentVersion(version=ver) except OSError: logger.error("Cannot get current version") except Exception: logger.error("Version seems to be bogus in VERSION file") - + def get_file(file: FileGet, content: bool = False) -> Optional[Union[FileSend, bytes]]: - possible_filename = 'npbackup{}'.format( - '.exe' if file.platform.value == 'windows' else '' + possible_filename = "npbackup{}".format( + ".exe" if file.platform.value == "windows" else "" + ) + path = os.path.join( + config_dict["upgrades"]["data_root"], + file.platform.value, + file.arch.value, + possible_filename, ) - path = os.path.join(config_dict['upgrades']['data_root'], file.platform.value, file.arch.value, possible_filename) logger.info("Searching for %s", path) if not os.path.isfile(path): return None - with open(path, 'rb') as fh: + with open(path, "rb") as fh: bytes = fh.read() if content: return bytes length = len(bytes) sha256 = sha256sum_data(bytes) - file_send = FileSend(arch=file.arch.value, platform=file.platform.value, sha256sum=sha256, filename=possible_filename, file_length=length) - return file_send \ No newline at end of file + file_send = FileSend( + arch=file.arch.value, + platform=file.platform.value, + sha256sum=sha256, + filename=possible_filename, + file_length=length, + ) + return file_send diff --git a/upgrade_server/upgrade_server/models/files.py b/upgrade_server/upgrade_server/models/files.py index f040e27..46229fb 100644 --- a/upgrade_server/upgrade_server/models/files.py +++ b/upgrade_server/upgrade_server/models/files.py @@ -28,11 +28,12 @@ class FileBase(BaseModel): arch: Arch platform: Platform + class FileGet(FileBase): pass + class FileSend(FileBase): sha256sum: constr(min_length=64, max_length=64) filename: str file_length: int - diff --git a/upgrade_server/upgrade_server/models/oper.py b/upgrade_server/upgrade_server/models/oper.py index 2f6f364..751ae27 100644 --- a/upgrade_server/upgrade_server/models/oper.py +++ b/upgrade_server/upgrade_server/models/oper.py @@ -14,4 +14,4 @@ from pydantic import BaseModel class CurrentVersion(BaseModel): - version: str \ No newline at end of file + version: str