qbit_manage/modules/util.py
2022-12-11 20:49:43 -05:00

370 lines
13 KiB
Python
Executable file

import json
import logging
import os
import shutil
import signal
import time
from pathlib import Path
import ruamel.yaml
logger = logging.getLogger("qBit Manage")
def get_list(data, lower=False, split=True, int_list=False):
if data is None:
return None
elif isinstance(data, list):
return data
elif isinstance(data, dict):
return [data]
elif split is False:
return [str(data)]
elif lower is True:
return [d.strip().lower() for d in str(data).split(",")]
elif int_list is True:
try:
return [int(d.strip()) for d in str(data).split(",")]
except ValueError:
return []
else:
return [d.strip() for d in str(data).split(",")]
class check:
def __init__(self, config):
self.config = config
def check_for_attribute(
self,
data,
attribute,
parent=None,
subparent=None,
test_list=None,
default=None,
do_print=True,
default_is_none=False,
req_default=False,
var_type="str",
min_int=0,
throw=False,
save=True,
make_dirs=False,
):
endline = ""
if parent is not None:
if subparent is not None:
if data and parent in data and subparent in data[parent]:
data = data[parent][subparent]
else:
data = None
do_print = False
else:
if data and parent in data:
data = data[parent]
else:
data = None
do_print = False
if subparent is not None:
text = f"{parent}->{subparent} sub-attribute {attribute}"
elif parent is None:
text = f"{attribute} attribute"
else:
text = f"{parent} sub-attribute {attribute}"
if data is None or attribute not in data or (attribute in data and data[attribute] is None and not default_is_none):
message = f"{text} not found"
if parent and save is True:
yaml = YAML(self.config.config_path)
if subparent:
endline = f"\n{subparent} sub-attribute {attribute} added to config"
if subparent not in yaml.data[parent] or not yaml.data[parent][subparent]:
yaml.data[parent][subparent] = {attribute: default}
elif attribute not in yaml.data[parent]:
if isinstance(yaml.data[parent][subparent], str):
yaml.data[parent][subparent] = {attribute: default}
yaml.data[parent][subparent][attribute] = default
else:
endline = ""
else:
endline = f"\n{parent} sub-attribute {attribute} added to config"
if parent not in yaml.data or not yaml.data[parent]:
yaml.data[parent] = {attribute: default}
elif attribute not in yaml.data[parent] or (
attribute in yaml.data[parent] and yaml.data[parent][attribute] is None
):
yaml.data[parent][attribute] = default
else:
endline = ""
yaml.save()
if default_is_none and var_type in ["list", "int_list"]:
return []
elif data[attribute] is None:
if default_is_none and var_type == "list":
return []
elif default_is_none:
return None
else:
message = f"{text} is blank"
elif var_type == "url":
if data[attribute].endswith(("\\", "/")):
return data[attribute][:-1]
else:
return data[attribute]
elif var_type == "bool":
if isinstance(data[attribute], bool):
return data[attribute]
else:
message = f"{text} must be either true or false"
throw = True
elif var_type == "int":
if isinstance(data[attribute], int) and data[attribute] >= min_int:
return data[attribute]
else:
message = f"{text} must an integer >= {min_int}"
throw = True
elif var_type == "float":
try:
data[attribute] = float(data[attribute])
except:
pass
if isinstance(data[attribute], float) and data[attribute] >= min_int:
return data[attribute]
else:
message = f"{text} must a float >= {float(min_int)}"
throw = True
elif var_type == "path":
if os.path.exists(os.path.abspath(data[attribute])):
return os.path.join(data[attribute], "")
else:
message = f"Path {os.path.abspath(data[attribute])} does not exist"
elif var_type == "list":
return get_list(data[attribute], split=False)
elif var_type == "list_path":
temp_list = [p for p in get_list(data[attribute], split=False) if os.path.exists(os.path.abspath(p))]
if len(temp_list) > 0:
return temp_list
else:
message = "No Paths exist"
elif var_type == "lower_list":
return get_list(data[attribute], lower=True)
elif test_list is None or data[attribute] in test_list:
return data[attribute]
else:
message = f"{text}: {data[attribute]} is an invalid input"
if var_type == "path" and default and os.path.exists(os.path.abspath(default)):
return os.path.join(default, "")
elif var_type == "path" and default and make_dirs:
os.makedirs(default, exist_ok=True)
return os.path.join(default, "")
elif var_type == "path" and default:
if data and attribute in data and data[attribute]:
message = f"neither {data[attribute]} or the default path {default} could be found"
else:
message = f"no {text} found and the default path {default} could not be found"
default = None
if (default is not None or default_is_none) and not message:
message = message + f" using {default} as default"
message = message + endline
if req_default and default is None:
raise Failed(f"Config Error: {attribute} attribute must be set under {parent}.")
options = ""
if test_list:
for option, description in test_list.items():
if len(options) > 0:
options = f"{options}\n"
options = f"{options} {option} ({description})"
if (default is None and not default_is_none) or throw:
if len(options) > 0:
message = message + "\n" + options
raise Failed(f"Config Error: {message}")
if do_print:
logger.print_line(f"Config Warning: {message}", "warning")
if data and attribute in data and data[attribute] and test_list is not None and data[attribute] not in test_list:
logger.print_line(options)
return default
class Failed(Exception):
pass
def list_in_text(text, search_list, match_all=False):
if isinstance(search_list, list):
search_list = set(search_list)
contains = {x for x in search_list if " " in x}
exception = search_list - contains
if match_all:
if all(x == m for m in text.split(" ") for x in exception) or all(x in text for x in contains):
return True
else:
if any(x == m for m in text.split(" ") for x in exception) or any(x in text for x in contains):
return True
return False
# truncate the value of the torrent url to remove sensitive information
def trunc_val(s, d, n=3):
try:
x = d.join(s.split(d, n)[:n])
except IndexError:
x = None
return x
# Move files from source to destination, mod variable is to change the date modified of the file being moved
def move_files(src, dest, mod=False):
dest_path = os.path.dirname(dest)
toDelete = False
if os.path.isdir(dest_path) is False:
os.makedirs(dest_path)
try:
if mod is True:
modTime = time.time()
os.utime(src, (modTime, modTime))
shutil.move(src, dest)
except PermissionError as p:
logger.warning(f"{p} : Copying files instead.")
shutil.copyfile(src, dest)
toDelete = True
except Exception as e:
logger.stacktrace()
logger.error(e)
return toDelete
# Copy Files from source to destination
def copy_files(src, dest):
dest_path = os.path.dirname(dest)
if os.path.isdir(dest_path) is False:
os.makedirs(dest_path)
try:
shutil.copyfile(src, dest)
except Exception as e:
logger.stacktrace()
logger.error(e)
# Remove any empty directories after moving files
def remove_empty_directories(pathlib_root_dir, pattern):
pathlib_root_dir = Path(pathlib_root_dir)
# list all directories recursively and sort them by path,
# longest first
L = sorted(
pathlib_root_dir.glob(pattern),
key=lambda p: len(str(p)),
reverse=True,
)
for pdir in L:
try:
pdir.rmdir() # remove directory if empty
except OSError:
continue # catch and continue if non-empty
# will check if there are any hard links if it passes a file or folder
# If a folder is passed, it will take the largest file in that folder and only check for hardlinks
# of the remaining files where the file is greater size a percentage of the largest file
# This fixes the bug in #192
def nohardlink(file):
check = True
if os.path.isfile(file):
logger.trace(f"Checking file: {file}")
if os.stat(file).st_nlink > 1:
check = False
else:
sorted_files = sorted(Path(file).rglob("*"), key=lambda x: os.stat(x).st_size, reverse=True)
logger.trace(f"Folder: {file}")
logger.trace(f"Files Sorted by size: {sorted_files}")
threshold = 0.5
if not sorted_files:
msg = (
f"Nohardlink Error: Unable to open the folder {file}."
"Please make sure qbit_manage has access to this directory."
)
raise Failed(msg)
largest_file_size = os.stat(sorted_files[0]).st_size
logger.trace(f"Largest file: {sorted_files[0]}")
logger.trace(f"Largest file size: {largest_file_size}")
for x in sorted_files:
file_size = os.stat(x).st_size
file_no_hardlinks = os.stat(x).st_nlink
logger.trace(f"Checking file: {file}")
logger.trace(f"Checking file size: {file_size}")
logger.trace(f"Checking no of hard links: {file_no_hardlinks}")
if file_no_hardlinks > 1 and file_size >= (largest_file_size * threshold):
check = False
return check
# Load json file if exists
def load_json(file):
if os.path.isfile(file):
f = open(file)
data = json.load(f)
f.close()
else:
data = {}
return data
# Save json file overwrite if exists
def save_json(torrent_json, dest):
with open(dest, "w", encoding="utf-8") as f:
json.dump(torrent_json, f, ensure_ascii=False, indent=4)
# Gracefully kill script when docker stops
class GracefulKiller:
kill_now = False
def __init__(self):
# signal.signal(signal.SIGINT, self.exit_gracefully)
signal.signal(signal.SIGTERM, self.exit_gracefully)
def exit_gracefully(self, *args):
self.kill_now = True
def human_readable_size(size, decimal_places=3):
for unit in ["B", "KiB", "MiB", "GiB", "TiB"]:
if size < 1024.0:
break
size /= 1024.0
return f"{size:.{decimal_places}f}{unit}"
class YAML:
def __init__(self, path=None, input_data=None, check_empty=False, create=False):
self.path = path
self.input_data = input_data
self.yaml = ruamel.yaml.YAML()
self.yaml.indent(mapping=2, sequence=2)
try:
if input_data:
self.data = self.yaml.load(input_data)
else:
if create and not os.path.exists(self.path):
with open(self.path, "w"):
pass
self.data = {}
else:
with open(self.path, encoding="utf-8") as fp:
self.data = self.yaml.load(fp)
except ruamel.yaml.error.YAMLError as e:
e = str(e).replace("\n", "\n ")
raise Failed(f"YAML Error: {e}")
except Exception as e:
raise Failed(f"YAML Error: {e}")
if not self.data or not isinstance(self.data, dict):
if check_empty:
raise Failed("YAML Error: File is empty")
self.data = {}
def save(self):
if self.path:
with open(self.path, "w") as fp:
self.yaml.dump(self.data, fp)