shortening URL in the caption if it's too long

This commit is contained in:
SanujaNS 2024-06-30 00:37:27 +05:30
parent 86295931e1
commit 6feb473dcf
4 changed files with 43 additions and 9 deletions

View file

@ -23,7 +23,7 @@ REDIS = os.getenv("REDIS", "redis")
TG_PREMIUM_MAX_SIZE = 4000 * 1024 * 1024
TG_NORMAL_MAX_SIZE = 2000 * 1024 * 1024
# TG_NORMAL_MAX_SIZE = 10 * 1024 * 1024
CAPTION_URL_LENGTH_LIMIT = 150
EXPIRE = 24 * 3600

View file

@ -189,11 +189,17 @@ def can_convert_mp4(video_path, uid):
return True
def ytdl_download(url: str, tempdir: str, bm, **kwargs) -> list:
def ytdl_download(url: str, tempdir: str, bm, custom_filename=None, **kwargs) -> list:
payment = Payment()
chat_id = bm.chat.id
hijack = kwargs.get("hijack")
output = pathlib.Path(tempdir, "%(title).70s.%(ext)s").as_posix()
output = None
if custom_filename:
output = pathlib.Path(tempdir, custom_filename).as_posix()
else:
output = pathlib.Path(tempdir, "%(title).70s.%(ext)s").as_posix()
ydl_opts = {
"progress_hooks": [lambda d: download_hook(d, bm)],
"outtmpl": output,

View file

@ -44,6 +44,7 @@ from config import (
TMPFILE_PATH,
WORKERS,
FileTooBig,
CAPTION_URL_LENGTH_LIMIT,
)
from constant import BotText
from database import Redis, MySQL
@ -57,6 +58,7 @@ from utils import (
get_metadata,
get_revision,
sizeof_fmt,
shorten_url,
)
customize_logger(["pyrogram.client", "pyrogram.session.session", "pyrogram.connection.connection"])
@ -108,8 +110,11 @@ def premium_button(user_id):
def ytdl_download_task(chat_id: int, message_id: int, url: str):
logging.info("YouTube celery tasks started for %s", url)
bot_msg = retrieve_message(chat_id, message_id)
url_parts = url.split(" -n ", maxsplit=1)
url = url_parts[0].strip() # Assuming space after -n
custom_filename = url_parts[1].strip() if len(url_parts) > 1 else None
try:
ytdl_normal_download(bot, bot_msg, url)
ytdl_normal_download(bot, bot_msg, url, custom_filename=custom_filename)
except FileTooBig as e:
# if you can go there, that means you have premium users set up
logging.warning("Seeking for help from premium user...")
@ -171,8 +176,14 @@ def ytdl_download_entrance(client: Client, bot_msg: types.Message, url: str, mod
payment = Payment()
redis = Redis()
chat_id = bot_msg.chat.id
unique = get_unique_clink(url, chat_id)
cached_fid = redis.get_send_cache(unique)
url_parts = url.split(" -n ", maxsplit=1)
url = url_parts[0].strip()
custom_filename = url_parts[1].strip() if len(url_parts) > 1 else None
cached_fid = None
if not custom_filename:
unique = get_unique_clink(url, chat_id)
cached_fid = redis.get_send_cache(unique)
try:
if cached_fid:
@ -185,7 +196,7 @@ def ytdl_download_entrance(client: Client, bot_msg: types.Message, url: str, mod
# in celery mode, producer has lost control of this task.
ytdl_download_task.delay(chat_id, bot_msg.id, url)
else:
ytdl_normal_download(client, bot_msg, url)
ytdl_normal_download(client, bot_msg, url, custom_filename=custom_filename)
except FileTooBig as e:
logging.warning("Seeking for help from premium user...")
# this is only for normal node. Celery node will need to do it in celery tasks
@ -314,7 +325,7 @@ def normal_audio(client: Client, bot_msg: typing.Union[types.Message, typing.Cor
Redis().update_metrics("audio_success")
def ytdl_normal_download(client: Client, bot_msg: types.Message | typing.Any, url: str):
def ytdl_normal_download(client: Client, bot_msg: types.Message | typing.Any, url: str, custom_filename: str):
"""
This function is called by celery task or directly by bot
:param client: bot client, either from main or bot(celery)
@ -324,7 +335,7 @@ def ytdl_normal_download(client: Client, bot_msg: types.Message | typing.Any, ur
chat_id = bot_msg.chat.id
temp_dir = tempfile.TemporaryDirectory(prefix="ytdl-", dir=TMPFILE_PATH)
video_paths = ytdl_download(url, temp_dir.name, bot_msg)
video_paths = ytdl_download(url, temp_dir.name, bot_msg, custom_filename)
logging.info("Download complete.")
client.send_chat_action(chat_id, enums.ChatAction.UPLOAD_DOCUMENT)
bot_msg.edit_text("Download complete. Sending now...")
@ -544,6 +555,16 @@ def gen_cap(bm, url, video_path):
worker = f"Downloaded by {worker_name}"
else:
worker = ""
# Shorten the URL if necessary
try:
if len(url) > CAPTION_URL_LENGTH_LIMIT:
url_for_cap = shorten_url(url, CAPTION_URL_LENGTH_LIMIT)
else:
url_for_cap = url
except Exception as e:
logging.warning(f"Error shortening URL: {e}")
url_for_cap = url
cap = (
f"{user_info}\n{file_name}\n\n{url}\n\nInfo: {meta['width']}x{meta['height']} {file_size}\t"
f"{meta['duration']}s\n{remain}\n{worker}\n{bot_text.custom_text}"

View file

@ -242,5 +242,12 @@ def extract_code_from_instagram_url(url):
return None
def shorten_url(url, CAPTION_URL_LENGTH_LIMIT):
#Shortens a URL by cutting it to a specified length.
shortened_url = url[:CAPTION_URL_LENGTH_LIMIT - 3] + "..."
return shortened_url
if __name__ == "__main__":
auto_restart()