Pyrogram2 (#291)

* upgrade to pyrogram 2.x

#288

* reformat code

* update dependencies

* remove instagram cookies

* add cryptography

* upgrade to py3.11

* temp fix
This commit is contained in:
Benny 2023-09-10 14:08:11 +02:00 committed by GitHub
parent 0607ae8b70
commit d90133211c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 111 additions and 76 deletions

View file

@ -1,11 +1,11 @@
FROM python:3.10-alpine as builder
FROM python:3.11-alpine as builder
RUN apk update && apk add --no-cache tzdata alpine-sdk libffi-dev ca-certificates
ADD requirements.txt /tmp/
RUN pip3 install --user -r /tmp/requirements.txt && rm /tmp/requirements.txt
FROM python:3.10-alpine
FROM python:3.11-alpine
WORKDIR /ytdlbot/ytdlbot
ENV TZ=Europe/Stockholm

View file

@ -21,6 +21,7 @@ services:
- ./db_data:/var/lib/mysql
environment:
MYSQL_ROOT_PASSWORD: 'root'
command: --default-authentication-plugin=mysql_native_password
logging:
driver: none
@ -33,7 +34,6 @@ services:
- socat
- redis
volumes:
- ./data/instagram.com_cookies.txt:/ytdlbot/ytdlbot/instagram.com_cookies.txt
- ./data/vnstat/:/var/lib/vnstat/
labels:
- "com.centurylinklabs.watchtower.enable=true"

View file

@ -1,11 +1,11 @@
pyrogram==1.4.16
pyrogram==2.0.106
tgcrypto==1.2.5
yt-dlp==2023.7.6
APScheduler==3.10.4
beautifultable==1.1.0
ffmpeg-python==0.2.0
PyMySQL==1.1.0
celery==5.3.1
celery==5.3.4
filetype==1.2.0
flower==2.0.1
psutil==5.9.5

View file

@ -7,11 +7,9 @@ services:
- env/ytdl.env
restart: always
command: [ "/usr/local/bin/supervisord", "-c" ,"/ytdlbot/conf/supervisor_worker.conf" ]
volumes:
- ./data/instagram.com_cookies.txt:/ytdlbot/ytdlbot/instagram.com_cookies.txt
# network_mode: "host"
# deploy:
# resources:
# limits:
# cpus: '0.3'
# memory: 1500M
# memory: 1500M

View file

@ -254,7 +254,9 @@ class MySQL:
self.con = pymysql.connect(
host=MYSQL_HOST, user=MYSQL_USER, passwd=MYSQL_PASS, db="ytdl", charset="utf8mb4"
)
logging.debug("Used real MySQL connection.")
except pymysql.err.OperationalError:
logging.warning("Using fake MySQL connection.")
self.con = FakeMySQL()
self.con.ping(reconnect=True)
@ -273,9 +275,8 @@ class MySQL:
self.con.close()
def get_user_settings(self, user_id: int) -> tuple:
cur = self.con.cursor()
cur.execute("SELECT * FROM settings WHERE user_id = %s", (user_id,))
data = cur.fetchone()
self.cur.execute("SELECT * FROM settings WHERE user_id = %s", (user_id,))
data = self.cur.fetchone()
if data is None:
return 100, "high", "video", "Celery"
return data

View file

@ -26,7 +26,14 @@ import requests
import yt_dlp as ytdl
from tqdm import tqdm
from config import AUDIO_FORMAT, ENABLE_ARIA2, ENABLE_FFMPEG, TG_MAX_SIZE, IPv6, SS_YOUTUBE
from config import (
AUDIO_FORMAT,
ENABLE_ARIA2,
ENABLE_FFMPEG,
SS_YOUTUBE,
TG_MAX_SIZE,
IPv6,
)
from limit import Payment
from utils import adjust_formats, apply_log_formatter, current_time, sizeof_fmt
@ -35,7 +42,7 @@ apply_log_formatter()
def edit_text(bot_msg, text: str):
key = f"{bot_msg.chat.id}-{bot_msg.message_id}"
key = f"{bot_msg.chat.id}-{bot_msg.id}"
# if the key exists, we shouldn't send edit message
if not r.exists(key):
time.sleep(random.random())

View file

@ -11,4 +11,4 @@ from celery import Celery
from config import BROKER
app = Celery('tasks', broker=BROKER, timezone="Asia/Shanghai")
app = Celery("tasks", broker=BROKER, timezone="Asia/Shanghai")

View file

@ -7,6 +7,7 @@
__author__ = "Benny <benny.think@gmail.com>"
import asyncio
import logging
import math
import os
@ -15,6 +16,7 @@ import random
import re
import shutil
import subprocess
import sys
import tempfile
import threading
import time
@ -30,7 +32,7 @@ import requests
from apscheduler.schedulers.background import BackgroundScheduler
from celery import Celery
from celery.worker.control import Panel
from pyrogram import Client, idle, types
from pyrogram import Client, enums, idle, types
from pyrogram.types import InlineKeyboardButton, InlineKeyboardMarkup, Message
from requests_toolbelt import MultipartEncoder, MultipartEncoderMonitor
@ -43,8 +45,8 @@ from config import (
ENABLE_QUEUE,
ENABLE_VIP,
OWNER,
RCLONE_PATH,
RATE_LIMIT,
RCLONE_PATH,
WORKERS,
)
from constant import BotText
@ -71,24 +73,23 @@ app = Celery("tasks", broker=BROKER)
redis = Redis()
channel = Channel()
session = "ytdl-celery"
celery_client = create_app(session)
bot = create_app("ytdl-celery")
def get_messages(chat_id, message_id):
try:
return celery_client.get_messages(chat_id, message_id)
return bot.get_messages(chat_id, message_id)
except ConnectionError as e:
logging.critical("WTH!!! %s", e)
celery_client.start()
return celery_client.get_messages(chat_id, message_id)
bot.start()
return bot.get_messages(chat_id, message_id)
@app.task(rate_limit=f"{RATE_LIMIT}/m")
def ytdl_download_task(chat_id, message_id, url: str):
logging.info("YouTube celery tasks started for %s", url)
bot_msg = get_messages(chat_id, message_id)
ytdl_normal_download(celery_client, bot_msg, url)
ytdl_normal_download(bot, bot_msg, url)
logging.info("YouTube celery tasks ended.")
@ -96,7 +97,7 @@ def ytdl_download_task(chat_id, message_id, url: str):
def audio_task(chat_id, message_id):
logging.info("Audio celery tasks started for %s-%s", chat_id, message_id)
bot_msg = get_messages(chat_id, message_id)
normal_audio(celery_client, bot_msg)
normal_audio(bot, bot_msg)
logging.info("Audio celery tasks ended.")
@ -116,7 +117,7 @@ def get_unique_clink(original_url: str, user_id: int):
def direct_download_task(chat_id, message_id, url):
logging.info("Direct download celery tasks started for %s", url)
bot_msg = get_messages(chat_id, message_id)
direct_normal_download(celery_client, bot_msg, url)
direct_normal_download(bot, bot_msg, url)
logging.info("Direct download celery tasks ended.")
@ -146,8 +147,8 @@ def ytdl_download_entrance(client: Client, bot_msg: types.Message, url: str, mod
return
mode = mode or payment.get_user_settings(chat_id)[-1]
if ENABLE_CELERY and mode in [None, "Celery"]:
async_task(ytdl_download_task, chat_id, bot_msg.message_id, url)
# ytdl_download_task.delay(chat_id, bot_msg.message_id, url)
async_task(ytdl_download_task, chat_id, bot_msg.id, url)
# ytdl_download_task.delay(chat_id, bot_msg.id, url)
else:
ytdl_normal_download(client, bot_msg, url)
except Exception as e:
@ -158,15 +159,15 @@ def ytdl_download_entrance(client: Client, bot_msg: types.Message, url: str, mod
def direct_download_entrance(client: Client, bot_msg: typing.Union[types.Message, typing.Coroutine], url: str):
if ENABLE_CELERY:
direct_normal_download(client, bot_msg, url)
# direct_download_task.delay(bot_msg.chat.id, bot_msg.message_id, url)
# direct_download_task.delay(bot_msg.chat.id, bot_msg.id, url)
else:
direct_normal_download(client, bot_msg, url)
def audio_entrance(client, bot_msg):
if ENABLE_CELERY:
async_task(audio_task, bot_msg.chat.id, bot_msg.message_id)
# audio_task.delay(bot_msg.chat.id, bot_msg.message_id)
async_task(audio_task, bot_msg.chat.id, bot_msg.id)
# audio_task.delay(bot_msg.chat.id, bot_msg.id)
else:
normal_audio(client, bot_msg)
@ -205,7 +206,7 @@ def direct_normal_download(client: Client, bot_msg: typing.Union[types.Message,
logging.info("Downloaded file %s", filename)
st_size = os.stat(filepath).st_size
client.send_chat_action(chat_id, "upload_document")
client.send_chat_action(chat_id, enums.ChatAction.UPLOAD_DOCUMENT)
client.send_document(
bot_msg.chat.id,
filepath,
@ -224,11 +225,11 @@ def normal_audio(client: Client, bot_msg: typing.Union[types.Message, typing.Cor
)
orig_url: str = re.findall(r"https?://.*", bot_msg.caption)[0]
with tempfile.TemporaryDirectory(prefix="ytdl-") as tmp:
client.send_chat_action(chat_id, "record_audio")
client.send_chat_action(chat_id, enums.ChatAction.RECORD_AUDIO)
# just try to download the audio using yt-dlp
filepath = ytdl_download(orig_url, tmp, status_msg, hijack="bestaudio[ext=m4a]")
status_msg.edit_text("Sending audio now...")
client.send_chat_action(chat_id, "upload_audio")
client.send_chat_action(chat_id, enums.ChatAction.UPLOAD_AUDIO)
for f in filepath:
client.send_audio(chat_id, f)
status_msg.edit_text("✅ Conversion complete.")
@ -264,18 +265,18 @@ def ytdl_normal_download(client: Client, bot_msg: typing.Union[types.Message, ty
video_paths = ytdl_download(url, temp_dir.name, bot_msg)
logging.info("Download complete.")
client.send_chat_action(chat_id, "upload_document")
client.send_chat_action(chat_id, enums.ChatAction.UPLOAD_DOCUMENT)
bot_msg.edit_text("Download complete. Sending now...")
try:
upload_processor(client, bot_msg, url, video_paths)
except pyrogram.errors.Flood as e:
except pyrogram.errors.FloodWait as e:
logging.critical("FloodWait from Telegram: %s", e)
client.send_message(
chat_id,
f"I'm being rate limited by Telegram. Your video will come after {e.x} seconds. Please wait patiently.",
f"I'm being rate limited by Telegram. Your video will come after {e.value} seconds. Please wait patiently.",
)
flood_owner_message(client, e)
time.sleep(e.x)
time.sleep(e.value)
upload_processor(client, bot_msg, url, video_paths)
bot_msg.edit_text("Download success!✅")
@ -407,7 +408,7 @@ def upload_processor(client, bot_msg, url, vp_or_fid: typing.Union[str, list]):
redis.add_send_cache(unique, getattr(obj, "file_id", None))
redis.update_metrics("video_success")
if ARCHIVE_ID and isinstance(vp_or_fid, pathlib.Path):
client.forward_messages(bot_msg.chat.id, ARCHIVE_ID, res_msg.message_id)
client.forward_messages(bot_msg.chat.id, ARCHIVE_ID, res_msg.id)
return res_msg
@ -449,14 +450,8 @@ def gen_cap(bm, url, video_path):
def gen_video_markup():
markup = InlineKeyboardMarkup(
[
[ # First row
InlineKeyboardButton( # Generates a callback query when pressed
"convert to audio", callback_data="convert"
)
]
]
)
[[InlineKeyboardButton("convert to audio", callback_data="convert")]]
) # First row # Generates a callback query when pressed
return markup
@ -506,28 +501,43 @@ def async_task(task_name, *args):
task_name.apply_async(args=args, queue=destination)
def run_celery():
worker_name = os.getenv("WORKER_NAME", "")
argv = ["-A", "tasks", "worker", "--loglevel=info", "--pool=threads", f"--concurrency={WORKERS}", "-n", worker_name]
if ENABLE_QUEUE:
argv.extend(["-Q", worker_name])
app.worker_main(argv)
def purge_tasks():
count = app.control.purge()
return f"purged {count} tasks."
def run_celery():
# 创建一个新的事件循环
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
worker_name = os.getenv("WORKER_NAME", "")
argv = [
"-A",
"tasks",
"worker",
"--loglevel=info",
"--pool=threads",
f"--concurrency={WORKERS}",
"-n",
worker_name,
]
if ENABLE_QUEUE:
argv.extend(["-Q", worker_name])
app.worker_main(argv)
except:
logging.warning("Celery worker failed to start.")
sys.exit(1)
if __name__ == "__main__":
# celery_client.start()
print("Bootstrapping Celery worker now.....")
time.sleep(5)
threading.Thread(target=run_celery, daemon=True).start()
scheduler = BackgroundScheduler(timezone="Asia/Shanghai")
scheduler.add_job(auto_restart, "interval", seconds=900)
scheduler.add_job(auto_restart, "interval", seconds=120)
scheduler.start()
idle()
celery_client.stop()
bot.stop()

View file

@ -12,11 +12,13 @@ import inspect as pyinspect
import logging
import os
import pathlib
import re
import shutil
import subprocess
import tempfile
import time
import uuid
from datetime import datetime
import coloredlogs
import ffmpeg
@ -189,6 +191,21 @@ class Detector:
# logging.warning("Potential crash detected by %s, it's time to commit suicide...", self.func_name())
# return True
def fail_connect_detector(self):
# TODO: don't know why sometimes it stops connected to DC
last_line = self.logs.strip().split("\n")[-1]
try:
log_time_str = re.findall(r"\[(.*),", last_line)[0]
log_time = datetime.strptime(log_time_str, "%Y-%m-%d %H:%M:%S")
except Exception:
return
time_difference = (datetime.now() - log_time).total_seconds()
if ("Sending as video" in last_line or "PingTask started" in last_line) and time_difference > 60:
logging.warning("Can't connect to Telegram DC")
return True
def auto_restart():
log_path = "/var/log/ytdl.log"

View file

@ -22,7 +22,7 @@ import pyrogram.errors
import requests
import yt_dlp
from apscheduler.schedulers.background import BackgroundScheduler
from pyrogram import Client, filters, types
from pyrogram import Client, enums, filters, types
from pyrogram.errors.exceptions.bad_request_400 import UserNotParticipant
from pyrogram.raw import functions
from pyrogram.raw import types as raw_types
@ -71,7 +71,7 @@ def private_use(func):
chat_id = getattr(message.from_user, "id", None)
# message type check
if message.chat.type != "private" and not message.text.lower().startswith("/ytdl"):
if message.chat.type != enums.ChatType.PRIVATE and not message.text.lower().startswith("/ytdl"):
logging.debug("%s, it's annoying me...🙄️ ", message.text)
return
@ -114,7 +114,7 @@ def start_handler(client: Client, message: types.Message):
payment = Payment()
from_id = message.from_user.id
logging.info("Welcome to youtube-dl bot!")
client.send_chat_action(from_id, "typing")
client.send_chat_action(from_id, enums.ChatAction.TYPING)
is_old_user = payment.check_old_user(from_id)
if is_old_user:
info = ""
@ -130,21 +130,21 @@ def start_handler(client: Client, message: types.Message):
@app.on_message(filters.command(["help"]))
def help_handler(client: Client, message: types.Message):
chat_id = message.chat.id
client.send_chat_action(chat_id, "typing")
client.send_chat_action(chat_id, enums.ChatAction.TYPING)
client.send_message(chat_id, BotText.help, disable_web_page_preview=True)
@app.on_message(filters.command(["about"]))
def about_handler(client: Client, message: types.Message):
chat_id = message.chat.id
client.send_chat_action(chat_id, "typing")
client.send_chat_action(chat_id, enums.ChatAction.TYPING)
client.send_message(chat_id, BotText.about)
@app.on_message(filters.command(["sub"]))
def subscribe_handler(client: Client, message: types.Message):
chat_id = message.chat.id
client.send_chat_action(chat_id, "typing")
client.send_chat_action(chat_id, enums.ChatAction.TYPING)
if message.text == "/sub":
result = channel.get_user_subscription(chat_id)
else:
@ -159,7 +159,7 @@ def subscribe_handler(client: Client, message: types.Message):
@app.on_message(filters.command(["unsub"]))
def unsubscribe_handler(client: Client, message: types.Message):
chat_id = message.chat.id
client.send_chat_action(chat_id, "typing")
client.send_chat_action(chat_id, enums.ChatAction.TYPING)
text = message.text.split(" ")
if len(text) == 1:
client.send_message(chat_id, "/unsub channel_id", disable_web_page_preview=True)
@ -179,7 +179,7 @@ def patch_handler(client: Client, message: types.Message):
chat_id = message.chat.id
if username == OWNER:
celery_app.control.broadcast("hot_patch")
client.send_chat_action(chat_id, "typing")
client.send_chat_action(chat_id, enums.ChatAction.TYPING)
client.send_message(chat_id, "Oorah!")
hot_patch()
@ -203,7 +203,7 @@ def purge_handler(client: Client, message: types.Message):
@app.on_message(filters.command(["ping"]))
def ping_handler(client: Client, message: types.Message):
chat_id = message.chat.id
client.send_chat_action(chat_id, "typing")
client.send_chat_action(chat_id, enums.ChatAction.TYPING)
if os.uname().sysname == "Darwin" or ".heroku" in os.getenv("PYTHONHOME", ""):
bot_info = "ping unavailable."
else:
@ -229,7 +229,7 @@ def sub_count_handler(client: Client, message: types.Message):
@app.on_message(filters.command(["direct"]))
def direct_handler(client: Client, message: types.Message):
chat_id = message.from_user.id
client.send_chat_action(chat_id, "typing")
client.send_chat_action(chat_id, enums.ChatAction.TYPING)
url = re.sub(r"/direct\s*", "", message.text)
logging.info("direct start %s", url)
if not re.findall(r"^https?://", url.lower()):
@ -246,7 +246,7 @@ def direct_handler(client: Client, message: types.Message):
def settings_handler(client: Client, message: types.Message):
chat_id = message.chat.id
payment = Payment()
client.send_chat_action(chat_id, "typing")
client.send_chat_action(chat_id, enums.ChatAction.TYPING)
data = MySQL().get_user_settings(chat_id)
set_mode = data[-1]
text = {"Local": "Celery", "Celery": "Local"}.get(set_mode, "Local")
@ -285,7 +285,7 @@ def buy_handler(client: Client, message: types.Message):
# process as chat.id, not from_user.id
chat_id = message.chat.id
text = message.text.strip()
client.send_chat_action(chat_id, "typing")
client.send_chat_action(chat_id, enums.ChatAction.TYPING)
client.send_message(chat_id, BotText.buy, disable_web_page_preview=True)
# generate telegram invoice here
payload = f"{message.chat.id}-buy"
@ -299,7 +299,7 @@ def buy_handler(client: Client, message: types.Message):
price, f"Buy {TOKEN_PRICE} download tokens", "You can pay by Telegram payment or using link above", payload
)
app.send(
app.invoke(
functions.messages.SendMedia(
peer=(raw_types.InputPeerUser(user_id=chat_id, access_hash=0)),
media=invoice,
@ -369,7 +369,7 @@ def link_checker(url: str) -> str:
def download_handler(client: Client, message: types.Message):
payment = Payment()
chat_id = message.from_user.id
client.send_chat_action(chat_id, "typing")
client.send_chat_action(chat_id, enums.ChatAction.TYPING)
redis.user_count(chat_id)
if message.document:
with tempfile.NamedTemporaryFile(mode="r+") as tf:
@ -412,19 +412,21 @@ def download_handler(client: Client, message: types.Message):
try:
# raise pyrogram.errors.exceptions.FloodWait(10)
bot_msg: typing.Union[types.Message, typing.Coroutine] = message.reply_text(text, quote=True)
except pyrogram.errors.Flood as e:
except pyrogram.errors.FloodWait as e:
f = BytesIO()
f.write(str(e).encode())
f.write(b"Your job will be done soon. Just wait! Don't rush.")
f.name = "Please don't flood me.txt"
bot_msg = message.reply_document(
f, caption=f"Flood wait! Please wait {e.x} seconds...." f"Your job will start automatically", quote=True
f,
caption=f"Flood wait! Please wait {e.value} seconds...." f"Your job will start automatically",
quote=True,
)
f.close()
client.send_message(OWNER, f"Flood wait! 🙁 {e.x} seconds....")
time.sleep(e.x)
client.send_message(OWNER, f"Flood wait! 🙁 {e.value} seconds....")
time.sleep(e.value)
client.send_chat_action(chat_id, "upload_video")
client.send_chat_action(chat_id, enums.ChatAction.UPLOAD_VIDEO)
bot_msg.chat = message.chat
ytdl_download_entrance(client, bot_msg, url)
@ -493,7 +495,7 @@ def raw_update(client: Client, update, users, chats):
payment = Payment()
action = getattr(getattr(update, "message", None), "action", None)
if update.QUALNAME == "types.UpdateBotPrecheckoutQuery":
client.send(
client.invoke(
functions.messages.SetBotPrecheckoutResults(
query_id=update.query_id,
success=True,