mirror of
https://github.com/StuffAnThings/qbit_manage.git
synced 2025-10-11 14:27:09 +08:00
- Update get_default_config_dir to check for existing YAML files in addition to sample files
1661 lines
63 KiB
Python
Executable file
1661 lines
63 KiB
Python
Executable file
"""Utility functions for qBit Manage."""
|
|
|
|
import glob
|
|
import json
|
|
import logging
|
|
import os
|
|
import platform
|
|
import re
|
|
import shutil
|
|
import signal
|
|
import sys
|
|
import time
|
|
from fnmatch import fnmatch
|
|
from pathlib import Path
|
|
|
|
import requests
|
|
import ruamel.yaml
|
|
from pytimeparse2 import parse
|
|
|
|
|
|
class LoggerProxy:
|
|
"""Proxy that defers attribute access to the active logger instance.
|
|
|
|
This allows modules that import `util.logger` at import time to still
|
|
route all logging calls to the final MyLogger instance once it is
|
|
initialized and set via `set_logger`.
|
|
"""
|
|
|
|
def __init__(self):
|
|
self._logger = None
|
|
|
|
def set_logger(self, logger):
|
|
self._logger = logger
|
|
|
|
def __getattr__(self, name):
|
|
# If MyLogger is set, delegate to it; otherwise, fallback to std logging.
|
|
if self._logger is not None:
|
|
return getattr(self._logger, name)
|
|
fallback = logging.getLogger("qBit Manage")
|
|
return getattr(fallback, name)
|
|
|
|
|
|
logger = LoggerProxy()
|
|
|
|
|
|
def get_list(data, lower=False, split=True, int_list=False, upper=False):
|
|
"""Return a list from a string or list."""
|
|
if data is None:
|
|
return None
|
|
elif isinstance(data, list):
|
|
if lower is True:
|
|
return [d.strip().lower() for d in data]
|
|
if upper is True:
|
|
return [d.strip().upper() for d in data]
|
|
return data
|
|
elif isinstance(data, dict):
|
|
return [data]
|
|
elif split is False:
|
|
return [str(data)]
|
|
elif lower is True:
|
|
return [d.strip().lower() for d in str(data).split(",")]
|
|
elif upper is True:
|
|
return [d.strip().upper() for d in str(data).split(",")]
|
|
elif int_list is True:
|
|
try:
|
|
return [int(d.strip()) for d in str(data).split(",")]
|
|
except ValueError:
|
|
return []
|
|
else:
|
|
return [d.strip() for d in str(data).split(",")]
|
|
|
|
|
|
def is_tag_in_torrent(check_tag, torrent_tags, exact=True):
|
|
"""Check if tag is in torrent_tags"""
|
|
tags = get_list(torrent_tags)
|
|
if isinstance(check_tag, str):
|
|
if exact:
|
|
return check_tag in tags
|
|
else:
|
|
tags_to_remove = []
|
|
for tag in tags:
|
|
if check_tag in tag:
|
|
tags_to_remove.append(tag)
|
|
return tags_to_remove
|
|
elif isinstance(check_tag, list):
|
|
if exact:
|
|
return all(tag in tags for tag in check_tag)
|
|
else:
|
|
tags_to_remove = []
|
|
for tag in tags:
|
|
for ctag in check_tag:
|
|
if ctag in tag:
|
|
tags_to_remove.append(tag)
|
|
return tags_to_remove
|
|
|
|
|
|
def format_stats_summary(stats: dict, config) -> list[str]:
|
|
"""
|
|
Formats the statistics summary into a human-readable list of strings.
|
|
|
|
Args:
|
|
stats (dict): The dictionary containing the statistics.
|
|
config (Config): The Config object to access tracker_error_tag and nohardlinks_tag.
|
|
|
|
Returns:
|
|
list[str]: A list of formatted strings, each representing a statistic.
|
|
"""
|
|
stats_output = []
|
|
for stat_key, stat_value in stats.items():
|
|
if stat_key == "executed_commands":
|
|
if stat_value:
|
|
stats_output.append(f"Executed Commands: {', '.join(stat_value)}")
|
|
elif isinstance(stat_value, (int, float)) and stat_value > 0:
|
|
display_key = stat_key.replace("_", " ").title()
|
|
if stat_key == "tagged_tracker_error" and hasattr(config, "tracker_error_tag"):
|
|
display_key = f"{config.tracker_error_tag} Torrents Tagged"
|
|
elif stat_key == "untagged_tracker_error" and hasattr(config, "tracker_error_tag"):
|
|
display_key = f"{config.tracker_error_tag} Torrents Untagged"
|
|
elif stat_key == "tagged_noHL" and hasattr(config, "nohardlinks_tag"):
|
|
display_key = f"{config.nohardlinks_tag} Torrents Tagged"
|
|
elif stat_key == "untagged_noHL" and hasattr(config, "nohardlinks_tag"):
|
|
display_key = f"{config.nohardlinks_tag} Torrents Untagged"
|
|
elif stat_key == "rem_unreg":
|
|
display_key = "Unregistered Torrents Removed"
|
|
elif stat_key == "deleted_contents":
|
|
display_key = "Torrents + Contents Deleted"
|
|
elif stat_key == "updated_share_limits":
|
|
display_key = "Share Limits Updated"
|
|
elif stat_key == "cleaned_share_limits":
|
|
display_key = "Torrents Removed from Meeting Share Limits"
|
|
elif stat_key == "recycle_emptied":
|
|
display_key = "Files Deleted from Recycle Bin"
|
|
elif stat_key == "orphaned_emptied":
|
|
display_key = "Files Deleted from Orphaned Data"
|
|
elif stat_key == "orphaned":
|
|
display_key = "Orphaned Files"
|
|
elif stat_key == "added":
|
|
display_key = "Torrents Added"
|
|
elif stat_key == "resumed":
|
|
display_key = "Torrents Resumed"
|
|
elif stat_key == "rechecked":
|
|
display_key = "Torrents Rechecked"
|
|
elif stat_key == "deleted":
|
|
display_key = "Torrents Deleted"
|
|
elif stat_key == "categorized":
|
|
display_key = "Torrents Categorized"
|
|
elif stat_key == "tagged":
|
|
display_key = "Torrents Tagged"
|
|
|
|
stats_output.append(f"Total {display_key}: {stat_value}")
|
|
return stats_output
|
|
|
|
|
|
def in_docker():
|
|
# Docker 1.13+ puts this file inside containers
|
|
if os.path.exists("/.dockerenv"):
|
|
return True
|
|
|
|
# Fallback: check cgroup info
|
|
try:
|
|
with open("/proc/1/cgroup") as f:
|
|
return any("docker" in line or "kubepods" in line or "containerd" in line or "lxc" in line for line in f)
|
|
except FileNotFoundError:
|
|
pass
|
|
|
|
return False
|
|
|
|
|
|
# Global variables for get_arg function
|
|
test_value = None
|
|
static_envs = []
|
|
|
|
|
|
def get_arg(env_str, default, arg_bool=False, arg_int=False):
|
|
"""
|
|
Get value from environment variable(s) with type conversion and fallback support.
|
|
|
|
Args:
|
|
env_str (str or list): Environment variable name(s) to check
|
|
default: Default value to return if no environment variable is set
|
|
arg_bool (bool): Convert result to boolean
|
|
arg_int (bool): Convert result to integer
|
|
|
|
Returns:
|
|
Value from environment variable or default, with optional type conversion
|
|
"""
|
|
global test_value
|
|
env_vars = [env_str] if not isinstance(env_str, list) else env_str
|
|
final_value = None
|
|
static_envs.extend(env_vars)
|
|
for env_var in env_vars:
|
|
env_value = os.environ.get(env_var)
|
|
if env_var == "BRANCH_NAME":
|
|
test_value = env_value
|
|
if env_value is not None:
|
|
final_value = env_value
|
|
break
|
|
if final_value or (arg_int and final_value == 0):
|
|
if arg_bool:
|
|
if final_value is True or final_value is False:
|
|
return final_value
|
|
elif final_value.lower() in ["t", "true"]:
|
|
return True
|
|
else:
|
|
return False
|
|
elif arg_int:
|
|
try:
|
|
return int(final_value)
|
|
except ValueError:
|
|
return default
|
|
else:
|
|
return str(final_value)
|
|
else:
|
|
return default
|
|
|
|
|
|
def runtime_path(*parts) -> Path:
|
|
"""
|
|
Resolve a bundled/runtime-safe path for assets.
|
|
- In PyInstaller bundles, files are extracted under sys._MEIPASS.
|
|
- In source runs, resolve relative to the project root.
|
|
"""
|
|
if hasattr(sys, "_MEIPASS"): # type: ignore[attr-defined]
|
|
return Path(getattr(sys, "_MEIPASS")).joinpath(*parts) # type: ignore[arg-type]
|
|
# modules/util.py => project root is parent of modules/
|
|
return Path(__file__).resolve().parent.parent.joinpath(*parts)
|
|
|
|
|
|
def _platform_config_base() -> Path:
|
|
"""Return the platform-specific base directory for app config."""
|
|
system = platform.system()
|
|
home = Path.home()
|
|
|
|
if system == "Windows":
|
|
appdata = os.environ.get("APPDATA")
|
|
base = Path(appdata) if appdata else home / "AppData" / "Roaming"
|
|
return base / "qbit-manage"
|
|
elif system == "Darwin":
|
|
return home / "Library" / "Application Support" / "qbit-manage"
|
|
else:
|
|
xdg = os.environ.get("XDG_CONFIG_HOME")
|
|
base = Path(xdg) if xdg else home / ".config"
|
|
return base / "qbit-manage"
|
|
|
|
|
|
def get_default_config_dir(config_hint: str = None) -> str:
|
|
"""
|
|
Determine the default persistent config directory, leveraging a provided config path/pattern first.
|
|
|
|
Resolution order:
|
|
1) If config_hint is an absolute path or contains a directory component, use its parent directory
|
|
2) Otherwise, if config_hint is a name/pattern (e.g. 'config.yml'), search common bases for:
|
|
- A direct match to that filename/pattern
|
|
- OR a persisted scheduler file 'schedule.yml' (so we don't lose an existing schedule when config.yml is absent)
|
|
Common bases (in order):
|
|
- /config (container volume)
|
|
- repository ./config
|
|
- user OS config directory
|
|
Return the first base containing either.
|
|
3) Fallback to legacy-ish behavior:
|
|
- /config if it contains any *.yml.sample / *.yaml.sample
|
|
- otherwise user OS config directory
|
|
"""
|
|
# 1) If a direct path is provided, prefer its parent directory
|
|
if config_hint:
|
|
primary = str(config_hint).split(",")[0].strip() # take first if comma-separated
|
|
if primary:
|
|
p = Path(primary).expanduser()
|
|
# If absolute or contains a parent component, use that directory
|
|
if p.is_absolute() or (str(p.parent) not in (".", "")):
|
|
base = p if p.is_dir() else p.parent
|
|
return str(base.resolve())
|
|
|
|
# 2) Try to resolve a plain filename/pattern or schedule.yml in common bases
|
|
candidates = []
|
|
if os.path.isdir("/config"):
|
|
candidates.append(Path("/config"))
|
|
repo_config = Path(__file__).resolve().parent.parent / "config"
|
|
candidates.append(repo_config)
|
|
candidates.append(_platform_config_base())
|
|
|
|
for base in candidates:
|
|
try:
|
|
# Match the primary pattern OR detect existing schedule.yml (persistence)
|
|
if list(base.glob(primary)) or (base / "schedule.yml").exists():
|
|
return str(base.resolve())
|
|
except Exception:
|
|
# ignore and continue to next base
|
|
pass
|
|
|
|
# 3) Fallbacks
|
|
has_yaml_sample = glob.glob(os.path.join("/config", "*.yml.sample")) or glob.glob(os.path.join("/config", "*.yaml.sample"))
|
|
has_yaml = glob.glob(os.path.join("/config", "*.yml")) or glob.glob(os.path.join("/config", "*.yaml"))
|
|
if os.path.isdir("/config") and (has_yaml_sample or has_yaml):
|
|
return "/config"
|
|
return str(_platform_config_base())
|
|
|
|
|
|
def ensure_config_dir_initialized(config_dir) -> str:
|
|
"""
|
|
Ensure the config directory exists and is initialized:
|
|
- Creates the config directory
|
|
- Creates logs/ and .backups/ subdirectories
|
|
- Creates an empty config.yml if no *.yml/*.yaml present
|
|
Returns the absolute config directory as a string.
|
|
"""
|
|
p = Path(config_dir).expanduser().resolve()
|
|
p.mkdir(parents=True, exist_ok=True)
|
|
(p / "logs").mkdir(parents=True, exist_ok=True)
|
|
(p / ".backups").mkdir(parents=True, exist_ok=True)
|
|
|
|
has_yaml = any(p.glob("*.yml")) or any(p.glob("*.yaml"))
|
|
if not has_yaml:
|
|
dest = p / "config.yml"
|
|
try:
|
|
dest.touch() # Create empty file
|
|
except Exception:
|
|
# Non-fatal; if creation fails, user can create a config manually
|
|
pass
|
|
|
|
return str(p)
|
|
|
|
|
|
class TorrentMessages:
|
|
"""Contains list of messages to check against a status of a torrent"""
|
|
|
|
UNREGISTERED_MSGS = [
|
|
"UNREGISTERED",
|
|
"TORRENT NOT FOUND",
|
|
"TORRENT IS NOT FOUND",
|
|
"NOT REGISTERED",
|
|
"NOT EXIST",
|
|
"UNKNOWN TORRENT",
|
|
"TRUMP",
|
|
"RETITLED",
|
|
"TRUNCATED",
|
|
"TORRENT IS NOT AUTHORIZED FOR USE ON THIS TRACKER",
|
|
"INFOHASH NOT FOUND.", # blutopia
|
|
"TORRENT HAS BEEN DELETED.", # blutopia
|
|
"TRACKER NICHT REGISTRIERT.",
|
|
"TORRENT EXISTIERT NICHT",
|
|
"TORRENT NICHT GEFUNDEN",
|
|
"TORRENT DELETED", # NexusPHP
|
|
"TORRENT BANNED", # NexusPHP
|
|
]
|
|
|
|
UNREGISTERED_MSGS_BHD = [
|
|
"DEAD",
|
|
"DUPE",
|
|
"COMPLETE SEASON UPLOADED",
|
|
"COMPLETE SEASON UPLOADED:",
|
|
"PROBLEM WITH DESCRIPTION",
|
|
"PROBLEM WITH FILE",
|
|
"PROBLEM WITH PACK",
|
|
"SPECIFICALLY BANNED",
|
|
"TRUMPED",
|
|
"OTHER",
|
|
"TORRENT HAS BEEN DELETED",
|
|
"NUKED",
|
|
"SEASON PACK:",
|
|
"SEASON PACK OUT",
|
|
"SEASON PACK UPLOADED",
|
|
]
|
|
|
|
IGNORE_MSGS = [
|
|
"YOU HAVE REACHED THE CLIENT LIMIT FOR THIS TORRENT",
|
|
"PASSKEY", # Any mention of passkeys should be a clear sign it should NOT be deleted
|
|
"MISSING INFO_HASH",
|
|
"EXPECTED VALUE (LIST, DICT, INT OR STRING) IN BENCODED STRING",
|
|
"COULD NOT PARSE BENCODED DATA",
|
|
"STREAM TRUNCATED",
|
|
"GATEWAY TIMEOUT", # BHD Gateway Timeout
|
|
"ANNOUNCE IS CURRENTLY UNAVAILABLE", # BHD Announce unavailable
|
|
"TORRENT HAS BEEN POSTPONED", # BHD Status
|
|
"520 (UNKNOWN HTTP ERROR)",
|
|
]
|
|
|
|
EXCEPTIONS_MSGS = [
|
|
"DOWN",
|
|
"DOWN.",
|
|
"IT MAY BE DOWN,",
|
|
"UNREACHABLE",
|
|
"(UNREACHABLE)",
|
|
"BAD GATEWAY",
|
|
"TRACKER UNAVAILABLE",
|
|
]
|
|
|
|
|
|
def guess_branch(version, env_version, git_branch):
|
|
if git_branch:
|
|
return git_branch
|
|
elif env_version == "develop":
|
|
return env_version
|
|
elif version[2] > 0:
|
|
dev_version = get_develop()
|
|
if version[1] != dev_version[1] or version[2] <= dev_version[2]:
|
|
return "develop"
|
|
else:
|
|
return "master"
|
|
|
|
|
|
def current_version(version, branch=None):
|
|
if branch == "develop":
|
|
return get_develop()
|
|
elif version[2] > 0:
|
|
new_version = get_develop()
|
|
if version[1] != new_version[1] or new_version[2] >= version[2]:
|
|
return new_version
|
|
else:
|
|
return get_master()
|
|
|
|
|
|
develop_version = None
|
|
develop_version_ts = 0.0
|
|
|
|
|
|
def get_develop():
|
|
"""Return latest develop version using TTL cache."""
|
|
global develop_version, develop_version_ts
|
|
ttl = _get_version_cache_ttl_seconds()
|
|
now = time.time()
|
|
if develop_version is not None and (now - develop_version_ts) < ttl:
|
|
return develop_version
|
|
value = get_version("develop")
|
|
# Only cache successful lookups
|
|
if value and value[0] != "Unknown":
|
|
develop_version = value
|
|
develop_version_ts = now
|
|
return value
|
|
|
|
|
|
master_version = None
|
|
master_version_ts = 0.0
|
|
|
|
|
|
def _get_version_cache_ttl_seconds() -> int:
|
|
"""Resolve TTL for version cache from env QBM_VERSION_CACHE_TTL.
|
|
|
|
Accepts seconds (e.g., "600") or human strings (e.g., "10m", "1h").
|
|
Defaults to 600 seconds (10 minutes) if unset or invalid.
|
|
"""
|
|
raw = os.environ.get("QBM_VERSION_CACHE_TTL", "600")
|
|
secs = None
|
|
try:
|
|
secs = int(raw)
|
|
except Exception:
|
|
try:
|
|
secs = parse(raw) if raw else None
|
|
except Exception:
|
|
secs = None
|
|
if not secs or secs < 1:
|
|
secs = 600
|
|
return int(secs)
|
|
|
|
|
|
def get_master():
|
|
"""Return latest master version using TTL cache."""
|
|
global master_version, master_version_ts
|
|
ttl = _get_version_cache_ttl_seconds()
|
|
now = time.time()
|
|
if master_version is not None and (now - master_version_ts) < ttl:
|
|
return master_version
|
|
value = get_version("master")
|
|
# Only cache successful lookups
|
|
if value and value[0] != "Unknown":
|
|
master_version = value
|
|
master_version_ts = now
|
|
return value
|
|
|
|
|
|
def get_version(level):
|
|
try:
|
|
# Always fetch fresh; bust caches and disable intermediaries
|
|
url = f"https://raw.githubusercontent.com/StuffAnThings/qbit_manage/refs/heads/{level}/VERSION"
|
|
params = {"ts": int(time.time())}
|
|
headers = {
|
|
"Cache-Control": "no-cache",
|
|
"Pragma": "no-cache",
|
|
"Accept": "text/plain",
|
|
"User-Agent": "qbit_manage-version-check",
|
|
}
|
|
resp = requests.get(url, headers=headers, params=params, timeout=5)
|
|
resp.raise_for_status()
|
|
return parse_version(resp.text.strip(), text=level)
|
|
except Exception:
|
|
return "Unknown", "Unknown", 0
|
|
|
|
|
|
def parse_version(version, text="develop"):
|
|
version = version.replace("develop", text)
|
|
split_version = version.split(f"-{text}")
|
|
return version, split_version[0], int(split_version[1]) if len(split_version) > 1 else 0
|
|
|
|
|
|
def get_current_version():
|
|
"""
|
|
Get the current qBit Manage version using the same logic as qbit_manage.py:400-411.
|
|
This function centralizes version parsing logic to avoid duplication.
|
|
|
|
Returns:
|
|
tuple: (version_tuple, branch) where version_tuple is (version_string, base_version, build_number)
|
|
and branch is the detected branch name
|
|
"""
|
|
# Initialize version tuple
|
|
version = ("Unknown", "Unknown", 0)
|
|
|
|
# Read and parse VERSION file with PyInstaller-safe resolution
|
|
try:
|
|
# Prefer bundled path when running as a frozen app
|
|
version_path = None
|
|
try:
|
|
bundled = runtime_path("VERSION")
|
|
if bundled.exists():
|
|
version_path = bundled
|
|
except Exception:
|
|
pass
|
|
|
|
# Fallback to repository structure: modules/../VERSION
|
|
if version_path is None:
|
|
repo_relative = Path(__file__).resolve().parent.parent / "VERSION"
|
|
if repo_relative.exists():
|
|
version_path = repo_relative
|
|
|
|
# If we found a version file, parse it
|
|
if version_path is not None:
|
|
with open(version_path, encoding="utf-8") as handle:
|
|
for line in handle:
|
|
line = line.strip()
|
|
if line:
|
|
version = parse_version(line)
|
|
break
|
|
# If not found, leave version as ("Unknown", "Unknown", 0)
|
|
except Exception as e:
|
|
# Non-fatal in frozen apps; keep noise low if VERSION is missing
|
|
logger.debug(f"VERSION read fallback hit: {e}")
|
|
|
|
# Get environment version (same as qbit_manage.py:282)
|
|
env_version = os.environ.get("BRANCH_NAME", "master")
|
|
|
|
# Get git branch (same logic as qbit_manage.py:275-280)
|
|
git_branch = None
|
|
try:
|
|
from git import InvalidGitRepositoryError
|
|
from git import Repo
|
|
|
|
try:
|
|
git_branch = Repo(path=".").head.ref.name # noqa
|
|
except InvalidGitRepositoryError:
|
|
git_branch = None
|
|
except ImportError:
|
|
git_branch = None
|
|
|
|
# Guess branch and format version (same logic as qbit_manage.py:407-410)
|
|
branch = guess_branch(version, env_version, git_branch)
|
|
if branch is None:
|
|
branch = "Unknown"
|
|
version = (version[0].replace("develop", branch), version[1].replace("develop", branch), version[2])
|
|
|
|
return version, branch
|
|
|
|
|
|
class check:
|
|
"""Check for attributes in config."""
|
|
|
|
def __init__(self, config):
|
|
self.config = config
|
|
|
|
def overwrite_attributes(self, data, attribute, parent=None):
|
|
"""
|
|
Overwrite attributes in config.
|
|
|
|
Args:
|
|
data: The new data to replace the attribute with
|
|
attribute: The attribute name to search for
|
|
parent: Optional parent attribute to restrict the search to
|
|
"""
|
|
if data is None:
|
|
return
|
|
|
|
yaml = YAML(self.config.config_path)
|
|
|
|
# Define the recursive search function once
|
|
def find_and_replace_attribute(dictionary, attr, new_data):
|
|
"""Recursively search for attribute in nested dictionaries and replace it."""
|
|
for key, value in dictionary.items():
|
|
if key == attr:
|
|
dictionary[key] = new_data
|
|
return True
|
|
elif isinstance(value, dict):
|
|
if find_and_replace_attribute(value, attr, new_data):
|
|
return True
|
|
return False
|
|
|
|
# Determine the root dictionary to search in
|
|
if parent is not None:
|
|
# Only search within parent if it exists and is a dictionary
|
|
if parent not in yaml.data or not isinstance(yaml.data[parent], dict):
|
|
return
|
|
|
|
root_dict = yaml.data[parent]
|
|
|
|
# Check if attribute exists directly in parent
|
|
if attribute in root_dict:
|
|
root_dict[attribute] = data
|
|
yaml.save()
|
|
return
|
|
else:
|
|
# Search in the entire yaml.data
|
|
root_dict = yaml.data
|
|
|
|
# Check if attribute exists at top level
|
|
if attribute in root_dict:
|
|
root_dict[attribute] = data
|
|
yaml.save()
|
|
return
|
|
|
|
# If not found directly, search recursively
|
|
if find_and_replace_attribute(root_dict, attribute, data):
|
|
yaml.save()
|
|
|
|
def check_for_attribute(
|
|
self,
|
|
data,
|
|
attribute,
|
|
parent=None,
|
|
subparent=None,
|
|
test_list=None,
|
|
default=None,
|
|
do_print=True,
|
|
default_is_none=False,
|
|
req_default=False,
|
|
var_type="str",
|
|
min_int=0,
|
|
throw=False,
|
|
save=True,
|
|
make_dirs=False,
|
|
):
|
|
"""
|
|
Check for attribute in config.
|
|
|
|
Args:
|
|
data (dict): The configuration data to search.
|
|
attribute (str): The name of the attribute key to search for.
|
|
parent (str, optional): The name of the top level attribute to search under. Defaults to None.
|
|
subparent (str, optional): The name of the second level attribute to search under. Defaults to None.
|
|
test_list (dict, optional): A dictionary of valid values for the attribute. Defaults to None.
|
|
default (any, optional): The default value to use if the attribute is not found. Defaults to None.
|
|
do_print (bool, optional): Whether to print warning messages. Defaults to True.
|
|
default_is_none (bool, optional): Whether to treat a None value as a valid default. Defaults to False.
|
|
req_default (bool, optional): Whether to raise an error if no default value is provided. Defaults to False.
|
|
var_type (str, optional): The expected type of the attribute value. Defaults to "str".
|
|
min_int (int, optional): The minimum value for an integer attribute. Defaults to 0.
|
|
throw (bool, optional): Whether to raise an error if the attribute value is invalid. Defaults to False.
|
|
save (bool, optional): Whether to save the default value to the config if it is used. Defaults to True.
|
|
make_dirs (bool, optional): Whether to create directories for path attributes if they do not exist. Defaults to False.
|
|
|
|
Returns:
|
|
any: The value of the attribute, or the default value if it is not found.
|
|
|
|
Raises:
|
|
Failed: If the attribute value is invalid or a required default value is missing.
|
|
"""
|
|
endline = ""
|
|
if parent is not None:
|
|
if subparent is not None:
|
|
if data and parent in data and subparent in data[parent]:
|
|
data = data[parent][subparent]
|
|
else:
|
|
data = None
|
|
do_print = False
|
|
else:
|
|
if data and parent in data:
|
|
data = data[parent]
|
|
else:
|
|
data = None
|
|
do_print = False
|
|
|
|
if subparent is not None:
|
|
text = f"{parent}->{subparent} sub-attribute {attribute}"
|
|
elif parent is None:
|
|
text = f"{attribute} attribute"
|
|
else:
|
|
text = f"{parent} sub-attribute {attribute}"
|
|
|
|
if data is None or attribute not in data or (attribute in data and data[attribute] is None and not default_is_none):
|
|
message = f"{text} not found"
|
|
if parent and save is True:
|
|
yaml = YAML(self.config.config_path)
|
|
if subparent:
|
|
endline = f"\n{subparent} sub-attribute {attribute} added to config"
|
|
if subparent not in yaml.data[parent] or not yaml.data[parent][subparent]:
|
|
yaml.data[parent][subparent] = {attribute: default}
|
|
elif attribute not in yaml.data[parent]:
|
|
if isinstance(yaml.data[parent][subparent], str):
|
|
yaml.data[parent][subparent] = {attribute: default}
|
|
yaml.data[parent][subparent][attribute] = default
|
|
else:
|
|
endline = ""
|
|
else:
|
|
endline = f"\n{parent} sub-attribute {attribute} added to config"
|
|
if parent not in yaml.data or not yaml.data[parent]:
|
|
yaml.data[parent] = {attribute: default}
|
|
elif attribute not in yaml.data[parent] or (
|
|
attribute in yaml.data[parent] and yaml.data[parent][attribute] is None
|
|
):
|
|
yaml.data[parent][attribute] = default
|
|
else:
|
|
endline = ""
|
|
yaml.save()
|
|
if default_is_none and var_type in ["list", "int_list"]:
|
|
return []
|
|
elif data[attribute] is None:
|
|
if default_is_none and var_type == "list":
|
|
return []
|
|
elif default_is_none:
|
|
return None
|
|
else:
|
|
message = f"{text} is blank"
|
|
elif var_type == "url":
|
|
if data[attribute].endswith(("\\", "/")):
|
|
return data[attribute][:-1]
|
|
else:
|
|
return data[attribute]
|
|
elif var_type == "bool":
|
|
if isinstance(data[attribute], bool):
|
|
return data[attribute]
|
|
else:
|
|
message = f"{text} must be either true or false"
|
|
throw = True
|
|
elif var_type == "int":
|
|
if isinstance(data[attribute], int) and data[attribute] >= min_int:
|
|
return data[attribute]
|
|
else:
|
|
message = f"{text} must an integer >= {min_int}"
|
|
throw = True
|
|
elif var_type == "float":
|
|
try:
|
|
data[attribute] = float(data[attribute])
|
|
except Exception:
|
|
pass
|
|
if isinstance(data[attribute], float) and data[attribute] >= min_int:
|
|
return data[attribute]
|
|
else:
|
|
message = f"{text} must a float >= {float(min_int)}"
|
|
throw = True
|
|
elif var_type == "time_parse":
|
|
if isinstance(data[attribute], int) and data[attribute] >= min_int:
|
|
return data[attribute]
|
|
else:
|
|
try:
|
|
parsed_seconds = parse(data[attribute])
|
|
if parsed_seconds is not None:
|
|
return int(parsed_seconds / 60)
|
|
else:
|
|
message = f"Unable to parse {text}, must be a valid time format."
|
|
throw = True
|
|
except Exception:
|
|
message = f"Unable to parse {text}, must be a valid time format."
|
|
throw = True
|
|
elif var_type == "size_parse":
|
|
# Accepts values like "200MB", "1.5GB", "750MiB", "1024", case-insensitive
|
|
# Returns bytes as an integer
|
|
try:
|
|
# If already an int and valid, treat as bytes
|
|
if isinstance(data[attribute], int) and data[attribute] >= min_int:
|
|
return int(data[attribute])
|
|
# If float-like numeric provided, also treat as bytes
|
|
if isinstance(data[attribute], float) and data[attribute] >= float(min_int):
|
|
return int(data[attribute])
|
|
parsed_bytes = parse_size_to_bytes(str(data[attribute]))
|
|
if parsed_bytes is not None and parsed_bytes >= min_int:
|
|
return int(parsed_bytes)
|
|
else:
|
|
message = f"Unable to parse {text}, must be a valid size format like '500MB', '4GB', or '1024'."
|
|
throw = True
|
|
except Exception:
|
|
message = f"Unable to parse {text}, must be a valid size format like '500MB', '4GB', or '1024'."
|
|
throw = True
|
|
elif var_type == "path":
|
|
if os.path.exists(os.path.abspath(data[attribute])):
|
|
return os.path.join(data[attribute], "")
|
|
else:
|
|
if make_dirs:
|
|
try:
|
|
os.makedirs(data[attribute], exist_ok=True)
|
|
return os.path.join(data[attribute], "")
|
|
except OSError:
|
|
message = f"Path {os.path.abspath(data[attribute])} does not exist and can't be created"
|
|
else:
|
|
message = f"Path {os.path.abspath(data[attribute])} does not exist"
|
|
elif var_type == "list":
|
|
return get_list(data[attribute], split=False)
|
|
elif var_type == "list_path":
|
|
temp_list = [p for p in get_list(data[attribute], split=False) if os.path.exists(os.path.abspath(p))]
|
|
if len(temp_list) > 0:
|
|
return temp_list
|
|
else:
|
|
message = "No Paths exist"
|
|
elif var_type == "lower_list":
|
|
return get_list(data[attribute], lower=True)
|
|
elif var_type == "upper_list":
|
|
return get_list(data[attribute], upper=True)
|
|
elif test_list is None or data[attribute] in test_list:
|
|
return data[attribute]
|
|
else:
|
|
message = f"{text}: {data[attribute]} is an invalid input"
|
|
if var_type == "path" and default:
|
|
default_path = os.path.abspath(default)
|
|
if make_dirs and not os.path.exists(default_path):
|
|
os.makedirs(default, exist_ok=True)
|
|
if os.path.exists(default_path):
|
|
default = os.path.join(default, "")
|
|
message = message + f", using {default} as default"
|
|
elif var_type == "path" and default:
|
|
if data and attribute in data and data[attribute]:
|
|
message = f"neither {data[attribute]} or the default path {default} could be found"
|
|
else:
|
|
message = f"no {text} found and the default path {default} could not be found"
|
|
default = None
|
|
if (default is not None or default_is_none) and not message:
|
|
message = message + f" using {default} as default"
|
|
message = message + endline
|
|
if req_default and default is None:
|
|
raise Failed(f"Config Error: {attribute} attribute must be set under {parent}.")
|
|
options = ""
|
|
if test_list:
|
|
for option, description in test_list.items():
|
|
if len(options) > 0:
|
|
options = f"{options}\n"
|
|
options = f"{options} {option} ({description})"
|
|
if (default is None and not default_is_none) or throw:
|
|
if len(options) > 0:
|
|
message = message + "\n" + options
|
|
raise Failed(f"Config Error: {message}")
|
|
if do_print:
|
|
logger.print_line(f"Config Warning: {message}", "warning")
|
|
if data and attribute in data and data[attribute] and test_list is not None and data[attribute] not in test_list:
|
|
logger.print_line(options)
|
|
return default
|
|
|
|
|
|
class Failed(Exception):
|
|
"""Exception raised for errors in the input."""
|
|
|
|
pass
|
|
|
|
|
|
def list_in_text(text, search_list, match_all=False):
|
|
"""
|
|
Check if elements from a search list are present in a given text.
|
|
|
|
Args:
|
|
text (str): The text to search in.
|
|
search_list (list or set): The list of elements to search for in the text.
|
|
match_all (bool, optional): If True, all elements in the search list must be present in the text.
|
|
If False, at least one element must be present. Defaults to False.
|
|
|
|
Returns:
|
|
bool: True if the search list elements are found in the text, False otherwise.
|
|
"""
|
|
if isinstance(search_list, list):
|
|
search_list = set(search_list)
|
|
contains = {x for x in search_list if " " in x}
|
|
exception = search_list - contains
|
|
if match_all:
|
|
if all(x == m for m in text.split(" ") for x in exception) or all(x in text for x in contains):
|
|
return True
|
|
else:
|
|
if any(x == m for m in text.split(" ") for x in exception) or any(x in text for x in contains):
|
|
return True
|
|
return False
|
|
|
|
|
|
def trunc_val(stg, delm, num=3):
|
|
"""Truncate the value of the torrent url to remove sensitive information"""
|
|
try:
|
|
val = delm.join(stg.split(delm, num)[:num])
|
|
except IndexError:
|
|
val = None
|
|
return val
|
|
|
|
|
|
def move_files(src, dest, mod=False):
|
|
"""Move files from source to destination, mod variable is to change the date modified of the file being moved"""
|
|
dest_path = os.path.dirname(dest)
|
|
to_delete = False
|
|
if os.path.isdir(dest_path) is False:
|
|
os.makedirs(dest_path, exist_ok=True)
|
|
try:
|
|
if mod is True:
|
|
mod_time = time.time()
|
|
os.utime(src, (mod_time, mod_time))
|
|
shutil.move(src, dest)
|
|
except PermissionError as perm:
|
|
logger.warning(f"{perm} : Copying files instead.")
|
|
try:
|
|
shutil.copyfile(src, dest)
|
|
except Exception as ex:
|
|
logger.stacktrace()
|
|
logger.error(ex)
|
|
return to_delete
|
|
if os.path.isfile(src):
|
|
logger.warning(f"Removing original file: {src}")
|
|
try:
|
|
os.remove(src)
|
|
except OSError as e:
|
|
logger.warning(f"Error: {e.filename} - {e.strerror}.")
|
|
to_delete = True
|
|
except FileNotFoundError as file:
|
|
logger.warning(f"{file} : source: {src} -> destination: {dest}")
|
|
except Exception as ex:
|
|
logger.stacktrace()
|
|
logger.error(ex)
|
|
return to_delete
|
|
|
|
|
|
def delete_files(file_path):
|
|
"""Try to delete the file directly."""
|
|
try:
|
|
os.remove(file_path)
|
|
except FileNotFoundError as e:
|
|
logger.warning(f"File not found: {e.filename} - {e.strerror}.")
|
|
except PermissionError as e:
|
|
logger.warning(f"Permission denied: {e.filename} - {e.strerror}.")
|
|
except OSError as e:
|
|
logger.error(f"Error deleting file: {e.filename} - {e.strerror}.")
|
|
|
|
|
|
def copy_files(src, dest):
|
|
"""Copy files from source to destination"""
|
|
dest_path = os.path.dirname(dest)
|
|
if os.path.isdir(dest_path) is False:
|
|
os.makedirs(dest_path)
|
|
try:
|
|
shutil.copyfile(src, dest)
|
|
except Exception as ex:
|
|
logger.stacktrace()
|
|
logger.error(ex)
|
|
|
|
|
|
def remove_empty_directories(pathlib_root_dir, excluded_paths=None, exclude_patterns=[]):
|
|
"""Remove empty directories recursively with optimized performance."""
|
|
pathlib_root_dir = Path(pathlib_root_dir)
|
|
|
|
# Early return for non-existent paths
|
|
if not pathlib_root_dir.exists():
|
|
return
|
|
|
|
# Optimize excluded paths handling
|
|
excluded_paths_set = set()
|
|
if excluded_paths is not None:
|
|
excluded_paths_set = {Path(p).resolve() for p in excluded_paths}
|
|
|
|
# Pre-compile exclude patterns for better performance
|
|
compiled_patterns = []
|
|
for pattern in exclude_patterns:
|
|
# Convert to regex for faster matching
|
|
import re
|
|
|
|
regex_pattern = fnmatch.translate(pattern)
|
|
compiled_patterns.append(re.compile(regex_pattern))
|
|
|
|
# Cache directory checks to avoid redundant operations
|
|
directories_to_check = []
|
|
|
|
# Collect all directories in single pass
|
|
for root, dirs, files in os.walk(pathlib_root_dir, topdown=False):
|
|
root_path = Path(root).resolve()
|
|
|
|
# Skip excluded paths efficiently
|
|
if excluded_paths_set and root_path in excluded_paths_set:
|
|
continue
|
|
|
|
# Check exclude patterns efficiently
|
|
if compiled_patterns:
|
|
root_str = str(root_path) + os.sep
|
|
if any(pattern.match(root_str) for pattern in compiled_patterns):
|
|
continue
|
|
|
|
# Only add directories that might be empty (no files)
|
|
if not files:
|
|
directories_to_check.append(root_path)
|
|
|
|
# Remove empty directories in batch
|
|
removed_dirs = set()
|
|
for dir_path in directories_to_check:
|
|
try:
|
|
os.rmdir(dir_path)
|
|
removed_dirs.add(dir_path)
|
|
except PermissionError as perm:
|
|
logger.warning(f"{perm} : Unable to delete folder {dir_path} as it has permission issues. Skipping...")
|
|
except OSError:
|
|
# Directory not empty - expected
|
|
pass
|
|
|
|
# Attempt root directory removal if it's now empty
|
|
if not excluded_paths_set or pathlib_root_dir.resolve() not in excluded_paths_set:
|
|
try:
|
|
pathlib_root_dir.rmdir()
|
|
except PermissionError as perm:
|
|
logger.warning(f"{perm} : Unable to delete root folder {pathlib_root_dir} as it has permission issues. Skipping...")
|
|
except OSError:
|
|
pass
|
|
|
|
|
|
class CheckHardLinks:
|
|
"""
|
|
Class to check for hardlinks
|
|
"""
|
|
|
|
def __init__(self, config):
|
|
self.root_dir = config.root_dir
|
|
self.remote_dir = config.remote_dir
|
|
self.orphaned_dir = config.orphaned_dir if config.orphaned_dir else ""
|
|
self.recycle_dir = config.recycle_dir if config.recycle_dir else ""
|
|
self.root_files = set(
|
|
get_root_files(self.root_dir, self.remote_dir)
|
|
+ get_root_files(self.orphaned_dir, "")
|
|
+ get_root_files(self.recycle_dir, "")
|
|
)
|
|
self.get_inode_count()
|
|
|
|
def get_inode_count(self):
|
|
self.inode_count = {}
|
|
for file in self.root_files:
|
|
# Only check hardlinks for files that are symlinks
|
|
if os.path.isfile(file) and os.path.islink(file):
|
|
continue
|
|
else:
|
|
try:
|
|
inode_no = os.stat(file.replace(self.root_dir, self.remote_dir)).st_ino
|
|
except PermissionError as perm:
|
|
logger.warning(f"{perm} : file {file} has permission issues. Skipping...")
|
|
continue
|
|
except FileNotFoundError as file_not_found_error:
|
|
logger.warning(f"{file_not_found_error} : File {file} not found. Skipping...")
|
|
continue
|
|
except Exception as ex:
|
|
logger.stacktrace()
|
|
logger.error(ex)
|
|
continue
|
|
if inode_no in self.inode_count:
|
|
self.inode_count[inode_no] += 1
|
|
else:
|
|
self.inode_count[inode_no] = 1
|
|
|
|
def nohardlink(self, file, notify, ignore_root_dir):
|
|
"""
|
|
Check if there are any hard links
|
|
Will check if there are any hard links if it passes a file or folder
|
|
If a folder is passed, it will take the largest file in that folder and only check for hardlinks
|
|
of the remaining files where the file is greater size a percentage of the largest file
|
|
This fixes the bug in #192
|
|
"""
|
|
|
|
def has_hardlinks(self, file, ignore_root_dir):
|
|
"""
|
|
Check if a file has hard links.
|
|
|
|
Args:
|
|
file (str): The path to the file.
|
|
ignore_root_dir (bool): Whether to ignore the root directory.
|
|
|
|
Returns:
|
|
bool: True if the file has hard links, False otherwise.
|
|
"""
|
|
if ignore_root_dir:
|
|
return os.stat(file).st_nlink - self.inode_count.get(os.stat(file).st_ino, 1) > 0
|
|
else:
|
|
return os.stat(file).st_nlink > 1
|
|
|
|
check_for_hl = True
|
|
try:
|
|
if os.path.isfile(file):
|
|
if os.path.islink(file):
|
|
logger.warning(f"Symlink found in {file}, unable to determine hardlinks. Skipping...")
|
|
return False
|
|
logger.trace(f"Checking file: {file}")
|
|
logger.trace(f"Checking file inum: {os.stat(file).st_ino}")
|
|
logger.trace(f"Checking no of hard links: {os.stat(file).st_nlink}")
|
|
logger.trace(f"Checking inode_count dict: {self.inode_count.get(os.stat(file).st_ino)}")
|
|
logger.trace(f"ignore_root_dir: {ignore_root_dir}")
|
|
# https://github.com/StuffAnThings/qbit_manage/issues/291 for more details
|
|
if has_hardlinks(self, file, ignore_root_dir):
|
|
logger.trace(f"Hardlinks found in {file}.")
|
|
check_for_hl = False
|
|
else:
|
|
sorted_files = sorted(Path(file).rglob("*"), key=lambda x: os.stat(x).st_size, reverse=True)
|
|
logger.trace(f"Folder: {file}")
|
|
logger.trace(f"Files Sorted by size: {sorted_files}")
|
|
threshold = 0.1
|
|
if not sorted_files:
|
|
msg = (
|
|
f"Nohardlink Error: Unable to open the folder {file}. "
|
|
"Please make sure folder exists and qbit_manage has access to this directory."
|
|
)
|
|
notify(msg, "nohardlink")
|
|
logger.warning(msg)
|
|
else:
|
|
largest_file_size = os.stat(sorted_files[0]).st_size
|
|
logger.trace(f"Largest file: {sorted_files[0]}")
|
|
logger.trace(f"Largest file size: {largest_file_size}")
|
|
for files in sorted_files:
|
|
if os.path.islink(files):
|
|
logger.warning(f"Symlink found in {files}, unable to determine hardlinks. Skipping...")
|
|
continue
|
|
file_size = os.stat(files).st_size
|
|
file_no_hardlinks = os.stat(files).st_nlink
|
|
logger.trace(f"Checking file: {files}")
|
|
logger.trace(f"Checking file inum: {os.stat(files).st_ino}")
|
|
logger.trace(f"Checking file size: {file_size}")
|
|
logger.trace(f"Checking no of hard links: {file_no_hardlinks}")
|
|
logger.trace(f"Checking inode_count dict: {self.inode_count.get(os.stat(files).st_ino)}")
|
|
logger.trace(f"ignore_root_dir: {ignore_root_dir}")
|
|
if has_hardlinks(self, files, ignore_root_dir) and file_size >= (largest_file_size * threshold):
|
|
logger.trace(f"Hardlinks found in {files}.")
|
|
check_for_hl = False
|
|
except PermissionError as perm:
|
|
logger.warning(f"{perm} : file {file} has permission issues. Skipping...")
|
|
return False
|
|
except FileNotFoundError as file_not_found_error:
|
|
logger.warning(f"{file_not_found_error} : File {file} not found. Skipping...")
|
|
return False
|
|
except Exception as ex:
|
|
logger.stacktrace()
|
|
logger.error(ex)
|
|
return False
|
|
return check_for_hl
|
|
|
|
|
|
def get_root_files(root_dir, remote_dir, exclude_dir=None):
|
|
"""
|
|
Get all files in root directory with optimized path handling and filtering.
|
|
|
|
Windows/UNC-safe:
|
|
- If remote_dir is empty or effectively the same as root_dir, walk root_dir directly.
|
|
- Otherwise, walk remote_dir (the accessible path) and map paths back to the root_dir representation.
|
|
"""
|
|
if not root_dir:
|
|
return []
|
|
|
|
# Normalize for robust equality checks across platforms (handles UNC vs local, slashes, case on Windows)
|
|
try:
|
|
rd_norm = os.path.normcase(os.path.normpath(root_dir)) if root_dir else ""
|
|
rem_norm = os.path.normcase(os.path.normpath(remote_dir)) if remote_dir else ""
|
|
except Exception:
|
|
rd_norm = root_dir or ""
|
|
rem_norm = remote_dir or ""
|
|
|
|
# Treat missing/empty remote_dir as "same path" (walk root_dir directly)
|
|
is_same_path = (not remote_dir) or (rem_norm == rd_norm)
|
|
|
|
# Determine which base directory to walk and validate it exists
|
|
base_to_walk = root_dir if is_same_path else remote_dir
|
|
if not base_to_walk or not os.path.isdir(base_to_walk):
|
|
return []
|
|
|
|
# Build an exclude path in the correct namespace
|
|
local_exclude_dir = None
|
|
if exclude_dir:
|
|
if is_same_path:
|
|
local_exclude_dir = exclude_dir
|
|
else:
|
|
# Convert an exclude in remote namespace to root namespace for comparison after replacement
|
|
try:
|
|
local_exclude_dir = exclude_dir.replace(remote_dir, root_dir, 1)
|
|
except Exception:
|
|
local_exclude_dir = None
|
|
|
|
root_files = []
|
|
|
|
if is_same_path:
|
|
# Fast path when paths are the same or remote_dir not provided
|
|
for path, subdirs, files in os.walk(base_to_walk):
|
|
if local_exclude_dir and os.path.normcase(local_exclude_dir) in os.path.normcase(path):
|
|
continue
|
|
for name in files:
|
|
root_files.append(os.path.join(path, name))
|
|
else:
|
|
# Walk the accessible remote_dir and convert to root_dir representation once per directory
|
|
for path, subdirs, files in os.walk(base_to_walk):
|
|
replaced_path = path.replace(remote_dir, root_dir, 1)
|
|
if local_exclude_dir and os.path.normcase(local_exclude_dir) in os.path.normcase(replaced_path):
|
|
continue
|
|
for name in files:
|
|
root_files.append(os.path.join(replaced_path, name))
|
|
|
|
return root_files
|
|
|
|
|
|
def load_json(file):
|
|
"""Load json file if exists"""
|
|
if os.path.isfile(truncate_filename(file)):
|
|
file = open(file)
|
|
data = json.load(file)
|
|
file.close()
|
|
else:
|
|
data = {}
|
|
return data
|
|
|
|
|
|
def truncate_filename(filename, max_length=255, offset=0):
|
|
"""
|
|
Truncate filename if necessary.
|
|
|
|
Args:
|
|
filename (str): The original filename.
|
|
max_length (int, optional): The maximum length of the truncated filename. Defaults to 255.
|
|
offset (int, optional): The number of characters to keep from the end of the base name. Defaults to 0.
|
|
|
|
Returns:
|
|
str: The truncated filename.
|
|
|
|
"""
|
|
base, ext = os.path.splitext(filename)
|
|
if len(filename) > max_length:
|
|
max_base_length = max_length - len(ext) - offset
|
|
truncated_base = base[:max_base_length]
|
|
truncated_base_offset = base[-offset:] if offset > 0 else ""
|
|
truncated_filename = f"{truncated_base}{truncated_base_offset}{ext}"
|
|
else:
|
|
truncated_filename = filename
|
|
return truncated_filename
|
|
|
|
|
|
def save_json(torrent_json, dest):
|
|
"""Save json file to destination, truncating filename if necessary."""
|
|
max_filename_length = 255 # Typical maximum filename length on many filesystems
|
|
directory, filename = os.path.split(dest)
|
|
filename, ext = os.path.splitext(filename)
|
|
|
|
if len(filename) > (max_filename_length - len(ext)):
|
|
truncated_filename = truncate_filename(filename, max_filename_length)
|
|
dest = os.path.join(directory, truncated_filename)
|
|
logger.warning(f"Filename too long, truncated to: {dest}")
|
|
|
|
try:
|
|
with open(dest, "w", encoding="utf-8") as file:
|
|
json.dump(torrent_json, file, ensure_ascii=False, indent=4)
|
|
except FileNotFoundError as e:
|
|
logger.error(f"Failed to save JSON file: {e.filename} - {e.strerror}.")
|
|
except OSError as e:
|
|
logger.error(f"OS error occurred: {e.filename} - {e.strerror}.")
|
|
|
|
|
|
class GracefulKiller:
|
|
"""
|
|
Class to catch SIGTERM and SIGINT signals.
|
|
Gracefully kill script when docker stops.
|
|
"""
|
|
|
|
kill_now = False
|
|
|
|
def __init__(self):
|
|
# signal.signal(signal.SIGINT, self.exit_gracefully)
|
|
signal.signal(signal.SIGTERM, self.exit_gracefully)
|
|
|
|
def exit_gracefully(self, *args):
|
|
"""Set kill_now to True to exit gracefully."""
|
|
self.kill_now = True
|
|
|
|
|
|
def human_readable_size(size, decimal_places=3):
|
|
"""Convert bytes to human readable size"""
|
|
for unit in ["B", "KiB", "MiB", "GiB", "TiB"]:
|
|
if size < 1024.0:
|
|
break
|
|
size /= 1024.0
|
|
return f"{size:.{decimal_places}f}{unit}"
|
|
|
|
|
|
def parse_size_to_bytes(value):
|
|
"""
|
|
Parse a human-readable size string into bytes.
|
|
Accepts units: B, KB, MB, GB, TB, PB and binary variants KiB, MiB, GiB, TiB, PiB (case-insensitive).
|
|
Examples: "200MB", "1.5GB", "750MiB", "1024", 2048
|
|
Returns:
|
|
int: number of bytes, or None if parsing fails.
|
|
"""
|
|
if value is None:
|
|
return None
|
|
if isinstance(value, (int, float)):
|
|
try:
|
|
return int(value)
|
|
except Exception:
|
|
return None
|
|
s = str(value).strip()
|
|
if s == "":
|
|
return None
|
|
# Match number and optional unit
|
|
m = re.match(r"^\s*(\d+(?:\.\d+)?)\s*([kKmMgGtTpP]i?[bB])?\s*$", s)
|
|
if not m:
|
|
# If pure integer without unit
|
|
try:
|
|
return int(float(s))
|
|
except Exception:
|
|
return None
|
|
num = float(m.group(1))
|
|
unit = m.group(2).lower() if m.group(2) else "b"
|
|
|
|
# Normalize common forms to binary multiples (base 1024) to match qBittorrent bytes
|
|
# Treat KB/MB/GB as KiB/MiB/GiB equivalents
|
|
multipliers = {
|
|
"b": 1,
|
|
"kb": 1024,
|
|
"kib": 1024,
|
|
"mb": 1024**2,
|
|
"mib": 1024**2,
|
|
"gb": 1024**3,
|
|
"gib": 1024**3,
|
|
"tb": 1024**4,
|
|
"tib": 1024**4,
|
|
"pb": 1024**5,
|
|
"pib": 1024**5,
|
|
}
|
|
mul = multipliers.get(unit, None)
|
|
if mul is None:
|
|
return None
|
|
return int(num * mul)
|
|
|
|
|
|
class YAML:
|
|
"""Class to load and save yaml files with !ENV tag preservation and environment variable resolution"""
|
|
|
|
def __init__(self, path=None, input_data=None, check_empty=False, create=False):
|
|
self.path = path
|
|
self.input_data = input_data
|
|
self.yaml = ruamel.yaml.YAML()
|
|
self.yaml.indent(mapping=2, sequence=2)
|
|
|
|
# Add constructor for !ENV tag
|
|
self.yaml.Constructor.add_constructor("!ENV", self._env_constructor)
|
|
# Add representer for !ENV tag
|
|
self.yaml.Representer.add_representer(EnvStr, self._env_representer)
|
|
|
|
try:
|
|
if input_data is not None:
|
|
if input_data == "":
|
|
# Empty string means initialize with empty data for writing
|
|
self.data = {}
|
|
else:
|
|
self.data = self.yaml.load(input_data)
|
|
else:
|
|
if create and not os.path.exists(self.path):
|
|
with open(self.path, "w"):
|
|
pass
|
|
self.data = {}
|
|
else:
|
|
with open(self.path, encoding="utf-8") as filepath:
|
|
self.data = self.yaml.load(filepath)
|
|
except ruamel.yaml.error.YAMLError as yerr:
|
|
err = str(yerr).replace("\n", "\n ")
|
|
raise Failed(f"YAML Error: {err}") from yerr
|
|
except Exception as yerr:
|
|
raise Failed(f"YAML Error: {yerr}") from yerr
|
|
if not self.data or not isinstance(self.data, dict):
|
|
if check_empty:
|
|
raise Failed("YAML Error: File is empty")
|
|
self.data = {}
|
|
|
|
def _env_constructor(self, loader, node):
|
|
"""Constructor for !ENV tag"""
|
|
value = loader.construct_scalar(node)
|
|
# Resolve the environment variable at runtime
|
|
env_value = os.getenv(value)
|
|
# If environment variable is not found, use an empty string as default for schema generation
|
|
if env_value is None:
|
|
logger.warning(f"Environment variable '{value}' not found. Using empty string for schema generation.")
|
|
env_value = ""
|
|
# Return a custom string subclass that preserves the !ENV tag
|
|
return EnvStr(value, env_value)
|
|
|
|
def _env_representer(self, dumper, data):
|
|
"""Representer for EnvStr class"""
|
|
return dumper.represent_scalar("!ENV", data.env_var)
|
|
|
|
def save(self):
|
|
"""Save yaml file with !ENV tags preserved"""
|
|
if self.path:
|
|
with open(self.path, "w", encoding="utf-8") as filepath:
|
|
self.yaml.dump(self.data, filepath)
|
|
else:
|
|
raise ValueError("YAML path is None or empty")
|
|
|
|
def save_preserving_format(self, new_data):
|
|
"""Save yaml file while preserving original formatting, comments, and structure"""
|
|
if not self.path:
|
|
raise ValueError("YAML path is None or empty")
|
|
|
|
# Load the original file to preserve formatting
|
|
original_yaml = ruamel.yaml.YAML()
|
|
original_yaml.preserve_quotes = True
|
|
original_yaml.map_indent = 2
|
|
original_yaml.sequence_indent = 2
|
|
original_yaml.sequence_dash_offset = 0
|
|
|
|
# Add constructor and representer for !ENV tag
|
|
original_yaml.Constructor.add_constructor("!ENV", self._env_constructor)
|
|
original_yaml.Representer.add_representer(EnvStr, self._env_representer)
|
|
|
|
try:
|
|
# Load the original file with formatting preserved
|
|
with open(self.path, encoding="utf-8") as filepath:
|
|
original_data = original_yaml.load(filepath)
|
|
|
|
# If original file is empty or None, use new data directly
|
|
if not original_data:
|
|
original_data = original_yaml.load("{}")
|
|
|
|
# Recursively update the original data with new values while preserving structure
|
|
self._deep_update_preserving_format(original_data, new_data)
|
|
|
|
# Save with preserved formatting
|
|
with open(self.path, "w", encoding="utf-8") as filepath:
|
|
original_yaml.dump(original_data, filepath)
|
|
|
|
except FileNotFoundError:
|
|
# If file doesn't exist, create it with new data
|
|
with open(self.path, "w", encoding="utf-8") as filepath:
|
|
original_yaml.dump(new_data, filepath)
|
|
except Exception as e:
|
|
logger.error(f"Error preserving YAML format: {e}")
|
|
# Fallback to regular save
|
|
self.data = new_data
|
|
self.save()
|
|
|
|
def _deep_update_preserving_format(self, original, new_data):
|
|
"""Recursively update original data with new data while preserving formatting"""
|
|
if not isinstance(new_data, dict):
|
|
return new_data
|
|
|
|
if not isinstance(original, dict):
|
|
return new_data
|
|
|
|
for key, value in new_data.items():
|
|
if key in original:
|
|
if isinstance(value, dict) and isinstance(original[key], dict):
|
|
# Recursively update nested dictionaries
|
|
self._deep_update_preserving_format(original[key], value)
|
|
else:
|
|
# Update the value while preserving any YAML formatting
|
|
original[key] = value
|
|
else:
|
|
# Add new key-value pairs
|
|
original[key] = value
|
|
|
|
# Remove keys that exist in original but not in new_data
|
|
keys_to_remove = []
|
|
for key in original:
|
|
if key not in new_data:
|
|
keys_to_remove.append(key)
|
|
|
|
for key in keys_to_remove:
|
|
del original[key]
|
|
|
|
return original
|
|
|
|
|
|
class EnvStr(str):
|
|
"""Custom string subclass to preserve !ENV tags"""
|
|
|
|
def __new__(cls, env_var, resolved_value):
|
|
# Create a new string instance with the resolved value
|
|
instance = super().__new__(cls, resolved_value)
|
|
instance.env_var = env_var # Store the environment variable name
|
|
return instance
|
|
|
|
def __repr__(self):
|
|
"""Return the resolved value as a string"""
|
|
return super().__repr__()
|
|
|
|
|
|
def get_matching_config_files(config_pattern: str, default_dir: str) -> list:
|
|
"""Get list of config files matching a pattern.
|
|
|
|
Args:
|
|
config_pattern (str): Config file pattern (e.g. "config.yml" or "config*.yml")
|
|
default_dir (str): Default directory to look for configs
|
|
|
|
Returns:
|
|
list: List of matching config file names
|
|
|
|
Raises:
|
|
Failed: If no matching config files found
|
|
"""
|
|
# Check docker config first
|
|
if os.path.isdir("/config") and glob.glob(os.path.join("/config", config_pattern)):
|
|
search_dir = "/config"
|
|
else:
|
|
search_dir = default_dir
|
|
|
|
# Handle single file vs pattern
|
|
if "*" not in config_pattern:
|
|
return [config_pattern]
|
|
else:
|
|
glob_configs = glob.glob(os.path.join(search_dir, config_pattern))
|
|
if glob_configs:
|
|
# Return just the filenames without paths
|
|
return [os.path.split(x)[-1] for x in glob_configs]
|
|
else:
|
|
raise Failed(f"Config Error: Unable to find any config files in the pattern '{config_pattern}'")
|
|
|
|
|
|
def execute_qbit_commands(qbit_manager, commands, stats, hashes=None):
|
|
"""Execute qBittorrent management commands and update stats.
|
|
|
|
Args:
|
|
qbit_manager: The qBittorrent manager instance
|
|
commands: Dictionary of command flags (e.g., {"cat_update": True, "tag_update": False})
|
|
stats: Dictionary to update with execution statistics
|
|
hashes: Optional list of torrent hashes to process (for web API)
|
|
|
|
Returns:
|
|
None (modifies stats dictionary in place)
|
|
"""
|
|
# Import here to avoid circular imports
|
|
from modules.core.category import Category
|
|
from modules.core.recheck import ReCheck
|
|
from modules.core.remove_orphaned import RemoveOrphaned
|
|
from modules.core.remove_unregistered import RemoveUnregistered
|
|
from modules.core.share_limits import ShareLimits
|
|
from modules.core.tag_nohardlinks import TagNoHardLinks
|
|
from modules.core.tags import Tags
|
|
from modules.qbit_error_handler import safe_execute_with_qbit_error_handling
|
|
|
|
# Initialize executed_commands list
|
|
if "executed_commands" not in stats:
|
|
stats["executed_commands"] = []
|
|
|
|
# Set Category
|
|
if commands.get("cat_update"):
|
|
if hashes is not None:
|
|
result = safe_execute_with_qbit_error_handling(
|
|
lambda: Category(qbit_manager, hashes).stats, "Category Update (with hashes)"
|
|
)
|
|
else:
|
|
result = safe_execute_with_qbit_error_handling(lambda: Category(qbit_manager).stats, "Category Update")
|
|
|
|
if result is not None:
|
|
if "categorized" not in stats:
|
|
stats["categorized"] = 0
|
|
stats["categorized"] += result
|
|
stats["executed_commands"].append("cat_update")
|
|
else:
|
|
logger.warning("Category Update operation skipped due to API errors")
|
|
|
|
# Set Tags
|
|
if commands.get("tag_update"):
|
|
if hashes is not None:
|
|
result = safe_execute_with_qbit_error_handling(lambda: Tags(qbit_manager, hashes).stats, "Tags Update (with hashes)")
|
|
else:
|
|
result = safe_execute_with_qbit_error_handling(lambda: Tags(qbit_manager).stats, "Tags Update")
|
|
|
|
if result is not None:
|
|
stats["tagged"] += result
|
|
stats["executed_commands"].append("tag_update")
|
|
else:
|
|
logger.warning("Tags Update operation skipped due to API errors")
|
|
|
|
# Remove Unregistered Torrents and tag errors
|
|
if commands.get("rem_unregistered") or commands.get("tag_tracker_error"):
|
|
if hashes is not None:
|
|
rem_unreg = safe_execute_with_qbit_error_handling(
|
|
lambda: RemoveUnregistered(qbit_manager, hashes), "Remove Unregistered Torrents (with hashes)"
|
|
)
|
|
else:
|
|
rem_unreg = safe_execute_with_qbit_error_handling(
|
|
lambda: RemoveUnregistered(qbit_manager), "Remove Unregistered Torrents"
|
|
)
|
|
|
|
if rem_unreg is not None:
|
|
# Initialize stats if they don't exist
|
|
for key in ["rem_unreg", "deleted", "deleted_contents", "tagged_tracker_error", "untagged_tracker_error"]:
|
|
if key not in stats:
|
|
stats[key] = 0
|
|
|
|
stats["rem_unreg"] += rem_unreg.stats_deleted + rem_unreg.stats_deleted_contents
|
|
stats["deleted"] += rem_unreg.stats_deleted
|
|
stats["deleted_contents"] += rem_unreg.stats_deleted_contents
|
|
stats["tagged_tracker_error"] += rem_unreg.stats_tagged
|
|
stats["untagged_tracker_error"] += rem_unreg.stats_untagged
|
|
stats["tagged"] += rem_unreg.stats_tagged
|
|
stats["executed_commands"].extend([cmd for cmd in ["rem_unregistered", "tag_tracker_error"] if commands.get(cmd)])
|
|
else:
|
|
logger.warning("Remove Unregistered Torrents operation skipped due to API errors")
|
|
|
|
# Recheck Torrents
|
|
if commands.get("recheck"):
|
|
if hashes is not None:
|
|
recheck = safe_execute_with_qbit_error_handling(
|
|
lambda: ReCheck(qbit_manager, hashes), "Recheck Torrents (with hashes)"
|
|
)
|
|
else:
|
|
recheck = safe_execute_with_qbit_error_handling(lambda: ReCheck(qbit_manager), "Recheck Torrents")
|
|
|
|
if recheck is not None:
|
|
if "rechecked" not in stats:
|
|
stats["rechecked"] = 0
|
|
if "resumed" not in stats:
|
|
stats["resumed"] = 0
|
|
stats["rechecked"] += recheck.stats_rechecked
|
|
stats["resumed"] += recheck.stats_resumed
|
|
stats["executed_commands"].append("recheck")
|
|
else:
|
|
logger.warning("Recheck Torrents operation skipped due to API errors")
|
|
|
|
# Remove Orphaned Files
|
|
if commands.get("rem_orphaned"):
|
|
result = safe_execute_with_qbit_error_handling(lambda: RemoveOrphaned(qbit_manager).stats, "Remove Orphaned Files")
|
|
|
|
if result is not None:
|
|
if "orphaned" not in stats:
|
|
stats["orphaned"] = 0
|
|
stats["orphaned"] += result
|
|
stats["executed_commands"].append("rem_orphaned")
|
|
else:
|
|
logger.warning("Remove Orphaned Files operation skipped due to API errors")
|
|
|
|
# Tag NoHardLinks
|
|
if commands.get("tag_nohardlinks"):
|
|
if hashes is not None:
|
|
no_hardlinks = safe_execute_with_qbit_error_handling(
|
|
lambda: TagNoHardLinks(qbit_manager, hashes), "Tag NoHardLinks (with hashes)"
|
|
)
|
|
else:
|
|
no_hardlinks = safe_execute_with_qbit_error_handling(lambda: TagNoHardLinks(qbit_manager), "Tag NoHardLinks")
|
|
|
|
if no_hardlinks is not None:
|
|
if "tagged_noHL" not in stats:
|
|
stats["tagged_noHL"] = 0
|
|
if "untagged_noHL" not in stats:
|
|
stats["untagged_noHL"] = 0
|
|
stats["tagged"] += no_hardlinks.stats_tagged
|
|
stats["tagged_noHL"] += no_hardlinks.stats_tagged
|
|
stats["untagged_noHL"] += no_hardlinks.stats_untagged
|
|
stats["executed_commands"].append("tag_nohardlinks")
|
|
else:
|
|
logger.warning("Tag NoHardLinks operation skipped due to API errors")
|
|
|
|
# Set Share Limits
|
|
if commands.get("share_limits"):
|
|
if hashes is not None:
|
|
share_limits = safe_execute_with_qbit_error_handling(
|
|
lambda: ShareLimits(qbit_manager, hashes), "Share Limits (with hashes)"
|
|
)
|
|
else:
|
|
share_limits = safe_execute_with_qbit_error_handling(lambda: ShareLimits(qbit_manager), "Share Limits")
|
|
|
|
if share_limits is not None:
|
|
if "updated_share_limits" not in stats:
|
|
stats["updated_share_limits"] = 0
|
|
if "cleaned_share_limits" not in stats:
|
|
stats["cleaned_share_limits"] = 0
|
|
stats["tagged"] += share_limits.stats_tagged
|
|
stats["updated_share_limits"] += share_limits.stats_tagged
|
|
stats["deleted"] += share_limits.stats_deleted
|
|
stats["deleted_contents"] += share_limits.stats_deleted_contents
|
|
stats["cleaned_share_limits"] += share_limits.stats_deleted + share_limits.stats_deleted_contents
|
|
stats["executed_commands"].append("share_limits")
|
|
else:
|
|
logger.warning("Share Limits operation skipped due to API errors")
|