qbit_manage/modules/util.py
Bakerboy448 b352a99d4b lint cleanup
- pylint and misc variable/naming cleanup
- add docstrings
- fix typos
2023-03-31 17:35:43 -05:00

393 lines
14 KiB
Python
Executable file

""" Utility functions for qBit Manage. """
import json
import logging
import os
import shutil
import signal
import time
from pathlib import Path
import ruamel.yaml
logger = logging.getLogger("qBit Manage")
def get_list(data, lower=False, split=True, int_list=False):
"""Return a list from a string or list."""
if data is None:
return None
elif isinstance(data, list):
return data
elif isinstance(data, dict):
return [data]
elif split is False:
return [str(data)]
elif lower is True:
return [d.strip().lower() for d in str(data).split(",")]
elif int_list is True:
try:
return [int(d.strip()) for d in str(data).split(",")]
except ValueError:
return []
else:
return [d.strip() for d in str(data).split(",")]
class check:
"""Check for attributes in config."""
def __init__(self, config):
self.config = config
def check_for_attribute(
self,
data,
attribute,
parent=None,
subparent=None,
test_list=None,
default=None,
do_print=True,
default_is_none=False,
req_default=False,
var_type="str",
min_int=0,
throw=False,
save=True,
make_dirs=False,
):
"""Check for attribute in config."""
endline = ""
if parent is not None:
if subparent is not None:
if data and parent in data and subparent in data[parent]:
data = data[parent][subparent]
else:
data = None
do_print = False
else:
if data and parent in data:
data = data[parent]
else:
data = None
do_print = False
if subparent is not None:
text = f"{parent}->{subparent} sub-attribute {attribute}"
elif parent is None:
text = f"{attribute} attribute"
else:
text = f"{parent} sub-attribute {attribute}"
if data is None or attribute not in data or (attribute in data and data[attribute] is None and not default_is_none):
message = f"{text} not found"
if parent and save is True:
yaml = YAML(self.config.config_path)
if subparent:
endline = f"\n{subparent} sub-attribute {attribute} added to config"
if subparent not in yaml.data[parent] or not yaml.data[parent][subparent]:
yaml.data[parent][subparent] = {attribute: default}
elif attribute not in yaml.data[parent]:
if isinstance(yaml.data[parent][subparent], str):
yaml.data[parent][subparent] = {attribute: default}
yaml.data[parent][subparent][attribute] = default
else:
endline = ""
else:
endline = f"\n{parent} sub-attribute {attribute} added to config"
if parent not in yaml.data or not yaml.data[parent]:
yaml.data[parent] = {attribute: default}
elif attribute not in yaml.data[parent] or (
attribute in yaml.data[parent] and yaml.data[parent][attribute] is None
):
yaml.data[parent][attribute] = default
else:
endline = ""
yaml.save()
if default_is_none and var_type in ["list", "int_list"]:
return []
elif data[attribute] is None:
if default_is_none and var_type == "list":
return []
elif default_is_none:
return None
else:
message = f"{text} is blank"
elif var_type == "url":
if data[attribute].endswith(("\\", "/")):
return data[attribute][:-1]
else:
return data[attribute]
elif var_type == "bool":
if isinstance(data[attribute], bool):
return data[attribute]
else:
message = f"{text} must be either true or false"
throw = True
elif var_type == "int":
if isinstance(data[attribute], int) and data[attribute] >= min_int:
return data[attribute]
else:
message = f"{text} must an integer >= {min_int}"
throw = True
elif var_type == "float":
try:
data[attribute] = float(data[attribute])
except:
pass
if isinstance(data[attribute], float) and data[attribute] >= min_int:
return data[attribute]
else:
message = f"{text} must a float >= {float(min_int)}"
throw = True
elif var_type == "path":
if os.path.exists(os.path.abspath(data[attribute])):
return os.path.join(data[attribute], "")
else:
message = f"Path {os.path.abspath(data[attribute])} does not exist"
elif var_type == "list":
return get_list(data[attribute], split=False)
elif var_type == "list_path":
temp_list = [p for p in get_list(data[attribute], split=False) if os.path.exists(os.path.abspath(p))]
if len(temp_list) > 0:
return temp_list
else:
message = "No Paths exist"
elif var_type == "lower_list":
return get_list(data[attribute], lower=True)
elif test_list is None or data[attribute] in test_list:
return data[attribute]
else:
message = f"{text}: {data[attribute]} is an invalid input"
if var_type == "path" and default and os.path.exists(os.path.abspath(default)):
return os.path.join(default, "")
elif var_type == "path" and default and make_dirs:
os.makedirs(default, exist_ok=True)
return os.path.join(default, "")
elif var_type == "path" and default:
if data and attribute in data and data[attribute]:
message = f"neither {data[attribute]} or the default path {default} could be found"
else:
message = f"no {text} found and the default path {default} could not be found"
default = None
if (default is not None or default_is_none) and not message:
message = message + f" using {default} as default"
message = message + endline
if req_default and default is None:
raise Failed(f"Config Error: {attribute} attribute must be set under {parent}.")
options = ""
if test_list:
for option, description in test_list.items():
if len(options) > 0:
options = f"{options}\n"
options = f"{options} {option} ({description})"
if (default is None and not default_is_none) or throw:
if len(options) > 0:
message = message + "\n" + options
raise Failed(f"Config Error: {message}")
if do_print:
logger.print_line(f"Config Warning: {message}", "warning")
if data and attribute in data and data[attribute] and test_list is not None and data[attribute] not in test_list:
logger.print_line(options)
return default
class Failed(Exception):
"""Exception raised for errors in the input."""
pass
def list_in_text(text, search_list, match_all=False):
"""Check if a list of strings is in a string"""
if isinstance(search_list, list):
search_list = set(search_list)
contains = {x for x in search_list if " " in x}
exception = search_list - contains
if match_all:
if all(x == m for m in text.split(" ") for x in exception) or all(x in text for x in contains):
return True
else:
if any(x == m for m in text.split(" ") for x in exception) or any(x in text for x in contains):
return True
return False
def trunc_val(stg, delm, num=3):
"""Truncate the value of the torrent url to remove sensitive information"""
try:
val = delm.join(stg.split(delm, num)[:num])
except IndexError:
val = None
return val
def move_files(src, dest, mod=False):
"""Move files from source to destination, mod variable is to change the date modified of the file being moved"""
dest_path = os.path.dirname(dest)
to_delete = False
if os.path.isdir(dest_path) is False:
os.makedirs(dest_path)
try:
if mod is True:
mod_time = time.time()
os.utime(src, (mod_time, mod_time))
shutil.move(src, dest)
except PermissionError as perm:
logger.warning(f"{perm} : Copying files instead.")
shutil.copyfile(src, dest)
to_delete = True
except FileNotFoundError as file:
logger.warning(f"{file} : source: {src} -> destination: {dest}")
except Exception as ex:
logger.stacktrace()
logger.error(ex)
return to_delete
def copy_files(src, dest):
"""Copy files from source to destination"""
dest_path = os.path.dirname(dest)
if os.path.isdir(dest_path) is False:
os.makedirs(dest_path)
try:
shutil.copyfile(src, dest)
except Exception as ex:
logger.stacktrace()
logger.error(ex)
def remove_empty_directories(pathlib_root_dir, pattern):
"""Remove empty directories recursively."""
pathlib_root_dir = Path(pathlib_root_dir)
# list all directories recursively and sort them by path,
# longest first
longest = sorted(
pathlib_root_dir.glob(pattern),
key=lambda p: len(str(p)),
reverse=True,
)
for pdir in longest:
try:
pdir.rmdir() # remove directory if empty
except OSError:
continue # catch and continue if non-empty
def nohardlink(file, notify):
"""
Check if there are any hard links
Will check if there are any hard links if it passes a file or folder
If a folder is passed, it will take the largest file in that folder and only check for hardlinks
of the remaining files where the file is greater size a percentage of the largest file
This fixes the bug in #192
"""
check_for_hl = True
if os.path.isfile(file):
logger.trace(f"Checking file: {file}")
if os.stat(file).st_nlink > 1:
check_for_hl = False
else:
sorted_files = sorted(Path(file).rglob("*"), key=lambda x: os.stat(x).st_size, reverse=True)
logger.trace(f"Folder: {file}")
logger.trace(f"Files Sorted by size: {sorted_files}")
threshold = 0.5
if not sorted_files:
msg = (
f"Nohardlink Error: Unable to open the folder {file}. "
"Please make sure folder exists and qbit_manage has access to this directory."
)
notify(msg, "nohardlink")
logger.warning(msg)
else:
largest_file_size = os.stat(sorted_files[0]).st_size
logger.trace(f"Largest file: {sorted_files[0]}")
logger.trace(f"Largest file size: {largest_file_size}")
for files in sorted_files:
file_size = os.stat(files).st_size
file_no_hardlinks = os.stat(files).st_nlink
logger.trace(f"Checking file: {file}")
logger.trace(f"Checking file size: {file_size}")
logger.trace(f"Checking no of hard links: {file_no_hardlinks}")
if file_no_hardlinks > 1 and file_size >= (largest_file_size * threshold):
check_for_hl = False
return check_for_hl
def load_json(file):
"""Load json file if exists"""
if os.path.isfile(file):
file = open(file)
data = json.load(file)
file.close()
else:
data = {}
return data
def save_json(torrent_json, dest):
"""Save json file to destination"""
with open(dest, "w", encoding="utf-8") as file:
json.dump(torrent_json, file, ensure_ascii=False, indent=4)
class GracefulKiller:
"""
Class to catch SIGTERM and SIGINT signals.
Gracefully kill script when docker stops.
"""
kill_now = False
def __init__(self):
# signal.signal(signal.SIGINT, self.exit_gracefully)
signal.signal(signal.SIGTERM, self.exit_gracefully)
def exit_gracefully(self, *args):
"""Set kill_now to True to exit gracefully."""
self.kill_now = True
def human_readable_size(size, decimal_places=3):
"""Convert bytes to human readable size"""
for unit in ["B", "KiB", "MiB", "GiB", "TiB"]:
if size < 1024.0:
break
size /= 1024.0
return f"{size:.{decimal_places}f}{unit}"
class YAML:
"""Class to load and save yaml files"""
def __init__(self, path=None, input_data=None, check_empty=False, create=False):
self.path = path
self.input_data = input_data
self.yaml = ruamel.yaml.YAML()
self.yaml.indent(mapping=2, sequence=2)
try:
if input_data:
self.data = self.yaml.load(input_data)
else:
if create and not os.path.exists(self.path):
with open(self.path, "w"):
pass
self.data = {}
else:
with open(self.path, encoding="utf-8") as filepath:
self.data = self.yaml.load(filepath)
except ruamel.yaml.error.YAMLError as yerr:
err = str(yerr).replace("\n", "\n ")
raise Failed(f"YAML Error: {err}") from yerr
except Exception as yerr:
raise Failed(f"YAML Error: {yerr}") from yerr
if not self.data or not isinstance(self.data, dict):
if check_empty:
raise Failed("YAML Error: File is empty")
self.data = {}
def save(self):
"""Save yaml file"""
if self.path:
with open(self.path, "w") as filepath:
self.yaml.dump(self.data, filepath)