forward enhancement

This commit is contained in:
BennyThink 2022-02-19 21:59:41 +08:00
parent 800b638ca6
commit f99addd94f
No known key found for this signature in database
GPG key ID: 6CD0DBDA5235D481
3 changed files with 126 additions and 105 deletions

View file

@ -141,21 +141,14 @@ class Redis:
file.name = f"{date}.txt" file.name = f"{date}.txt"
return file return file
def add_send_cache(self, unique, uid, mid): def add_send_cache(self, unique, file_id):
# unique: video_url+resolution+send_type self.r.hset("cache", unique, file_id)
# value in redis [uid1,uid2]
values = []
v = self.r.hget(unique, uid)
if v:
values = json.loads(v)
values.append(mid)
self.r.hset(unique, uid, json.dumps(values))
def get_send_cache(self, unique) -> "dict": def get_send_cache(self, unique) -> "str":
return self.r.hgetall(unique) return self.r.hget("cache", unique)
def del_send_cache(self, unique, uid): def del_send_cache(self, unique):
self.r.hdel(unique, uid) self.r.hdel("cache", unique)
class MySQL: class MySQL:

View file

@ -78,7 +78,7 @@ class VIP(Redis, MySQL):
if ENABLE_VIP: if ENABLE_VIP:
self.cur.execute("select count(user_id) from subscribe where user_id=%s", (user_id,)) self.cur.execute("select count(user_id) from subscribe where user_id=%s", (user_id,))
usage = int(self.cur.fetchone()[0]) usage = int(self.cur.fetchone()[0])
if usage >= 3 and not self.check_vip(user_id): if usage >= 5 and not self.check_vip(user_id):
logging.warning("User %s is not VIP but has subscribed %s channels", user_id, usage) logging.warning("User %s is not VIP but has subscribed %s channels", user_id, usage)
return "You have subscribed too many channels. Please upgrade to VIP to subscribe more channels." return "You have subscribed too many channels. Please upgrade to VIP to subscribe more channels."

View file

@ -16,6 +16,8 @@ import subprocess
import tempfile import tempfile
import threading import threading
import time import time
import traceback
import typing
from urllib.parse import quote_plus from urllib.parse import quote_plus
import psutil import psutil
@ -23,8 +25,8 @@ import requests
from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.schedulers.background import BackgroundScheduler
from celery import Celery from celery import Celery
from celery.worker.control import Panel from celery.worker.control import Panel
from pyrogram import idle from pyrogram import Client, idle
from pyrogram.types import InlineKeyboardButton, InlineKeyboardMarkup from pyrogram.types import InlineKeyboardButton, InlineKeyboardMarkup, Message
from requests_toolbelt.multipart.encoder import MultipartEncoder from requests_toolbelt.multipart.encoder import MultipartEncoder
from client_init import create_app from client_init import create_app
@ -75,7 +77,9 @@ def audio_task(chat_id, message_id):
logging.info("Audio celery tasks ended.") logging.info("Audio celery tasks ended.")
def get_unique_clink(clink, settings): def get_unique_clink(original_url, user_id):
settings = get_user_settings(str(user_id))
clink = VIP().extract_canonical_link(original_url)
try: try:
unique = "{}?p={}{}".format(clink, *settings[1:]) unique = "{}?p={}{}".format(clink, *settings[1:])
except IndexError: except IndexError:
@ -91,43 +95,42 @@ def direct_download_task(chat_id, message_id, url):
logging.info("Direct download celery tasks ended.") logging.info("Direct download celery tasks ended.")
def forward_video(chat_id, url, client): def forward_video(url, client, bot_msg):
chat_id = bot_msg.chat.id
red = Redis() red = Redis()
vip = VIP() vip = VIP()
settings = get_user_settings(str(chat_id)) unique = get_unique_clink(url, chat_id)
clink = vip.extract_canonical_link(url)
unique = get_unique_clink(clink, settings)
cache = red.get_send_cache(unique) cached_fid = red.get_send_cache(unique)
if not cache: if not cached_fid:
return False return False
for uid, mid in cache.items():
uid, mid = int(uid), json.loads(mid)
try: try:
fwd_msg = client.forward_messages(chat_id, uid, mid) res_msg: "Message" = upload_processor(client, bot_msg, url, cached_fid)
if not fwd_msg: if not res_msg:
raise ValueError("Failed to forward message") raise ValueError("Failed to forward message")
red.update_metrics("cache_hit") obj = res_msg.document or res_msg.video or res_msg.audio
if not isinstance(fwd_msg, list): caption, _ = gen_cap(chat_id, url, obj)
fwd_msg = [fwd_msg] res_msg.edit_text(caption, reply_markup=gen_video_markup())
for fwd in fwd_msg:
if ENABLE_VIP: if ENABLE_VIP:
file_size = getattr(fwd.document, "file_size", None) or getattr(fwd.video, "file_size", 1024) file_size = getattr(obj, "file_size", None) \
or getattr(obj, "file_size", None) \
or getattr(obj, "file_size", 10)
# TODO: forward file size may exceed the limit # TODO: forward file size may exceed the limit
vip.use_quota(chat_id, file_size) vip.use_quota(chat_id, file_size)
red.add_send_cache(unique, chat_id, fwd.message_id) red.update_metrics("cache_hit")
return True return True
except Exception as e: except Exception as e:
traceback.print_exc()
logging.error("Failed to forward message %s", e) logging.error("Failed to forward message %s", e)
red.del_send_cache(unique, uid) red.del_send_cache(unique)
red.update_metrics("cache_miss") red.update_metrics("cache_miss")
def ytdl_download_entrance(bot_msg, client, url): def ytdl_download_entrance(bot_msg, client, url):
chat_id = bot_msg.chat.id chat_id = bot_msg.chat.id
if forward_video(chat_id, url, client): if forward_video(url, client, bot_msg):
return return
mode = get_user_settings(str(chat_id))[-1] mode = get_user_settings(str(chat_id))[-1]
if ENABLE_CELERY and mode in [None, "Celery"]: if ENABLE_CELERY and mode in [None, "Celery"]:
@ -251,9 +254,93 @@ def ytdl_normal_download(bot_msg, client, url):
chat_id = bot_msg.chat.id chat_id = bot_msg.chat.id
temp_dir = tempfile.TemporaryDirectory(prefix="ytdl-") temp_dir = tempfile.TemporaryDirectory(prefix="ytdl-")
red = Redis()
result = ytdl_download(url, temp_dir.name, bot_msg) result = ytdl_download(url, temp_dir.name, bot_msg)
logging.info("Download complete.") logging.info("Download complete.")
if result["status"]:
client.send_chat_action(chat_id, 'upload_document')
video_paths = result["filepath"]
bot_msg.edit_text('Download complete. Sending now...')
for video_path in video_paths:
# normally there's only one video in that path...
st_size = os.stat(video_path).st_size
if st_size > TG_MAX_SIZE:
t = f"Your video({sizeof_fmt(st_size)}) is too large for Telegram. I'll upload it to transfer.sh"
bot_msg.edit_text(t)
client.send_chat_action(chat_id, 'upload_document')
client.send_message(chat_id, upload_transfer_sh(video_paths))
return
upload_processor(client, bot_msg, url, video_path)
bot_msg.edit_text('Download success!✅')
else:
client.send_chat_action(chat_id, 'typing')
tb = result["error"][0:4000]
bot_msg.edit_text(f"Download failed!❌\n\n```{tb}```", disable_web_page_preview=True)
temp_dir.cleanup()
def upload_processor(client, bot_msg, url, vp_or_fid: "typing.Any[str, pathlib.Path]"):
chat_id = bot_msg.chat.id
red = Redis()
markup = gen_video_markup()
cap, meta = gen_cap(chat_id, url, vp_or_fid)
settings = get_user_settings(str(chat_id))
if ARCHIVE_ID and isinstance(vp_or_fid, pathlib.Path):
chat_id = ARCHIVE_ID
if settings[2] == "document":
logging.info("Sending as document")
res_msg = client.send_document(chat_id, vp_or_fid,
caption=cap,
progress=upload_hook, progress_args=(bot_msg,),
reply_markup=markup,
thumb=meta["thumb"]
)
elif settings[2] == "audio":
logging.info("Sending as audio")
res_msg = client.send_audio(chat_id, vp_or_fid,
caption=cap,
progress=upload_hook, progress_args=(bot_msg,),
)
else:
logging.info("Sending as video")
res_msg = client.send_video(chat_id, vp_or_fid,
supports_streaming=True,
caption=cap,
progress=upload_hook, progress_args=(bot_msg,),
reply_markup=markup,
**meta
)
unique = get_unique_clink(url, bot_msg.chat.id)
obj = res_msg.document or res_msg.video or res_msg.audio
red.add_send_cache(unique, getattr(obj, "file_id", None))
red.update_metrics("video_success")
if ARCHIVE_ID and isinstance(vp_or_fid, pathlib.Path):
client.forward_messages(bot_msg.chat.id, ARCHIVE_ID, res_msg.message_id)
return res_msg
def gen_cap(chat_id, url, video_path):
if isinstance(video_path, pathlib.Path):
meta = get_metadata(video_path)
file_name = video_path.name
file_size = sizeof_fmt(os.stat(video_path).st_size)
else:
file_name = getattr(video_path, "file_name", "")
file_size = sizeof_fmt(getattr(video_path, "file_size", (2 << 6) - (2 << 4) - (2 << 2) + (0 ^ 1) + (2 << 5)))
meta = dict(
width=getattr(video_path, "width", 0),
height=getattr(video_path, "height", 0),
duration=getattr(video_path, "duration", 0),
thumb=getattr(video_path, "thumb", None),
)
remain = bot_text.remaining_quota_caption(chat_id)
worker = get_dl_source()
cap = f"`{file_name}`\n\n{url}\n\nInfo: {meta['width']}x{meta['height']} {file_size}\t" \
f"{meta['duration']}s\n{remain}\n{worker}"
return cap, meta
def gen_video_markup():
markup = InlineKeyboardMarkup( markup = InlineKeyboardMarkup(
[ [
[ # First row [ # First row
@ -264,66 +351,7 @@ def ytdl_normal_download(bot_msg, client, url):
] ]
] ]
) )
if result["status"]: return markup
client.send_chat_action(chat_id, 'upload_document')
video_paths = result["filepath"]
bot_msg.edit_text('Download complete. Sending now...')
for video_path in video_paths:
# normally there's only one video in that path...
filename = video_path.name
remain = bot_text.remaining_quota_caption(chat_id)
st_size = os.stat(video_path).st_size
size = sizeof_fmt(st_size)
if st_size > TG_MAX_SIZE:
t = f"Your video size is {size} which is too large for Telegram. I'll upload it to transfer.sh"
bot_msg.edit_text(t)
client.send_chat_action(chat_id, 'upload_document')
client.send_message(chat_id, upload_transfer_sh(video_paths))
return
meta = get_metadata(video_path)
worker = get_dl_source()
cap = f"`{filename}`\n\n{url}\n\nInfo: {meta['width']}x{meta['height']} {size} {meta['duration']}s" \
f"\n{remain}\n{worker}"
settings = get_user_settings(str(chat_id))
if ARCHIVE_ID:
chat_id = ARCHIVE_ID
if settings[2] == "document":
logging.info("Sending as document")
res_msg = client.send_document(chat_id, video_path,
caption=cap,
progress=upload_hook, progress_args=(bot_msg,),
reply_markup=markup,
thumb=meta["thumb"]
)
elif settings[2] == "audio":
logging.info("Sending as audio")
res_msg = client.send_audio(chat_id, video_path,
caption=cap,
progress=upload_hook, progress_args=(bot_msg,),
)
else:
logging.info("Sending as video")
res_msg = client.send_video(chat_id, video_path,
supports_streaming=True,
caption=cap,
progress=upload_hook, progress_args=(bot_msg,),
reply_markup=markup,
**meta
)
clink = VIP().extract_canonical_link(url)
unique = get_unique_clink(clink, settings)
red.add_send_cache(unique, res_msg.chat.id, res_msg.message_id)
red.update_metrics("video_success")
if ARCHIVE_ID:
client.forward_messages(bot_msg.chat.id, ARCHIVE_ID, res_msg.message_id)
bot_msg.edit_text('Download success!✅')
else:
client.send_chat_action(chat_id, 'typing')
tb = result["error"][0:4000]
bot_msg.edit_text(f"Download failed!❌\n\n```{tb}```", disable_web_page_preview=True)
temp_dir.cleanup()
@Panel.register @Panel.register