mirror of
https://github.com/StuffAnThings/qbit_manage.git
synced 2025-11-11 08:50:59 +08:00
refactor(path): replace string replace with cross-platform path utility
Replace all instances of string `.replace()` for path manipulation with a new `util.path_replace()` function that handles cross-platform path separators safely, improving compatibility between Windows and Unix-like systems. This includes updates in config, remove_orphaned, share_limits, tag_nohardlinks, qbittorrent modules, and the addition of the utility function itself.
This commit is contained in:
parent
ac14d4da7a
commit
e0905b214f
7 changed files with 65 additions and 31 deletions
2
VERSION
2
VERSION
|
|
@ -1 +1 @@
|
|||
4.6.1-develop1
|
||||
4.6.1-develop2
|
||||
|
|
|
|||
|
|
@ -1040,7 +1040,8 @@ class Config:
|
|||
save_path = list(self.data["cat"].values())
|
||||
cleaned_save_path = [
|
||||
os.path.join(
|
||||
s.replace(self.root_dir, self.remote_dir), os.path.basename(location_path.rstrip(os.sep))
|
||||
util.path_replace(s, self.root_dir, self.remote_dir),
|
||||
os.path.basename(location_path.rstrip(os.sep)),
|
||||
)
|
||||
for s in save_path
|
||||
]
|
||||
|
|
|
|||
|
|
@ -55,7 +55,7 @@ class RemoveOrphaned:
|
|||
if self.config.orphaned["exclude_patterns"]:
|
||||
logger.print_line("Processing orphan exclude patterns")
|
||||
exclude_patterns = [
|
||||
exclude_pattern.replace(self.remote_dir, self.root_dir)
|
||||
util.path_replace(exclude_pattern, self.remote_dir, self.root_dir)
|
||||
for exclude_pattern in self.config.orphaned["exclude_patterns"]
|
||||
]
|
||||
|
||||
|
|
@ -130,7 +130,7 @@ class RemoveOrphaned:
|
|||
else:
|
||||
body += logger.print_line(
|
||||
f"{'Not moving' if self.config.dry_run else 'Moving'} {num_orphaned} Orphaned files "
|
||||
f"to {self.orphaned_dir.replace(self.remote_dir, self.root_dir)}",
|
||||
f"to {util.path_replace(self.orphaned_dir, self.remote_dir, self.root_dir)}",
|
||||
self.config.loglevel,
|
||||
)
|
||||
|
||||
|
|
@ -139,7 +139,7 @@ class RemoveOrphaned:
|
|||
"title": f"Removing {num_orphaned} Orphaned Files",
|
||||
"body": "\n".join(body),
|
||||
"orphaned_files": list(orphaned_files),
|
||||
"orphaned_directory": self.orphaned_dir.replace(self.remote_dir, self.root_dir),
|
||||
"orphaned_directory": util.path_replace(self.orphaned_dir, self.remote_dir, self.root_dir),
|
||||
"total_orphaned_files": num_orphaned,
|
||||
}
|
||||
self.config.send_notifications(attr)
|
||||
|
|
@ -159,7 +159,7 @@ class RemoveOrphaned:
|
|||
if orphaned_parent_paths:
|
||||
logger.print_line("Removing newly empty directories", self.config.loglevel)
|
||||
exclude_patterns = [
|
||||
exclude_pattern.replace(self.remote_dir, self.root_dir)
|
||||
util.path_replace(exclude_pattern, self.remote_dir, self.root_dir)
|
||||
for exclude_pattern in self.config.orphaned.get("exclude_patterns", [])
|
||||
]
|
||||
|
||||
|
|
@ -177,9 +177,9 @@ class RemoveOrphaned:
|
|||
|
||||
def handle_orphaned_files(self, file):
|
||||
"""Handle orphaned file with improved error handling and batching"""
|
||||
src = file.replace(self.root_dir, self.remote_dir)
|
||||
dest = os.path.join(self.orphaned_dir, file.replace(self.root_dir, ""))
|
||||
orphaned_parent_path = os.path.dirname(file).replace(self.root_dir, self.remote_dir)
|
||||
src = util.path_replace(file, self.root_dir, self.remote_dir)
|
||||
dest = os.path.join(self.orphaned_dir, util.path_replace(file, self.root_dir, ""))
|
||||
orphaned_parent_path = util.path_replace(os.path.dirname(file), self.root_dir, self.remote_dir)
|
||||
|
||||
try:
|
||||
if self.config.orphaned["empty_after_x_days"] == 0:
|
||||
|
|
@ -198,12 +198,7 @@ class RemoveOrphaned:
|
|||
"""Get full paths for torrent files with improved path handling"""
|
||||
save_path = torrent.save_path
|
||||
|
||||
# Use list comprehension for better performance
|
||||
fullpath_torrent_files = [
|
||||
os.path.join(save_path, file.name).replace(r"/", "\\")
|
||||
if ":\\" in os.path.join(save_path, file.name)
|
||||
else os.path.join(save_path, file.name)
|
||||
for file in torrent.files
|
||||
]
|
||||
# Use list comprehension for better performance with cross-platform path normalization
|
||||
fullpath_torrent_files = [os.path.normpath(os.path.join(save_path, file.name)) for file in torrent.files]
|
||||
|
||||
return fullpath_torrent_files
|
||||
|
|
|
|||
|
|
@ -110,7 +110,7 @@ class ShareLimits:
|
|||
t_msg = self.qbt.torrentinfo[t_name]["msg"]
|
||||
t_status = self.qbt.torrentinfo[t_name]["status"]
|
||||
# Double check that the content path is the same before we delete anything
|
||||
if torrent["content_path"].replace(self.root_dir, self.remote_dir) == torrent_dict["content_path"]:
|
||||
if util.path_replace(torrent["content_path"], self.root_dir, self.remote_dir) == torrent_dict["content_path"]:
|
||||
tracker = self.qbt.get_tags(self.qbt.get_tracker_urls(torrent.trackers))
|
||||
body = []
|
||||
body += logger.print_line(logger.insert_space(f"Torrent Name: {t_name}", 3), self.config.loglevel)
|
||||
|
|
@ -130,7 +130,7 @@ class ShareLimits:
|
|||
"torrent_tracker": tracker["url"],
|
||||
"notifiarr_indexer": tracker["notifiarr"],
|
||||
}
|
||||
if os.path.exists(torrent["content_path"].replace(self.root_dir, self.remote_dir)):
|
||||
if os.path.exists(util.path_replace(torrent["content_path"], self.root_dir, self.remote_dir)):
|
||||
# Checks if any of the original torrents are working
|
||||
if self.qbt.has_cross_seed(torrent) and ("" in t_msg or 2 in t_status):
|
||||
self.stats_deleted += 1
|
||||
|
|
@ -160,7 +160,7 @@ class ShareLimits:
|
|||
body += logger.print_line(
|
||||
logger.insert_space(
|
||||
"Deleted .torrent but NOT content files. Reason: path does not exist [path="
|
||||
+ torrent["content_path"].replace(self.root_dir, self.remote_dir)
|
||||
+ util.path_replace(torrent["content_path"], self.root_dir, self.remote_dir)
|
||||
+ "].",
|
||||
8,
|
||||
),
|
||||
|
|
@ -338,7 +338,9 @@ class ShareLimits:
|
|||
if t_hash not in self.tdel_dict:
|
||||
self.tdel_dict[t_hash] = {}
|
||||
self.tdel_dict[t_hash]["torrent"] = torrent
|
||||
self.tdel_dict[t_hash]["content_path"] = torrent["content_path"].replace(self.root_dir, self.remote_dir)
|
||||
self.tdel_dict[t_hash]["content_path"] = util.path_replace(
|
||||
torrent["content_path"], self.root_dir, self.remote_dir
|
||||
)
|
||||
self.tdel_dict[t_hash]["body"] = tor_reached_seed_limit
|
||||
else:
|
||||
# New behavior: throttle upload speed instead of pausing/removing
|
||||
|
|
|
|||
|
|
@ -92,7 +92,7 @@ class TagNoHardLinks:
|
|||
"""Helper method to process a single torrent for nohardlinks tagging."""
|
||||
tracker = self.qbt.get_tags(self.qbt.get_tracker_urls(torrent.trackers))
|
||||
has_nohardlinks = check_hardlinks.nohardlink(
|
||||
torrent["content_path"].replace(self.root_dir, self.remote_dir),
|
||||
util.path_replace(torrent["content_path"], self.root_dir, self.remote_dir),
|
||||
self.config.notify,
|
||||
ignore_root_dir,
|
||||
)
|
||||
|
|
|
|||
|
|
@ -429,7 +429,7 @@ class Qbt:
|
|||
save_paths = set()
|
||||
categories = self.client.torrent_categories.categories
|
||||
for cat in categories:
|
||||
save_path = categories[cat].savePath.replace(self.config.root_dir, self.config.remote_dir)
|
||||
save_path = util.path_replace(categories[cat].savePath, self.config.root_dir, self.config.remote_dir)
|
||||
if save_path:
|
||||
save_paths.add(save_path)
|
||||
# Also add root_dir to the list
|
||||
|
|
@ -448,7 +448,7 @@ class Qbt:
|
|||
@handle_qbit_api_errors(context="tor_delete_recycle_get_files", retry_attempts=1)
|
||||
def get_torrent_files():
|
||||
info_hash = torrent.hash
|
||||
save_path = torrent.save_path.replace(self.config.root_dir, self.config.remote_dir)
|
||||
save_path = util.path_replace(torrent.save_path, self.config.root_dir, self.config.remote_dir)
|
||||
# Define torrent files/folders
|
||||
for file in torrent.files:
|
||||
tor_files.append(os.path.join(save_path, file.name))
|
||||
|
|
@ -529,7 +529,7 @@ class Qbt:
|
|||
logger.info(backup_str)
|
||||
torrent_json["tracker_torrent_files"] = tracker_torrent_files
|
||||
if "files" not in torrent_json:
|
||||
files_cleaned = [f.replace(self.config.remote_dir, "") for f in tor_files]
|
||||
files_cleaned = [util.path_replace(f, self.config.remote_dir, "") for f in tor_files]
|
||||
torrent_json["files"] = files_cleaned
|
||||
if "deleted_contents" not in torrent_json:
|
||||
torrent_json["deleted_contents"] = info["torrents_deleted_and_contents"]
|
||||
|
|
@ -546,13 +546,14 @@ class Qbt:
|
|||
else:
|
||||
logger.print_line("\n".join(tor_files), "DEBUG")
|
||||
logger.debug(
|
||||
f"Moved {len(tor_files)} files to {recycle_path.replace(self.config.remote_dir, self.config.root_dir)}"
|
||||
f"Moved {len(tor_files)} files to "
|
||||
f"{util.path_replace(recycle_path, self.config.remote_dir, self.config.root_dir)}"
|
||||
)
|
||||
|
||||
# Move files from torrent contents to Recycle bin
|
||||
for file in tor_files:
|
||||
src = file
|
||||
dest = os.path.join(recycle_path, file.replace(self.config.remote_dir, ""))
|
||||
dest = os.path.join(recycle_path, util.path_replace(file, self.config.remote_dir, ""))
|
||||
# Move files and change date modified
|
||||
try:
|
||||
to_delete = util.move_files(src, dest, True)
|
||||
|
|
@ -560,7 +561,7 @@ class Qbt:
|
|||
ex = logger.print_line(f"RecycleBin Warning - FileNotFound: No such file or directory: {src} ", "WARNING")
|
||||
self.config.notify(ex, "Deleting Torrent", False)
|
||||
# Add src file to orphan exclusion since sometimes deleting files are slow in certain environments
|
||||
exclude_file = src.replace(self.config.remote_dir, self.config.root_dir)
|
||||
exclude_file = util.path_replace(src, self.config.remote_dir, self.config.root_dir)
|
||||
if exclude_file not in self.config.orphaned["exclude_patterns"]:
|
||||
self.config.orphaned["exclude_patterns"].append(exclude_file)
|
||||
# Delete torrent and files
|
||||
|
|
@ -573,7 +574,7 @@ class Qbt:
|
|||
if info["torrents_deleted_and_contents"] is True:
|
||||
for file in tor_files:
|
||||
# Add src file to orphan exclusion since sometimes deleting files are slow in certain environments
|
||||
exclude_file = file.replace(self.config.remote_dir, self.config.root_dir)
|
||||
exclude_file = util.path_replace(file, self.config.remote_dir, self.config.root_dir)
|
||||
if exclude_file not in self.config.orphaned["exclude_patterns"]:
|
||||
self.config.orphaned["exclude_patterns"].append(exclude_file)
|
||||
torrent.delete(delete_files=True)
|
||||
|
|
|
|||
|
|
@ -1034,7 +1034,7 @@ class CheckHardLinks:
|
|||
continue
|
||||
else:
|
||||
try:
|
||||
inode_no = os.stat(file.replace(self.root_dir, self.remote_dir)).st_ino
|
||||
inode_no = os.stat(path_replace(file, self.root_dir, self.remote_dir)).st_ino
|
||||
except PermissionError as perm:
|
||||
logger.warning(f"{perm} : file {file} has permission issues. Skipping...")
|
||||
continue
|
||||
|
|
@ -1169,7 +1169,7 @@ def get_root_files(root_dir, remote_dir, exclude_dir=None):
|
|||
else:
|
||||
# Convert an exclude in remote namespace to root namespace for comparison after replacement
|
||||
try:
|
||||
local_exclude_dir = exclude_dir.replace(remote_dir, root_dir, 1)
|
||||
local_exclude_dir = path_replace(exclude_dir, remote_dir, root_dir)
|
||||
except Exception:
|
||||
local_exclude_dir = None
|
||||
|
||||
|
|
@ -1185,7 +1185,7 @@ def get_root_files(root_dir, remote_dir, exclude_dir=None):
|
|||
else:
|
||||
# Walk the accessible remote_dir and convert to root_dir representation once per directory
|
||||
for path, subdirs, files in os.walk(base_to_walk):
|
||||
replaced_path = path.replace(remote_dir, root_dir, 1)
|
||||
replaced_path = path_replace(path, remote_dir, root_dir)
|
||||
if local_exclude_dir and os.path.normcase(local_exclude_dir) in os.path.normcase(replaced_path):
|
||||
continue
|
||||
for name in files:
|
||||
|
|
@ -1325,6 +1325,41 @@ def parse_size_to_bytes(value):
|
|||
return int(num * mul)
|
||||
|
||||
|
||||
def path_replace(path, old_path, new_path):
|
||||
"""
|
||||
Cross-platform safe path replacement that handles different path separators.
|
||||
|
||||
This function replaces old_path with new_path in the given path, accounting for
|
||||
differences in path separators between Windows (\\) and Unix-like systems (/).
|
||||
|
||||
Args:
|
||||
path (str): The path to modify
|
||||
old_path (str): The path segment to replace
|
||||
new_path (str): The replacement path segment
|
||||
|
||||
Returns:
|
||||
str: The modified path with cross-platform compatibility
|
||||
"""
|
||||
if not path or not old_path:
|
||||
return path
|
||||
|
||||
# Normalize all paths to use forward slashes for comparison
|
||||
path_norm = path.replace("\\", "/")
|
||||
old_norm = old_path.replace("\\", "/")
|
||||
new_norm = new_path.replace("\\", "/") if new_path else ""
|
||||
|
||||
# Perform the replacement on normalized paths
|
||||
if path_norm.startswith(old_norm):
|
||||
result = new_norm + path_norm[len(old_norm) :]
|
||||
elif old_norm in path_norm:
|
||||
result = path_norm.replace(old_norm, new_norm, 1)
|
||||
else:
|
||||
return path
|
||||
|
||||
# Convert back to the platform's preferred separator
|
||||
return os.path.normpath(result)
|
||||
|
||||
|
||||
class YAML:
|
||||
"""Class to load and save yaml files with !ENV tag preservation and environment variable resolution"""
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue