lint cleanup

- pylint and misc variable/naming cleanup
- add docstrings
- fix typos
This commit is contained in:
Bakerboy448 2023-03-31 13:37:33 -05:00
parent 382a9af45f
commit b352a99d4b
10 changed files with 250 additions and 165 deletions

View file

@ -1,3 +1,4 @@
"""Apprise notification class"""
from modules import util
from modules.util import Failed
@ -5,6 +6,8 @@ logger = util.logger
class Apprise:
"""Apprise notification class"""
def __init__(self, config, params):
self.config = config
self.api_url = params["api_url"]

View file

@ -1,13 +1,16 @@
"""Module for BeyondHD (BHD) tracker."""
from json import JSONDecodeError
from modules import util
from modules.util import Failed
logger = util.logger
base_url = "https://beyond-hd.me/api/"
BASE_URL = "https://beyond-hd.me/api/"
class BeyondHD:
"""BeyondHD (BHD) tracker class."""
def __init__(self, config, params):
self.config = config
self.apikey = params["apikey"]
@ -16,7 +19,8 @@ class BeyondHD:
self.search(json)
def search(self, json, path="torrents/"):
url = f"{base_url}{path}{self.apikey}"
"""Search BHD."""
url = f"{BASE_URL}{path}{self.apikey}"
json["action"] = "search"
logger.trace(url)
logger.trace(f"JSON: {json}")
@ -24,11 +28,11 @@ class BeyondHD:
response = self.config.post(url, json=json, headers={"User-Agent": "Chrome"})
logger.trace(response)
response_json = response.json()
except JSONDecodeError as e:
except JSONDecodeError as err:
if response.status_code >= 400:
raise Failed(e)
elif "Expecting value" in e:
logger.debug(e)
raise Failed(err) from err
elif "Expecting value" in err:
logger.debug(err)
if response.status_code >= 400:
logger.debug(f"Response: {response_json}")
raise Failed(f"({response.status_code} [{response.reason}]) {response_json}")

View file

@ -1,3 +1,4 @@
"""Config class for qBittorrent-Manage"""
import os
import re
import stat
@ -33,6 +34,8 @@ COMMANDS = [
class Config:
"""Config class for qBittorrent-Manage"""
def __init__(self, default_dir, args):
logger.info("Locating config...")
self.args = args
@ -161,7 +164,7 @@ class Config:
"cleanup_dirs": None,
}
self.webhooks = {
self.webhooks_factory = {
"error": self.util.check_for_attribute(self.data, "error", parent="webhooks", var_type="list", default_is_none=True),
"run_start": self.util.check_for_attribute(
self.data, "run_start", parent="webhooks", var_type="list", default_is_none=True
@ -178,12 +181,12 @@ class Config:
self.cat_change = self.data["cat_change"] if "cat_change" in self.data else {}
self.AppriseFactory = None
self.apprise_factory = None
if "apprise" in self.data:
if self.data["apprise"] is not None:
logger.info("Connecting to Apprise...")
try:
self.AppriseFactory = Apprise(
self.apprise_factory = Apprise(
self,
{
"api_url": self.util.check_for_attribute(
@ -194,16 +197,16 @@ class Config:
),
},
)
except Failed as e:
logger.error(e)
logger.info(f"Apprise Connection {'Failed' if self.AppriseFactory is None else 'Successful'}")
except Failed as err:
logger.error(err)
logger.info(f"Apprise Connection {'Failed' if self.apprise_factory is None else 'Successful'}")
self.NotifiarrFactory = None
self.notifiarr_factory = None
if "notifiarr" in self.data:
if self.data["notifiarr"] is not None:
logger.info("Connecting to Notifiarr...")
try:
self.NotifiarrFactory = Notifiarr(
self.notifiarr_factory = Notifiarr(
self,
{
"apikey": self.util.check_for_attribute(self.data, "apikey", parent="notifiarr", throw=True),
@ -212,29 +215,31 @@ class Config:
),
},
)
except Failed as e:
logger.error(e)
logger.info(f"Notifiarr Connection {'Failed' if self.NotifiarrFactory is None else 'Successful'}")
except Failed as err:
logger.error(err)
logger.info(f"Notifiarr Connection {'Failed' if self.notifiarr_factory is None else 'Successful'}")
self.Webhooks = Webhooks(self, self.webhooks, notifiarr=self.NotifiarrFactory, apprise=self.AppriseFactory)
self.webhooks_factory = Webhooks(
self, self.webhooks_factory, notifiarr=self.notifiarr_factory, apprise=self.apprise_factory
)
try:
self.Webhooks.start_time_hooks(self.start_time)
except Failed as e:
self.webhooks_factory.start_time_hooks(self.start_time)
except Failed as err:
logger.stacktrace()
logger.error(f"Webhooks Error: {e}")
logger.error(f"Webhooks Error: {err}")
self.BeyondHD = None
self.beyond_hd = None
if "bhd" in self.data:
if self.data["bhd"] is not None:
logger.info("Connecting to BHD API...")
try:
self.BeyondHD = BeyondHD(
self.beyond_hd = BeyondHD(
self, {"apikey": self.util.check_for_attribute(self.data, "apikey", parent="bhd", throw=True)}
)
except Failed as e:
logger.error(e)
self.notify(e, "BHD")
logger.info(f"BHD Connection {'Failed' if self.BeyondHD is None else 'Successful'}")
except Failed as err:
logger.error(err)
self.notify(err, "BHD")
logger.info(f"BHD Connection {'Failed' if self.beyond_hd is None else 'Successful'}")
# nohardlinks
self.nohardlinks = None
@ -336,15 +341,15 @@ class Config:
save=False,
)
else:
e = f"Config Error: Category {cat} is defined under nohardlinks attribute "
err = f"Config Error: Category {cat} is defined under nohardlinks attribute "
"but is not defined in the cat attribute."
self.notify(e, "Config")
raise Failed(e)
self.notify(err, "Config")
raise Failed(err)
else:
if self.commands["tag_nohardlinks"]:
e = "Config Error: nohardlinks attribute max_ratio not found"
self.notify(e, "Config")
raise Failed(e)
err = "Config Error: nohardlinks attribute max_ratio not found"
self.notify(err, "Config")
raise Failed(err)
# Add RecycleBin
self.recyclebin = {}
@ -416,9 +421,9 @@ class Config:
if self.recyclebin["enabled"] and self.recyclebin["save_torrents"]:
self.torrents_dir = self.util.check_for_attribute(self.data, "torrents_dir", parent="directory", var_type="path")
if not any(File.endswith(".torrent") for File in os.listdir(self.torrents_dir)):
e = f"Config Error: The location {self.torrents_dir} does not contain any .torrents"
self.notify(e, "Config")
raise Failed(e)
err = f"Config Error: The location {self.torrents_dir} does not contain any .torrents"
self.notify(err, "Config")
raise Failed(err)
else:
self.torrents_dir = self.util.check_for_attribute(
self.data, "torrents_dir", parent="directory", default_is_none=True
@ -728,9 +733,9 @@ class Config:
for s in save_path
]
location_path_list = [location_path]
for dir in cleaned_save_path:
if os.path.exists(dir):
location_path_list.append(dir)
for folder in cleaned_save_path:
if os.path.exists(folder):
location_path_list.append(folder)
else:
e = f"No categories defined. Checking {location} directory {location_path}."
self.notify(e, f"Empty {location}", False)
@ -796,25 +801,25 @@ class Config:
def send_notifications(self, attr):
try:
function = attr["function"]
config_webhooks = self.Webhooks.function_webhooks
config_webhooks = self.webhooks_factory.function_webhooks
config_function = None
for key in config_webhooks:
if key in function:
config_function = key
break
if config_function:
self.Webhooks.function_hooks([config_webhooks[config_function]], attr)
self.webhooks_factory.function_hooks([config_webhooks[config_function]], attr)
except Failed as e:
logger.stacktrace()
logger.error(f"Webhooks Error: {e}")
logger.error(f"webhooks_factory Error: {e}")
def notify(self, text, function=None, critical=True):
for error in util.get_list(text, split=False):
try:
self.Webhooks.error_hooks(error, function_error=function, critical=critical)
self.webhooks_factory.error_hooks(error, function_error=function, critical=critical)
except Failed as e:
logger.stacktrace()
logger.error(f"Webhooks Error: {e}")
logger.error(f"webhooks_factory Error: {e}")
def get_json(self, url, json=None, headers=None, params=None):
return self.get(url, json=json, headers=headers, params=params).json()

View file

@ -1,3 +1,4 @@
"""Logging module"""
import io
import logging
import os
@ -20,6 +21,7 @@ TRACE = 5
def fmt_filter(record):
"""Filter log message"""
record.levelname = f"[{record.levelname}]"
record.filename = f"[{record.filename}:{record.lineno}]"
return True
@ -29,7 +31,10 @@ _srcfile = os.path.normcase(fmt_filter.__code__.co_filename)
class MyLogger:
"""Logger class"""
def __init__(self, logger_name, log_file, log_level, default_dir, screen_width, separating_character, ignore_ghost):
"""Initialize logger"""
self.logger_name = logger_name
self.default_dir = default_dir
self.screen_width = screen_width
@ -60,9 +65,11 @@ class MyLogger:
self._logger.addHandler(cmd_handler)
def clear_errors(self):
"""Clear saved errors"""
self.saved_errors = []
def _get_handler(self, log_file, count=3):
"""Get handler for log file"""
max_bytes = 1024 * 1024 * 2
_handler = RotatingFileHandler(log_file, delay=True, mode="w", maxBytes=max_bytes, backupCount=count, encoding="utf-8")
self._formatter(_handler)
@ -71,20 +78,24 @@ class MyLogger:
return _handler
def _formatter(self, handler, border=True):
"""Format log message"""
text = f"| %(message)-{self.screen_width - 2}s |" if border else f"%(message)-{self.screen_width - 2}s"
if isinstance(handler, RotatingFileHandler):
text = f"[%(asctime)s] %(filename)-27s %(levelname)-10s {text}"
handler.setFormatter(logging.Formatter(text))
def add_main_handler(self):
"""Add main handler to logger"""
self.main_handler = self._get_handler(self.main_log, count=9)
self.main_handler.addFilter(fmt_filter)
self._logger.addHandler(self.main_handler)
def remove_main_handler(self):
"""Remove main handler from logger"""
self._logger.removeHandler(self.main_handler)
def add_config_handler(self, config_key):
"""Add config handler to logger"""
if config_key in self.config_handlers:
self._logger.addHandler(self.config_handlers[config_key])
else:
@ -92,10 +103,12 @@ class MyLogger:
self._logger.addHandler(self.config_handlers[config_key])
def remove_config_handler(self, config_key):
"""Remove config handler from logger"""
if config_key in self.config_handlers:
self._logger.removeHandler(self.config_handlers[config_key])
def _centered(self, text, sep=" ", side_space=True, left=False):
"""Center text"""
if len(text) > self.screen_width - 2:
return text
space = self.screen_width - len(text) - 2
@ -108,6 +121,7 @@ class MyLogger:
return final_text
def separator(self, text=None, space=True, border=True, side_space=True, left=False, loglevel="INFO"):
"""Print separator"""
sep = " " if space else self.separating_character
for handler in self._logger.handlers:
self._formatter(handler, border=False)
@ -116,8 +130,8 @@ class MyLogger:
self.print_line(border_text, loglevel)
if text:
text_list = text.split("\n")
for t in text_list:
self.print_line(f"|{sep}{self._centered(t, sep=sep, side_space=side_space, left=left)}{sep}|", loglevel)
for txt in text_list:
self.print_line(f"|{sep}{self._centered(txt, sep=sep, side_space=side_space, left=left)}{sep}|", loglevel)
if border:
self.print_line(border_text, loglevel)
for handler in self._logger.handlers:
@ -125,50 +139,61 @@ class MyLogger:
return [text]
def print_line(self, msg, loglevel="INFO", *args, **kwargs):
"""Print line"""
loglvl = getattr(logging, loglevel.upper())
if self._logger.isEnabledFor(loglvl):
self._log(loglvl, str(msg), args, **kwargs)
return [str(msg)]
def trace(self, msg, *args, **kwargs):
"""Print trace"""
if self._logger.isEnabledFor(TRACE):
self._log(TRACE, str(msg), args, **kwargs)
def debug(self, msg, *args, **kwargs):
"""Print debug"""
if self._logger.isEnabledFor(DEBUG):
self._log(DEBUG, str(msg), args, **kwargs)
def info_center(self, msg, *args, **kwargs):
"""Print info centered"""
self.info(self._centered(str(msg)), *args, **kwargs)
def info(self, msg, *args, **kwargs):
"""Print info"""
if self._logger.isEnabledFor(INFO):
self._log(INFO, str(msg), args, **kwargs)
def dryrun(self, msg, *args, **kwargs):
"""Print dryrun"""
if self._logger.isEnabledFor(DRYRUN):
self._log(DRYRUN, str(msg), args, **kwargs)
def warning(self, msg, *args, **kwargs):
"""Print warning"""
if self._logger.isEnabledFor(WARNING):
self._log(WARNING, str(msg), args, **kwargs)
def error(self, msg, *args, **kwargs):
"""Print error"""
if self.save_errors:
self.saved_errors.append(msg)
if self._logger.isEnabledFor(ERROR):
self._log(ERROR, str(msg), args, **kwargs)
def critical(self, msg, *args, **kwargs):
"""Print critical"""
if self.save_errors:
self.saved_errors.append(msg)
if self._logger.isEnabledFor(CRITICAL):
self._log(CRITICAL, str(msg), args, **kwargs)
def stacktrace(self):
"""Print stacktrace"""
self.debug(traceback.format_exc())
def _space(self, display_title):
"""Add spaces to display title"""
display_title = str(display_title)
space_length = self.spacing - len(display_title)
if space_length > 0:
@ -176,6 +201,7 @@ class MyLogger:
return display_title
def ghost(self, text):
"""Print ghost"""
if not self.ignore_ghost:
try:
final_text = f"| {text}"
@ -186,15 +212,18 @@ class MyLogger:
self.spacing = len(text) + 2
def exorcise(self):
"""Exorcise ghost"""
if not self.ignore_ghost:
print(self._space(" "), end="\r")
self.spacing = 0
def secret(self, text):
"""Add secret"""
if str(text) not in self.secrets and str(text):
self.secrets.append(str(text))
def insert_space(self, display_title, space_length=0):
"""Insert space"""
display_title = str(display_title)
if space_length == 0:
space_length = self.spacing - len(display_title)
@ -203,6 +232,7 @@ class MyLogger:
return display_title
def _log(self, level, msg, args, exc_info=None, extra=None, stack_info=False, stacklevel=1):
"""Log"""
if self.spacing > 0:
self.exorcise()
if "\n" in msg:
@ -226,43 +256,44 @@ class MyLogger:
try:
if not _srcfile:
raise ValueError
fn, lno, func, sinfo = self.findCaller(stack_info, stacklevel)
func, lno, func, sinfo = self.find_caller(stack_info, stacklevel)
except ValueError:
fn, lno, func, sinfo = "(unknown file)", 0, "(unknown function)", None
func, lno, func, sinfo = "(unknown file)", 0, "(unknown function)", None
if exc_info:
if isinstance(exc_info, BaseException):
exc_info = (type(exc_info), exc_info, exc_info.__traceback__)
elif not isinstance(exc_info, tuple):
exc_info = sys.exc_info()
record = self._logger.makeRecord(self._logger.name, level, fn, lno, msg, args, exc_info, func, extra, sinfo)
record = self._logger.makeRecord(self._logger.name, level, func, lno, msg, args, exc_info, func, extra, sinfo)
self._logger.handle(record)
def findCaller(self, stack_info=False, stacklevel=1):
f = logging.currentframe()
if f is not None:
f = f.f_back
orig_f = f
while f and stacklevel > 1:
f = f.f_back
def find_caller(self, stack_info=False, stacklevel=1):
"""Find caller"""
frm = logging.currentframe()
if frm is not None:
frm = frm.f_back
orig_f = frm
while frm and stacklevel > 1:
frm = frm.f_back
stacklevel -= 1
if not f:
f = orig_f
rv = "(unknown file)", 0, "(unknown function)", None
while hasattr(f, "f_code"):
co = f.f_code
filename = os.path.normcase(co.co_filename)
if not frm:
frm = orig_f
rvf = "(unknown file)", 0, "(unknown function)", None
while hasattr(frm, "f_code"):
code = frm.f_code
filename = os.path.normcase(code.co_filename)
if filename == _srcfile:
f = f.f_back
frm = frm.f_back
continue
sinfo = None
if stack_info:
sio = io.StringIO()
sio.write("Stack (most recent call last):\n")
traceback.print_stack(f, file=sio)
traceback.print_stack(frm, file=sio)
sinfo = sio.getvalue()
if sinfo[-1] == "\n":
sinfo = sinfo[:-1]
sio.close()
rv = (co.co_filename, f.f_lineno, co.co_name, sinfo)
rvf = (code.co_filename, frm.f_lineno, code.co_name, sinfo)
break
return rv
return rvf

View file

@ -5,17 +5,22 @@ from modules.util import Failed
logger = util.logger
base_url = "https://notifiarr.com/api/v1/"
class Notifiarr:
"""Notifiarr API"""
BASE_URL = "https://notifiarr.com/api"
API_VERSION = "v1"
def __init__(self, config, params):
"""Initialize Notifiarr API"""
self.config = config
self.apikey = params["apikey"]
self.header = {"X-API-Key": self.apikey}
self.instance = params["instance"]
self.url = f"{self.BASE_URL}/{self.API_VERSION}/"
logger.secret(self.apikey)
response = self.config.get(f"{base_url}user/qbitManage/", headers=self.header, params={"fetch": "settings"})
response = self.config.get(f"{self.url}user/qbitManage/", headers=self.header, params={"fetch": "settings"})
response_json = None
try:
response_json = response.json()
@ -29,5 +34,6 @@ class Notifiarr:
raise Failed("Notifiarr Error: Invalid apikey")
def notification(self, json):
"""Send notification to Notifiarr"""
params = {"qbit_client": self.config.data["qbt"]["host"], "instance": self.instance}
return self.config.get(f"{base_url}notification/qbitManage/", json=json, headers=self.header, params=params)
return self.config.get(f"{self.url}notification/qbitManage/", json=json, headers=self.header, params=params)

View file

@ -953,11 +953,11 @@ class Qbt:
dir_cs_out = os.path.join(dir_cs, "qbit_manage_added")
os.makedirs(dir_cs_out, exist_ok=True)
for file in cs_files:
t_name = file.split("]", 2)[2].split(".torrent")[0]
tr_name = file.split("]", 2)[2].split(".torrent")[0]
t_tracker = file.split("]", 2)[1][1:]
# Substring Key match in dictionary (used because t_name might not match exactly with torrentdict key)
# Returned the dictionary of filtered item
torrentdict_file = dict(filter(lambda item: t_name in item[0], self.torrentinfo.items()))
torrentdict_file = dict(filter(lambda item: tr_name in item[0], self.torrentinfo.items()))
if torrentdict_file:
# Get the exact torrent match name from torrentdict
t_name = next(iter(torrentdict_file))

View file

@ -1,3 +1,4 @@
""" Utility functions for qBit Manage. """
import json
import logging
import os
@ -12,6 +13,7 @@ logger = logging.getLogger("qBit Manage")
def get_list(data, lower=False, split=True, int_list=False):
"""Return a list from a string or list."""
if data is None:
return None
elif isinstance(data, list):
@ -32,6 +34,8 @@ def get_list(data, lower=False, split=True, int_list=False):
class check:
"""Check for attributes in config."""
def __init__(self, config):
self.config = config
@ -52,6 +56,7 @@ class check:
save=True,
make_dirs=False,
):
"""Check for attribute in config."""
endline = ""
if parent is not None:
if subparent is not None:
@ -188,10 +193,13 @@ class check:
class Failed(Exception):
"""Exception raised for errors in the input."""
pass
def list_in_text(text, search_list, match_all=False):
"""Check if a list of strings is in a string"""
if isinstance(search_list, list):
search_list = set(search_list)
contains = {x for x in search_list if " " in x}
@ -205,77 +213,80 @@ def list_in_text(text, search_list, match_all=False):
return False
# truncate the value of the torrent url to remove sensitive information
def trunc_val(s, d, n=3):
def trunc_val(stg, delm, num=3):
"""Truncate the value of the torrent url to remove sensitive information"""
try:
x = d.join(s.split(d, n)[:n])
val = delm.join(stg.split(delm, num)[:num])
except IndexError:
x = None
return x
val = None
return val
# Move files from source to destination, mod variable is to change the date modified of the file being moved
def move_files(src, dest, mod=False):
"""Move files from source to destination, mod variable is to change the date modified of the file being moved"""
dest_path = os.path.dirname(dest)
toDelete = False
to_delete = False
if os.path.isdir(dest_path) is False:
os.makedirs(dest_path)
try:
if mod is True:
modTime = time.time()
os.utime(src, (modTime, modTime))
mod_time = time.time()
os.utime(src, (mod_time, mod_time))
shutil.move(src, dest)
except PermissionError as p:
logger.warning(f"{p} : Copying files instead.")
except PermissionError as perm:
logger.warning(f"{perm} : Copying files instead.")
shutil.copyfile(src, dest)
toDelete = True
except FileNotFoundError as f:
logger.warning(f"{f} : source: {src} -> destination: {dest}")
except Exception as e:
to_delete = True
except FileNotFoundError as file:
logger.warning(f"{file} : source: {src} -> destination: {dest}")
except Exception as ex:
logger.stacktrace()
logger.error(e)
return toDelete
logger.error(ex)
return to_delete
# Copy Files from source to destination
def copy_files(src, dest):
"""Copy files from source to destination"""
dest_path = os.path.dirname(dest)
if os.path.isdir(dest_path) is False:
os.makedirs(dest_path)
try:
shutil.copyfile(src, dest)
except Exception as e:
except Exception as ex:
logger.stacktrace()
logger.error(e)
logger.error(ex)
# Remove any empty directories after moving files
def remove_empty_directories(pathlib_root_dir, pattern):
"""Remove empty directories recursively."""
pathlib_root_dir = Path(pathlib_root_dir)
# list all directories recursively and sort them by path,
# longest first
L = sorted(
longest = sorted(
pathlib_root_dir.glob(pattern),
key=lambda p: len(str(p)),
reverse=True,
)
for pdir in L:
for pdir in longest:
try:
pdir.rmdir() # remove directory if empty
except OSError:
continue # catch and continue if non-empty
# will check if there are any hard links if it passes a file or folder
# If a folder is passed, it will take the largest file in that folder and only check for hardlinks
# of the remaining files where the file is greater size a percentage of the largest file
# This fixes the bug in #192
def nohardlink(file, notify):
check = True
"""
Check if there are any hard links
Will check if there are any hard links if it passes a file or folder
If a folder is passed, it will take the largest file in that folder and only check for hardlinks
of the remaining files where the file is greater size a percentage of the largest file
This fixes the bug in #192
"""
check_for_hl = True
if os.path.isfile(file):
logger.trace(f"Checking file: {file}")
if os.stat(file).st_nlink > 1:
check = False
check_for_hl = False
else:
sorted_files = sorted(Path(file).rglob("*"), key=lambda x: os.stat(x).st_size, reverse=True)
logger.trace(f"Folder: {file}")
@ -292,36 +303,40 @@ def nohardlink(file, notify):
largest_file_size = os.stat(sorted_files[0]).st_size
logger.trace(f"Largest file: {sorted_files[0]}")
logger.trace(f"Largest file size: {largest_file_size}")
for x in sorted_files:
file_size = os.stat(x).st_size
file_no_hardlinks = os.stat(x).st_nlink
for files in sorted_files:
file_size = os.stat(files).st_size
file_no_hardlinks = os.stat(files).st_nlink
logger.trace(f"Checking file: {file}")
logger.trace(f"Checking file size: {file_size}")
logger.trace(f"Checking no of hard links: {file_no_hardlinks}")
if file_no_hardlinks > 1 and file_size >= (largest_file_size * threshold):
check = False
return check
check_for_hl = False
return check_for_hl
# Load json file if exists
def load_json(file):
"""Load json file if exists"""
if os.path.isfile(file):
f = open(file)
data = json.load(f)
f.close()
file = open(file)
data = json.load(file)
file.close()
else:
data = {}
return data
# Save json file overwrite if exists
def save_json(torrent_json, dest):
with open(dest, "w", encoding="utf-8") as f:
json.dump(torrent_json, f, ensure_ascii=False, indent=4)
"""Save json file to destination"""
with open(dest, "w", encoding="utf-8") as file:
json.dump(torrent_json, file, ensure_ascii=False, indent=4)
# Gracefully kill script when docker stops
class GracefulKiller:
"""
Class to catch SIGTERM and SIGINT signals.
Gracefully kill script when docker stops.
"""
kill_now = False
def __init__(self):
@ -329,10 +344,12 @@ class GracefulKiller:
signal.signal(signal.SIGTERM, self.exit_gracefully)
def exit_gracefully(self, *args):
"""Set kill_now to True to exit gracefully."""
self.kill_now = True
def human_readable_size(size, decimal_places=3):
"""Convert bytes to human readable size"""
for unit in ["B", "KiB", "MiB", "GiB", "TiB"]:
if size < 1024.0:
break
@ -341,6 +358,8 @@ def human_readable_size(size, decimal_places=3):
class YAML:
"""Class to load and save yaml files"""
def __init__(self, path=None, input_data=None, check_empty=False, create=False):
self.path = path
self.input_data = input_data
@ -355,19 +374,20 @@ class YAML:
pass
self.data = {}
else:
with open(self.path, encoding="utf-8") as fp:
self.data = self.yaml.load(fp)
except ruamel.yaml.error.YAMLError as e:
e = str(e).replace("\n", "\n ")
raise Failed(f"YAML Error: {e}")
except Exception as e:
raise Failed(f"YAML Error: {e}")
with open(self.path, encoding="utf-8") as filepath:
self.data = self.yaml.load(filepath)
except ruamel.yaml.error.YAMLError as yerr:
err = str(yerr).replace("\n", "\n ")
raise Failed(f"YAML Error: {err}") from yerr
except Exception as yerr:
raise Failed(f"YAML Error: {yerr}") from yerr
if not self.data or not isinstance(self.data, dict):
if check_empty:
raise Failed("YAML Error: File is empty")
self.data = {}
def save(self):
"""Save yaml file"""
if self.path:
with open(self.path, "w") as fp:
self.yaml.dump(self.data, fp)
with open(self.path, "w") as filepath:
self.yaml.dump(self.data, filepath)

View file

@ -1,3 +1,4 @@
"""Class to handle webhooks."""
from json import JSONDecodeError
from requests.exceptions import JSONDecodeError as requestsJSONDecodeError
@ -9,7 +10,10 @@ logger = util.logger
class Webhooks:
"""Class to handle webhooks."""
def __init__(self, config, system_webhooks, notifiarr=None, apprise=None):
"""Initialize the class."""
self.config = config
self.error_webhooks = system_webhooks["error"] if "error" in system_webhooks else []
self.run_start_webhooks = system_webhooks["run_start"] if "run_start" in system_webhooks else []
@ -25,6 +29,7 @@ class Webhooks:
self.apprise = apprise
def _request(self, webhooks, json):
"""Send a webhook request."""
logger.trace("")
logger.trace(f"JSON: {json}")
for webhook in list(set(webhooks)):
@ -36,20 +41,18 @@ class Webhooks:
if self.notifiarr is None:
break
else:
for x in range(6):
response = self.notifiarr.notification(json=json)
if response.status_code < 500:
break
response = self.notifiarr.notification(json=json)
if response.status_code < 500:
break
elif webhook == "apprise":
if self.apprise is None:
logger.warning("Webhook attribute set to apprise but apprise attribute is not configured.")
break
else:
json["urls"] = self.apprise.notify_url
for x in range(6):
response = self.config.post(f"{self.apprise.api_url}/notify", json=json)
if response.status_code < 500:
break
response = self.config.post(f"{self.apprise.api_url}/notify", json=json)
if response.status_code < 500:
break
else:
response = self.config.post(webhook, json=json)
if response:
@ -72,11 +75,12 @@ class Webhooks:
response.status_code >= 400 or ("result" in response_json and response_json["result"] == "error")
) and skip is False:
raise Failed(f"({response.status_code} [{response.reason}]) {response_json}")
except (JSONDecodeError, requestsJSONDecodeError):
except (JSONDecodeError, requestsJSONDecodeError) as exc:
if response.status_code >= 400:
raise Failed(f"({response.status_code} [{response.reason}])")
raise Failed(f"({response.status_code} [{response.reason}])") from exc
def start_time_hooks(self, start_time):
"""Send a webhook to notify that the run has started."""
if self.run_start_webhooks:
dry_run = self.config.commands["dry_run"]
if dry_run:
@ -95,6 +99,7 @@ class Webhooks:
)
def end_time_hooks(self, start_time, end_time, run_time, next_run, stats, body):
"""Send a webhook to notify that the run has ended."""
if self.run_end_webhooks:
self._request(
self.run_end_webhooks,
@ -125,19 +130,21 @@ class Webhooks:
)
def error_hooks(self, text, function_error=None, critical=True):
"""Send a webhook to notify that an error has occurred."""
if self.error_webhooks:
type = "failure" if critical is True else "warning"
err_type = "failure" if critical is True else "warning"
json = {
"function": "run_error",
"title": f"{function_error} Error",
"body": str(text),
"critical": critical,
"type": type,
"type": err_type,
}
if function_error:
json["function_error"] = function_error
self._request(self.error_webhooks, json)
def function_hooks(self, webhook, json):
"""Send a webhook to notify that a function has completed."""
if self.function_webhooks:
self._request(webhook, json)

View file

@ -1,4 +1,5 @@
#!/usr/bin/python3
"""qBittorrent Manager."""
import argparse
import glob
import os
@ -14,11 +15,13 @@ except ModuleNotFoundError:
print("Requirements Error: Requirements are not installed")
sys.exit(0)
REQUIRED_VERSION = (3, 8, 1)
current_version = sys.version_info
if sys.version_info < (3, 8, 1):
if current_version < (REQUIRED_VERSION):
print(
"Version Error: Version: %s.%s.%s incompatible please use Python 3.8.1+"
% (sys.version_info[0], sys.version_info[1], sys.version_info[2])
"Version Error: Version: %s.%s.%s incompatible with qbit_manage please use Python %s+"
% (current_version[0], current_version[1], current_version[2], REQUIRED_VERSION)
)
sys.exit(0)
@ -142,7 +145,7 @@ parser.add_argument(
dest="skip_cleanup",
action="store_true",
default=False,
help="Use this to skip cleaning up Reycle Bin/Orphaned directory.",
help="Use this to skip cleaning up Recycle Bin/Orphaned directory.",
)
parser.add_argument(
"-dr",
@ -163,6 +166,7 @@ args = parser.parse_args()
def get_arg(env_str, default, arg_bool=False, arg_int=False):
"""Get argument from environment variable or command line argument."""
env_vars = [env_str] if not isinstance(env_str, list) else env_str
final_value = None
for env_var in env_vars:
@ -284,11 +288,12 @@ from modules.util import GracefulKiller # noqa
from modules.util import Failed # noqa
def my_except_hook(exctype, value, tb):
def my_except_hook(exctype, value, tbi):
"""Handle uncaught exceptions"""
if issubclass(exctype, KeyboardInterrupt):
sys.__excepthook__(exctype, value, tb)
sys.__excepthook__(exctype, value, tbi)
else:
logger.critical("Uncaught Exception", exc_info=(exctype, value, tb))
logger.critical("Uncaught Exception", exc_info=(exctype, value, tbi))
sys.excepthook = my_except_hook
@ -303,6 +308,7 @@ with open(os.path.join(os.path.dirname(os.path.abspath(__file__)), "VERSION")) a
def start_loop():
"""Start the main loop"""
if len(config_files) == 1:
args["config_file"] = config_files[0]
start()
@ -316,6 +322,7 @@ def start_loop():
def start():
"""Start the run"""
start_time = datetime.now()
args["time"] = start_time.strftime("%H:%M")
args["time_obj"] = start_time
@ -345,13 +352,14 @@ def start():
"untagged_noHL": 0,
}
def FinishedRun():
def finished_run():
"""Handle the end of a run"""
nonlocal end_time, start_time, stats_summary, run_time, next_run, body
end_time = datetime.now()
run_time = str(end_time - start_time).split(".")[0]
_, nr = calc_next_run(sch, True)
next_run_str = nr["next_run_str"]
next_run = nr["next_run"]
run_time = str(end_time - start_time).split(".", maxsplit=1)[0]
_, nxt_run = calc_next_run(sch, True)
next_run_str = nxt_run["next_run_str"]
next_run = nxt_run["next_run"]
body = logger.separator(
f"Finished Run\n{os.linesep.join(stats_summary) if len(stats_summary)>0 else ''}"
f"\nRun Time: {run_time}\n{next_run_str if len(next_run_str)>0 else ''}".replace("\n\n", "\n").rstrip()
@ -360,15 +368,15 @@ def start():
try:
cfg = Config(default_dir, args)
except Exception as e:
if "Qbittorrent Error" in e.args[0]:
logger.print_line(e, "CRITICAL")
except Exception as ex:
if "Qbittorrent Error" in ex.args[0]:
logger.print_line(ex, "CRITICAL")
logger.print_line("Exiting scheduled Run.", "CRITICAL")
FinishedRun()
finished_run()
return None
else:
logger.stacktrace()
logger.print_line(e, "CRITICAL")
logger.print_line(ex, "CRITICAL")
if cfg:
# Set Category
@ -449,28 +457,29 @@ def start():
if stats["orphaned_emptied"] > 0:
stats_summary.append(f"Total Files Deleted from Orphaned Data: {stats['orphaned_emptied']}")
FinishedRun()
finished_run()
if cfg:
try:
cfg.Webhooks.end_time_hooks(start_time, end_time, run_time, next_run, stats, body)
except Failed as e:
except Failed as err:
logger.stacktrace()
logger.error(f"Webhooks Error: {e}")
logger.error(f"Webhooks Error: {err}")
def end():
"""Ends the program"""
logger.info("Exiting Qbit_manage")
logger.remove_main_handler()
sys.exit(0)
def calc_next_run(sch, print=False):
def calc_next_run(schd, write_out=False):
"""Calculates the next run time based on the schedule"""
current = datetime.now().strftime("%H:%M")
seconds = sch * 60
time_to_run = datetime.now() + timedelta(minutes=sch)
seconds = schd * 60
time_to_run = datetime.now() + timedelta(minutes=schd)
time_to_run_str = time_to_run.strftime("%H:%M")
new_seconds = (datetime.strptime(time_to_run_str, "%H:%M") - datetime.strptime(current, "%H:%M")).total_seconds()
time_str = ""
next_run = {}
if run is False:
next_run["next_run"] = time_to_run
@ -481,14 +490,14 @@ def calc_next_run(sch, print=False):
if seconds is not None:
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
time_str = f"{hours} Hour{'s' if hours > 1 else ''}{' and ' if minutes > 1 else ''}" if hours > 0 else ""
time_str += f"{minutes} Minute{'s' if minutes > 1 else ''}" if minutes > 0 else ""
if print:
next_run["next_run_str"] = f"Current Time: {current} | {time_str} until the next run at {time_to_run_str}"
time_until = f"{hours} Hour{'s' if hours > 1 else ''}{' and ' if minutes > 1 else ''}" if hours > 0 else ""
time_until += f"{minutes} Minute{'s' if minutes > 1 else ''}" if minutes > 0 else ""
if write_out:
next_run["next_run_str"] = f"Current Time: {current} | {time_until} until the next run at {time_to_run_str}"
else:
next_run["next_run"] = None
next_run["next_run_str"] = ""
return time_str, next_run
return time_until, next_run
if __name__ == "__main__":

View file

@ -49,7 +49,7 @@ def setup_services(qbt=False):
)
try:
qbt_client.auth_log_in()
print("Succesfully connected to qBittorrent!")
print("Successfully connected to qBittorrent!")
except:
print("Error: Could not log into qBittorrent. Please verify login details are correct and Web Ui is available.")
quit_program(1)