From d90133211c903c389881aaa148277fa96ef33532 Mon Sep 17 00:00:00 2001 From: Benny Date: Sun, 10 Sep 2023 14:08:11 +0200 Subject: [PATCH] Pyrogram2 (#291) * upgrade to pyrogram 2.x #288 * reformat code * update dependencies * remove instagram cookies * add cryptography * upgrade to py3.11 * temp fix --- Dockerfile | 4 +- docker-compose.yml | 2 +- requirements.txt | 4 +- worker.yml | 4 +- ytdlbot/database.py | 7 +-- ytdlbot/downloader.py | 11 ++++- ytdlbot/flower_tasks.py | 2 +- ytdlbot/tasks.py | 94 +++++++++++++++++++++++------------------ ytdlbot/utils.py | 17 ++++++++ ytdlbot/ytdl_bot.py | 42 +++++++++--------- 10 files changed, 111 insertions(+), 76 deletions(-) diff --git a/Dockerfile b/Dockerfile index 8dde3a9..3b8c86c 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,11 +1,11 @@ -FROM python:3.10-alpine as builder +FROM python:3.11-alpine as builder RUN apk update && apk add --no-cache tzdata alpine-sdk libffi-dev ca-certificates ADD requirements.txt /tmp/ RUN pip3 install --user -r /tmp/requirements.txt && rm /tmp/requirements.txt -FROM python:3.10-alpine +FROM python:3.11-alpine WORKDIR /ytdlbot/ytdlbot ENV TZ=Europe/Stockholm diff --git a/docker-compose.yml b/docker-compose.yml index 6c50943..d43a3f8 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -21,6 +21,7 @@ services: - ./db_data:/var/lib/mysql environment: MYSQL_ROOT_PASSWORD: 'root' + command: --default-authentication-plugin=mysql_native_password logging: driver: none @@ -33,7 +34,6 @@ services: - socat - redis volumes: - - ./data/instagram.com_cookies.txt:/ytdlbot/ytdlbot/instagram.com_cookies.txt - ./data/vnstat/:/var/lib/vnstat/ labels: - "com.centurylinklabs.watchtower.enable=true" diff --git a/requirements.txt b/requirements.txt index cd20e45..86ed364 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,11 +1,11 @@ -pyrogram==1.4.16 +pyrogram==2.0.106 tgcrypto==1.2.5 yt-dlp==2023.7.6 APScheduler==3.10.4 beautifultable==1.1.0 ffmpeg-python==0.2.0 PyMySQL==1.1.0 -celery==5.3.1 +celery==5.3.4 filetype==1.2.0 flower==2.0.1 psutil==5.9.5 diff --git a/worker.yml b/worker.yml index 30ba3a4..990e245 100644 --- a/worker.yml +++ b/worker.yml @@ -7,11 +7,9 @@ services: - env/ytdl.env restart: always command: [ "/usr/local/bin/supervisord", "-c" ,"/ytdlbot/conf/supervisor_worker.conf" ] - volumes: - - ./data/instagram.com_cookies.txt:/ytdlbot/ytdlbot/instagram.com_cookies.txt # network_mode: "host" # deploy: # resources: # limits: # cpus: '0.3' -# memory: 1500M \ No newline at end of file +# memory: 1500M diff --git a/ytdlbot/database.py b/ytdlbot/database.py index f064c54..db72114 100644 --- a/ytdlbot/database.py +++ b/ytdlbot/database.py @@ -254,7 +254,9 @@ class MySQL: self.con = pymysql.connect( host=MYSQL_HOST, user=MYSQL_USER, passwd=MYSQL_PASS, db="ytdl", charset="utf8mb4" ) + logging.debug("Used real MySQL connection.") except pymysql.err.OperationalError: + logging.warning("Using fake MySQL connection.") self.con = FakeMySQL() self.con.ping(reconnect=True) @@ -273,9 +275,8 @@ class MySQL: self.con.close() def get_user_settings(self, user_id: int) -> tuple: - cur = self.con.cursor() - cur.execute("SELECT * FROM settings WHERE user_id = %s", (user_id,)) - data = cur.fetchone() + self.cur.execute("SELECT * FROM settings WHERE user_id = %s", (user_id,)) + data = self.cur.fetchone() if data is None: return 100, "high", "video", "Celery" return data diff --git a/ytdlbot/downloader.py b/ytdlbot/downloader.py index 965b3ab..51d66a4 100644 --- a/ytdlbot/downloader.py +++ b/ytdlbot/downloader.py @@ -26,7 +26,14 @@ import requests import yt_dlp as ytdl from tqdm import tqdm -from config import AUDIO_FORMAT, ENABLE_ARIA2, ENABLE_FFMPEG, TG_MAX_SIZE, IPv6, SS_YOUTUBE +from config import ( + AUDIO_FORMAT, + ENABLE_ARIA2, + ENABLE_FFMPEG, + SS_YOUTUBE, + TG_MAX_SIZE, + IPv6, +) from limit import Payment from utils import adjust_formats, apply_log_formatter, current_time, sizeof_fmt @@ -35,7 +42,7 @@ apply_log_formatter() def edit_text(bot_msg, text: str): - key = f"{bot_msg.chat.id}-{bot_msg.message_id}" + key = f"{bot_msg.chat.id}-{bot_msg.id}" # if the key exists, we shouldn't send edit message if not r.exists(key): time.sleep(random.random()) diff --git a/ytdlbot/flower_tasks.py b/ytdlbot/flower_tasks.py index f580e95..31a40dc 100644 --- a/ytdlbot/flower_tasks.py +++ b/ytdlbot/flower_tasks.py @@ -11,4 +11,4 @@ from celery import Celery from config import BROKER -app = Celery('tasks', broker=BROKER, timezone="Asia/Shanghai") +app = Celery("tasks", broker=BROKER, timezone="Asia/Shanghai") diff --git a/ytdlbot/tasks.py b/ytdlbot/tasks.py index 94a1e5c..99f71fd 100644 --- a/ytdlbot/tasks.py +++ b/ytdlbot/tasks.py @@ -7,6 +7,7 @@ __author__ = "Benny " +import asyncio import logging import math import os @@ -15,6 +16,7 @@ import random import re import shutil import subprocess +import sys import tempfile import threading import time @@ -30,7 +32,7 @@ import requests from apscheduler.schedulers.background import BackgroundScheduler from celery import Celery from celery.worker.control import Panel -from pyrogram import Client, idle, types +from pyrogram import Client, enums, idle, types from pyrogram.types import InlineKeyboardButton, InlineKeyboardMarkup, Message from requests_toolbelt import MultipartEncoder, MultipartEncoderMonitor @@ -43,8 +45,8 @@ from config import ( ENABLE_QUEUE, ENABLE_VIP, OWNER, - RCLONE_PATH, RATE_LIMIT, + RCLONE_PATH, WORKERS, ) from constant import BotText @@ -71,24 +73,23 @@ app = Celery("tasks", broker=BROKER) redis = Redis() channel = Channel() -session = "ytdl-celery" -celery_client = create_app(session) +bot = create_app("ytdl-celery") def get_messages(chat_id, message_id): try: - return celery_client.get_messages(chat_id, message_id) + return bot.get_messages(chat_id, message_id) except ConnectionError as e: logging.critical("WTH!!! %s", e) - celery_client.start() - return celery_client.get_messages(chat_id, message_id) + bot.start() + return bot.get_messages(chat_id, message_id) @app.task(rate_limit=f"{RATE_LIMIT}/m") def ytdl_download_task(chat_id, message_id, url: str): logging.info("YouTube celery tasks started for %s", url) bot_msg = get_messages(chat_id, message_id) - ytdl_normal_download(celery_client, bot_msg, url) + ytdl_normal_download(bot, bot_msg, url) logging.info("YouTube celery tasks ended.") @@ -96,7 +97,7 @@ def ytdl_download_task(chat_id, message_id, url: str): def audio_task(chat_id, message_id): logging.info("Audio celery tasks started for %s-%s", chat_id, message_id) bot_msg = get_messages(chat_id, message_id) - normal_audio(celery_client, bot_msg) + normal_audio(bot, bot_msg) logging.info("Audio celery tasks ended.") @@ -116,7 +117,7 @@ def get_unique_clink(original_url: str, user_id: int): def direct_download_task(chat_id, message_id, url): logging.info("Direct download celery tasks started for %s", url) bot_msg = get_messages(chat_id, message_id) - direct_normal_download(celery_client, bot_msg, url) + direct_normal_download(bot, bot_msg, url) logging.info("Direct download celery tasks ended.") @@ -146,8 +147,8 @@ def ytdl_download_entrance(client: Client, bot_msg: types.Message, url: str, mod return mode = mode or payment.get_user_settings(chat_id)[-1] if ENABLE_CELERY and mode in [None, "Celery"]: - async_task(ytdl_download_task, chat_id, bot_msg.message_id, url) - # ytdl_download_task.delay(chat_id, bot_msg.message_id, url) + async_task(ytdl_download_task, chat_id, bot_msg.id, url) + # ytdl_download_task.delay(chat_id, bot_msg.id, url) else: ytdl_normal_download(client, bot_msg, url) except Exception as e: @@ -158,15 +159,15 @@ def ytdl_download_entrance(client: Client, bot_msg: types.Message, url: str, mod def direct_download_entrance(client: Client, bot_msg: typing.Union[types.Message, typing.Coroutine], url: str): if ENABLE_CELERY: direct_normal_download(client, bot_msg, url) - # direct_download_task.delay(bot_msg.chat.id, bot_msg.message_id, url) + # direct_download_task.delay(bot_msg.chat.id, bot_msg.id, url) else: direct_normal_download(client, bot_msg, url) def audio_entrance(client, bot_msg): if ENABLE_CELERY: - async_task(audio_task, bot_msg.chat.id, bot_msg.message_id) - # audio_task.delay(bot_msg.chat.id, bot_msg.message_id) + async_task(audio_task, bot_msg.chat.id, bot_msg.id) + # audio_task.delay(bot_msg.chat.id, bot_msg.id) else: normal_audio(client, bot_msg) @@ -205,7 +206,7 @@ def direct_normal_download(client: Client, bot_msg: typing.Union[types.Message, logging.info("Downloaded file %s", filename) st_size = os.stat(filepath).st_size - client.send_chat_action(chat_id, "upload_document") + client.send_chat_action(chat_id, enums.ChatAction.UPLOAD_DOCUMENT) client.send_document( bot_msg.chat.id, filepath, @@ -224,11 +225,11 @@ def normal_audio(client: Client, bot_msg: typing.Union[types.Message, typing.Cor ) orig_url: str = re.findall(r"https?://.*", bot_msg.caption)[0] with tempfile.TemporaryDirectory(prefix="ytdl-") as tmp: - client.send_chat_action(chat_id, "record_audio") + client.send_chat_action(chat_id, enums.ChatAction.RECORD_AUDIO) # just try to download the audio using yt-dlp filepath = ytdl_download(orig_url, tmp, status_msg, hijack="bestaudio[ext=m4a]") status_msg.edit_text("Sending audio now...") - client.send_chat_action(chat_id, "upload_audio") + client.send_chat_action(chat_id, enums.ChatAction.UPLOAD_AUDIO) for f in filepath: client.send_audio(chat_id, f) status_msg.edit_text("✅ Conversion complete.") @@ -264,18 +265,18 @@ def ytdl_normal_download(client: Client, bot_msg: typing.Union[types.Message, ty video_paths = ytdl_download(url, temp_dir.name, bot_msg) logging.info("Download complete.") - client.send_chat_action(chat_id, "upload_document") + client.send_chat_action(chat_id, enums.ChatAction.UPLOAD_DOCUMENT) bot_msg.edit_text("Download complete. Sending now...") try: upload_processor(client, bot_msg, url, video_paths) - except pyrogram.errors.Flood as e: + except pyrogram.errors.FloodWait as e: logging.critical("FloodWait from Telegram: %s", e) client.send_message( chat_id, - f"I'm being rate limited by Telegram. Your video will come after {e.x} seconds. Please wait patiently.", + f"I'm being rate limited by Telegram. Your video will come after {e.value} seconds. Please wait patiently.", ) flood_owner_message(client, e) - time.sleep(e.x) + time.sleep(e.value) upload_processor(client, bot_msg, url, video_paths) bot_msg.edit_text("Download success!✅") @@ -407,7 +408,7 @@ def upload_processor(client, bot_msg, url, vp_or_fid: typing.Union[str, list]): redis.add_send_cache(unique, getattr(obj, "file_id", None)) redis.update_metrics("video_success") if ARCHIVE_ID and isinstance(vp_or_fid, pathlib.Path): - client.forward_messages(bot_msg.chat.id, ARCHIVE_ID, res_msg.message_id) + client.forward_messages(bot_msg.chat.id, ARCHIVE_ID, res_msg.id) return res_msg @@ -449,14 +450,8 @@ def gen_cap(bm, url, video_path): def gen_video_markup(): markup = InlineKeyboardMarkup( - [ - [ # First row - InlineKeyboardButton( # Generates a callback query when pressed - "convert to audio", callback_data="convert" - ) - ] - ] - ) + [[InlineKeyboardButton("convert to audio", callback_data="convert")]] + ) # First row # Generates a callback query when pressed return markup @@ -506,28 +501,43 @@ def async_task(task_name, *args): task_name.apply_async(args=args, queue=destination) -def run_celery(): - worker_name = os.getenv("WORKER_NAME", "") - argv = ["-A", "tasks", "worker", "--loglevel=info", "--pool=threads", f"--concurrency={WORKERS}", "-n", worker_name] - if ENABLE_QUEUE: - argv.extend(["-Q", worker_name]) - app.worker_main(argv) - - def purge_tasks(): count = app.control.purge() return f"purged {count} tasks." +def run_celery(): + # 创建一个新的事件循环 + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + worker_name = os.getenv("WORKER_NAME", "") + argv = [ + "-A", + "tasks", + "worker", + "--loglevel=info", + "--pool=threads", + f"--concurrency={WORKERS}", + "-n", + worker_name, + ] + if ENABLE_QUEUE: + argv.extend(["-Q", worker_name]) + app.worker_main(argv) + except: + logging.warning("Celery worker failed to start.") + sys.exit(1) + + if __name__ == "__main__": - # celery_client.start() print("Bootstrapping Celery worker now.....") time.sleep(5) threading.Thread(target=run_celery, daemon=True).start() scheduler = BackgroundScheduler(timezone="Asia/Shanghai") - scheduler.add_job(auto_restart, "interval", seconds=900) + scheduler.add_job(auto_restart, "interval", seconds=120) scheduler.start() idle() - celery_client.stop() + bot.stop() diff --git a/ytdlbot/utils.py b/ytdlbot/utils.py index 1c48172..4d80617 100644 --- a/ytdlbot/utils.py +++ b/ytdlbot/utils.py @@ -12,11 +12,13 @@ import inspect as pyinspect import logging import os import pathlib +import re import shutil import subprocess import tempfile import time import uuid +from datetime import datetime import coloredlogs import ffmpeg @@ -189,6 +191,21 @@ class Detector: # logging.warning("Potential crash detected by %s, it's time to commit suicide...", self.func_name()) # return True + def fail_connect_detector(self): + # TODO: don't know why sometimes it stops connected to DC + last_line = self.logs.strip().split("\n")[-1] + try: + log_time_str = re.findall(r"\[(.*),", last_line)[0] + log_time = datetime.strptime(log_time_str, "%Y-%m-%d %H:%M:%S") + except Exception: + return + + time_difference = (datetime.now() - log_time).total_seconds() + + if ("Sending as video" in last_line or "PingTask started" in last_line) and time_difference > 60: + logging.warning("Can't connect to Telegram DC") + return True + def auto_restart(): log_path = "/var/log/ytdl.log" diff --git a/ytdlbot/ytdl_bot.py b/ytdlbot/ytdl_bot.py index 5fdb536..d4eae5c 100644 --- a/ytdlbot/ytdl_bot.py +++ b/ytdlbot/ytdl_bot.py @@ -22,7 +22,7 @@ import pyrogram.errors import requests import yt_dlp from apscheduler.schedulers.background import BackgroundScheduler -from pyrogram import Client, filters, types +from pyrogram import Client, enums, filters, types from pyrogram.errors.exceptions.bad_request_400 import UserNotParticipant from pyrogram.raw import functions from pyrogram.raw import types as raw_types @@ -71,7 +71,7 @@ def private_use(func): chat_id = getattr(message.from_user, "id", None) # message type check - if message.chat.type != "private" and not message.text.lower().startswith("/ytdl"): + if message.chat.type != enums.ChatType.PRIVATE and not message.text.lower().startswith("/ytdl"): logging.debug("%s, it's annoying me...🙄️ ", message.text) return @@ -114,7 +114,7 @@ def start_handler(client: Client, message: types.Message): payment = Payment() from_id = message.from_user.id logging.info("Welcome to youtube-dl bot!") - client.send_chat_action(from_id, "typing") + client.send_chat_action(from_id, enums.ChatAction.TYPING) is_old_user = payment.check_old_user(from_id) if is_old_user: info = "" @@ -130,21 +130,21 @@ def start_handler(client: Client, message: types.Message): @app.on_message(filters.command(["help"])) def help_handler(client: Client, message: types.Message): chat_id = message.chat.id - client.send_chat_action(chat_id, "typing") + client.send_chat_action(chat_id, enums.ChatAction.TYPING) client.send_message(chat_id, BotText.help, disable_web_page_preview=True) @app.on_message(filters.command(["about"])) def about_handler(client: Client, message: types.Message): chat_id = message.chat.id - client.send_chat_action(chat_id, "typing") + client.send_chat_action(chat_id, enums.ChatAction.TYPING) client.send_message(chat_id, BotText.about) @app.on_message(filters.command(["sub"])) def subscribe_handler(client: Client, message: types.Message): chat_id = message.chat.id - client.send_chat_action(chat_id, "typing") + client.send_chat_action(chat_id, enums.ChatAction.TYPING) if message.text == "/sub": result = channel.get_user_subscription(chat_id) else: @@ -159,7 +159,7 @@ def subscribe_handler(client: Client, message: types.Message): @app.on_message(filters.command(["unsub"])) def unsubscribe_handler(client: Client, message: types.Message): chat_id = message.chat.id - client.send_chat_action(chat_id, "typing") + client.send_chat_action(chat_id, enums.ChatAction.TYPING) text = message.text.split(" ") if len(text) == 1: client.send_message(chat_id, "/unsub channel_id", disable_web_page_preview=True) @@ -179,7 +179,7 @@ def patch_handler(client: Client, message: types.Message): chat_id = message.chat.id if username == OWNER: celery_app.control.broadcast("hot_patch") - client.send_chat_action(chat_id, "typing") + client.send_chat_action(chat_id, enums.ChatAction.TYPING) client.send_message(chat_id, "Oorah!") hot_patch() @@ -203,7 +203,7 @@ def purge_handler(client: Client, message: types.Message): @app.on_message(filters.command(["ping"])) def ping_handler(client: Client, message: types.Message): chat_id = message.chat.id - client.send_chat_action(chat_id, "typing") + client.send_chat_action(chat_id, enums.ChatAction.TYPING) if os.uname().sysname == "Darwin" or ".heroku" in os.getenv("PYTHONHOME", ""): bot_info = "ping unavailable." else: @@ -229,7 +229,7 @@ def sub_count_handler(client: Client, message: types.Message): @app.on_message(filters.command(["direct"])) def direct_handler(client: Client, message: types.Message): chat_id = message.from_user.id - client.send_chat_action(chat_id, "typing") + client.send_chat_action(chat_id, enums.ChatAction.TYPING) url = re.sub(r"/direct\s*", "", message.text) logging.info("direct start %s", url) if not re.findall(r"^https?://", url.lower()): @@ -246,7 +246,7 @@ def direct_handler(client: Client, message: types.Message): def settings_handler(client: Client, message: types.Message): chat_id = message.chat.id payment = Payment() - client.send_chat_action(chat_id, "typing") + client.send_chat_action(chat_id, enums.ChatAction.TYPING) data = MySQL().get_user_settings(chat_id) set_mode = data[-1] text = {"Local": "Celery", "Celery": "Local"}.get(set_mode, "Local") @@ -285,7 +285,7 @@ def buy_handler(client: Client, message: types.Message): # process as chat.id, not from_user.id chat_id = message.chat.id text = message.text.strip() - client.send_chat_action(chat_id, "typing") + client.send_chat_action(chat_id, enums.ChatAction.TYPING) client.send_message(chat_id, BotText.buy, disable_web_page_preview=True) # generate telegram invoice here payload = f"{message.chat.id}-buy" @@ -299,7 +299,7 @@ def buy_handler(client: Client, message: types.Message): price, f"Buy {TOKEN_PRICE} download tokens", "You can pay by Telegram payment or using link above", payload ) - app.send( + app.invoke( functions.messages.SendMedia( peer=(raw_types.InputPeerUser(user_id=chat_id, access_hash=0)), media=invoice, @@ -369,7 +369,7 @@ def link_checker(url: str) -> str: def download_handler(client: Client, message: types.Message): payment = Payment() chat_id = message.from_user.id - client.send_chat_action(chat_id, "typing") + client.send_chat_action(chat_id, enums.ChatAction.TYPING) redis.user_count(chat_id) if message.document: with tempfile.NamedTemporaryFile(mode="r+") as tf: @@ -412,19 +412,21 @@ def download_handler(client: Client, message: types.Message): try: # raise pyrogram.errors.exceptions.FloodWait(10) bot_msg: typing.Union[types.Message, typing.Coroutine] = message.reply_text(text, quote=True) - except pyrogram.errors.Flood as e: + except pyrogram.errors.FloodWait as e: f = BytesIO() f.write(str(e).encode()) f.write(b"Your job will be done soon. Just wait! Don't rush.") f.name = "Please don't flood me.txt" bot_msg = message.reply_document( - f, caption=f"Flood wait! Please wait {e.x} seconds...." f"Your job will start automatically", quote=True + f, + caption=f"Flood wait! Please wait {e.value} seconds...." f"Your job will start automatically", + quote=True, ) f.close() - client.send_message(OWNER, f"Flood wait! 🙁 {e.x} seconds....") - time.sleep(e.x) + client.send_message(OWNER, f"Flood wait! 🙁 {e.value} seconds....") + time.sleep(e.value) - client.send_chat_action(chat_id, "upload_video") + client.send_chat_action(chat_id, enums.ChatAction.UPLOAD_VIDEO) bot_msg.chat = message.chat ytdl_download_entrance(client, bot_msg, url) @@ -493,7 +495,7 @@ def raw_update(client: Client, update, users, chats): payment = Payment() action = getattr(getattr(update, "message", None), "action", None) if update.QUALNAME == "types.UpdateBotPrecheckoutQuery": - client.send( + client.invoke( functions.messages.SetBotPrecheckoutResults( query_id=update.query_id, success=True,