""" Utility functions for qBit Manage. """ import json import logging import os import shutil import signal import time from pathlib import Path import requests import ruamel.yaml logger = logging.getLogger("qBit Manage") def get_list(data, lower=False, split=True, int_list=False): """Return a list from a string or list.""" if data is None: return None elif isinstance(data, list): return data elif isinstance(data, dict): return [data] elif split is False: return [str(data)] elif lower is True: return [d.strip().lower() for d in str(data).split(",")] elif int_list is True: try: return [int(d.strip()) for d in str(data).split(",")] except ValueError: return [] else: return [d.strip() for d in str(data).split(",")] class TorrentMessages: """Contains list of messages to check against a status of a torrent""" UNREGISTERED_MSGS = [ "UNREGISTERED", "TORRENT NOT FOUND", "TORRENT IS NOT FOUND", "NOT REGISTERED", "NOT EXIST", "UNKNOWN TORRENT", "TRUMP", "RETITLED", "TRUNCATED", "TORRENT IS NOT AUTHORIZED FOR USE ON THIS TRACKER", ] IGNORE_MSGS = [ "YOU HAVE REACHED THE CLIENT LIMIT FOR THIS TORRENT", "MISSING PASSKEY", "MISSING INFO_HASH", "PASSKEY IS INVALID", "INVALID PASSKEY", "EXPECTED VALUE (LIST, DICT, INT OR STRING) IN BENCODED STRING", "COULD NOT PARSE BENCODED DATA", "STREAM TRUNCATED", ] EXCEPTIONS_MSGS = [ "DOWN", "DOWN.", "IT MAY BE DOWN,", "UNREACHABLE", "(UNREACHABLE)", "BAD GATEWAY", "TRACKER UNAVAILABLE", ] def guess_branch(version, env_version, git_branch): if git_branch: return git_branch elif env_version == "develop": return env_version elif version[2] > 0: dev_version = get_develop() if version[1] != dev_version[1] or version[2] <= dev_version[2]: return "develop" else: return "master" def current_version(version, branch=None): if branch == "develop": return get_develop() elif version[2] > 0: new_version = get_develop() if version[1] != new_version[1] or new_version[2] >= version[2]: return new_version else: return get_master() develop_version = None def get_develop(): global develop_version if develop_version is None: develop_version = get_version("develop") return develop_version master_version = None def get_master(): global master_version if master_version is None: master_version = get_version("master") return master_version def get_version(level): try: url = f"https://raw.githubusercontent.com/StuffAnThings/qbit_manage/{level}/VERSION" return parse_version(requests.get(url).content.decode().strip(), text=level) except requests.exceptions.ConnectionError: return "Unknown", "Unknown", 0 def parse_version(version, text="develop"): version = version.replace("develop", text) split_version = version.split(f"-{text}") return version, split_version[0], int(split_version[1]) if len(split_version) > 1 else 0 class check: """Check for attributes in config.""" def __init__(self, config): self.config = config def overwrite_attributes(self, data, attribute): """Overwrite attributes in config.""" yaml = YAML(self.config.config_path) if data is not None and attribute in yaml.data: yaml.data[attribute] = data yaml.save() def check_for_attribute( self, data, attribute, parent=None, subparent=None, test_list=None, default=None, do_print=True, default_is_none=False, req_default=False, var_type="str", min_int=0, throw=False, save=True, make_dirs=False, ): """ Check for attribute in config. Args: data (dict): The configuration data to search. attribute (str): The name of the attribute key to search for. parent (str, optional): The name of the top level attribute to search under. Defaults to None. subparent (str, optional): The name of the second level attribute to search under. Defaults to None. test_list (dict, optional): A dictionary of valid values for the attribute. Defaults to None. default (any, optional): The default value to use if the attribute is not found. Defaults to None. do_print (bool, optional): Whether to print warning messages. Defaults to True. default_is_none (bool, optional): Whether to treat a None value as a valid default. Defaults to False. req_default (bool, optional): Whether to raise an error if no default value is provided. Defaults to False. var_type (str, optional): The expected type of the attribute value. Defaults to "str". min_int (int, optional): The minimum value for an integer attribute. Defaults to 0. throw (bool, optional): Whether to raise an error if the attribute value is invalid. Defaults to False. save (bool, optional): Whether to save the default value to the config if it is used. Defaults to True. make_dirs (bool, optional): Whether to create directories for path attributes if they do not exist. Defaults to False. Returns: any: The value of the attribute, or the default value if it is not found. Raises: Failed: If the attribute value is invalid or a required default value is missing. """ endline = "" if parent is not None: if subparent is not None: if data and parent in data and subparent in data[parent]: data = data[parent][subparent] else: data = None do_print = False else: if data and parent in data: data = data[parent] else: data = None do_print = False if subparent is not None: text = f"{parent}->{subparent} sub-attribute {attribute}" elif parent is None: text = f"{attribute} attribute" else: text = f"{parent} sub-attribute {attribute}" if data is None or attribute not in data or (attribute in data and data[attribute] is None and not default_is_none): message = f"{text} not found" if parent and save is True: yaml = YAML(self.config.config_path) if subparent: endline = f"\n{subparent} sub-attribute {attribute} added to config" if subparent not in yaml.data[parent] or not yaml.data[parent][subparent]: yaml.data[parent][subparent] = {attribute: default} elif attribute not in yaml.data[parent]: if isinstance(yaml.data[parent][subparent], str): yaml.data[parent][subparent] = {attribute: default} yaml.data[parent][subparent][attribute] = default else: endline = "" else: endline = f"\n{parent} sub-attribute {attribute} added to config" if parent not in yaml.data or not yaml.data[parent]: yaml.data[parent] = {attribute: default} elif attribute not in yaml.data[parent] or ( attribute in yaml.data[parent] and yaml.data[parent][attribute] is None ): yaml.data[parent][attribute] = default else: endline = "" yaml.save() if default_is_none and var_type in ["list", "int_list"]: return [] elif data[attribute] is None: if default_is_none and var_type == "list": return [] elif default_is_none: return None else: message = f"{text} is blank" elif var_type == "url": if data[attribute].endswith(("\\", "/")): return data[attribute][:-1] else: return data[attribute] elif var_type == "bool": if isinstance(data[attribute], bool): return data[attribute] else: message = f"{text} must be either true or false" throw = True elif var_type == "int": if isinstance(data[attribute], int) and data[attribute] >= min_int: return data[attribute] else: message = f"{text} must an integer >= {min_int}" throw = True elif var_type == "float": try: data[attribute] = float(data[attribute]) except: pass if isinstance(data[attribute], float) and data[attribute] >= min_int: return data[attribute] else: message = f"{text} must a float >= {float(min_int)}" throw = True elif var_type == "path": if os.path.exists(os.path.abspath(data[attribute])): return os.path.join(data[attribute], "") else: if make_dirs: try: os.makedirs(data[attribute], exist_ok=True) return os.path.join(data[attribute], "") except OSError: message = f"Path {os.path.abspath(data[attribute])} does not exist and can't be created" else: message = f"Path {os.path.abspath(data[attribute])} does not exist" elif var_type == "list": return get_list(data[attribute], split=False) elif var_type == "list_path": temp_list = [p for p in get_list(data[attribute], split=False) if os.path.exists(os.path.abspath(p))] if len(temp_list) > 0: return temp_list else: message = "No Paths exist" elif var_type == "lower_list": return get_list(data[attribute], lower=True) elif test_list is None or data[attribute] in test_list: return data[attribute] else: message = f"{text}: {data[attribute]} is an invalid input" if var_type == "path" and default: default_path = os.path.abspath(default) if make_dirs and not os.path.exists(default_path): os.makedirs(default, exist_ok=True) if os.path.exists(default_path): default = os.path.join(default, "") message = message + f", using {default} as default" elif var_type == "path" and default: if data and attribute in data and data[attribute]: message = f"neither {data[attribute]} or the default path {default} could be found" else: message = f"no {text} found and the default path {default} could not be found" default = None if (default is not None or default_is_none) and not message: message = message + f" using {default} as default" message = message + endline if req_default and default is None: raise Failed(f"Config Error: {attribute} attribute must be set under {parent}.") options = "" if test_list: for option, description in test_list.items(): if len(options) > 0: options = f"{options}\n" options = f"{options} {option} ({description})" if (default is None and not default_is_none) or throw: if len(options) > 0: message = message + "\n" + options raise Failed(f"Config Error: {message}") if do_print: logger.print_line(f"Config Warning: {message}", "warning") if data and attribute in data and data[attribute] and test_list is not None and data[attribute] not in test_list: logger.print_line(options) return default class Failed(Exception): """Exception raised for errors in the input.""" pass def list_in_text(text, search_list, match_all=False): """Check if a list of strings is in a string""" if isinstance(search_list, list): search_list = set(search_list) contains = {x for x in search_list if " " in x} exception = search_list - contains if match_all: if all(x == m for m in text.split(" ") for x in exception) or all(x in text for x in contains): return True else: if any(x == m for m in text.split(" ") for x in exception) or any(x in text for x in contains): return True return False def trunc_val(stg, delm, num=3): """Truncate the value of the torrent url to remove sensitive information""" try: val = delm.join(stg.split(delm, num)[:num]) except IndexError: val = None return val def move_files(src, dest, mod=False): """Move files from source to destination, mod variable is to change the date modified of the file being moved""" dest_path = os.path.dirname(dest) to_delete = False if os.path.isdir(dest_path) is False: os.makedirs(dest_path, exist_ok=True) try: if mod is True: mod_time = time.time() os.utime(src, (mod_time, mod_time)) shutil.move(src, dest) except PermissionError as perm: logger.warning(f"{perm} : Copying files instead.") shutil.copyfile(src, dest) to_delete = True except FileNotFoundError as file: logger.warning(f"{file} : source: {src} -> destination: {dest}") except Exception as ex: logger.stacktrace() logger.error(ex) return to_delete def copy_files(src, dest): """Copy files from source to destination""" dest_path = os.path.dirname(dest) if os.path.isdir(dest_path) is False: os.makedirs(dest_path) try: shutil.copyfile(src, dest) except Exception as ex: logger.stacktrace() logger.error(ex) def remove_empty_directories(pathlib_root_dir, pattern): """Remove empty directories recursively.""" pathlib_root_dir = Path(pathlib_root_dir) try: # list all directories recursively and sort them by path, # longest first longest = sorted( pathlib_root_dir.glob(pattern), key=lambda p: len(str(p)), reverse=True, ) longest.append(pathlib_root_dir) # delete the folder itself if it's empty for pdir in longest: try: pdir.rmdir() # remove directory if empty except (FileNotFoundError, OSError): continue # catch and continue if non-empty, folders within could already be deleted if run in parallel except FileNotFoundError: pass # if this is being run in parallel, pathlib_root_dir could already be deleted class CheckHardLinks: """ Class to check for hardlinks """ def __init__(self, root_dir, remote_dir): self.root_dir = root_dir self.remote_dir = remote_dir self.root_files = set(get_root_files(self.root_dir, self.remote_dir)) self.get_inode_count() def get_inode_count(self): self.inode_count = {} for file in self.root_files: try: inode_no = os.stat(file.replace(self.root_dir, self.remote_dir)).st_ino except PermissionError as perm: logger.warning(f"{perm} : file {file} has permission issues. Skipping...") continue except FileNotFoundError as file_not_found_error: logger.warning(f"{file_not_found_error} : File {file} not found. Skipping...") continue except Exception as ex: logger.stacktrace() logger.error(ex) continue if inode_no in self.inode_count: self.inode_count[inode_no] += 1 else: self.inode_count[inode_no] = 1 def nohardlink(self, file, notify): """ Check if there are any hard links Will check if there are any hard links if it passes a file or folder If a folder is passed, it will take the largest file in that folder and only check for hardlinks of the remaining files where the file is greater size a percentage of the largest file This fixes the bug in #192 """ check_for_hl = True try: if os.path.isfile(file): if os.path.islink(file): logger.warning(f"Symlink found in {file}, unable to determine hardlinks. Skipping...") return False logger.trace(f"Checking file: {file}") logger.trace(f"Checking file inum: {os.stat(file).st_ino}") logger.trace(f"Checking no of hard links: {os.stat(file).st_nlink}") logger.trace(f"Checking inode_count dict: {self.inode_count.get(os.stat(file).st_ino)}") # https://github.com/StuffAnThings/qbit_manage/issues/291 for more details if os.stat(file).st_nlink - self.inode_count.get(os.stat(file).st_ino, 1) > 0: check_for_hl = False else: sorted_files = sorted(Path(file).rglob("*"), key=lambda x: os.stat(x).st_size, reverse=True) logger.trace(f"Folder: {file}") logger.trace(f"Files Sorted by size: {sorted_files}") threshold = 0.5 if not sorted_files: msg = ( f"Nohardlink Error: Unable to open the folder {file}. " "Please make sure folder exists and qbit_manage has access to this directory." ) notify(msg, "nohardlink") logger.warning(msg) else: largest_file_size = os.stat(sorted_files[0]).st_size logger.trace(f"Largest file: {sorted_files[0]}") logger.trace(f"Largest file size: {largest_file_size}") for files in sorted_files: if os.path.islink(files): logger.warning(f"Symlink found in {files}, unable to determine hardlinks. Skipping...") continue file_size = os.stat(files).st_size file_no_hardlinks = os.stat(files).st_nlink logger.trace(f"Checking file: {file}") logger.trace(f"Checking file inum: {os.stat(file).st_ino}") logger.trace(f"Checking file size: {file_size}") logger.trace(f"Checking no of hard links: {file_no_hardlinks}") logger.trace(f"Checking inode_count dict: {self.inode_count.get(os.stat(file).st_ino)}") if file_no_hardlinks - self.inode_count.get(os.stat(file).st_ino, 1) > 0 and file_size >= ( largest_file_size * threshold ): check_for_hl = False except PermissionError as perm: logger.warning(f"{perm} : file {file} has permission issues. Skipping...") return False except FileNotFoundError as file_not_found_error: logger.warning(f"{file_not_found_error} : File {file} not found. Skipping...") return False except Exception as ex: logger.stacktrace() logger.error(ex) return False return check_for_hl def get_root_files(root_dir, remote_dir, exclude_dir=None): local_exclude_dir = exclude_dir.replace(remote_dir, root_dir) if exclude_dir and remote_dir != root_dir else exclude_dir root_files = [ os.path.join(path.replace(remote_dir, root_dir) if remote_dir != root_dir else path, name) for path, subdirs, files in os.walk(remote_dir if remote_dir != root_dir else root_dir) for name in files if not local_exclude_dir or local_exclude_dir not in path ] return root_files def load_json(file): """Load json file if exists""" if os.path.isfile(file): file = open(file) data = json.load(file) file.close() else: data = {} return data def save_json(torrent_json, dest): """Save json file to destination""" with open(dest, "w", encoding="utf-8") as file: json.dump(torrent_json, file, ensure_ascii=False, indent=4) class GracefulKiller: """ Class to catch SIGTERM and SIGINT signals. Gracefully kill script when docker stops. """ kill_now = False def __init__(self): # signal.signal(signal.SIGINT, self.exit_gracefully) signal.signal(signal.SIGTERM, self.exit_gracefully) def exit_gracefully(self, *args): """Set kill_now to True to exit gracefully.""" self.kill_now = True def human_readable_size(size, decimal_places=3): """Convert bytes to human readable size""" for unit in ["B", "KiB", "MiB", "GiB", "TiB"]: if size < 1024.0: break size /= 1024.0 return f"{size:.{decimal_places}f}{unit}" class YAML: """Class to load and save yaml files""" def __init__(self, path=None, input_data=None, check_empty=False, create=False): self.path = path self.input_data = input_data self.yaml = ruamel.yaml.YAML() self.yaml.indent(mapping=2, sequence=2) try: if input_data: self.data = self.yaml.load(input_data) else: if create and not os.path.exists(self.path): with open(self.path, "w"): pass self.data = {} else: with open(self.path, encoding="utf-8") as filepath: self.data = self.yaml.load(filepath) except ruamel.yaml.error.YAMLError as yerr: err = str(yerr).replace("\n", "\n ") raise Failed(f"YAML Error: {err}") from yerr except Exception as yerr: raise Failed(f"YAML Error: {yerr}") from yerr if not self.data or not isinstance(self.data, dict): if check_empty: raise Failed("YAML Error: File is empty") self.data = {} def save(self): """Save yaml file""" if self.path: with open(self.path, "w") as filepath: self.yaml.dump(self.data, filepath)