mirror of
				https://github.com/StuffAnThings/qbit_manage.git
				synced 2025-10-31 08:26:29 +08:00 
			
		
		
		
	
		
			
				
	
	
		
			613 lines
		
	
	
	
		
			29 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable file
		
	
	
	
	
			
		
		
	
	
			613 lines
		
	
	
	
		
			29 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable file
		
	
	
	
	
| """Config class for qBittorrent-Manage"""
 | |
| import os
 | |
| import re
 | |
| import stat
 | |
| import time
 | |
| 
 | |
| import requests
 | |
| from retrying import retry
 | |
| 
 | |
| from modules import util
 | |
| from modules.apprise import Apprise
 | |
| from modules.bhd import BeyondHD
 | |
| from modules.notifiarr import Notifiarr
 | |
| from modules.qbittorrent import Qbt
 | |
| from modules.util import check
 | |
| from modules.util import Failed
 | |
| from modules.util import YAML
 | |
| from modules.webhooks import Webhooks
 | |
| 
 | |
| logger = util.logger
 | |
| 
 | |
| COMMANDS = [
 | |
|     "cross_seed",
 | |
|     "recheck",
 | |
|     "cat_update",
 | |
|     "tag_update",
 | |
|     "rem_unregistered",
 | |
|     "tag_tracker_error",
 | |
|     "rem_orphaned",
 | |
|     "tag_nohardlinks",
 | |
|     "skip_cleanup",
 | |
|     "dry_run",
 | |
| ]
 | |
| 
 | |
| 
 | |
| class Config:
 | |
|     """Config class for qBittorrent-Manage"""
 | |
| 
 | |
|     def __init__(self, default_dir, args):
 | |
|         logger.info("Locating config...")
 | |
|         self.args = args
 | |
|         config_file = args["config_file"]
 | |
|         if config_file and os.path.exists(config_file):
 | |
|             self.config_path = os.path.abspath(config_file)
 | |
|         elif config_file and os.path.exists(os.path.join(default_dir, config_file)):
 | |
|             self.config_path = os.path.abspath(os.path.join(default_dir, config_file))
 | |
|         elif config_file and not os.path.exists(config_file):
 | |
|             raise Failed(f"Config Error: config not found at {os.path.abspath(config_file)}")
 | |
|         elif os.path.exists(os.path.join(default_dir, "config.yml")):
 | |
|             self.config_path = os.path.abspath(os.path.join(default_dir, "config.yml"))
 | |
|         else:
 | |
|             raise Failed(f"Config Error: config not found at {os.path.abspath(default_dir)}")
 | |
|         logger.info(f"Using {self.config_path} as config")
 | |
| 
 | |
|         self.util = check(self)
 | |
|         self.default_dir = default_dir
 | |
|         self.start_time = args["time_obj"]
 | |
| 
 | |
|         loaded_yaml = YAML(self.config_path)
 | |
|         self.data = loaded_yaml.data
 | |
| 
 | |
|         # Replace env variables with config commands
 | |
|         if "commands" in self.data:
 | |
|             if self.data["commands"] is not None:
 | |
|                 logger.info(f"Commands found in {config_file}, ignoring env variables and using config commands instead.")
 | |
|                 self.commands = {}
 | |
|                 for command in COMMANDS:
 | |
|                     self.commands[command] = self.util.check_for_attribute(
 | |
|                         self.data,
 | |
|                         command,
 | |
|                         parent="commands",
 | |
|                         var_type="bool",
 | |
|                         default=False,
 | |
|                         save=True,
 | |
|                     )
 | |
|                 logger.debug(f"    --cross-seed (QBT_CROSS_SEED): {self.commands['cross_seed']}")
 | |
|                 logger.debug(f"    --recheck (QBT_RECHECK): {self.commands['recheck']}")
 | |
|                 logger.debug(f"    --cat-update (QBT_CAT_UPDATE): {self.commands['cat_update']}")
 | |
|                 logger.debug(f"    --tag-update (QBT_TAG_UPDATE): {self.commands['tag_update']}")
 | |
|                 logger.debug(f"    --rem-unregistered (QBT_REM_UNREGISTERED): {self.commands['rem_unregistered']}")
 | |
|                 logger.debug(f"    --tag-tracker-error (QBT_TAG_TRACKER_ERROR): {self.commands['tag_tracker_error']}")
 | |
|                 logger.debug(f"    --rem-orphaned (QBT_REM_ORPHANED): {self.commands['rem_orphaned']}")
 | |
|                 logger.debug(f"    --tag-nohardlinks (QBT_TAG_NOHARDLINKS): {self.commands['tag_nohardlinks']}")
 | |
|                 logger.debug(f"    --skip-cleanup (QBT_SKIP_CLEANUP): {self.commands['skip_cleanup']}")
 | |
|                 logger.debug(f"    --dry-run (QBT_DRY_RUN): {self.commands['dry_run']}")
 | |
|         else:
 | |
|             self.commands = args
 | |
| 
 | |
|         if "qbt" in self.data:
 | |
|             self.data["qbt"] = self.data.pop("qbt")
 | |
|         self.data["settings"] = self.data.pop("settings") if "settings" in self.data else {}
 | |
|         if "directory" in self.data:
 | |
|             self.data["directory"] = self.data.pop("directory")
 | |
|         self.data["cat"] = self.data.pop("cat") if "cat" in self.data else {}
 | |
|         if "cat_change" in self.data:
 | |
|             self.data["cat_change"] = self.data.pop("cat_change")
 | |
|         if "tracker" in self.data:
 | |
|             self.data["tracker"] = self.data.pop("tracker")
 | |
|         else:
 | |
|             self.data["tracker"] = {}
 | |
|         if "nohardlinks" in self.data:
 | |
|             self.data["nohardlinks"] = self.data.pop("nohardlinks")
 | |
|         if "recyclebin" in self.data:
 | |
|             self.data["recyclebin"] = self.data.pop("recyclebin")
 | |
|         if "orphaned" in self.data:
 | |
|             self.data["orphaned"] = self.data.pop("orphaned")
 | |
|         if "apprise" in self.data:
 | |
|             self.data["apprise"] = self.data.pop("apprise")
 | |
|         if "notifiarr" in self.data:
 | |
|             self.data["notifiarr"] = self.data.pop("notifiarr")
 | |
|         if "webhooks" in self.data:
 | |
|             temp = self.data.pop("webhooks")
 | |
|             if "function" not in temp or ("function" in temp and temp["function"] is None):
 | |
|                 temp["function"] = {}
 | |
| 
 | |
|             def hooks(attr):
 | |
|                 if attr in temp:
 | |
|                     items = temp.pop(attr)
 | |
|                     if items:
 | |
|                         temp["function"][attr] = items
 | |
|                 if attr not in temp["function"]:
 | |
|                     temp["function"][attr] = {}
 | |
|                     temp["function"][attr] = None
 | |
| 
 | |
|             hooks("cross_seed")
 | |
|             hooks("recheck")
 | |
|             hooks("cat_update")
 | |
|             hooks("tag_update")
 | |
|             hooks("rem_unregistered")
 | |
|             hooks("rem_orphaned")
 | |
|             hooks("tag_nohardlinks")
 | |
|             hooks("cleanup_dirs")
 | |
|             self.data["webhooks"] = temp
 | |
|         if "bhd" in self.data:
 | |
|             self.data["bhd"] = self.data.pop("bhd")
 | |
|         self.dry_run = self.commands["dry_run"]
 | |
|         self.loglevel = "DRYRUN" if self.dry_run else "INFO"
 | |
|         self.session = requests.Session()
 | |
| 
 | |
|         self.settings = {
 | |
|             "force_auto_tmm": self.util.check_for_attribute(
 | |
|                 self.data, "force_auto_tmm", parent="settings", var_type="bool", default=False
 | |
|             ),
 | |
|             "tracker_error_tag": self.util.check_for_attribute(
 | |
|                 self.data, "tracker_error_tag", parent="settings", default="issue"
 | |
|             ),
 | |
|             "nohardlinks_tag": self.util.check_for_attribute(self.data, "nohardlinks_tag", parent="settings", default="noHL"),
 | |
|         }
 | |
| 
 | |
|         self.tracker_error_tag = self.settings["tracker_error_tag"]
 | |
|         self.nohardlinks_tag = self.settings["nohardlinks_tag"]
 | |
| 
 | |
|         default_ignore_tags = [self.nohardlinks_tag, self.tracker_error_tag, "cross-seed"]
 | |
|         self.settings["ignoreTags_OnUpdate"] = self.util.check_for_attribute(
 | |
|             self.data, "ignoreTags_OnUpdate", parent="settings", default=default_ignore_tags, var_type="list"
 | |
|         )
 | |
| 
 | |
|         default_function = {
 | |
|             "cross_seed": None,
 | |
|             "recheck": None,
 | |
|             "cat_update": None,
 | |
|             "tag_update": None,
 | |
|             "rem_unregistered": None,
 | |
|             "tag_tracker_error": None,
 | |
|             "rem_orphaned": None,
 | |
|             "tag_nohardlinks": None,
 | |
|             "cleanup_dirs": None,
 | |
|         }
 | |
| 
 | |
|         self.webhooks_factory = {
 | |
|             "error": self.util.check_for_attribute(self.data, "error", parent="webhooks", var_type="list", default_is_none=True),
 | |
|             "run_start": self.util.check_for_attribute(
 | |
|                 self.data, "run_start", parent="webhooks", var_type="list", default_is_none=True
 | |
|             ),
 | |
|             "run_end": self.util.check_for_attribute(
 | |
|                 self.data, "run_end", parent="webhooks", var_type="list", default_is_none=True
 | |
|             ),
 | |
|             "function": self.util.check_for_attribute(
 | |
|                 self.data, "function", parent="webhooks", var_type="list", default=default_function
 | |
|             ),
 | |
|         }
 | |
|         for func in default_function:
 | |
|             self.util.check_for_attribute(self.data, func, parent="webhooks", subparent="function", default_is_none=True)
 | |
| 
 | |
|         self.cat_change = self.data["cat_change"] if "cat_change" in self.data else {}
 | |
| 
 | |
|         self.apprise_factory = None
 | |
|         if "apprise" in self.data:
 | |
|             if self.data["apprise"] is not None:
 | |
|                 logger.info("Connecting to Apprise...")
 | |
|                 try:
 | |
|                     self.apprise_factory = Apprise(
 | |
|                         self,
 | |
|                         {
 | |
|                             "api_url": self.util.check_for_attribute(
 | |
|                                 self.data, "api_url", parent="apprise", var_type="url", throw=True
 | |
|                             ),
 | |
|                             "notify_url": self.util.check_for_attribute(
 | |
|                                 self.data, "notify_url", parent="apprise", var_type="list", throw=True
 | |
|                             ),
 | |
|                         },
 | |
|                     )
 | |
|                 except Failed as err:
 | |
|                     logger.error(err)
 | |
|                 logger.info(f"Apprise Connection {'Failed' if self.apprise_factory is None else 'Successful'}")
 | |
| 
 | |
|         self.notifiarr_factory = None
 | |
|         if "notifiarr" in self.data:
 | |
|             if self.data["notifiarr"] is not None:
 | |
|                 logger.info("Connecting to Notifiarr...")
 | |
|                 try:
 | |
|                     self.notifiarr_factory = Notifiarr(
 | |
|                         self,
 | |
|                         {
 | |
|                             "apikey": self.util.check_for_attribute(self.data, "apikey", parent="notifiarr", throw=True),
 | |
|                             "instance": self.util.check_for_attribute(
 | |
|                                 self.data, "instance", parent="notifiarr", default=False, do_print=False, save=False
 | |
|                             ),
 | |
|                         },
 | |
|                     )
 | |
|                 except Failed as err:
 | |
|                     logger.error(err)
 | |
|                 logger.info(f"Notifiarr Connection {'Failed' if self.notifiarr_factory is None else 'Successful'}")
 | |
| 
 | |
|         self.webhooks_factory = Webhooks(
 | |
|             self, self.webhooks_factory, notifiarr=self.notifiarr_factory, apprise=self.apprise_factory
 | |
|         )
 | |
|         try:
 | |
|             self.webhooks_factory.start_time_hooks(self.start_time)
 | |
|         except Failed as err:
 | |
|             logger.stacktrace()
 | |
|             logger.error(f"Webhooks Error: {err}")
 | |
| 
 | |
|         self.beyond_hd = None
 | |
|         if "bhd" in self.data:
 | |
|             if self.data["bhd"] is not None:
 | |
|                 logger.info("Connecting to BHD API...")
 | |
|                 try:
 | |
|                     self.beyond_hd = BeyondHD(
 | |
|                         self, {"apikey": self.util.check_for_attribute(self.data, "apikey", parent="bhd", throw=True)}
 | |
|                     )
 | |
|                 except Failed as err:
 | |
|                     logger.error(err)
 | |
|                     self.notify(err, "BHD")
 | |
|                 logger.info(f"BHD Connection {'Failed' if self.beyond_hd is None else 'Successful'}")
 | |
| 
 | |
|         # nohardlinks
 | |
|         self.nohardlinks = None
 | |
|         if "nohardlinks" in self.data and self.commands["tag_nohardlinks"]:
 | |
|             self.nohardlinks = {}
 | |
|             for cat in self.data["nohardlinks"]:
 | |
|                 if cat in list(self.data["cat"].keys()):
 | |
|                     is_max_ratio_defined = self.data["nohardlinks"][cat].get("max_ratio")
 | |
|                     is_max_seeding_time_defined = self.data["nohardlinks"][cat].get("max_seeding_time")
 | |
|                     self.nohardlinks[cat] = {}
 | |
|                     self.nohardlinks[cat]["exclude_tags"] = self.util.check_for_attribute(
 | |
|                         self.data,
 | |
|                         "exclude_tags",
 | |
|                         parent="nohardlinks",
 | |
|                         subparent=cat,
 | |
|                         var_type="list",
 | |
|                         default_is_none=True,
 | |
|                         do_print=False,
 | |
|                     )
 | |
|                     self.nohardlinks[cat]["cleanup"] = self.util.check_for_attribute(
 | |
|                         self.data, "cleanup", parent="nohardlinks", subparent=cat, var_type="bool", default=False, do_print=False
 | |
|                     )
 | |
|                     if is_max_ratio_defined or is_max_seeding_time_defined:
 | |
|                         self.nohardlinks[cat]["max_ratio"] = self.util.check_for_attribute(
 | |
|                             self.data,
 | |
|                             "max_ratio",
 | |
|                             parent="nohardlinks",
 | |
|                             subparent=cat,
 | |
|                             var_type="float",
 | |
|                             min_int=-2,
 | |
|                             do_print=False,
 | |
|                             default=-1,
 | |
|                             save=False,
 | |
|                         )
 | |
|                         self.nohardlinks[cat]["max_seeding_time"] = self.util.check_for_attribute(
 | |
|                             self.data,
 | |
|                             "max_seeding_time",
 | |
|                             parent="nohardlinks",
 | |
|                             subparent=cat,
 | |
|                             var_type="int",
 | |
|                             min_int=-2,
 | |
|                             do_print=False,
 | |
|                             default=-1,
 | |
|                             save=False,
 | |
|                         )
 | |
|                     else:
 | |
|                         self.nohardlinks[cat]["max_ratio"] = self.util.check_for_attribute(
 | |
|                             self.data,
 | |
|                             "max_ratio",
 | |
|                             parent="nohardlinks",
 | |
|                             subparent=cat,
 | |
|                             var_type="float",
 | |
|                             min_int=-2,
 | |
|                             do_print=False,
 | |
|                             default_is_none=True,
 | |
|                             save=False,
 | |
|                         )
 | |
|                         self.nohardlinks[cat]["max_seeding_time"] = self.util.check_for_attribute(
 | |
|                             self.data,
 | |
|                             "max_seeding_time",
 | |
|                             parent="nohardlinks",
 | |
|                             subparent=cat,
 | |
|                             var_type="int",
 | |
|                             min_int=-2,
 | |
|                             do_print=False,
 | |
|                             default_is_none=True,
 | |
|                             save=False,
 | |
|                         )
 | |
|                     self.nohardlinks[cat]["min_seeding_time"] = self.util.check_for_attribute(
 | |
|                         self.data,
 | |
|                         "min_seeding_time",
 | |
|                         parent="nohardlinks",
 | |
|                         subparent=cat,
 | |
|                         var_type="int",
 | |
|                         min_int=0,
 | |
|                         do_print=False,
 | |
|                         default=0,
 | |
|                         save=False,
 | |
|                     )
 | |
|                     self.nohardlinks[cat]["limit_upload_speed"] = self.util.check_for_attribute(
 | |
|                         self.data,
 | |
|                         "limit_upload_speed",
 | |
|                         parent="nohardlinks",
 | |
|                         subparent=cat,
 | |
|                         var_type="int",
 | |
|                         min_int=-1,
 | |
|                         do_print=False,
 | |
|                         default=0,
 | |
|                         save=False,
 | |
|                     )
 | |
|                     self.nohardlinks[cat]["resume_torrent_after_untagging_noHL"] = self.util.check_for_attribute(
 | |
|                         self.data,
 | |
|                         "resume_torrent_after_untagging_noHL",
 | |
|                         parent="nohardlinks",
 | |
|                         subparent=cat,
 | |
|                         var_type="bool",
 | |
|                         default=True,
 | |
|                         do_print=False,
 | |
|                         save=False,
 | |
|                     )
 | |
|                 else:
 | |
|                     err = f"Config Error: Category {cat} is defined under nohardlinks attribute "
 | |
|                     "but is not defined in the cat attribute."
 | |
|                     self.notify(err, "Config")
 | |
|                     raise Failed(err)
 | |
|         else:
 | |
|             if self.commands["tag_nohardlinks"]:
 | |
|                 err = "Config Error: nohardlinks attribute max_ratio not found"
 | |
|                 self.notify(err, "Config")
 | |
|                 raise Failed(err)
 | |
| 
 | |
|         # Add RecycleBin
 | |
|         self.recyclebin = {}
 | |
|         self.recyclebin["enabled"] = self.util.check_for_attribute(
 | |
|             self.data, "enabled", parent="recyclebin", var_type="bool", default=True
 | |
|         )
 | |
|         self.recyclebin["empty_after_x_days"] = self.util.check_for_attribute(
 | |
|             self.data, "empty_after_x_days", parent="recyclebin", var_type="int", default_is_none=True
 | |
|         )
 | |
|         self.recyclebin["save_torrents"] = self.util.check_for_attribute(
 | |
|             self.data, "save_torrents", parent="recyclebin", var_type="bool", default=False
 | |
|         )
 | |
|         self.recyclebin["split_by_category"] = self.util.check_for_attribute(
 | |
|             self.data, "split_by_category", parent="recyclebin", var_type="bool", default=False
 | |
|         )
 | |
| 
 | |
|         # Assign directories
 | |
|         if "directory" in self.data:
 | |
|             self.root_dir = os.path.join(
 | |
|                 self.util.check_for_attribute(self.data, "root_dir", parent="directory", default_is_none=True), ""
 | |
|             )
 | |
|             self.remote_dir = os.path.join(
 | |
|                 self.util.check_for_attribute(self.data, "remote_dir", parent="directory", default=self.root_dir), ""
 | |
|             )
 | |
|             if self.commands["cross_seed"] or self.commands["tag_nohardlinks"] or self.commands["rem_orphaned"]:
 | |
|                 self.remote_dir = self.util.check_for_attribute(
 | |
|                     self.data, "remote_dir", parent="directory", var_type="path", default=self.root_dir
 | |
|                 )
 | |
|             else:
 | |
|                 if self.recyclebin["enabled"]:
 | |
|                     self.remote_dir = self.util.check_for_attribute(
 | |
|                         self.data, "remote_dir", parent="directory", var_type="path", default=self.root_dir
 | |
|                     )
 | |
|             if self.commands["cross_seed"]:
 | |
|                 self.cross_seed_dir = self.util.check_for_attribute(self.data, "cross_seed", parent="directory", var_type="path")
 | |
|             else:
 | |
|                 self.cross_seed_dir = self.util.check_for_attribute(
 | |
|                     self.data, "cross_seed", parent="directory", default_is_none=True
 | |
|                 )
 | |
|             if self.commands["rem_orphaned"]:
 | |
|                 if "orphaned_dir" in self.data["directory"] and self.data["directory"]["orphaned_dir"] is not None:
 | |
|                     default_orphaned = os.path.join(
 | |
|                         self.remote_dir, os.path.basename(self.data["directory"]["orphaned_dir"].rstrip(os.sep))
 | |
|                     )
 | |
|                 else:
 | |
|                     default_orphaned = os.path.join(self.remote_dir, "orphaned_data")
 | |
|                 self.orphaned_dir = self.util.check_for_attribute(
 | |
|                     self.data, "orphaned_dir", parent="directory", var_type="path", default=default_orphaned, make_dirs=True
 | |
|                 )
 | |
|             else:
 | |
|                 self.orphaned_dir = None
 | |
|             if self.recyclebin["enabled"]:
 | |
|                 if "recycle_bin" in self.data["directory"] and self.data["directory"]["recycle_bin"] is not None:
 | |
|                     default_recycle = os.path.join(
 | |
|                         self.remote_dir, os.path.basename(self.data["directory"]["recycle_bin"].rstrip(os.sep))
 | |
|                     )
 | |
|                 else:
 | |
|                     default_recycle = os.path.join(self.remote_dir, ".RecycleBin")
 | |
|                 if self.recyclebin["split_by_category"]:
 | |
|                     self.recycle_dir = self.util.check_for_attribute(
 | |
|                         self.data, "recycle_bin", parent="directory", default=default_recycle
 | |
|                     )
 | |
|                 else:
 | |
|                     self.recycle_dir = self.util.check_for_attribute(
 | |
|                         self.data, "recycle_bin", parent="directory", var_type="path", default=default_recycle, make_dirs=True
 | |
|                     )
 | |
|             else:
 | |
|                 self.recycle_dir = None
 | |
|             if self.recyclebin["enabled"] and self.recyclebin["save_torrents"]:
 | |
|                 self.torrents_dir = self.util.check_for_attribute(self.data, "torrents_dir", parent="directory", var_type="path")
 | |
|                 if not any(File.endswith(".torrent") for File in os.listdir(self.torrents_dir)):
 | |
|                     err = f"Config Error: The location {self.torrents_dir} does not contain any .torrents"
 | |
|                     self.notify(err, "Config")
 | |
|                     raise Failed(err)
 | |
|             else:
 | |
|                 self.torrents_dir = self.util.check_for_attribute(
 | |
|                     self.data, "torrents_dir", parent="directory", default_is_none=True
 | |
|                 )
 | |
|         else:
 | |
|             e = "Config Error: directory attribute not found"
 | |
|             self.notify(e, "Config")
 | |
|             raise Failed(e)
 | |
| 
 | |
|         # Add Orphaned
 | |
|         self.orphaned = {}
 | |
|         self.orphaned["empty_after_x_days"] = self.util.check_for_attribute(
 | |
|             self.data, "empty_after_x_days", parent="orphaned", var_type="int", default_is_none=True
 | |
|         )
 | |
|         self.orphaned["exclude_patterns"] = self.util.check_for_attribute(
 | |
|             self.data, "exclude_patterns", parent="orphaned", var_type="list", default_is_none=True, do_print=False
 | |
|         )
 | |
|         if self.commands["rem_orphaned"]:
 | |
|             exclude_orphaned = f"**{os.sep}{os.path.basename(self.orphaned_dir.rstrip(os.sep))}{os.sep}*"
 | |
|             self.orphaned["exclude_patterns"].append(exclude_orphaned) if exclude_orphaned not in self.orphaned[
 | |
|                 "exclude_patterns"
 | |
|             ] else self.orphaned["exclude_patterns"]
 | |
|         if self.recyclebin["enabled"]:
 | |
|             exclude_recycle = f"**{os.sep}{os.path.basename(self.recycle_dir.rstrip(os.sep))}{os.sep}*"
 | |
|             self.orphaned["exclude_patterns"].append(exclude_recycle) if exclude_recycle not in self.orphaned[
 | |
|                 "exclude_patterns"
 | |
|             ] else self.orphaned["exclude_patterns"]
 | |
| 
 | |
|         # Connect to Qbittorrent
 | |
|         self.qbt = None
 | |
|         if "qbt" in self.data:
 | |
|             logger.info("Connecting to Qbittorrent...")
 | |
|             self.qbt = Qbt(
 | |
|                 self,
 | |
|                 {
 | |
|                     "host": self.util.check_for_attribute(self.data, "host", parent="qbt", throw=True),
 | |
|                     "username": self.util.check_for_attribute(self.data, "user", parent="qbt", default_is_none=True),
 | |
|                     "password": self.util.check_for_attribute(self.data, "pass", parent="qbt", default_is_none=True),
 | |
|                 },
 | |
|             )
 | |
|         else:
 | |
|             e = "Config Error: qbt attribute not found"
 | |
|             self.notify(e, "Config")
 | |
|             raise Failed(e)
 | |
| 
 | |
|     # Empty old files from recycle bin or orphaned
 | |
|     def cleanup_dirs(self, location):
 | |
|         num_del = 0
 | |
|         files = []
 | |
|         size_bytes = 0
 | |
|         skip = self.commands["skip_cleanup"]
 | |
|         if location == "Recycle Bin":
 | |
|             enabled = self.recyclebin["enabled"]
 | |
|             empty_after_x_days = self.recyclebin["empty_after_x_days"]
 | |
|             function = "cleanup_dirs"
 | |
|             location_path = self.recycle_dir
 | |
| 
 | |
|         elif location == "Orphaned Data":
 | |
|             enabled = self.commands["rem_orphaned"]
 | |
|             empty_after_x_days = self.orphaned["empty_after_x_days"]
 | |
|             function = "cleanup_dirs"
 | |
|             location_path = self.orphaned_dir
 | |
| 
 | |
|         if not skip:
 | |
|             if enabled and empty_after_x_days is not None:
 | |
|                 if location == "Recycle Bin" and self.recyclebin["split_by_category"]:
 | |
|                     if "cat" in self.data and self.data["cat"] is not None:
 | |
|                         save_path = list(self.data["cat"].values())
 | |
|                         cleaned_save_path = [
 | |
|                             os.path.join(
 | |
|                                 s.replace(self.root_dir, self.remote_dir), os.path.basename(location_path.rstrip(os.sep))
 | |
|                             )
 | |
|                             for s in save_path
 | |
|                         ]
 | |
|                         location_path_list = [location_path]
 | |
|                         for folder in cleaned_save_path:
 | |
|                             if os.path.exists(folder):
 | |
|                                 location_path_list.append(folder)
 | |
|                     else:
 | |
|                         e = f"No categories defined. Checking {location} directory {location_path}."
 | |
|                         self.notify(e, f"Empty {location}", False)
 | |
|                         logger.warning(e)
 | |
|                         location_path_list = [location_path]
 | |
|                 else:
 | |
|                     location_path_list = [location_path]
 | |
|                 location_files = [
 | |
|                     os.path.join(path, name)
 | |
|                     for r_path in location_path_list
 | |
|                     for path, subdirs, files in os.walk(r_path)
 | |
|                     for name in files
 | |
|                 ]
 | |
|                 location_files = sorted(location_files)
 | |
|                 logger.trace(f"location_files: {location_files}")
 | |
|                 if location_files:
 | |
|                     body = []
 | |
|                     logger.separator(f"Emptying {location} (Files > {empty_after_x_days} days)", space=True, border=True)
 | |
|                     prevfolder = ""
 | |
|                     for file in location_files:
 | |
|                         folder = re.search(f".*{os.path.basename(location_path.rstrip(os.sep))}", file).group(0)
 | |
|                         if folder != prevfolder:
 | |
|                             body += logger.separator(f"Searching: {folder}", space=False, border=False)
 | |
|                         try:
 | |
|                             fileStats = os.stat(file)
 | |
|                             filename = os.path.basename(file)
 | |
|                             last_modified = fileStats[stat.ST_MTIME]  # in seconds (last modified time)
 | |
|                         except FileNotFoundError:
 | |
|                             ex = logger.print_line(
 | |
|                                 f"{location} Warning - FileNotFound: No such file or directory: {file} ", "WARNING"
 | |
|                             )
 | |
|                             self.config.notify(ex, "Cleanup Dirs", False)
 | |
|                             continue
 | |
|                         now = time.time()  # in seconds
 | |
|                         days = (now - last_modified) / (60 * 60 * 24)
 | |
|                         if empty_after_x_days <= days:
 | |
|                             num_del += 1
 | |
|                             body += logger.print_line(
 | |
|                                 f"{'Did not delete' if self.dry_run else 'Deleted'} "
 | |
|                                 f"{filename} from {folder} (Last modified {round(days)} days ago).",
 | |
|                                 self.loglevel,
 | |
|                             )
 | |
|                             files += [str(filename)]
 | |
|                             size_bytes += os.path.getsize(file)
 | |
|                             if not self.dry_run:
 | |
|                                 os.remove(file)
 | |
|                         prevfolder = re.search(f".*{os.path.basename(location_path.rstrip(os.sep))}", file).group(0)
 | |
|                     if num_del > 0:
 | |
|                         if not self.dry_run:
 | |
|                             for path in location_path_list:
 | |
|                                 util.remove_empty_directories(path, "**/*")
 | |
|                         body += logger.print_line(
 | |
|                             f"{'Did not delete' if self.dry_run else 'Deleted'} {num_del} files "
 | |
|                             f"({util.human_readable_size(size_bytes)}) from the {location}.",
 | |
|                             self.loglevel,
 | |
|                         )
 | |
|                         attr = {
 | |
|                             "function": function,
 | |
|                             "location": location,
 | |
|                             "title": f"Emptying {location} (Files > {empty_after_x_days} days)",
 | |
|                             "body": "\n".join(body),
 | |
|                             "files": files,
 | |
|                             "empty_after_x_days": empty_after_x_days,
 | |
|                             "size_in_bytes": size_bytes,
 | |
|                         }
 | |
|                         self.send_notifications(attr)
 | |
|                 else:
 | |
|                     logger.debug(f'No files found in "{(",".join(location_path_list))}"')
 | |
|         return num_del
 | |
| 
 | |
|     def send_notifications(self, attr):
 | |
|         try:
 | |
|             function = attr["function"]
 | |
|             config_webhooks = self.webhooks_factory.function_webhooks
 | |
|             config_function = None
 | |
|             for key in config_webhooks:
 | |
|                 if key in function:
 | |
|                     config_function = key
 | |
|                     break
 | |
|             if config_function:
 | |
|                 self.webhooks_factory.function_hooks([config_webhooks[config_function]], attr)
 | |
|         except Failed as e:
 | |
|             logger.stacktrace()
 | |
|             logger.error(f"webhooks_factory Error: {e}")
 | |
| 
 | |
|     def notify(self, text, function=None, critical=True):
 | |
|         for error in util.get_list(text, split=False):
 | |
|             try:
 | |
|                 self.webhooks_factory.error_hooks(error, function_error=function, critical=critical)
 | |
|             except Failed as e:
 | |
|                 logger.stacktrace()
 | |
|                 logger.error(f"webhooks_factory Error: {e}")
 | |
| 
 | |
|     def get_json(self, url, json=None, headers=None, params=None):
 | |
|         return self.get(url, json=json, headers=headers, params=params).json()
 | |
| 
 | |
|     @retry(stop_max_attempt_number=6, wait_fixed=10000)
 | |
|     def get(self, url, json=None, headers=None, params=None):
 | |
|         return self.session.get(url, json=json, headers=headers, params=params)
 | |
| 
 | |
|     def post_json(self, url, data=None, json=None, headers=None):
 | |
|         return self.post(url, data=data, json=json, headers=headers).json()
 | |
| 
 | |
|     @retry(stop_max_attempt_number=6, wait_fixed=10000)
 | |
|     def post(self, url, data=None, json=None, headers=None):
 | |
|         return self.session.post(url, data=data, json=json, headers=headers)
 |