let's try pyrogram 2.x again

This commit is contained in:
Benny 2023-12-16 18:37:38 +01:00
parent acacfe58c9
commit f7243a4a93
No known key found for this signature in database
GPG key ID: 6CD0DBDA5235D481
16 changed files with 156 additions and 238 deletions

View file

@ -1,4 +1,5 @@
env
venv
db_data
.ash_history
.DS_Store
.DS_Store

3
.gitignore vendored
View file

@ -170,3 +170,6 @@ reinforcement/*
/ytdlbot/ytdl-main.session
/ytdlbot/ytdl-celery.session-journal
/ytdlbot/ytdl-celery.session
/ytdlbot/main.session
/ytdlbot/tasks.session
/ytdlbot/tasks.session-journal

0
.gitmodules vendored
View file

View file

@ -1,9 +0,0 @@
# This configuration file was automatically generated by Gitpod.
# Please adjust to your needs (see https://www.gitpod.io/docs/introduction/learn-gitpod/gitpod-yaml)
# and commit this file to your remote git repository to share the goodness with others.
# Learn more from ready-to-use templates: https://www.gitpod.io/docs/introduction/getting-started/quickstart
tasks:
- init: pip3 install -r requirements.txt
- init: sudo apt update && sudo apt install ffmpeg -y

View file

@ -1,16 +1,13 @@
FROM python:3.11-alpine as builder
RUN apk update && apk add --no-cache tzdata alpine-sdk libffi-dev ca-certificates
FROM python:3.11 as builder
ADD requirements.txt /tmp/
RUN pip3 install --user -r /tmp/requirements.txt && rm /tmp/requirements.txt
FROM python:3.11-alpine
FROM python:3.11-slim
WORKDIR /ytdlbot/ytdlbot
ENV TZ=Europe/Stockholm
ENV TZ=Europe/London
COPY apk.txt /tmp/
RUN apk update && xargs apk add < /tmp/apk.txt
RUN apt update && apt install -y --no-install-recommends --no-install-suggests ffmpeg vnstat git aria2
COPY --from=builder /root/.local /usr/local
COPY --from=builder /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/
COPY --from=builder /usr/share/zoneinfo /usr/share/zoneinfo

View file

@ -74,7 +74,7 @@ This bot can be deployed on any platform that supports Python.
To deploy this bot, follow these steps:
1. Install bot dependencies
* Install Python 3.8 or a later version, FFmpeg.
* Install Python 3.10 or a later version, FFmpeg.
* (optional)Aria2 and add it to the PATH.
2. Clone the code from the repository and cd into it.
@ -153,7 +153,6 @@ You can configure all the following environment variables:
* AUTHORIZED_USER: only authorized users can use the bot
* REQUIRED_MEMBERSHIP: group or channel username, user must join this group to use the bot
* ENABLE_CELERY: celery mode, default: disable
* ENABLE_QUEUE: celery queue
* BROKER: celery broker, should be redis://redis:6379/0
* MYSQL_HOST:MySQL host
* MYSQL_USER: MySQL username
@ -291,11 +290,11 @@ https://twitter.com/BennyThinks/status/1475836588542341124
## Test instagram
https://www.instagram.com/p/ClBSqo3PkJw/
https://www.instagram.com/p/CaiAHoWDnrM/
https://www.instagram.com/p/CZtUDyyv1u1/
* single image: https://www.instagram.com/p/CXpxSyOrWCA/
* single video: https://www.instagram.com/p/Cah_7gnDVUW/
* reels: https://www.instagram.com/p/C0ozGsjtY0W/
* image carousel: https://www.instagram.com/p/C0ozPQ5o536/
* video and image carousel: https://www.instagram.com/p/C0ozhsVo-m8/
# Donation

View file

@ -1 +0,0 @@
ffmpeg vnstat git aria2

View file

@ -1,4 +1,4 @@
pyrogram==1.4.16
pyrogram==2.0.106
tgcrypto==1.2.5
yt-dlp==2023.11.16
APScheduler==3.10.4
@ -11,7 +11,7 @@ flower==2.0.1
psutil==5.9.6
influxdb==5.3.1
beautifulsoup4==4.12.2
fakeredis==2.20.0
fakeredis==2.20.1
supervisor==4.2.5
tgbot-ping==1.0.7
redis==5.0.1

View file

@ -12,14 +12,12 @@ from pyrogram import Client
from config import APP_HASH, APP_ID, PYRO_WORKERS, TOKEN, IPv6
def create_app(session: str, workers: int = PYRO_WORKERS) -> Client:
_app = Client(
session,
def create_app(name: str, workers: int = PYRO_WORKERS) -> Client:
return Client(
name,
APP_ID,
APP_HASH,
bot_token=TOKEN,
workers=workers,
ipv6=IPv6,
)
return _app

View file

@ -12,8 +12,8 @@ import os
from blinker import signal
# general settings
WORKERS: int = int(os.getenv("WORKERS", 100))
PYRO_WORKERS: int = int(os.getenv("PYRO_WORKERS", min(64, (os.cpu_count() + 4) * 10)))
WORKERS: int = int(os.getenv("WORKERS", 10))
PYRO_WORKERS: int = int(os.getenv("PYRO_WORKERS", min(32, (os.cpu_count() or 0) + 4)))
APP_ID: int = int(os.getenv("APP_ID", 198214))
APP_HASH = os.getenv("APP_HASH", "1234b90")
TOKEN = os.getenv("TOKEN", "1234")
@ -36,7 +36,6 @@ REQUIRED_MEMBERSHIP: str = os.getenv("REQUIRED_MEMBERSHIP", "")
# celery related
ENABLE_CELERY = os.getenv("ENABLE_CELERY", False)
ENABLE_QUEUE = os.getenv("ENABLE_QUEUE", False)
BROKER = os.getenv("BROKER", f"redis://{REDIS}:6379/4")
MYSQL_HOST = os.getenv("MYSQL_HOST", "mysql")
@ -49,7 +48,6 @@ ARCHIVE_ID = os.getenv("ARCHIVE_ID")
IPv6 = os.getenv("IPv6", False)
ENABLE_FFMPEG = os.getenv("ENABLE_FFMPEG", False)
PLAYLIST_SUPPORT = os.getenv("PLAYLIST_SUPPORT", False)
M3U8_SUPPORT = os.getenv("M3U8_SUPPORT", False)
ENABLE_ARIA2 = os.getenv("ENABLE_ARIA2", False)

View file

@ -75,7 +75,7 @@ class Redis:
self.r = redis.StrictRedis(host=REDIS, db=0, decode_responses=True)
self.r.ping()
except Exception:
logging.warning("Redis connection failed, using fake redis instead.")
logging.debug("Redis connection failed, using fake redis instead.")
self.r = fakeredis.FakeStrictRedis(host=REDIS, db=0, decode_responses=True)
db_banner = "=" * 20 + "DB data" + "=" * 20
@ -256,7 +256,7 @@ class MySQL:
host=MYSQL_HOST, user=MYSQL_USER, passwd=MYSQL_PASS, db="ytdl", charset="utf8mb4"
)
except Exception:
logging.warning("MySQL connection failed, using fake mysql instead.")
logging.debug("MySQL connection failed, using fake mysql instead.")
self.con = FakeMySQL()
self.con.ping(reconnect=True)

View file

@ -24,15 +24,10 @@ import ffpb
import filetype
import requests
import yt_dlp as ytdl
from pyrogram import types
from tqdm import tqdm
from config import (
AUDIO_FORMAT,
ENABLE_ARIA2,
ENABLE_FFMPEG,
TG_MAX_SIZE,
IPv6,
)
from config import AUDIO_FORMAT, ENABLE_ARIA2, ENABLE_FFMPEG, TG_MAX_SIZE, IPv6
from limit import Payment
from utils import adjust_formats, apply_log_formatter, current_time, sizeof_fmt
@ -40,8 +35,8 @@ r = fakeredis.FakeStrictRedis()
apply_log_formatter()
def edit_text(bot_msg, text: str):
key = f"{bot_msg.chat.id}-{bot_msg.message_id}"
def edit_text(bot_msg: types.Message, text: str):
key = f"{bot_msg.chat.id}-{bot_msg.id}"
# if the key exists, we shouldn't send edit message
if not r.exists(key):
time.sleep(random.random())
@ -87,7 +82,7 @@ def remove_bash_color(text):
def download_hook(d: dict, bot_msg):
# since we're using celery, server location may be located in different continent.
# since we're using celery, server location may be located in different region.
# Therefore, we can't trigger the hook very often.
# the key is user_id + download_link
original_url = d["info_dict"]["original_url"]

View file

@ -155,8 +155,8 @@ class TronTrx:
cur.execute("select user_id, payment_id from payment where payment_id like 'tron,0,T%'")
data = cur.fetchall()
logging.info("Checking unpaid payment...%s", data)
for row in data:
logging.info("Checking user payment %s", row)
user_id = row[0]
addr, index = row[1].split(",")[2:]
try:

View file

@ -7,11 +7,10 @@
__author__ = "Benny <benny.think@gmail.com>"
import asyncio
import logging
import math
import os
import pathlib
import random
import re
import shutil
import subprocess
@ -20,7 +19,7 @@ import threading
import time
import traceback
import typing
from hashlib import md5
from typing import Any
from urllib.parse import quote_plus
import filetype
@ -30,9 +29,7 @@ import requests
from apscheduler.schedulers.background import BackgroundScheduler
from celery import Celery
from celery.worker.control import Panel
from pyrogram import Client, idle, types
from pyrogram.types import InlineKeyboardButton, InlineKeyboardMarkup, Message
from requests_toolbelt import MultipartEncoder, MultipartEncoderMonitor
from pyrogram import Client, enums, idle, types
from channel import Channel
from client_init import create_app
@ -40,7 +37,6 @@ from config import (
ARCHIVE_ID,
BROKER,
ENABLE_CELERY,
ENABLE_QUEUE,
ENABLE_VIP,
OWNER,
RATE_LIMIT,
@ -66,37 +62,33 @@ apply_log_formatter()
bot_text = BotText()
logging.getLogger("apscheduler.executors.default").propagate = False
# celery -A tasks worker --loglevel=info --pool=solo
# app = Celery('celery', broker=BROKER, accept_content=['pickle'], task_serializer='pickle')
app = Celery("tasks", broker=BROKER)
bot = create_app("tasks")
channel = Channel()
session = "ytdl-celery"
celery_client = create_app(session)
def get_messages(chat_id, message_id):
def get_messages(chat_id: int, message_id: int):
try:
return celery_client.get_messages(chat_id, message_id)
return bot.get_messages(chat_id, message_id)
except ConnectionError as e:
logging.critical("WTH!!! %s", e)
celery_client.start()
return celery_client.get_messages(chat_id, message_id)
logging.critical("CRITICAL ERROR: %s", e)
bot.start()
return bot.get_messages(chat_id, message_id)
@app.task(rate_limit=f"{RATE_LIMIT}/m")
def ytdl_download_task(chat_id, message_id, url: str):
def ytdl_download_task(chat_id: int, message_id: int, url: str):
logging.info("YouTube celery tasks started for %s", url)
bot_msg = get_messages(chat_id, message_id)
ytdl_normal_download(celery_client, bot_msg, url)
ytdl_normal_download(bot_msg, url)
logging.info("YouTube celery tasks ended.")
@app.task()
def audio_task(chat_id, message_id):
def audio_task(chat_id: int, message_id: int):
logging.info("Audio celery tasks started for %s-%s", chat_id, message_id)
bot_msg = get_messages(chat_id, message_id)
normal_audio(celery_client, bot_msg)
normal_audio(bot_msg)
logging.info("Audio celery tasks ended.")
@ -113,14 +105,14 @@ def get_unique_clink(original_url: str, user_id: int):
@app.task()
def direct_download_task(chat_id, message_id, url):
def direct_download_task(chat_id: int, message_id: int, url: str):
logging.info("Direct download celery tasks started for %s", url)
bot_msg = get_messages(chat_id, message_id)
direct_normal_download(celery_client, bot_msg, url)
direct_normal_download(bot, bot_msg, url)
logging.info("Direct download celery tasks ended.")
def forward_video(client, bot_msg, url: str):
def forward_video(bot_msg: types.Message, url: str):
redis = Redis()
chat_id = bot_msg.chat.id
unique = get_unique_clink(url, chat_id)
@ -129,28 +121,27 @@ def forward_video(client, bot_msg, url: str):
redis.update_metrics("cache_miss")
return False
res_msg: "Message" = upload_processor(client, bot_msg, url, cached_fid)
res_msg = upload_processor(bot_msg, url, cached_fid)
obj = res_msg.document or res_msg.video or res_msg.audio or res_msg.animation or res_msg.photo
caption, _ = gen_cap(bot_msg, url, obj)
res_msg.edit_text(caption, reply_markup=gen_video_markup())
bot_msg.edit_text(f"Download success!✅✅✅")
bot_msg.edit_text(f"Download success!✅")
redis.update_metrics("cache_hit")
return True
def ytdl_download_entrance(client: Client, bot_msg: types.Message, url: str, mode=None):
def ytdl_download_entrance(bot_msg: types.Message, url: str, mode=None):
payment = Payment()
chat_id = bot_msg.chat.id
try:
if forward_video(client, bot_msg, url):
if forward_video(bot_msg, url):
return
mode = mode or payment.get_user_settings(chat_id)[-1]
if ENABLE_CELERY and mode in [None, "Celery"]:
async_task(ytdl_download_task, chat_id, bot_msg.message_id, url)
# ytdl_download_task.delay(chat_id, bot_msg.message_id, url)
ytdl_download_task.delay(chat_id, bot_msg.id, url)
else:
ytdl_normal_download(client, bot_msg, url)
ytdl_normal_download(bot_msg, url)
except Exception as e:
logging.error("Failed to download %s, error: %s", url, e)
bot_msg.edit_text(f"Download failed!❌\n\n`{traceback.format_exc()[0:4000]}`", disable_web_page_preview=True)
@ -159,23 +150,22 @@ def ytdl_download_entrance(client: Client, bot_msg: types.Message, url: str, mod
def direct_download_entrance(client: Client, bot_msg: typing.Union[types.Message, typing.Coroutine], url: str):
if ENABLE_CELERY:
direct_normal_download(client, bot_msg, url)
# direct_download_task.delay(bot_msg.chat.id, bot_msg.message_id, url)
# direct_download_task.delay(bot_msg.chat.id, bot_msg.id, url)
else:
direct_normal_download(client, bot_msg, url)
def audio_entrance(client, bot_msg):
def audio_entrance(bot_msg: types.Message):
if ENABLE_CELERY:
async_task(audio_task, bot_msg.chat.id, bot_msg.message_id)
# audio_task.delay(bot_msg.chat.id, bot_msg.message_id)
audio_task.delay(bot_msg.chat.id, bot_msg.id)
else:
normal_audio(client, bot_msg)
normal_audio(bot_msg)
def direct_normal_download(client: Client, bot_msg: typing.Union[types.Message, typing.Coroutine], url: str):
chat_id = bot_msg.chat.id
headers = {
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.149 Safari/537.36"
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.3987.149 Safari/537.36"
}
length = 0
@ -206,7 +196,7 @@ def direct_normal_download(client: Client, bot_msg: typing.Union[types.Message,
logging.info("Downloaded file %s", filename)
st_size = os.stat(filepath).st_size
client.send_chat_action(chat_id, "upload_document")
client.send_chat_action(chat_id, enums.ChatAction.UPLOAD_DOCUMENT)
client.send_document(
bot_msg.chat.id,
filepath,
@ -217,7 +207,7 @@ def direct_normal_download(client: Client, bot_msg: typing.Union[types.Message,
bot_msg.edit_text("Download success!✅")
def normal_audio(client: Client, bot_msg: typing.Union[types.Message, typing.Coroutine]):
def normal_audio(bot_msg: typing.Union[types.Message, typing.Coroutine]):
chat_id = bot_msg.chat.id
# fn = getattr(bot_msg.video, "file_name", None) or getattr(bot_msg.document, "file_name", None)
status_msg: typing.Union[types.Message, typing.Coroutine] = bot_msg.reply_text(
@ -225,59 +215,36 @@ def normal_audio(client: Client, bot_msg: typing.Union[types.Message, typing.Cor
)
orig_url: str = re.findall(r"https?://.*", bot_msg.caption)[0]
with tempfile.TemporaryDirectory(prefix="ytdl-", dir=TMPFILE_PATH) as tmp:
client.send_chat_action(chat_id, "record_audio")
bot.send_chat_action(chat_id, enums.ChatAction.RECORD_AUDIO)
# just try to download the audio using yt-dlp
filepath = ytdl_download(orig_url, tmp, status_msg, hijack="bestaudio[ext=m4a]")
status_msg.edit_text("Sending audio now...")
client.send_chat_action(chat_id, "upload_audio")
bot.send_chat_action(chat_id, enums.ChatAction.UPLOAD_AUDIO)
for f in filepath:
client.send_audio(chat_id, f)
bot.send_audio(chat_id, f)
status_msg.edit_text("✅ Conversion complete.")
Redis().update_metrics("audio_success")
def get_dl_source():
worker_name = os.getenv("WORKER_NAME")
if worker_name:
return f"Downloaded by {worker_name}"
return ""
def upload_transfer_sh(bm, paths: list) -> str:
d = {p.name: (md5(p.name.encode("utf8")).hexdigest() + p.suffix, p.open("rb")) for p in paths}
monitor = MultipartEncoderMonitor(MultipartEncoder(fields=d), lambda x: upload_hook(x.bytes_read, x.len, bm))
headers = {"Content-Type": monitor.content_type}
try:
req = requests.post("https://transfer.sh", data=monitor, headers=headers)
bm.edit_text(f"Download success!✅")
return re.sub(r"https://", "\nhttps://", req.text)
except requests.exceptions.RequestException as e:
return f"Upload failed!❌\n\n```{e}```"
def flood_owner_message(client, ex):
client.send_message(OWNER, f"CRITICAL INFO: {ex}")
def ytdl_normal_download(client: Client, bot_msg: typing.Union[types.Message, typing.Coroutine], url: str):
def ytdl_normal_download(bot_msg: types.Message | typing.Any, url: str):
chat_id = bot_msg.chat.id
temp_dir = tempfile.TemporaryDirectory(prefix="ytdl-", dir=TMPFILE_PATH)
video_paths = ytdl_download(url, temp_dir.name, bot_msg)
logging.info("Download complete.")
client.send_chat_action(chat_id, "upload_document")
bot.send_chat_action(chat_id, enums.ChatAction.UPLOAD_DOCUMENT)
bot_msg.edit_text("Download complete. Sending now...")
try:
upload_processor(client, bot_msg, url, video_paths)
upload_processor(bot_msg, url, video_paths)
except pyrogram.errors.Flood as e:
logging.critical("FloodWait from Telegram: %s", e)
client.send_message(
bot.send_message(
chat_id,
f"I'm being rate limited by Telegram. Your video will come after {e.x} seconds. Please wait patiently.",
f"I'm being rate limited by Telegram. Your video will come after {e} seconds. Please wait patiently.",
)
flood_owner_message(client, e)
time.sleep(e.x)
upload_processor(client, bot_msg, url, video_paths)
bot.send_message(OWNER, f"CRITICAL INFO: {e}")
time.sleep(e.value)
upload_processor(bot_msg, url, video_paths)
bot_msg.edit_text("Download success!✅")
@ -306,7 +273,7 @@ def generate_input_media(file_paths: list, cap: str) -> list:
return input_media
def upload_processor(client, bot_msg, url, vp_or_fid: typing.Union[str, list]):
def upload_processor(bot_msg: types.Message, url: str, vp_or_fid: str | list):
redis = Redis()
# raise pyrogram.errors.exceptions.FloodWait(13)
# if is str, it's a file id; else it's a list of paths
@ -316,7 +283,7 @@ def upload_processor(client, bot_msg, url, vp_or_fid: typing.Union[str, list]):
if isinstance(vp_or_fid, list) and len(vp_or_fid) > 1:
# just generate the first for simplicity, send as media group(2-20)
cap, meta = gen_cap(bot_msg, url, vp_or_fid[0])
res_msg = client.send_media_group(chat_id, generate_input_media(vp_or_fid, cap))
res_msg: list["types.Message"] | Any = bot.send_media_group(chat_id, generate_input_media(vp_or_fid, cap))
# TODO no cache for now
return res_msg[0]
elif isinstance(vp_or_fid, list) and len(vp_or_fid) == 1:
@ -335,7 +302,7 @@ def upload_processor(client, bot_msg, url, vp_or_fid: typing.Union[str, list]):
logging.info("Sending as document")
try:
# send as document could be sent as video even if it's a document
res_msg = client.send_document(
res_msg = bot.send_document(
chat_id,
vp_or_fid,
caption=cap,
@ -347,7 +314,7 @@ def upload_processor(client, bot_msg, url, vp_or_fid: typing.Union[str, list]):
)
except ValueError:
logging.error("Retry to send as video")
res_msg = client.send_video(
res_msg = bot.send_video(
chat_id,
vp_or_fid,
supports_streaming=True,
@ -359,7 +326,7 @@ def upload_processor(client, bot_msg, url, vp_or_fid: typing.Union[str, list]):
)
elif settings[2] == "audio":
logging.info("Sending as audio")
res_msg = client.send_audio(
res_msg = bot.send_audio(
chat_id,
vp_or_fid,
caption=cap,
@ -370,7 +337,7 @@ def upload_processor(client, bot_msg, url, vp_or_fid: typing.Union[str, list]):
# settings==video
logging.info("Sending as video")
try:
res_msg = client.send_video(
res_msg = bot.send_video(
chat_id,
vp_or_fid,
supports_streaming=True,
@ -384,7 +351,7 @@ def upload_processor(client, bot_msg, url, vp_or_fid: typing.Union[str, list]):
# try to send as annimation, photo
try:
logging.warning("Retry to send as animation")
res_msg = client.send_animation(
res_msg = bot.send_animation(
chat_id,
vp_or_fid,
caption=cap,
@ -396,7 +363,7 @@ def upload_processor(client, bot_msg, url, vp_or_fid: typing.Union[str, list]):
except Exception:
# this is likely a photo
logging.warning("Retry to send as photo")
res_msg = client.send_photo(
res_msg = bot.send_photo(
chat_id,
vp_or_fid,
caption=cap,
@ -409,7 +376,7 @@ def upload_processor(client, bot_msg, url, vp_or_fid: typing.Union[str, list]):
redis.add_send_cache(unique, getattr(obj, "file_id", None))
redis.update_metrics("video_success")
if ARCHIVE_ID and isinstance(vp_or_fid, pathlib.Path):
client.forward_messages(bot_msg.chat.id, ARCHIVE_ID, res_msg.message_id)
bot.forward_messages(bot_msg.chat.id, ARCHIVE_ID, res_msg.id)
return res_msg
@ -441,7 +408,11 @@ def gen_cap(bm, url, video_path):
remain = f"Download token count: free {free}, pay {pay}"
else:
remain = ""
worker = get_dl_source()
if worker_name := os.getenv("WORKER_NAME"):
worker = f"Downloaded by {worker_name}"
else:
worker = ""
cap = (
f"{user_info}\n{file_name}\n\n{url}\n\nInfo: {meta['width']}x{meta['height']} {file_size}\t"
f"{meta['duration']}s\n{remain}\n{worker}\n{bot_text.custom_text}"
@ -450,10 +421,10 @@ def gen_cap(bm, url, video_path):
def gen_video_markup():
markup = InlineKeyboardMarkup(
markup = types.InlineKeyboardMarkup(
[
[ # First row
InlineKeyboardButton( # Generates a callback query when pressed
types.InlineKeyboardButton( # Generates a callback query when pressed
"convert to audio", callback_data="convert"
)
]
@ -472,7 +443,6 @@ def hot_patch(*args):
app_path = pathlib.Path().cwd().parent
logging.info("Hot patching on path %s...", app_path)
apk_install = "xargs apk add < apk.txt"
pip_install = "pip install -r requirements.txt"
unset = "git config --unset http.https://github.com/.extraheader"
pull_unshallow = "git pull origin --unshallow"
@ -484,52 +454,32 @@ def hot_patch(*args):
subprocess.call(pull, shell=True, cwd=app_path)
logging.info("Code is updated, applying hot patch now...")
subprocess.call(apk_install, shell=True, cwd=app_path)
subprocess.call(pip_install, shell=True, cwd=app_path)
psutil.Process().kill()
def async_task(task_name, *args):
if not ENABLE_QUEUE:
task_name.delay(*args)
return
t0 = time.time()
inspect = app.control.inspect()
worker_stats = inspect.stats()
route_queues = []
padding = math.ceil(sum([i["pool"]["max-concurrency"] for i in worker_stats.values()]) / len(worker_stats))
for worker_name, stats in worker_stats.items():
route = worker_name.split("@")[1]
concurrency = stats["pool"]["max-concurrency"]
route_queues.extend([route] * (concurrency + padding))
destination = random.choice(route_queues)
logging.info("Selecting worker %s from %s in %.2fs", destination, route_queues, time.time() - t0)
task_name.apply_async(args=args, queue=destination)
def run_celery():
worker_name = os.getenv("WORKER_NAME", "")
argv = ["-A", "tasks", "worker", "--loglevel=info", "--pool=threads", f"--concurrency={WORKERS}", "-n", worker_name]
if ENABLE_QUEUE:
argv.extend(["-Q", worker_name])
app.worker_main(argv)
def purge_tasks():
count = app.control.purge()
return f"purged {count} tasks."
def run_celery():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
worker_name = os.getenv("WORKER_NAME", "")
argv = ["-A", "tasks", "worker", "--loglevel=info", "--pool=threads", f"--concurrency={WORKERS}", "-n", worker_name]
app.worker_main(argv)
if __name__ == "__main__":
# celery_client.start()
bot.start()
print("Bootstrapping Celery worker now.....")
time.sleep(5)
time.sleep(3)
threading.Thread(target=run_celery, daemon=True).start()
scheduler = BackgroundScheduler(timezone="Asia/Shanghai")
scheduler = BackgroundScheduler(timezone="Europe/London")
scheduler.add_job(auto_restart, "interval", seconds=900)
scheduler.start()
idle()
celery_client.stop()
bot.stop()

View file

@ -166,7 +166,6 @@ class Detector:
"types.UpdatesTooLong",
"Got shutdown from remote",
"Code is updated",
'Retrying "messages.GetMessages"',
"OSError: Connection lost",
"[Errno -3] Try again",
"MISCONF",
@ -179,7 +178,7 @@ class Detector:
def next_salt_detector(self):
text = "Next salt in"
if self.logs.count(text) >= 4:
if self.logs.count(text) >= 5:
logging.critical("Next salt crash: %s", self.func_name())
return True
@ -187,17 +186,17 @@ class Detector:
text = "The msg_id is too low"
if text in self.logs:
logging.critical("msg id crash: %s ", self.func_name())
for item in pathlib.Path(__file__).parent.glob("ytdl-*"):
for item in pathlib.Path(__file__).parent.glob("*.session"):
logging.warning("Removing session file: %s", item)
item.unlink(missing_ok=True)
time.sleep(3)
return True
# def idle_detector(self):
# mtime = os.stat("/var/log/ytdl.log").st_mtime
# cur_ts = time.time()
# if cur_ts - mtime > 7200:
# logging.warning("Potential crash detected by %s, it's time to commit suicide...", self.func_name())
# return True
def sqlite_locked_detector(self):
text = "sqlite3.OperationalError: database is locked"
if text in self.logs:
logging.critical("database is locked: %s ", self.func_name())
return True
def auto_restart():

View file

@ -15,18 +15,17 @@ import re
import tempfile
import time
import traceback
import typing
from io import BytesIO
from typing import Any
import pyrogram.errors
import qrcode
import yt_dlp
from apscheduler.schedulers.background import BackgroundScheduler
from pyrogram import Client, filters, types
from pyrogram import Client, enums, filters, types
from pyrogram.errors.exceptions.bad_request_400 import UserNotParticipant
from pyrogram.raw import functions
from pyrogram.raw import types as raw_types
from pyrogram.types import InlineKeyboardButton, InlineKeyboardMarkup
from tgbot_ping import get_runtime
from youtubesearchpython import VideosSearch
@ -59,12 +58,11 @@ from tasks import (
)
from utils import auto_restart, clean_tempfile, customize_logger, get_revision
logging.info("Authorized users are %s", AUTHORIZED_USER)
customize_logger(["pyrogram.client", "pyrogram.session.session", "pyrogram.connection.connection"])
logging.getLogger("apscheduler.executors.default").propagate = False
app = create_app(":memory:")
logging.info("Authorized users are %s", AUTHORIZED_USER)
app = create_app("main")
channel = Channel()
@ -73,7 +71,7 @@ def private_use(func):
chat_id = getattr(message.from_user, "id", None)
# message type check
if message.chat.type != "private" and not message.text.lower().startswith("/ytdl"):
if message.chat.type != enums.ChatType.PRIVATE and not message.text.lower().startswith("/ytdl"):
logging.debug("%s, it's annoying me...🙄️ ", message.text)
return
@ -89,14 +87,11 @@ def private_use(func):
if REQUIRED_MEMBERSHIP:
try:
member: typing.Union[types.ChatMember, typing.Coroutine] = app.get_chat_member(
REQUIRED_MEMBERSHIP, chat_id
)
member: types.ChatMember | Any = app.get_chat_member(REQUIRED_MEMBERSHIP, chat_id)
if member.status not in [
"creator",
"administrator",
"member",
"owner",
enums.ChatMemberStatus.ADMINISTRATOR,
enums.ChatMemberStatus.MEMBER,
enums.ChatMemberStatus.OWNER,
]:
raise UserNotParticipant()
else:
@ -116,11 +111,11 @@ def start_handler(client: Client, message: types.Message):
payment = Payment()
from_id = message.from_user.id
logging.info("%s welcome to youtube-dl bot!", message.from_user.id)
client.send_chat_action(from_id, "typing")
client.send_chat_action(from_id, enums.ChatAction.TYPING)
is_old_user = payment.check_old_user(from_id)
if is_old_user:
info = ""
elif ENABLE_VIP:
if ENABLE_VIP:
free_token, pay_token, reset = payment.get_token(from_id)
info = f"Free token: {free_token}, Pay token: {pay_token}, Reset: {reset}"
else:
@ -132,21 +127,21 @@ def start_handler(client: Client, message: types.Message):
@app.on_message(filters.command(["help"]))
def help_handler(client: Client, message: types.Message):
chat_id = message.chat.id
client.send_chat_action(chat_id, "typing")
client.send_chat_action(chat_id, enums.ChatAction.TYPING)
client.send_message(chat_id, BotText.help, disable_web_page_preview=True)
@app.on_message(filters.command(["about"]))
def about_handler(client: Client, message: types.Message):
chat_id = message.chat.id
client.send_chat_action(chat_id, "typing")
client.send_chat_action(chat_id, enums.ChatAction.TYPING)
client.send_message(chat_id, BotText.about)
@app.on_message(filters.command(["sub"]))
def subscribe_handler(client: Client, message: types.Message):
chat_id = message.chat.id
client.send_chat_action(chat_id, "typing")
client.send_chat_action(chat_id, enums.ChatAction.TYPING)
if message.text == "/sub":
result = channel.get_user_subscription(chat_id)
else:
@ -161,7 +156,7 @@ def subscribe_handler(client: Client, message: types.Message):
@app.on_message(filters.command(["unsub"]))
def unsubscribe_handler(client: Client, message: types.Message):
chat_id = message.chat.id
client.send_chat_action(chat_id, "typing")
client.send_chat_action(chat_id, enums.ChatAction.TYPING)
text = message.text.split(" ")
if len(text) == 1:
client.send_message(chat_id, "/unsub channel_id", disable_web_page_preview=True)
@ -181,7 +176,7 @@ def patch_handler(client: Client, message: types.Message):
chat_id = message.chat.id
if username == OWNER:
celery_app.control.broadcast("hot_patch")
client.send_chat_action(chat_id, "typing")
client.send_chat_action(chat_id, enums.ChatAction.TYPING)
client.send_message(chat_id, "Oorah!")
hot_patch()
@ -206,7 +201,7 @@ def purge_handler(client: Client, message: types.Message):
def ping_handler(client: Client, message: types.Message):
redis = Redis()
chat_id = message.chat.id
client.send_chat_action(chat_id, "typing")
client.send_chat_action(chat_id, enums.ChatAction.TYPING)
if os.uname().sysname == "Darwin" or ".heroku" in os.getenv("PYTHONHOME", ""):
bot_info = "ping unavailable."
else:
@ -233,7 +228,7 @@ def sub_count_handler(client: Client, message: types.Message):
def direct_handler(client: Client, message: types.Message):
redis = Redis()
chat_id = message.from_user.id
client.send_chat_action(chat_id, "typing")
client.send_chat_action(chat_id, enums.ChatAction.TYPING)
url = re.sub(r"/direct\s*", "", message.text)
logging.info("direct start %s", url)
if not re.findall(r"^https?://", url.lower()):
@ -250,27 +245,27 @@ def direct_handler(client: Client, message: types.Message):
def settings_handler(client: Client, message: types.Message):
chat_id = message.chat.id
payment = Payment()
client.send_chat_action(chat_id, "typing")
client.send_chat_action(chat_id, enums.ChatAction.TYPING)
data = MySQL().get_user_settings(chat_id)
set_mode = data[-1]
text = {"Local": "Celery", "Celery": "Local"}.get(set_mode, "Local")
mode_text = f"Download mode: **{set_mode}**"
if message.chat.username == OWNER or payment.get_pay_token(chat_id):
extra = [InlineKeyboardButton(f"Change download mode to {text}", callback_data=text)]
extra = [types.InlineKeyboardButton(f"Change download mode to {text}", callback_data=text)]
else:
extra = []
markup = InlineKeyboardMarkup(
markup = types.InlineKeyboardMarkup(
[
[ # First row
InlineKeyboardButton("send as document", callback_data="document"),
InlineKeyboardButton("send as video", callback_data="video"),
InlineKeyboardButton("send as audio", callback_data="audio"),
types.InlineKeyboardButton("send as document", callback_data="document"),
types.InlineKeyboardButton("send as video", callback_data="video"),
types.InlineKeyboardButton("send as audio", callback_data="audio"),
],
[ # second row
InlineKeyboardButton("High Quality", callback_data="high"),
InlineKeyboardButton("Medium Quality", callback_data="medium"),
InlineKeyboardButton("Low Quality", callback_data="low"),
types.InlineKeyboardButton("High Quality", callback_data="high"),
types.InlineKeyboardButton("Medium Quality", callback_data="medium"),
types.InlineKeyboardButton("Low Quality", callback_data="low"),
],
extra,
]
@ -288,7 +283,7 @@ def settings_handler(client: Client, message: types.Message):
def buy_handler(client: Client, message: types.Message):
# process as chat.id, not from_user.id
chat_id = message.chat.id
client.send_chat_action(chat_id, "typing")
client.send_chat_action(chat_id, enums.ChatAction.TYPING)
# currency USD
token_count = message.text.replace("/buy", "").strip()
if token_count.isdigit():
@ -296,11 +291,11 @@ def buy_handler(client: Client, message: types.Message):
else:
price = 100
markup = InlineKeyboardMarkup(
markup = types.InlineKeyboardMarkup(
[
[
InlineKeyboardButton("Bot Payments", callback_data=f"bot-payments-{price}"),
InlineKeyboardButton("TRON(TRX)", callback_data="tron-trx"),
types.InlineKeyboardButton("Bot Payments", callback_data=f"bot-payments-{price}"),
types.InlineKeyboardButton("TRON(TRX)", callback_data="tron-trx"),
],
]
)
@ -311,7 +306,7 @@ def buy_handler(client: Client, message: types.Message):
def tronpayment_btn_calback(client: Client, callback_query: types.CallbackQuery):
callback_query.answer("Generating QR code...")
chat_id = callback_query.message.chat.id
client.send_chat_action(chat_id, "typing")
client.send_chat_action(chat_id, enums.ChatAction.TYPING)
addr = TronTrx().get_payment_address(chat_id)
with BytesIO() as bio:
@ -324,13 +319,13 @@ def tronpayment_btn_calback(client: Client, callback_query: types.CallbackQuery)
def bot_payment_btn_calback(client: Client, callback_query: types.CallbackQuery):
callback_query.answer("Generating invoice...")
chat_id = callback_query.message.chat.id
client.send_chat_action(chat_id, "typing")
client.send_chat_action(chat_id, enums.ChatAction.TYPING)
data = callback_query.data
price = int(data.split("-")[-1])
payload = f"{chat_id}-buy"
invoice = generate_invoice(price, f"Buy {TOKEN_PRICE} download tokens", "Pay by card", payload)
app.send(
app.invoke(
functions.messages.SendMedia(
peer=(raw_types.InputPeerUser(user_id=chat_id, access_hash=0)),
media=invoice,
@ -384,10 +379,9 @@ def link_checker(url: str) -> str:
def search_ytb(kw: str):
videosSearch = VideosSearch(kw, limit=10)
videos_search = VideosSearch(kw, limit=10)
text = ""
results = videosSearch.result()["result"]
results = videos_search.result()["result"]
for item in results:
title = item.get("title")
link = item.get("link")
@ -402,7 +396,7 @@ def download_handler(client: Client, message: types.Message):
redis = Redis()
payment = Payment()
chat_id = message.from_user.id
client.send_chat_action(chat_id, "typing")
client.send_chat_action(chat_id, enums.ChatAction.TYPING)
redis.user_count(chat_id)
if message.document:
with tempfile.NamedTemporaryFile(mode="r+") as tf:
@ -431,9 +425,7 @@ def download_handler(client: Client, message: types.Message):
if ENABLE_VIP and not payment.check_old_user(chat_id):
free, pay, reset = payment.get_token(chat_id)
if free + pay <= 0:
message.reply_text(
f"You don't have enough token. Please wait until {reset} or /buy more token.", quote=True
)
message.reply_text(f"You don't have enough token. Please wait until {reset} or /buy .", quote=True)
redis.update_metrics("reject_token")
return
else:
@ -444,22 +436,22 @@ def download_handler(client: Client, message: types.Message):
text = BotText.get_receive_link_text()
try:
# raise pyrogram.errors.exceptions.FloodWait(10)
bot_msg: typing.Union[types.Message, typing.Coroutine] = message.reply_text(text, quote=True)
bot_msg: types.Message | Any = message.reply_text(text, quote=True)
except pyrogram.errors.Flood as e:
f = BytesIO()
f.write(str(e).encode())
f.write(b"Your job will be done soon. Just wait! Don't rush.")
f.name = "Please don't flood me.txt"
bot_msg = message.reply_document(
f, caption=f"Flood wait! Please wait {e.x} seconds...." f"Your job will start automatically", quote=True
f, caption=f"Flood wait! Please wait {e} seconds...." f"Your job will start automatically", quote=True
)
f.close()
client.send_message(OWNER, f"Flood wait! 🙁 {e.x} seconds....")
time.sleep(e.x)
client.send_message(OWNER, f"Flood wait! 🙁 {e} seconds....")
time.sleep(e.value)
client.send_chat_action(chat_id, "upload_video")
client.send_chat_action(chat_id, enums.ChatAction.UPLOAD_VIDEO)
bot_msg.chat = message.chat
ytdl_download_entrance(client, bot_msg, url)
ytdl_download_entrance(bot_msg, url)
@app.on_callback_query(filters.regex(r"document|video|audio"))
@ -484,14 +476,13 @@ def download_resolution_callback(client: Client, callback_query: types.CallbackQ
def audio_callback(client: Client, callback_query: types.CallbackQuery):
redis = Redis()
if not ENABLE_FFMPEG:
callback_query.answer("Audio conversion is disabled now.")
callback_query.answer("Request rejected.")
callback_query.message.reply_text("Audio conversion is disabled now.")
return
callback_query.answer(f"Converting to audio...please wait patiently")
redis.update_metrics("audio_request")
vmsg = callback_query.message
audio_entrance(client, vmsg)
audio_entrance(callback_query.message)
@app.on_callback_query(filters.regex(r"Local|Celery"))
@ -509,14 +500,12 @@ def periodic_sub_check():
logging.info(f"periodic update:{video_url} - {uids}")
for uid in uids:
try:
bot_msg: typing.Union[types.Message, typing.Coroutine] = app.send_message(
uid, f"{video_url} is out. Watch it on YouTube"
)
bot_msg: types.Message | Any = app.send_message(uid, f"{video_url} is out. Watch it on YouTube")
# ytdl_download_entrance(app, bot_msg, video_url, mode="direct")
except (exceptions.bad_request_400.PeerIdInvalid, exceptions.bad_request_400.UserIsBlocked) as e:
logging.warning("User is blocked or deleted. %s", e)
channel.deactivate_user_subscription(uid)
except Exception as e:
except Exception:
logging.error("Unknown error when sending message to user. %s", traceback.format_exc())
finally:
time.sleep(random.random() * 3)
@ -527,7 +516,7 @@ def raw_update(client: Client, update, users, chats):
payment = Payment()
action = getattr(getattr(update, "message", None), "action", None)
if update.QUALNAME == "types.UpdateBotPrecheckoutQuery":
client.send(
client.invoke(
functions.messages.SetBotPrecheckoutResults(
query_id=update.query_id,
success=True,
@ -549,14 +538,13 @@ def trx_notify(_, **kwargs):
if __name__ == "__main__":
redis = Redis()
MySQL()
TRX_SIGNAL.connect(trx_notify)
scheduler = BackgroundScheduler(timezone="Europe/London", job_defaults={"max_instances": 6})
scheduler.add_job(auto_restart, "interval", seconds=600)
scheduler.add_job(clean_tempfile, "interval", seconds=120)
if not IS_BACKUP_BOT:
scheduler.add_job(redis.reset_today, "cron", hour=0, minute=0)
scheduler.add_job(Redis().reset_today, "cron", hour=0, minute=0)
scheduler.add_job(InfluxDB().collect_data, "interval", seconds=120)
scheduler.add_job(TronTrx().check_payment, "interval", seconds=60, max_instances=1)
# default quota allocation of 10,000 units per day