token mode instead of vip mode

This commit is contained in:
Benny 2023-04-06 22:33:24 +02:00
parent fe4f859aa1
commit 218b1d5ec9
No known key found for this signature in database
GPG key ID: 6CD0DBDA5235D481
19 changed files with 1001 additions and 1201 deletions

8
.gitignore vendored
View file

@ -159,4 +159,10 @@ ytdlbot/ytdl.session
data/*
upgrade_worker.sh
ytdl.session
reinforcement/*
reinforcement/*
/ytdlbot/session/celery.session
/.idea/prettier.xml
/.idea/watcherTasks.xml
/ytdlbot/session/ytdl.session-journal
/ytdlbot/unknown_errors.txt
/ytdlbot/ytdl.session-journal

8
FAQ.md
View file

@ -1,10 +1,11 @@
# Project status
This project is in zombie mode now.
This project is currently inactive and is no longer being maintained.
It is not maintained anymore. No features will be added, no bugs will be fixed.
No new features will be added, and no bugs will be fixed.
Existing code and bot will continue to work without any guarantees.
While the existing code and bot will continue to work, there are no guarantees regarding their reliability or
functionality.
# Can you support downloading from XXX?
@ -14,7 +15,6 @@ Please reach out to [yt-dlp](https://github.com/yt-dlp/yt-dlp)
Just wait. It is a free service, and it is not guaranteed to work.
See [Grafana Dashboard](https://grafana.dmesg.app/d/9yXGmc1nk/youtube-download-celery?orgId=2) for current server status
# Why is the bot not responding?

303
README.md
View file

@ -4,7 +4,7 @@
YouTube Download Bot🚀
Download videos from YouTube and other platforms through a Telegram Bot
This Telegram bot allows you to download videos from YouTube and other supported platforms.
-----
**READ [FAQ](FAQ.md) FIRST IF YOU ENCOUNTER ANY ISSUES.**
@ -12,7 +12,7 @@ Download videos from YouTube and other platforms through a Telegram Bot
-----
[![Deploy](https://www.herokucdn.com/deploy/button.svg)](https://heroku.com/deploy)
Can't deploy? Fork to your personal account and deploy it there!
If you are having trouble deploying, you can fork the project to your personal account and deploy it from there.
**Starting November 28, 2022, free Heroku Dynos, free Heroku Postgres, and free Heroku Data for Redis® plans will no
longer be available.**
@ -27,14 +27,13 @@ Websites [supported by youtube-dl](https://ytdl-org.github.io/youtube-dl/support
# Limitations of my bot
I don't have unlimited servers and bandwidth, so I have to make some restrictions.
Due to limitations on servers and bandwidth, there are some restrictions on this service.
* 5 GiB one-way traffic per 24 hours for each user
* maximum 5 minutes streaming conversion support
* maximum 3 subscriptions
* limited request in certain time range
* Each user is limited to 5 free downloads per 24-hour period
* there is a maximum of three subscriptions allowed for YouTube channels.
You can choose to become 'VIP' if you really need large traffic. And also, you could always deploy your own bot.
If you require more downloads, you can purchase additional tokens. Additionally, you have the option of deploying your
own bot.
# Features
@ -45,7 +44,7 @@ You can choose to become 'VIP' if you really need large traffic. And also, you c
3. support progress bar
4. audio conversion
5. playlist support
6. VIP support
6. payment support
7. support different video resolutions
8. support sending as file or streaming as video
9. supports celery worker distribution - faster than before.
@ -56,25 +55,23 @@ You can choose to become 'VIP' if you really need large traffic. And also, you c
# How to deploy?
You can deploy this bot on any platform that supports Python.
## Heroku
Use the button above! It should work like a magic but with limited functionalities.
This bot can be deployed on any platform that supports Python.
## Run natively on your machine
1. clone code
2. install ffmpeg
3. install Python 3.6+
4. install aria2 and add to PATH
5. pip3 install -r requirements.txt
6. set environment variables `TOKEN`, `APP_ID` and `APP_HASH`, and more if you like.
7. `python3 ytdl_bot.py`
To deploy this bot, follow these steps:
1. Clone the code from the repository.
2. Install FFmpeg.
3. Install Python 3.6 or a later version.
4. Install Aria2 and add it to the PATH.
5. Install the required packages by running `pip3 install -r requirements.txt`.
6. Set the environment variables `TOKEN`, `APP_ID`, `APP_HASH`, and any others that you may need.
7. Run `python3 ytdl_bot.py`.
## Docker
Some functions, such as VIP, ping will be disabled.
This bot has a simple one-line code and some functions, such as VIP and ping, are disabled.
```shell
docker run -e APP_ID=111 -e APP_HASH=111 -e TOKEN=370FXI bennythink/ytdlbot
@ -104,42 +101,35 @@ mkdir env
vim env/ytdl.env
```
you can configure all the following environment variables:
You can configure all the following environment variables:
* PYRO_WORKERS: number of workers for pyrogram, default is 100
* WORKERS: workers count for celery
* PYRO_WORKERS: number of workers for pyrogram, default is 100
* APP_ID: **REQUIRED**, get it from https://core.telegram.org/
* APP_HASH: **REQUIRED**
* TOKEN: **REQUIRED**
* REDIS: **REQUIRED if you need VIP mode and cache** ⚠️ Don't publish your redis server on the internet. ⚠️
* EXPIRE: token expire time, default: 1 day
* ENABLE_VIP: enable VIP mode
* OWNER: owner username
* QUOTA: quota in bytes
* EX: quota expire time
* MULTIPLY: vip quota comparing to normal quota
* USD2CNY: exchange rate
* VIP: VIP mode, default: disable
* AFD_LINK
* COFFEE_LINK
* COFFEE_TOKEN
* AFD_TOKEN
* AFD_USER_ID
* AUTHORIZED_USER: users that could use this bot, user_id, separated with `,`
* REQUIRED_MEMBERSHIP: group or channel username, user must join this group to use the bot. Could be use with
above `AUTHORIZED_USER`
* ENABLE_CELERY: Distribution mode, default: disable. You'll can setup workers in different locations.
* ENABLE_FFMPEG: enable ffmpeg so Telegram can stream
* MYSQL_HOST: you'll have to setup MySQL if you enable VIP mode
* MYSQL_USER
* MYSQL_PASS
* AUTHORIZED_USER: only authorized users can use the bot
* REQUIRED_MEMBERSHIP: group or channel username, user must join this group to use the bot
* ENABLE_CELERY: celery mode, default: disable
* ENABLE_QUEUE: celery queue
* BROKER: celery broker, should be redis://redis:6379/0
* MYSQL_HOST:MySQL host
* MYSQL_USER: MySQL username
* MYSQL_PASS: MySQL password
* AUDIO_FORMAT: default audio format
* ARCHIVE_ID: forward all downloads to this group/channel
* IPv6 = os.getenv("IPv6", False)
* ENABLE_FFMPEG = os.getenv("ENABLE_FFMPEG", False)
* PROVIDER_TOKEN: stripe token on Telegram payment
* PLAYLIST_SUPPORT: download playlist support
* ENABLE_ARIA2: enable aria2c download
* FREE_DOWNLOAD: free download count per day
* TOKEN_PRICE: token price per 1 USD
* GOOGLE_API_KEY: YouTube API key, required for YouTube video subscription.
* AUDIO_FORMAT: audio format, default is m4a. You can set to any known and supported format for ffmpeg. For
example,`mp3`, `flac`, etc. ⚠️ m4a is the fastest. Other formats may affect performance.
* ARCHIVE_ID: group or channel id/username. All downloads will send to this group first and then forward to end user.
* PLAYLIST_SUPPORT: `True` or `False`, Ability to enable or disable downloads of playlist / channels by bot. Default: `False`.
**Inline button will be lost during the forwarding.**
## 3.2 Set up init data
@ -187,7 +177,7 @@ vim data/instagram.com_cookies.txt
In `flower` service section, you may want to change your basic authentication username password and publish port.
You can also limit CPU and RAM usage by adding an `deploy' key:
You can also limit CPU and RAM usage by adding a `deploy' key:
```docker
deploy:
@ -231,208 +221,12 @@ On the other machine:
docker-compose -f worker.yml up -d
```
**⚠️ Bear in mind don't publish redis directly on the internet! You can use WireGuard to wrap it up.**
**⚠️ Please bear in mind that you should not publish Redis directly on the internet.
Instead, you can use WireGuard to wrap it up for added security.**
## Kubernetes
## kubernetes
Kubernetes, also known as K8s, is an open-source system for automating deployment, scaling, and management of
containerized applications
# Complete deployment guide for k8s deloyment
* contains every functionality
* compatible with amd64, arm64 and armv7l
## First. Get all file in k8s folder
Download `k8s` file to a directory on your k8s server and go to this folder
## 1. Create Redis deloyment
```shell
kubectl apply -f 01.redis.yml
```
This command will create ytdl namespace, redis pod and redis service
## 2. Creat MariaDB deloyment
```shell
kubectl apply -f 02.mariadb.yml
```
This deloyment will claim 10GB storage from storageClassName: longhorn. Please replace longhorn with your
storageClassName before apply.
## 3. Set environment variables
Create configMap for env
### 3.1 Edit configmap.yml
```shell
vim 03.configmap.yml
```
you can configure all the following environment variables:
* PYRO_WORKERS: number of workers for pyrogram, default is 100
* WORKERS: workers count for celery
* APP_ID: **REQUIRED**, get it from https://core.telegram.org/
* APP_HASH: **REQUIRED**
* TOKEN: **REQUIRED**
* REDIS: **REQUIRED if you need VIP mode and cache** ⚠️ Don't publish your redis server on the internet. ⚠️
* OWNER: owner username
* QUOTA: quota in bytes
* EX: quota expire time
* MULTIPLY: vip quota comparing to normal quota
* USD2CNY: exchange rate
* VIP: VIP mode, default: disable
* AFD_LINK
* COFFEE_LINK
* COFFEE_TOKEN
* AFD_TOKEN
* AFD_USER_ID
* AUTHORIZED_USER: users that could use this bot, user_id, separated with `,`
* REQUIRED_MEMBERSHIP: group or channel username, user must join this group to use the bot. Could be use with
above `AUTHORIZED_USER`
* ENABLE_CELERY: Distribution mode, default: disable. You'll can setup workers in different locations.
* ENABLE_FFMPEG: enable ffmpeg so Telegram can stream
* MYSQL_HOST: you'll have to setup MySQL if you enable VIP mode
* MYSQL_USER
* MYSQL_PASS
* GOOGLE_API_KEY: YouTube API key, required for YouTube video subscription.
* AUDIO_FORMAT: audio format, default is m4a. You can set to any known and supported format for ffmpeg. For
example,`mp3`, `flac`, etc. ⚠️ m4a is the fastest. Other formats may affect performance.
* ARCHIVE_ID: group or channel id/username. All downloads will send to this group first and then forward to end user.
**Inline button will be lost during the forwarding.**
### 3.2 Apply configMap for environment variables
```shell
kubectl apply -f 03.configmap.yml
```
## 4. Run Master Celery
```shell
kubectl apply -f 04.ytdl-master.yml
```
This deloyment will create ytdl-pvc PersistentVolumeClaim on storageClassName: longhorn. This clain will contain vnstat,
cookies folder and flower database. Please replace longhorn with your storageClassName before apply
### 4.1 Setup instagram cookies
Required if you want to support instagram.
You can use this extension
[Get cookies.txt](https://chrome.google.com/webstore/detail/get-cookiestxt/bgaddhkoddajcdgocldbbfleckgcbcid)
to get instagram cookies
Get pod running ytdl master:
```shell
kubectl get pods --namespace ytdl
```
Name should be ytdl-xxxxxxxx
Access to pod
```shell
kubectl --namespace=ytdl exec --stdin --tty ytdl-xxx -- sh
```
(replace ytdl-xxx by your pod name)
Go to ytdl-pvc mounted folder
```shell
cd /ytdlbot/ytdlbot/data/
vim instagram.com_cookies.txt
# paste your cookies
```
## 5. Run Worker Celery
```shell
kubectl apply -f 05.ytdl-worker.yml
```
## 6. Run Flower image (OPTIONAL)
### 6.1 Setup flower db
Get pod running ytdl master:
```shell
kubectl get pods --namespace ytdl
```
Name should be ytdl-xxxxxxxx
Access to pod
```shell
kubectl --namespace=ytdl exec --stdin --tty ytdl-xxx -- sh
```
(replace ytdl-xxx by your pod name)
Go to ytdl-pvc mounted folder
```shel
cd /var/lib/vnstat/
```
Create flower database file
```shell
{} ~ python3
Python 3.9.9 (main, Nov 21 2021, 03:22:47)
[Clang 12.0.0 (clang-1200.0.32.29)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> import dbm;dbm.open("flower","n");exit()
```
### 6.2 Config Flower Ingress
This step need config ingress from line 51 of file 06.flower.yml with your ingress service. Need for access from
internet.
YML file should be adjusted depending on your load balancing, ingress and network system
For active SSL
```yml
cert-manager.io/cluster-issuer: letsencrypt-prod
```
Replace nginx by your ingress service
```yml
ingressClassName: nginx
```
Add your domain, example
```yml
tls:
- hosts:
- flower.benny.com
secretName: flower-tls
rules:
- host: flower.benny.com
```
### 6.3 Apply Flower deloyment
```shell
kubectl apply -f 06.flower.yml
```
refer guide here [kubernetes](k8s.md)
# Command
@ -450,8 +244,6 @@ unsub - Unsubscribe from YouTube Channel
sub_count - Check subscription status, owner only.
uncache - Delete cache for this link, owner only.
purge - Delete all tasks , owner only.
topup - Top up quota
tgvip - Using Telegram payment to pay for VIP
```
# Test data
@ -481,8 +273,9 @@ https://twitter.com/BennyThinks/status/1475836588542341124
## Stripe
You can choose to donate via Stripe. Please click the button below to donate via Stripe.
Choose the currency and payment method that suits you.
You can choose to donate via Stripe by clicking the button below.
Select the currency and payment method that suits you.
| USD(Card, Apple Pay and Google Pay) | SEK(Card, Apple Pay and Google Pay) | CNY(Card, Apple Pay, Google Pay and Alipay) |
|--------------------------------------------------|--------------------------------------------------|--------------------------------------------------|

200
k8s.md Normal file
View file

@ -0,0 +1,200 @@
## Kubernetes
Kubernetes, also known as K8s, is an open-source system for automating deployment, scaling, and management of
containerized applications
# Complete deployment guide for k8s deloyment
* contains every functionality
* compatible with amd64, arm64 and armv7l
## First. Get all file in k8s folder
Download `k8s` file to a directory on your k8s server and go to this folder
## 1. Create Redis deloyment
```shell
kubectl apply -f 01.redis.yml
```
This command will create ytdl namespace, redis pod and redis service
## 2. Creat MariaDB deloyment
```shell
kubectl apply -f 02.mariadb.yml
```
This deloyment will claim 10GB storage from storageClassName: longhorn. Please replace longhorn with your
storageClassName before apply.
## 3. Set environment variables
Create configMap for env
### 3.1 Edit configmap.yml
```shell
vim 03.configmap.yml
```
you can configure all the following environment variables:
* PYRO_WORKERS: number of workers for pyrogram, default is 100
* WORKERS: workers count for celery
* APP_ID: **REQUIRED**, get it from https://core.telegram.org/
* APP_HASH: **REQUIRED**
* TOKEN: **REQUIRED**
* REDIS: **REQUIRED if you need VIP mode and cache** ⚠️ Don't publish your redis server on the internet. ⚠️
* OWNER: owner username
* QUOTA: quota in bytes
* EX: quota expire time
* MULTIPLY: vip quota comparing to normal quota
* USD2CNY: exchange rate
* VIP: VIP mode, default: disable
* AFD_LINK
* COFFEE_LINK
* COFFEE_TOKEN
* AFD_TOKEN
* AFD_USER_ID
* AUTHORIZED_USER: users that could use this bot, user_id, separated with `,`
* REQUIRED_MEMBERSHIP: group or channel username, user must join this group to use the bot. Could be use with
above `AUTHORIZED_USER`
* ENABLE_CELERY: Distribution mode, default: disable. You'll can setup workers in different locations.
* ENABLE_FFMPEG: enable ffmpeg so Telegram can stream
* MYSQL_HOST: you'll have to setup MySQL if you enable VIP mode
* MYSQL_USER
* MYSQL_PASS
* GOOGLE_API_KEY: YouTube API key, required for YouTube video subscription.
* AUDIO_FORMAT: audio format, default is m4a. You can set to any known and supported format for ffmpeg. For
example,`mp3`, `flac`, etc. ⚠️ m4a is the fastest. Other formats may affect performance.
* ARCHIVE_ID: group or channel id/username. All downloads will send to this group first and then forward to end user.
**Inline button will be lost during the forwarding.**
### 3.2 Apply configMap for environment variables
```shell
kubectl apply -f 03.configmap.yml
```
## 4. Run Master Celery
```shell
kubectl apply -f 04.ytdl-master.yml
```
This deloyment will create ytdl-pvc PersistentVolumeClaim on storageClassName: longhorn. This clain will contain vnstat,
cookies folder and flower database. Please replace longhorn with your storageClassName before apply
### 4.1 Setup instagram cookies
Required if you want to support instagram.
You can use this extension
[Get cookies.txt](https://chrome.google.com/webstore/detail/get-cookiestxt/bgaddhkoddajcdgocldbbfleckgcbcid)
to get instagram cookies
Get pod running ytdl master:
```shell
kubectl get pods --namespace ytdl
```
Name should be ytdl-xxxxxxxx
Access to pod
```shell
kubectl --namespace=ytdl exec --stdin --tty ytdl-xxx -- sh
```
(replace ytdl-xxx by your pod name)
Go to ytdl-pvc mounted folder
```shell
cd /ytdlbot/ytdlbot/data/
vim instagram.com_cookies.txt
# paste your cookies
```
## 5. Run Worker Celery
```shell
kubectl apply -f 05.ytdl-worker.yml
```
## 6. Run Flower image (OPTIONAL)
### 6.1 Setup flower db
Get pod running ytdl master:
```shell
kubectl get pods --namespace ytdl
```
Name should be ytdl-xxxxxxxx
Access to pod
```shell
kubectl --namespace=ytdl exec --stdin --tty ytdl-xxx -- sh
```
(replace ytdl-xxx by your pod name)
Go to ytdl-pvc mounted folder
```shel
cd /var/lib/vnstat/
```
Create flower database file
```shell
{} ~ python3
Python 3.9.9 (main, Nov 21 2021, 03:22:47)
[Clang 12.0.0 (clang-1200.0.32.29)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> import dbm;dbm.open("flower","n");exit()
```
### 6.2 Config Flower Ingress
This step need config ingress from line 51 of file 06.flower.yml with your ingress service. Need for access from
internet.
YML file should be adjusted depending on your load balancing, ingress and network system
For active SSL
```yml
cert-manager.io/cluster-issuer: letsencrypt-prod
```
Replace nginx by your ingress service
```yml
ingressClassName: nginx
```
Add your domain, example
```yml
tls:
- hosts:
- flower.benny.com
secretName: flower-tls
rules:
- host: flower.benny.com
```
### 6.3 Apply Flower deloyment
```shell
kubectl apply -f 06.flower.yml
```

View file

@ -1,7 +1,7 @@
pyrogram==1.4.16
tgcrypto==1.2.5
yt-dlp==2023.3.3
APScheduler==3.10.0
yt-dlp==2023.3.4
APScheduler==3.10.1
beautifultable==1.1.0
ffmpeg-python==0.2.0
PyMySQL==1.0.2
@ -10,14 +10,15 @@ filetype==1.2.0
flower==1.2.0
psutil==5.9.4
influxdb==5.3.1
beautifulsoup4==4.11.2
fakeredis==2.9.2
beautifulsoup4==4.12.1
fakeredis==2.10.3
supervisor==4.2.5
tgbot-ping==1.0.7
redis==4.3.3
requests==2.28.2
tqdm==4.64.1
tqdm==4.65.0
requests-toolbelt==0.10.1
ffpb==0.4.1
youtube-search-python==1.6.6
token-bucket==0.3.0
coloredlogs==15.0.1

View file

@ -8,6 +8,7 @@
__author__ = "Benny <benny.think@gmail.com>"
import sqlite3
import pymysql
mysql_con = pymysql.connect(host='localhost', user='root', passwd='root', db='vip', charset='utf8mb4')

View file

@ -9,9 +9,5 @@ docker run -d --restart unless-stopped --name ytdl \
-e WORKERS=4 \
-e VIP=True \
-e CUSTOM_TEXT=#StandWithUkraine \
-e WORKER_NAME=Pi \
-v "$(pwd)"/session:/ytdlbot/ytdlbot/session \
-v "$(pwd)"/ramdisk:/tmp/ \
--log-driver none \
bennythink/ytdlbot \
/usr/local/bin/supervisord -c "/ytdlbot/conf/supervisor_worker.conf"

View file

@ -1,62 +0,0 @@
#!/usr/local/bin/python3
# coding: utf-8
# ytdlbot - broadcast.py
# 8/25/21 16:11
#
__author__ = "Benny <benny.think@gmail.com>"
import argparse
import contextlib
import logging
import random
import sys
import tempfile
import time
from tqdm import tqdm
from db import Redis
from ytdl_bot import create_app
parser = argparse.ArgumentParser(description='Broadcast to users')
parser.add_argument('-m', help='message', required=True)
parser.add_argument('-p', help='picture', default=None)
parser.add_argument('--notify', help='notify all users?', action="store_false")
parser.add_argument('-u', help='user_id', type=int)
logging.basicConfig(level=logging.INFO)
args = parser.parse_args()
r = Redis().r
keys = r.keys("*")
user_ids = set()
for key in keys:
if key.isdigit():
user_ids.add(key)
metrics = r.hgetall("metrics")
for key in metrics:
if key.isdigit():
user_ids.add(key)
if args.u:
user_ids = [args.u]
if "YES" != input("Are you sure you want to send broadcast message to %s users?\n>" % len(user_ids)):
logging.info("Abort")
sys.exit(1)
with tempfile.NamedTemporaryFile() as tmp:
app = create_app(tmp.name, 1)
app.start()
for user_id in tqdm(user_ids):
time.sleep(random.random() * 5)
if args.p:
with contextlib.suppress(Exception):
app.send_photo(user_id, args.p, caption=args.m, disable_notification=args.notify)
else:
with contextlib.suppress(Exception):
app.send_message(user_id, args.m, disable_notification=args.notify)
app.stop()

177
ytdlbot/channel.py Normal file
View file

@ -0,0 +1,177 @@
#!/usr/bin/env python3
# coding: utf-8
import http
import logging
import os
import re
import requests
from bs4 import BeautifulSoup
from config import ENABLE_VIP
from limit import Payment
class Channel(Payment):
def subscribe_channel(self, user_id: "int", share_link: "str"):
if not re.findall(r"youtube\.com|youtu\.be", share_link):
raise ValueError("Is this a valid YouTube Channel link?")
if ENABLE_VIP:
self.cur.execute("select count(user_id) from subscribe where user_id=%s", (user_id,))
usage = int(self.cur.fetchone()[0])
if usage >= 5:
# TODO: 5 tokens for one more subscription?
logging.warning("User %s has subscribed %s channels", user_id, usage)
return "You have subscribed too many channels. Maximum 5 channels."
data = self.get_channel_info(share_link)
channel_id = data["channel_id"]
self.cur.execute("select user_id from subscribe where user_id=%s and channel_id=%s", (user_id, channel_id))
if self.cur.fetchall():
raise ValueError("You have already subscribed this channel.")
self.cur.execute(
"INSERT IGNORE INTO channel values"
"(%(link)s,%(title)s,%(description)s,%(channel_id)s,%(playlist)s,%(last_video)s)",
data,
)
self.cur.execute("INSERT INTO subscribe values(%s,%s, NULL)", (user_id, channel_id))
self.con.commit()
logging.info("User %s subscribed channel %s", user_id, data["title"])
return "Subscribed to {}".format(data["title"])
def unsubscribe_channel(self, user_id: "int", channel_id: "str"):
affected_rows = self.cur.execute(
"DELETE FROM subscribe WHERE user_id=%s AND channel_id=%s", (user_id, channel_id)
)
self.con.commit()
logging.info("User %s tried to unsubscribe channel %s", user_id, channel_id)
return affected_rows
@staticmethod
def extract_canonical_link(url):
# canonic link works for many websites. It will strip out unnecessary stuff
props = ["canonical", "alternate", "shortlinkUrl"]
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.163 Safari/537.36"
}
cookie = {"CONSENT": "YES+cb.20210328-17-p0.en+FX+999"}
# send head request first
r = requests.head(url, headers=headers, allow_redirects=True, cookies=cookie)
if r.status_code != http.HTTPStatus.METHOD_NOT_ALLOWED and "text/html" not in r.headers.get("content-type"):
# get content-type, if it's not text/html, there's no need to issue a GET request
logging.warning("%s Content-type is not text/html, no need to GET for extract_canonical_link", url)
return url
html_doc = requests.get(url, headers=headers, cookies=cookie, timeout=5).text
soup = BeautifulSoup(html_doc, "html.parser")
for prop in props:
element = soup.find("link", rel=prop)
try:
href = element["href"]
if href not in ["null", "", None]:
return href
except Exception:
logging.warning("Canonical exception %s", url)
return url
def get_channel_info(self, url: "str"):
api_key = os.getenv("GOOGLE_API_KEY")
canonical_link = self.extract_canonical_link(url)
try:
channel_id = canonical_link.split("youtube.com/channel/")[1]
except IndexError:
channel_id = canonical_link.split("/")[-1]
channel_api = (
f"https://www.googleapis.com/youtube/v3/channels?part=snippet,contentDetails&id={channel_id}&key={api_key}"
)
data = requests.get(channel_api).json()
snippet = data["items"][0]["snippet"]
title = snippet["title"]
description = snippet["description"]
playlist = data["items"][0]["contentDetails"]["relatedPlaylists"]["uploads"]
return {
"link": url,
"title": title,
"description": description,
"channel_id": channel_id,
"playlist": playlist,
"last_video": self.get_latest_video(playlist),
}
@staticmethod
def get_latest_video(playlist_id: "str"):
api_key = os.getenv("GOOGLE_API_KEY")
video_api = (
f"https://www.googleapis.com/youtube/v3/playlistItems?part=snippet&maxResults=1&"
f"playlistId={playlist_id}&key={api_key}"
)
data = requests.get(video_api).json()
video_id = data["items"][0]["snippet"]["resourceId"]["videoId"]
logging.info(f"Latest video %s from %s", video_id, data["items"][0]["snippet"]["channelTitle"])
return f"https://www.youtube.com/watch?v={video_id}"
def has_newer_update(self, channel_id: "str"):
self.cur.execute("SELECT playlist,latest_video FROM channel WHERE channel_id=%s", (channel_id,))
data = self.cur.fetchone()
playlist_id = data[0]
old_video = data[1]
newest_video = self.get_latest_video(playlist_id)
if old_video != newest_video:
logging.info("Newer update found for %s %s", channel_id, newest_video)
self.cur.execute("UPDATE channel SET latest_video=%s WHERE channel_id=%s", (newest_video, channel_id))
self.con.commit()
return newest_video
def get_user_subscription(self, user_id: "int"):
self.cur.execute(
"""
select title, link, channel.channel_id from channel, subscribe
where subscribe.user_id = %s and channel.channel_id = subscribe.channel_id
""",
(user_id,),
)
data = self.cur.fetchall()
text = ""
for item in data:
text += "[{}]({}) `{}\n`".format(*item)
return text
def group_subscriber(self):
# {"channel_id": [user_id, user_id, ...]}
self.cur.execute("select * from subscribe where is_valid=1")
data = self.cur.fetchall()
group = {}
for item in data:
group.setdefault(item[1], []).append(item[0])
logging.info("Checking periodic subscriber...")
return group
def deactivate_user_subscription(self, user_id: "int"):
self.cur.execute("UPDATE subscribe set is_valid=0 WHERE user_id=%s", (user_id,))
self.con.commit()
def sub_count(self):
sql = """
select user_id, channel.title, channel.link
from subscribe, channel where subscribe.channel_id = channel.channel_id
"""
self.cur.execute(sql)
data = self.cur.fetchall()
text = f"Total {len(data)} subscriptions found.\n\n"
for item in data:
text += "{} ==> [{}]({})\n".format(*item)
return text
def del_cache(self, user_link: "str"):
unique = self.extract_canonical_link(user_link)
caches = self.r.hgetall("cache")
count = 0
for key in caches:
if key.startswith(unique):
count += self.del_send_cache(key)
return count

View file

@ -12,26 +12,18 @@ import os
# general settings
WORKERS: "int" = int(os.getenv("WORKERS", 100))
PYRO_WORKERS: "int" = int(os.getenv("PYRO_WORKERS", 100))
APP_ID: "int" = int(os.getenv("APP_ID", 111))
APP_HASH = os.getenv("APP_HASH", "111")
TOKEN = os.getenv("TOKEN", "3703WLI")
APP_ID: "int" = int(os.getenv("APP_ID", 198214))
APP_HASH = os.getenv("APP_HASH", "1234b90")
TOKEN = os.getenv("TOKEN", "1234")
REDIS = os.getenv("REDIS")
# quota settings
QUOTA = int(os.getenv("QUOTA", 5 * 1024 * 1024 * 1024)) # 5G
if os.uname().sysname == "Darwin":
QUOTA = 10 * 1024 * 1024 # 10M
REDIS = os.getenv("REDIS", "redis")
TG_MAX_SIZE = 2 * 1024 * 1024 * 1024 * 0.99
# TG_MAX_SIZE = 10 * 1024 * 1024
EX = int(os.getenv("EX", 24 * 3600))
MULTIPLY = os.getenv("MULTIPLY", 10) # VIP1 is 5*5-25G, VIP2 is 50G
USD2CNY = os.getenv("USD2CNY", 6) # $5 --> ¥30
EXPIRE = 24 * 3600
ENABLE_VIP = os.getenv("VIP", False)
MAX_DURATION = int(os.getenv("MAX_DURATION", 60))
AFD_LINK = os.getenv("AFD_LINK", "https://afdian.net/@BennyThink")
COFFEE_LINK = os.getenv("COFFEE_LINK", "https://www.buymeacoffee.com/bennythink")
COFFEE_TOKEN = os.getenv("COFFEE_TOKEN")
@ -49,7 +41,7 @@ ENABLE_CELERY = os.getenv("ENABLE_CELERY", False)
ENABLE_QUEUE = os.getenv("ENABLE_QUEUE", False)
BROKER = os.getenv("BROKER", f"redis://{REDIS}:6379/4")
MYSQL_HOST = os.getenv("MYSQL_HOST")
MYSQL_HOST = os.getenv("MYSQL_HOST", "mysql")
MYSQL_USER = os.getenv("MYSQL_USER", "root")
MYSQL_PASS = os.getenv("MYSQL_PASS", "root")
@ -58,9 +50,12 @@ ARCHIVE_ID = os.getenv("ARCHIVE_ID")
IPv6 = os.getenv("IPv6", False)
ENABLE_FFMPEG = os.getenv("ENABLE_FFMPEG", False)
# 0.01 means basically no limit
RATE = float(os.getenv("RATE", 0.01))
BURST = int(os.getenv("BURST", 3))
# Stripe setting
PROVIDER_TOKEN = os.getenv("PROVIDER_TOKEN") or "1234"
PLAYLIST_SUPPORT = os.getenv("PLAYLIST_SUPPORT", False)
ENABLE_ARIA2 = os.getenv("ENABLE_ARIA2", False)
FREE_DOWNLOAD = os.getenv("FREE_DOWNLOAD", 5)
TOKEN_PRICE = os.getenv("BUY_UNIT", 20) # one USD=20 credits

View file

@ -8,13 +8,16 @@
__author__ = "Benny <benny.think@gmail.com>"
import os
import time
from config import (AFD_LINK, BURST, COFFEE_LINK, ENABLE_CELERY, ENABLE_VIP,
EX, MULTIPLY, RATE, REQUIRED_MEMBERSHIP, USD2CNY)
from db import InfluxDB
from downloader import sizeof_fmt
from limit import QUOTA, VIP
from config import (
AFD_LINK,
COFFEE_LINK,
ENABLE_CELERY,
FREE_DOWNLOAD,
REQUIRED_MEMBERSHIP,
TOKEN_PRICE,
)
from database import InfluxDB
from utils import get_func_queue
@ -22,96 +25,64 @@ class BotText:
start = "Welcome to YouTube Download bot. Type /help for more information."
help = f"""
1. This bot should works at all times. If it doesn't, wait for a few minutes, try to send the link again.
1. This bot should work at all times. If it doesn't, please wait for a few minutes and try sending the link again.
2. At this time of writing, this bot consumes more than 100GB of network traffic per day.
In order to avoid being abused,
every one can use this bot within **{sizeof_fmt(QUOTA)} of quota for every {int(EX / 3600)} hours.**
2. At the time of writing, this bot consumes more than 100GB of network traffic per day.
To prevent abuse, each user is limited to 5 downloads per 24 hours.
3. You can optionally choose to become 'VIP' user if you need more traffic. Type /vip for more information.
3. You have the option to buy more tokens. Type /buy for more information.
4. Source code for this bot will always stay open, here-> https://github.com/tgbot-collection/ytdlbot
4. The source code for this bot will always remain open and can be found here: https://github.com/tgbot-collection/ytdlbot
"""
5. Request limit is applied for everyone, excluding VIP users.
""" if ENABLE_VIP else "Help text"
about = "YouTube Downloader by @BennyThink.\n\nOpen source on GitHub: https://github.com/tgbot-collection/ytdlbot"
about = "YouTube-DL by @BennyThink. Open source on GitHub: https://github.com/tgbot-collection/ytdlbot"
vip = f"""
buy = f"""
**Terms:**
1. You can use this service, free of charge, {sizeof_fmt(QUOTA)} per {int(EX / 3600)} hours.
2. The above traffic, is counted one-way. For example, if you download a video of 1GB, your will use 1GB instead of 2GB.
3. Streaming support is limited due to high costs of conversion.
4. I won't gather any personal information, which means I don't know how many and what videos did you download.
5. No rate limit for VIP users.
6. Possible to refund, but you'll have to bear with process fee.
7. I'll record your unique ID after a successful payment, usually it's payment ID or email address.
8. VIP identity won't expire.
9. Please try not to abuse this service. It's a open source project, you can always deploy your own bot.
1. You can use this service free of charge for up to {FREE_DOWNLOAD} downloads within a 24-hour period, regardless of whether the download is successful or not.
**Pay Tier:**
1. Everyone: {sizeof_fmt(QUOTA)} per {int(EX / 3600)} hours
2. VIP1: ${MULTIPLY} or ¥{MULTIPLY * USD2CNY}, {sizeof_fmt(QUOTA * 2)} per {int(EX / 3600)} hours
3. VIP2: ${MULTIPLY * 2} or ¥{MULTIPLY * USD2CNY * 2}, {sizeof_fmt(QUOTA * 2 * 2)} per {int(EX / 3600)} hours
4. VIP4....VIPn.
2. You can purchase additional download tokens, which will be valid indefinitely.
**Temporary top up**
Just want more traffic for a short period of time? Don't worry, you can use /topup command to top up your quota.
It's valid permanently, until you use it up.
3. I will not gather any personal information, so I won't know how many or which videos you have downloaded.
**Payment method:**
1. (afdian) Mainland China: {AFD_LINK}
2. (buy me a coffee) Other countries or regions: {COFFEE_LINK}
3. Telegram Payment(stripe), please directly using /tgvip command.
4. Refunds are possible, but you will be responsible for the processing fee charged by the payment provider (Stripe, Buy Me a Coffee, etc.).
5. I will record your unique ID after a successful payment, which is usually your payment ID or email address.
**Download token price:**
1. Everyone: {FREE_DOWNLOAD} tokens per 24 hours, free of charge.
2. 1 USD == {TOKEN_PRICE} tokens, valid indefinitely.
3. 7 CNY == {TOKEN_PRICE} tokens, valid indefinitely.
**Payment option:**
1. AFDIAN(AliPay, WeChat Pay and PayPal): {AFD_LINK}
2. Buy me a coffee: {COFFEE_LINK}
3. Telegram Payment(Stripe), see following invoice.
**After payment:**
1. afdian: with your order number `/vip 123456`
2. buy me a coffee: with your email `/vip someone@else.com`
3. Telegram Payment: automatically activated
""" if ENABLE_VIP else "VIP is not enabled."
vip_pay = "Processing your payments...If it's not responding after one minute, please contact @BennyThink."
1. Afdian: Provide your order number with the /redeem command (e.g., `/redeem 123456`).
2. Buy Me a Coffee: Provide your email with the /redeem command (e.g., `/redeem some@one.com`). **Use different email each time.**
3. Telegram Payment: Your payment will be automatically activated.
Want to buy more token at once? Let's say 100? Here you go! `/buy 123`
"""
private = "This bot is for private use"
membership_require = f"You need to join this group or channel to use this bot\n\nhttps://t.me/{REQUIRED_MEMBERSHIP}"
settings = """
Select sending format and video quality. **Only applies to YouTube**
High quality is recommended; Medium quality is aimed as 720P while low quality is aimed as 480P.
Remember if you choose to send as document, there will be no streaming.
Please choose the desired format and video quality for your video. Note that these settings only **apply to YouTube videos**.
High quality is recommended. Medium quality is 720P, while low quality is 480P.
Please keep in mind that if you choose to send the video as a document, it will not be possible to stream it.
Your current settings:
Video quality: **{0}**
Sending format: **{1}**
"""
custom_text = os.getenv("CUSTOM_TEXT", "")
topup_description = f"US$1 will give you {sizeof_fmt(QUOTA)} traffic permanently"
topup_title = "Pay US$1 for more traffic!"
def remaining_quota_caption(self, chat_id):
if not ENABLE_VIP:
return ""
used, total, ttl = self.return_remaining_quota(chat_id)
refresh_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(ttl + time.time()))
caption = f"Remaining quota: **{sizeof_fmt(used)}/{sizeof_fmt(total)}**, " \
f"refresh at {refresh_time}\n"
return caption
@staticmethod
def return_remaining_quota(chat_id):
used, total, ttl = VIP().check_remaining_quota(chat_id)
return used, total, ttl
@staticmethod
def get_vip_greeting(chat_id):
if not ENABLE_VIP:
return ""
v = VIP().check_vip(chat_id)
if v:
return f"Hello {v[1]}, VIP{v[-2]}☺️\n\n"
else:
return ""
@staticmethod
def get_receive_link_text():
@ -126,6 +97,7 @@ Sending format: **{1}**
@staticmethod
def ping_worker():
from tasks import app as celery_app
workers = InfluxDB().extract_dashboard_data()
# [{'celery@BennyのMBP': 'abc'}, {'celery@BennyのMBP': 'abc'}]
response = celery_app.control.broadcast("ping_revision", reply=True)
@ -144,5 +116,3 @@ Sending format: **{1}**
text += f"{status}{hostname} **{active}** {load} {rev}\n"
return text
too_fast = f"You have reached rate limit. Current rate limit is 1 request per {RATE} seconds, {BURST - 1} bursts."

View file

@ -1,7 +1,7 @@
#!/usr/local/bin/python3
# coding: utf-8
# ytdlbot - db.py
# ytdlbot - database.py
# 12/7/21 16:57
#
@ -13,6 +13,7 @@ import datetime
import logging
import os
import re
import sqlite3
import subprocess
import time
from io import BytesIO
@ -24,13 +25,49 @@ import requests
from beautifultable import BeautifulTable
from influxdb import InfluxDBClient
from config import MYSQL_HOST, MYSQL_PASS, MYSQL_USER, QUOTA, REDIS
from fakemysql import FakeMySQL
from config import MYSQL_HOST, MYSQL_PASS, MYSQL_USER, REDIS
from utils import sizeof_fmt
init_con = sqlite3.connect(":memory:", check_same_thread=False)
class FakeMySQL:
@staticmethod
def cursor() -> "Cursor":
return Cursor()
def commit(self):
pass
def close(self):
pass
class Cursor:
def __init__(self):
self.con = init_con
self.cur = self.con.cursor()
def execute(self, *args, **kwargs):
sql = self.sub(args[0])
new_args = (sql,) + args[1:]
return self.cur.execute(*new_args, **kwargs)
def fetchall(self):
return self.cur.fetchall()
def fetchone(self):
return self.cur.fetchone()
@staticmethod
def sub(sql):
sql = re.sub(r"CHARSET.*|charset.*", "", sql, re.IGNORECASE)
sql = sql.replace("%s", "?")
return sql
class Redis:
def __init__(self):
super(Redis, self).__init__()
if REDIS:
self.r = redis.StrictRedis(host=REDIS, db=0, decode_responses=True)
else:
@ -61,6 +98,7 @@ class Redis:
{usage_banner}
%s
"""
super().__init__()
def __del__(self):
self.r.close()
@ -82,14 +120,13 @@ class Redis:
return table
def show_usage(self):
from downloader import sizeof_fmt
db = MySQL()
db.cur.execute("select * from vip")
db.cur.execute("select user_id,payment_amount,old_user,token from payment")
data = db.cur.fetchall()
fd = []
for item in data:
fd.append([item[0], item[1], sizeof_fmt(item[-1])])
db_text = self.generate_table(["ID", "username", "quota"], fd)
fd.append([item[0], item[1], item[2], item[3]])
db_text = self.generate_table(["ID", "pay amount", "old user", "token"], fd)
fd = []
hash_keys = self.r.hgetall("metrics")
@ -117,10 +154,10 @@ class Redis:
# vnstat
if os.uname().sysname == "Darwin":
cmd = "/usr/local/bin/vnstat -i en0".split()
cmd = "/opt/homebrew/bin/vnstat -i en0".split()
else:
cmd = "/usr/bin/vnstat -i eth0".split()
vnstat_text = subprocess.check_output(cmd).decode('u8')
vnstat_text = subprocess.check_output(cmd).decode("u8")
return self.final_text % (db_text, vnstat_text, quota_text, metrics_text, usage_text)
def reset_today(self):
@ -152,18 +189,16 @@ class Redis:
class MySQL:
vip_sql = """
create table if not exists vip
CREATE TABLE if not exists `payment`
(
user_id bigint not null,
username varchar(256) null,
payment_amount int null,
payment_id varchar(256) null,
level int default 1 null,
quota bigint default %s null,
constraint VIP_pk
primary key (user_id)
);
""" % QUOTA
`user_id` bigint NOT NULL,
`payment_amount` float DEFAULT NULL,
`payment_id` varchar(256) DEFAULT NULL,
`old_user` tinyint(1) DEFAULT NULL,
`token` int DEFAULT NULL,
UNIQUE KEY `payment_id` (`payment_id`)
) CHARSET = utf8mb4
"""
settings_sql = """
create table if not exists settings
@ -202,13 +237,15 @@ class MySQL:
def __init__(self):
if MYSQL_HOST:
self.con = pymysql.connect(host=MYSQL_HOST, user=MYSQL_USER, passwd=MYSQL_PASS, db="ytdl",
charset="utf8mb4")
self.con = pymysql.connect(
host=MYSQL_HOST, user=MYSQL_USER, passwd=MYSQL_PASS, db="ytdl", charset="utf8mb4"
)
else:
self.con = FakeMySQL()
self.cur = self.con.cursor()
self.init_db()
super().__init__()
def init_db(self):
self.cur.execute(self.vip_sql)
@ -220,6 +257,31 @@ class MySQL:
def __del__(self):
self.con.close()
def get_user_settings(self, user_id: "str") -> "tuple":
cur = self.con.cursor()
cur.execute("SELECT * FROM settings WHERE user_id = %s", (user_id,))
data = cur.fetchone()
if data is None:
return 100, "high", "video", "Celery"
return data
def set_user_settings(self, user_id: int, field: "str", value: "str"):
cur = self.con.cursor()
cur.execute("SELECT * FROM settings WHERE user_id = %s", (user_id,))
data = cur.fetchone()
if data is None:
resolution = method = ""
if field == "resolution":
method = "video"
resolution = value
if field == "method":
method = value
resolution = "high"
cur.execute("INSERT INTO settings VALUES (%s,%s,%s,%s)", (user_id, resolution, method, "Celery"))
else:
cur.execute(f"UPDATE settings SET {field} =%s WHERE user_id = %s", (value, user_id))
self.con.commit()
class InfluxDB:
def __init__(self):
@ -231,8 +293,8 @@ class InfluxDB:
@staticmethod
def get_worker_data():
password = os.getenv("FLOWER_PASSWORD", "123456abc")
username = os.getenv("FLOWER_USERNAME", "benny")
password = os.getenv("FLOWER_PASSWORD", "123456abc")
token = base64.b64encode(f"{username}:{password}".encode()).decode()
headers = {"Authorization": f"Basic {token}"}
r = requests.get("https://celery.dmesg.app/dashboard?json=1", headers=headers)
@ -250,7 +312,6 @@ class InfluxDB:
"tags": {
"hostname": worker["hostname"],
},
"time": datetime.datetime.utcnow(),
"fields": {
"task-received": worker.get("task-received", 0),
@ -262,7 +323,7 @@ class InfluxDB:
"load1": load1,
"load5": load5,
"load15": load15,
}
},
}
json_body.append(t)
return json_body
@ -273,26 +334,11 @@ class InfluxDB:
def __fill_overall_data(self):
active = sum([i["active"] for i in self.data["data"]])
json_body = [
{
"measurement": "active",
"time": datetime.datetime.utcnow(),
"fields": {
"active": active
}
}
]
json_body = [{"measurement": "active", "time": datetime.datetime.utcnow(), "fields": {"active": active}}]
self.client.write_points(json_body)
def __fill_redis_metrics(self):
json_body = [
{
"measurement": "metrics",
"time": datetime.datetime.utcnow(),
"fields": {
}
}
]
json_body = [{"measurement": "metrics", "time": datetime.datetime.utcnow(), "fields": {}}]
r = Redis().r
hash_keys = r.hgetall("metrics")
for key, value in hash_keys.items():

View file

@ -24,23 +24,14 @@ import filetype
import yt_dlp as ytdl
from tqdm import tqdm
from config import (AUDIO_FORMAT, ENABLE_FFMPEG, ENABLE_VIP, MAX_DURATION, ENABLE_ARIA2,
TG_MAX_SIZE, IPv6)
from db import Redis
from limit import VIP
from utils import (adjust_formats, apply_log_formatter, current_time,
get_user_settings)
from config import AUDIO_FORMAT, ENABLE_ARIA2, ENABLE_FFMPEG, IPv6
from limit import Payment
from utils import adjust_formats, apply_log_formatter, current_time
from ytdlbot.config import TG_MAX_SIZE
r = fakeredis.FakeStrictRedis()
apply_log_formatter()
def sizeof_fmt(num: int, suffix='B'):
for unit in ['', 'Ki', 'Mi', 'Gi', 'Ti', 'Pi', 'Ei', 'Zi']:
if abs(num) < 1024.0:
return "%3.1f%s%s" % (num, unit, suffix)
num /= 1024.0
return "%.1f%s%s" % (num, 'Yi', suffix)
payment = Payment()
def edit_text(bot_msg, text):
@ -60,9 +51,15 @@ def tqdm_progress(desc, total, finished, speed="", eta=""):
return ""
f = StringIO()
tqdm(total=total, initial=finished, file=f, ascii=False, unit_scale=True, ncols=30,
bar_format="{l_bar}{bar} |{n_fmt}/{total_fmt} "
)
tqdm(
total=total,
initial=finished,
file=f,
ascii=False,
unit_scale=True,
ncols=30,
bar_format="{l_bar}{bar} |{n_fmt}/{total_fmt} ",
)
raw_output = f.getvalue()
tqdm_output = raw_output.split("|")
progress = f"`[{tqdm_output[1]}]`"
@ -80,7 +77,7 @@ def tqdm_progress(desc, total, finished, speed="", eta=""):
def remove_bash_color(text):
return re.sub(r'\u001b|\[0;94m|\u001b\[0m|\[0;32m|\[0m|\[0;33m', "", text)
return re.sub(r"\u001b|\[0;94m|\u001b\[0m|\[0;32m|\[0m|\[0;33m", "", text)
def download_hook(d: dict, bot_msg):
@ -90,16 +87,12 @@ def download_hook(d: dict, bot_msg):
original_url = d["info_dict"]["original_url"]
key = f"{bot_msg.chat.id}-{original_url}"
if d['status'] == 'downloading':
if d["status"] == "downloading":
downloaded = d.get("downloaded_bytes", 0)
total = d.get("total_bytes") or d.get("total_bytes_estimate", 0)
# percent = remove_bash_color(d.get("_percent_str", "N/A"))
speed = remove_bash_color(d.get("_speed_str", "N/A"))
if ENABLE_VIP and not r.exists(key):
result, err_msg = check_quota(total, bot_msg.chat.id)
if result is False:
raise ValueError(err_msg)
eta = remove_bash_color(d.get("_eta_str", d.get("eta")))
text = tqdm_progress("Downloading...", total, downloaded, speed, eta)
edit_text(bot_msg, text)
@ -107,25 +100,10 @@ def download_hook(d: dict, bot_msg):
def upload_hook(current, total, bot_msg):
# filesize = sizeof_fmt(total)
text = tqdm_progress("Uploading...", total, current)
edit_text(bot_msg, text)
def check_quota(file_size, chat_id) -> ("bool", "str"):
remain, _, ttl = VIP().check_remaining_quota(chat_id)
if file_size > remain:
refresh_time = current_time(ttl + time.time())
err = f"Quota exceed, you have {sizeof_fmt(remain)} remaining, " \
f"but you want to download a video with {sizeof_fmt(file_size)} in size. \n" \
f"Try again in {ttl} seconds({refresh_time})"
logging.warning(err)
Redis().update_metrics("quota_exceed")
return False, err
else:
return True, ""
def convert_to_mp4(resp: dict, bot_msg):
default_type = ["video/x-flv", "video/webm"]
if resp["status"]:
@ -136,7 +114,9 @@ def convert_to_mp4(resp: dict, bot_msg):
if mime in default_type:
if not can_convert_mp4(path, bot_msg.chat.id):
logging.warning("Conversion abort for %s", bot_msg.chat.id)
bot_msg._client.send_message(bot_msg.chat.id, "Can't convert your video to streaming format.")
bot_msg._client.send_message(
bot_msg.chat.id, "Can't convert your video to streaming format. ffmpeg has been disabled."
)
break
edit_text(bot_msg, f"{current_time()}: Converting {path.name} to mp4. Please wait.")
new_file_path = path.with_suffix(".mp4")
@ -170,18 +150,7 @@ def run_ffmpeg(cmd_list, bm):
def can_convert_mp4(video_path, uid):
if not ENABLE_FFMPEG:
return False
if not ENABLE_VIP:
return True
video_streams = ffmpeg.probe(video_path, select_streams="v")
try:
duration = int(float(video_streams["format"]["duration"]))
except Exception:
duration = 0
if duration > MAX_DURATION and not VIP().check_vip(uid):
logging.info("Video duration: %s, not vip, can't convert", duration)
return False
else:
return True
return True
def ytdl_download(url, tempdir, bm, **kwargs) -> dict:
@ -190,33 +159,27 @@ def ytdl_download(url, tempdir, bm, **kwargs) -> dict:
response = {"status": True, "error": "", "filepath": []}
output = pathlib.Path(tempdir, "%(title).70s.%(ext)s").as_posix()
ydl_opts = {
'progress_hooks': [lambda d: download_hook(d, bm)],
'outtmpl': output,
'restrictfilenames': False,
'quiet': True,
"proxy": os.getenv("YTDL_PROXY")
"progress_hooks": [lambda d: download_hook(d, bm)],
"outtmpl": output,
"restrictfilenames": False,
"quiet": True,
"proxy": os.getenv("YTDL_PROXY"),
}
if ENABLE_ARIA2:
ydl_opts["external_downloader"] = "aria2c"
ydl_opts["external_downloader_args"] = ['--min-split-size=1M',
'--max-connection-per-server=16',
'--max-concurrent-downloads=16',
'--split=16'
]
ydl_opts["external_downloader_args"] = [
"--min-split-size=1M",
"--max-connection-per-server=16",
"--max-concurrent-downloads=16",
"--split=16",
]
formats = [
"bestvideo[ext=mp4]+bestaudio[ext=m4a]/bestvideo+bestaudio",
"bestvideo[vcodec^=avc]+bestaudio[acodec^=mp4a]/best[vcodec^=avc]/best",
None
None,
]
adjust_formats(chat_id, url, formats, hijack)
add_instagram_cookies(url, ydl_opts)
# check quota before download
if ENABLE_VIP:
# check quota after download
remain, _, ttl = VIP().check_remaining_quota(chat_id)
result, err_msg = check_quota(detect_filesize(url), chat_id)
if not result:
return {"status": False, "error": err_msg, "filepath": []}
address = ["::", "0.0.0.0"] if IPv6 else [None]
for format_ in formats:
@ -230,6 +193,7 @@ def ytdl_download(url, tempdir, bm, **kwargs) -> dict:
ydl.download([url])
response["status"] = True
response["error"] = ""
response["filepath"] = list(pathlib.Path(tempdir).glob("*"))
break
except Exception as e:
logging.error("Download failed for %s ", url)
@ -243,32 +207,15 @@ def ytdl_download(url, tempdir, bm, **kwargs) -> dict:
if response["status"] is False:
return response
for i in os.listdir(tempdir):
p = pathlib.Path(tempdir, i)
file_size = os.stat(p).st_size
if ENABLE_VIP:
# check quota after download
remain, _, ttl = VIP().check_remaining_quota(chat_id)
result, err_msg = check_quota(file_size, chat_id)
else:
result, err_msg = True, ""
if result is False:
response["status"] = False
response["error"] = err_msg
else:
VIP().use_quota(bm.chat.id, file_size)
response["status"] = True
response["filepath"].append(p)
# convert format if necessary
settings = get_user_settings(str(chat_id))
settings = payment.get_user_settings(str(chat_id))
if settings[2] == "video" or isinstance(settings[2], MagicMock):
# only convert if send type is video
convert_to_mp4(response, bm)
if settings[2] == "audio" or hijack == "bestaudio[ext=m4a]":
convert_audio_format(response, bm)
# enable it for now
split_large_video(response)
# disable it for now
# split_large_video(response)
return response
@ -280,9 +227,7 @@ def convert_audio_format(resp: "dict", bm):
path: "pathlib.Path"
for path in resp["filepath"]:
streams = ffmpeg.probe(path)["streams"]
if (AUDIO_FORMAT is None and
len(streams) == 1 and
streams[0]["codec_type"] == "audio"):
if AUDIO_FORMAT is None and len(streams) == 1 and streams[0]["codec_type"] == "audio":
logging.info("%s is audio, default format, no need to convert", path)
elif AUDIO_FORMAT is None and len(streams) >= 2:
logging.info("%s is video, default format, need to extract audio", path)
@ -311,11 +256,6 @@ def add_instagram_cookies(url: "str", opt: "dict"):
opt["cookiefile"] = pathlib.Path(__file__).parent.joinpath("instagram.com_cookies.txt").as_posix()
def run_splitter(video_path: "str"):
subprocess.check_output(f"sh split-video.sh {video_path} {TG_MAX_SIZE * 0.95} ".split())
os.remove(video_path)
def split_large_video(response: "dict"):
original_video = None
split = False
@ -324,19 +264,8 @@ def split_large_video(response: "dict"):
if size > TG_MAX_SIZE:
split = True
logging.warning("file is too large %s, splitting...", size)
run_splitter(original_video)
subprocess.check_output(f"sh split-video.sh {original_video} {TG_MAX_SIZE * 0.95} ".split())
os.remove(original_video)
if split and original_video:
response["filepath"] = [i.as_posix() for i in pathlib.Path(original_video).parent.glob("*")]
def detect_filesize(url: "str") -> "int":
# find the largest file size
with ytdl.YoutubeDL() as ydl:
info_dict = ydl.extract_info(url, download=False)
max_size = 0
max_size_list = [i.get("filesize", 0) for i in info_dict["formats"] if i.get("filesize")]
if max_size_list:
max_size = max(max_size_list)
logging.info("%s max size is %s", url, max_size)
return max_size

View file

@ -1,58 +0,0 @@
#!/usr/local/bin/python3
# coding: utf-8
# ytdlbot - fakemysql.py
# 2/20/22 20:08
#
__author__ = "Benny <benny.think@gmail.com>"
import re
import sqlite3
init_con = sqlite3.connect(":memory:", check_same_thread=False)
class FakeMySQL:
@staticmethod
def cursor() -> "Cursor":
return Cursor()
def commit(self):
pass
def close(self):
pass
class Cursor:
def __init__(self):
self.con = init_con
self.cur = self.con.cursor()
def execute(self, *args, **kwargs):
sql = self.sub(args[0])
new_args = (sql,) + args[1:]
return self.cur.execute(*new_args, **kwargs)
def fetchall(self):
return self.cur.fetchall()
def fetchone(self):
return self.cur.fetchone()
@staticmethod
def sub(sql):
sql = re.sub(r"CHARSET.*|charset.*", "", sql, re.IGNORECASE)
sql = sql.replace("%s", "?")
return sql
if __name__ == '__main__':
con = FakeMySQL()
cur = con.cursor()
cur.execute("create table user(id int, name varchar(20))")
cur.execute("insert into user values(%s,%s)", (1, "benny"))
cur.execute("select * from user")
data = cur.fetchall()
print(data)

View file

@ -8,239 +8,26 @@
__author__ = "Benny <benny.think@gmail.com>"
import hashlib
import http
import logging
import math
import os
import re
import time
from unittest.mock import MagicMock
import requests
from bs4 import BeautifulSoup
from config import (AFD_TOKEN, AFD_USER_ID, COFFEE_TOKEN, ENABLE_VIP, EX,
MULTIPLY, OWNER, QUOTA, USD2CNY)
from db import MySQL, Redis
from utils import apply_log_formatter
from config import (
AFD_TOKEN,
AFD_USER_ID,
COFFEE_TOKEN,
EXPIRE,
FREE_DOWNLOAD,
OWNER,
TOKEN_PRICE,
)
from database import MySQL, Redis
from utils import apply_log_formatter, current_time
apply_log_formatter()
class VIP(Redis, MySQL):
def check_vip(self, user_id: "int") -> "tuple":
self.cur.execute("SELECT * FROM vip WHERE user_id=%s", (user_id,))
data = self.cur.fetchone()
return data
def __add_vip(self, user_data: "dict"):
sql = "INSERT INTO vip VALUES (%s,%s,%s,%s,%s,%s);"
self.cur.execute(sql, list(user_data.values()))
self.con.commit()
# also remove redis cache
self.r.delete(user_data["user_id"])
def add_vip(self, user_data: "dict") -> "str":
# first select
self.cur.execute("SELECT * FROM vip WHERE payment_id=%s", (user_data["payment_id"],))
is_exist = self.cur.fetchone()
if is_exist:
return "Failed. {} is being used by user {}".format(user_data["payment_id"], is_exist[0])
self.__add_vip(user_data)
return "Success! You are VIP{} now!".format(user_data["level"])
def direct_add_vip(self, user_data: "dict") -> ("bool", "str"):
self.__add_vip(user_data)
return "Success payment from Telegram! You are VIP{} now!".format(user_data["level"])
def remove_vip(self, user_id: "int"):
raise NotImplementedError()
def get_user_quota(self, user_id: "int") -> int:
# even VIP have certain quota
q = self.check_vip(user_id)
topup = self.r.hget("topup", user_id)
if q:
return q[-1]
elif topup:
return int(topup) + QUOTA
else:
return QUOTA
def set_topup(self, user_id: "int"):
self.r.hset("topup", user_id, QUOTA)
def check_remaining_quota(self, user_id: "int"):
user_quota = self.get_user_quota(user_id)
ttl = self.r.ttl(user_id)
q = int(self.r.get(user_id)) if self.r.exists(user_id) else user_quota
if q <= 0:
q = 0
return q, user_quota, ttl
def use_quota(self, user_id: "int", traffic: "int"):
user_quota = self.get_user_quota(user_id)
# fix for standard mode
if isinstance(user_quota, MagicMock):
user_quota = 2 ** 32
if self.r.exists(user_id):
self.r.decr(user_id, traffic)
else:
self.r.set(user_id, user_quota - traffic, ex=EX)
def subscribe_channel(self, user_id: "int", share_link: "str"):
if not re.findall(r"youtube\.com|youtu\.be", share_link):
raise ValueError("Is this a valid YouTube Channel link?")
if ENABLE_VIP:
self.cur.execute("select count(user_id) from subscribe where user_id=%s", (user_id,))
usage = int(self.cur.fetchone()[0])
if usage >= 5 and not self.check_vip(user_id):
logging.warning("User %s is not VIP but has subscribed %s channels", user_id, usage)
return "You have subscribed too many channels. Please upgrade to VIP to subscribe more channels."
data = self.get_channel_info(share_link)
channel_id = data["channel_id"]
self.cur.execute("select user_id from subscribe where user_id=%s and channel_id=%s", (user_id, channel_id))
if self.cur.fetchall():
raise ValueError("You have already subscribed this channel.")
self.cur.execute("INSERT IGNORE INTO channel values"
"(%(link)s,%(title)s,%(description)s,%(channel_id)s,%(playlist)s,%(last_video)s)", data)
self.cur.execute("INSERT INTO subscribe values(%s,%s, NULL)", (user_id, channel_id))
self.con.commit()
logging.info("User %s subscribed channel %s", user_id, data["title"])
return "Subscribed to {}".format(data["title"])
def unsubscribe_channel(self, user_id: "int", channel_id: "str"):
affected_rows = self.cur.execute("DELETE FROM subscribe WHERE user_id=%s AND channel_id=%s",
(user_id, channel_id))
self.con.commit()
logging.info("User %s tried to unsubscribe channel %s", user_id, channel_id)
return affected_rows
@staticmethod
def extract_canonical_link(url):
# canonic link works for many websites. It will strip out unnecessary stuff
proxy = {"https": os.getenv("YTDL_PROXY")} if os.getenv("YTDL_PROXY") else None
props = ["canonical", "alternate", "shortlinkUrl"]
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.163 Safari/537.36"}
# send head request first
r = requests.head(url, headers=headers, proxies=proxy)
if r.status_code != http.HTTPStatus.METHOD_NOT_ALLOWED and "text/html" not in r.headers.get("content-type"):
# get content-type, if it's not text/html, there's no need to issue a GET request
logging.warning("%s Content-type is not text/html, no need to GET for extract_canonical_link", url)
return url
html_doc = requests.get(url, headers=headers, timeout=5, proxies=proxy).text
soup = BeautifulSoup(html_doc, "html.parser")
for prop in props:
element = soup.find("link", rel=prop)
try:
href = element["href"]
if href not in ["null", "", None]:
return href
except Exception:
logging.warning("Canonical exception %s", url)
return url
def get_channel_info(self, url: "str"):
api_key = os.getenv("GOOGLE_API_KEY")
canonical_link = self.extract_canonical_link(url)
try:
channel_id = canonical_link.split("https://www.youtube.com/channel/")[1]
except IndexError:
channel_id = canonical_link.split("https://youtube.com/channel/")[1]
channel_api = f"https://www.googleapis.com/youtube/v3/channels?part=snippet,contentDetails&" \
f"id={channel_id}&key={api_key}"
data = requests.get(channel_api).json()
snippet = data['items'][0]['snippet']
title = snippet['title']
description = snippet['description']
playlist = data['items'][0]['contentDetails']['relatedPlaylists']['uploads']
return {
"link": url,
"title": title,
"description": description,
"channel_id": channel_id,
"playlist": playlist,
"last_video": VIP.get_latest_video(playlist)
}
@staticmethod
def get_latest_video(playlist_id: "str"):
api_key = os.getenv("GOOGLE_API_KEY")
video_api = f"https://www.googleapis.com/youtube/v3/playlistItems?part=snippet&maxResults=1&" \
f"playlistId={playlist_id}&key={api_key}"
data = requests.get(video_api).json()
video_id = data['items'][0]['snippet']['resourceId']['videoId']
logging.info(f"Latest video %s from %s", video_id, data['items'][0]['snippet']['channelTitle'])
return f"https://www.youtube.com/watch?v={video_id}"
def has_newer_update(self, channel_id: "str"):
self.cur.execute("SELECT playlist,latest_video FROM channel WHERE channel_id=%s", (channel_id,))
data = self.cur.fetchone()
playlist_id = data[0]
old_video = data[1]
newest_video = VIP.get_latest_video(playlist_id)
if old_video != newest_video:
logging.info("Newer update found for %s %s", channel_id, newest_video)
self.cur.execute("UPDATE channel SET latest_video=%s WHERE channel_id=%s", (newest_video, channel_id))
self.con.commit()
return newest_video
def get_user_subscription(self, user_id: "int"):
self.cur.execute(
"""
select title, link, channel.channel_id from channel, subscribe
where subscribe.user_id = %s and channel.channel_id = subscribe.channel_id
""", (user_id,))
data = self.cur.fetchall()
text = ""
for item in data:
text += "[{}]({}) `{}\n`".format(*item)
return text
def group_subscriber(self):
# {"channel_id": [user_id, user_id, ...]}
self.cur.execute("select * from subscribe where is_valid=1")
data = self.cur.fetchall()
group = {}
for item in data:
group.setdefault(item[1], []).append(item[0])
logging.info("Checking periodic subscriber...")
return group
def deactivate_user_subscription(self, user_id: "int"):
self.cur.execute("UPDATE subscribe set is_valid=0 WHERE user_id=%s", (user_id,))
self.con.commit()
def sub_count(self):
sql = """
select user_id, channel.title, channel.link
from subscribe, channel where subscribe.channel_id = channel.channel_id
"""
self.cur.execute(sql)
data = self.cur.fetchall()
text = f"Total {len(data)} subscriptions found.\n\n"
for item in data:
text += "{} ==> [{}]({})\n".format(*item)
return text
def del_cache(self, user_link: "str"):
unique = self.extract_canonical_link(user_link)
caches = self.r.hgetall("cache")
count = 0
for key in caches:
if key.startswith(unique):
count += self.del_send_cache(key)
return count
class BuyMeACoffee:
def __init__(self):
self._token = COFFEE_TOKEN
@ -266,8 +53,7 @@ class BuyMeACoffee:
price = float(order.get("support_coffee_price", 0))
cups = float(order.get("support_coffees", 1))
amount = price * cups
level = math.floor(amount / MULTIPLY)
return level, amount, email
return amount, email
class Afdian:
@ -279,11 +65,11 @@ class Afdian:
def _generate_signature(self):
data = {
"user_id": self._user_id,
"params": "{\"x\":0}",
"params": '{"x":0}',
"ts": int(time.time()),
}
sign_text = "{token}params{params}ts{ts}user_id{user_id}".format(
token=self._token, params=data['params'], ts=data["ts"], user_id=data["user_id"]
token=self._token, params=data["params"], ts=data["ts"], user_id=data["user_id"]
)
md5 = hashlib.md5(sign_text.encode("u8"))
@ -305,44 +91,75 @@ class Afdian:
def get_user_payment(self, trade_no: "str") -> ("int", "float", "str"):
order = self._get_afdian_status(trade_no)
amount = float(order.get("show_amount", 0))
level = math.floor(amount / (MULTIPLY * USD2CNY))
return level, amount, trade_no
# convert to USD
return amount / 7, trade_no
def verify_payment(user_id, unique, client) -> "str":
if not ENABLE_VIP:
return "VIP is not enabled."
logging.info("Verifying payment for %s - %s", user_id, unique)
if "@" in unique:
pay = BuyMeACoffee()
else:
pay = Afdian()
class Payment(Redis, MySQL):
# TODO transaction issues
def check_old_user(self, user_id: "int") -> "tuple":
self.cur.execute("SELECT * FROM payment WHERE user_id=%s AND old_user=1", (user_id,))
data = self.cur.fetchone()
return data
level, amount, pay_id = pay.get_user_payment(unique)
if amount == 0:
return f"You pay amount is {amount}. Did you input wrong order ID or email? " \
f"Talk to @{OWNER} if you need any assistant."
if not level:
return f"You pay amount {amount} is below minimum ${MULTIPLY}. " \
f"Talk to @{OWNER} if you need any assistant."
else:
vip = VIP()
ud = {
"user_id": user_id,
"username": client.get_chat(user_id).first_name,
"payment_amount": amount,
"payment_id": pay_id,
"level": level,
"quota": QUOTA * level * MULTIPLY
}
def get_pay_token(self, user_id: "int") -> int:
self.cur.execute("SELECT token FROM payment WHERE user_id=%s", (user_id,))
data = self.cur.fetchall() or [(0,)]
return sum([i[0] for i in data if i[0]])
message = vip.add_vip(ud)
return message
def get_free_token(self, user_id: "int") -> int:
if self.r.exists(user_id):
return int(self.r.get(user_id))
else:
# set and return
self.r.set(user_id, FREE_DOWNLOAD, ex=EXPIRE)
return FREE_DOWNLOAD
def get_token(self, user_id):
ttl = self.r.ttl(user_id)
return self.get_free_token(user_id), self.get_pay_token(user_id), current_time(time.time() + ttl)
def subscribe_query():
vip = VIP()
for cid, uid in vip.group_subscriber().items():
has = vip.has_newer_update(cid)
if has:
print(f"{has} - {uid}")
def use_free_token(self, user_id: "int"):
if self.r.exists(user_id):
self.r.decr(user_id, 1)
else:
# first time download
self.r.set(user_id, 5 - 1, ex=EXPIRE)
def use_pay_token(self, user_id: int):
# a user may pay multiple times, so we'll need to filter the first payment with valid token
self.cur.execute("SELECT payment_id FROM payment WHERE user_id=%s AND token>0", (user_id,))
data = self.cur.fetchone()
payment_id = data[0]
logging.info("User %s use pay token with payment_id %s", user_id, payment_id)
self.cur.execute("UPDATE payment SET token=token-1 WHERE payment_id=%s", (payment_id,))
self.con.commit()
def use_token(self, user_id):
free = self.get_free_token(user_id)
if free > 0:
self.use_free_token(user_id)
else:
self.use_pay_token(user_id)
def add_pay_user(self, pay_data: "list"):
self.cur.execute("INSERT INTO payment VALUES (%s,%s,%s,%s,%s)", pay_data)
self.con.commit()
def verify_payment(self, user_id: "int", unique: "str") -> "str":
pay = BuyMeACoffee() if "@" in unique else Afdian()
self.cur.execute("SELECT * FROM payment WHERE payment_id=%s ", (unique,))
data = self.cur.fetchone()
if data:
# TODO what if a user pay twice with the same email address?
return (
f"Failed. Payment has been verified by other users. Please contact @{OWNER} if you have any questions."
)
amount, pay_id = pay.get_user_payment(unique)
logging.info("User %s paid %s, identifier is %s", user_id, amount, unique)
# amount is already in USD
if amount == 0:
return "Payment not found. Please check your payment ID or email address"
self.add_pay_user([user_id, amount, pay_id, 0, amount * TOKEN_PRICE])
return "Thanks! Your payment has been verified. /start to get your token details"

View file

@ -1,3 +0,0 @@
alter table settings
add mode varchar(32) default 'Celery' null;

View file

@ -31,26 +31,33 @@ from pyrogram import idle
from pyrogram.types import InlineKeyboardButton, InlineKeyboardMarkup, Message
from requests_toolbelt import MultipartEncoder, MultipartEncoderMonitor
from channel import Channel
from client_init import create_app
from config import (ARCHIVE_ID, BROKER, ENABLE_CELERY, ENABLE_QUEUE,
ENABLE_VIP, TG_MAX_SIZE, WORKERS)
from config import ARCHIVE_ID, BROKER, ENABLE_CELERY, ENABLE_QUEUE, TG_MAX_SIZE, WORKERS, ENABLE_VIP
from constant import BotText
from db import Redis
from downloader import (edit_text, sizeof_fmt, tqdm_progress, upload_hook,
ytdl_download)
from limit import VIP
from utils import (apply_log_formatter, auto_restart, customize_logger,
get_metadata, get_revision, get_user_settings)
from database import Redis
from downloader import edit_text, tqdm_progress, upload_hook, ytdl_download
from limit import Payment
from utils import (
apply_log_formatter,
auto_restart,
customize_logger,
get_metadata,
get_revision,
sizeof_fmt,
)
customize_logger(["pyrogram.client", "pyrogram.session.session", "pyrogram.connection.connection"])
apply_log_formatter()
bot_text = BotText()
logging.getLogger('apscheduler.executors.default').propagate = False
logging.getLogger("apscheduler.executors.default").propagate = False
# celery -A tasks worker --loglevel=info --pool=solo
# app = Celery('celery', broker=BROKER, accept_content=['pickle'], task_serializer='pickle')
app = Celery('tasks', broker=BROKER)
app = Celery("tasks", broker=BROKER)
redis = Redis()
payment = Payment()
channel = Channel()
celery_client = create_app(":memory:")
@ -80,9 +87,10 @@ def audio_task(chat_id, message_id):
def get_unique_clink(original_url, user_id):
settings = get_user_settings(str(user_id))
clink = VIP().extract_canonical_link(original_url)
settings = payment.get_user_settings(str(user_id))
clink = channel.extract_canonical_link(original_url)
try:
# different user may have different resolution settings
unique = "{}?p={}{}".format(clink, *settings[1:])
except IndexError:
unique = clink
@ -97,45 +105,34 @@ def direct_download_task(chat_id, message_id, url):
logging.info("Direct download celery tasks ended.")
def forward_video(url, client, bot_msg):
def forward_video(client, bot_msg, url):
chat_id = bot_msg.chat.id
red = Redis()
vip = VIP()
unique = get_unique_clink(url, chat_id)
cached_fid = red.get_send_cache(unique)
cached_fid = redis.get_send_cache(unique)
if not cached_fid:
return False
try:
res_msg: "Message" = upload_processor(client, bot_msg, url, cached_fid)
if not res_msg:
raise ValueError("Failed to forward message")
obj = res_msg.document or res_msg.video or res_msg.audio
if ENABLE_VIP:
file_size = getattr(obj, "file_size", None) \
or getattr(obj, "file_size", None) \
or getattr(obj, "file_size", 10)
# TODO: forward file size may exceed the limit
vip.use_quota(chat_id, file_size)
obj = res_msg.document or res_msg.video or res_msg.audio or res_msg.animation
caption, _ = gen_cap(bot_msg, url, obj)
res_msg.edit_text(caption, reply_markup=gen_video_markup())
bot_msg.edit_text(f"Download success!✅✅✅")
red.update_metrics("cache_hit")
redis.update_metrics("cache_hit")
return True
except Exception as e:
traceback.print_exc()
logging.error("Failed to forward message %s", e)
red.del_send_cache(unique)
red.update_metrics("cache_miss")
redis.del_send_cache(unique)
redis.update_metrics("cache_miss")
def ytdl_download_entrance(bot_msg, client, url):
def ytdl_download_entrance(client, bot_msg, url):
chat_id = bot_msg.chat.id
if forward_video(url, client, bot_msg):
if forward_video(client, bot_msg, url):
return
mode = get_user_settings(str(chat_id))[-1]
mode = payment.get_user_settings(str(chat_id))[-1]
if ENABLE_CELERY and mode in [None, "Celery"]:
async_task(ytdl_download_task, chat_id, bot_msg.message_id, url)
# ytdl_download_task.delay(chat_id, bot_msg.message_id, url)
@ -163,19 +160,10 @@ def audio_entrance(bot_msg, client):
def direct_normal_download(bot_msg, client, url):
chat_id = bot_msg.chat.id
headers = {
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.149 Safari/537.36"}
vip = VIP()
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.149 Safari/537.36"
}
vip = Payment()
length = 0
if ENABLE_VIP:
remain, _, _ = vip.check_remaining_quota(chat_id)
try:
head_req = requests.head(url, headers=headers)
length = int(head_req.headers.get("content-length"))
except (TypeError, requests.exceptions.RequestException):
length = 0
if remain < length:
bot_msg.reply_text(f"Sorry, you have reached your quota.\n")
return
req = None
try:
@ -203,13 +191,15 @@ def direct_normal_download(bot_msg, client, url):
downloaded += len(chunk)
logging.info("Downloaded file %s", filename)
st_size = os.stat(filepath).st_size
if ENABLE_VIP:
vip.use_quota(chat_id, st_size)
client.send_chat_action(chat_id, "upload_document")
client.send_document(bot_msg.chat.id, filepath,
caption=f"filesize: {sizeof_fmt(st_size)}",
progress=upload_hook, progress_args=(bot_msg,),
)
client.send_document(
bot_msg.chat.id,
filepath,
caption=f"filesize: {sizeof_fmt(st_size)}",
progress=upload_hook,
progress_args=(bot_msg,),
)
bot_msg.edit_text("Download success!✅")
@ -219,11 +209,11 @@ def normal_audio(bot_msg, client):
status_msg = bot_msg.reply_text("Converting to audio...please wait patiently", quote=True)
orig_url: "str" = re.findall(r"https?://.*", bot_msg.caption)[0]
with tempfile.TemporaryDirectory(prefix="ytdl-") as tmp:
client.send_chat_action(chat_id, 'record_audio')
client.send_chat_action(chat_id, "record_audio")
# just try to download the audio using yt-dlp
resp = ytdl_download(orig_url, tmp, status_msg, hijack="bestaudio[ext=m4a]")
status_msg.edit_text("Sending audio now...")
client.send_chat_action(chat_id, 'upload_audio')
client.send_chat_action(chat_id, "upload_audio")
for f in resp["filepath"]:
client.send_audio(chat_id, f)
status_msg.edit_text("✅ Conversion complete.")
@ -240,7 +230,7 @@ def get_dl_source():
def upload_transfer_sh(bm, paths: list) -> "str":
d = {p.name: (md5(p.name.encode("utf8")).hexdigest() + p.suffix, p.open("rb")) for p in paths}
monitor = MultipartEncoderMonitor(MultipartEncoder(fields=d), lambda x: upload_hook(x.bytes_read, x.len, bm))
headers = {'Content-Type': monitor.content_type}
headers = {"Content-Type": monitor.content_type}
try:
req = requests.post("https://transfer.sh", data=monitor, headers=headers)
bm.edit_text(f"Download success!✅")
@ -256,9 +246,9 @@ def ytdl_normal_download(bot_msg, client, url):
result = ytdl_download(url, temp_dir.name, bot_msg)
logging.info("Download complete.")
if result["status"]:
client.send_chat_action(chat_id, 'upload_document')
client.send_chat_action(chat_id, "upload_document")
video_paths = result["filepath"]
bot_msg.edit_text('Download complete. Sending now...')
bot_msg.edit_text("Download complete. Sending now...")
for video_path in video_paths:
# normally there's only one video in that path...
st_size = os.stat(video_path).st_size
@ -268,9 +258,9 @@ def ytdl_normal_download(bot_msg, client, url):
# client.send_message(chat_id, upload_transfer_sh(bot_msg, video_paths))
continue
upload_processor(client, bot_msg, url, video_path)
bot_msg.edit_text('Download success!✅')
bot_msg.edit_text("Download success!✅")
else:
client.send_chat_action(chat_id, 'typing')
client.send_chat_action(chat_id, "typing")
tb = result["error"][0:4000]
bot_msg.edit_text(f"Download failed!❌\n\n```{tb}```", disable_web_page_preview=True)
@ -279,52 +269,75 @@ def ytdl_normal_download(bot_msg, client, url):
def upload_processor(client, bot_msg, url, vp_or_fid: "typing.Any[str, pathlib.Path]"):
chat_id = bot_msg.chat.id
red = Redis()
markup = gen_video_markup()
cap, meta = gen_cap(bot_msg, url, vp_or_fid)
settings = get_user_settings(str(chat_id))
settings = payment.get_user_settings(str(chat_id))
if ARCHIVE_ID and isinstance(vp_or_fid, pathlib.Path):
chat_id = ARCHIVE_ID
if settings[2] == "document":
logging.info("Sending as document")
try:
# send as document could be sent as video even if it's a document
res_msg = client.send_document(chat_id, vp_or_fid,
caption=cap,
progress=upload_hook, progress_args=(bot_msg,),
reply_markup=markup,
thumb=meta["thumb"],
force_document=True
)
res_msg = client.send_document(
chat_id,
vp_or_fid,
caption=cap,
progress=upload_hook,
progress_args=(bot_msg,),
reply_markup=markup,
thumb=meta["thumb"],
force_document=True,
)
except ValueError:
logging.error("Retry to send as video")
res_msg = client.send_video(chat_id, vp_or_fid,
supports_streaming=True,
caption=cap,
progress=upload_hook, progress_args=(bot_msg,),
reply_markup=markup,
**meta
)
res_msg = client.send_video(
chat_id,
vp_or_fid,
supports_streaming=True,
caption=cap,
progress=upload_hook,
progress_args=(bot_msg,),
reply_markup=markup,
**meta,
)
elif settings[2] == "audio":
logging.info("Sending as audio")
res_msg = client.send_audio(chat_id, vp_or_fid,
caption=cap,
progress=upload_hook, progress_args=(bot_msg,),
)
res_msg = client.send_audio(
chat_id,
vp_or_fid,
caption=cap,
progress=upload_hook,
progress_args=(bot_msg,),
)
else:
logging.info("Sending as video")
res_msg = client.send_video(chat_id, vp_or_fid,
supports_streaming=True,
caption=cap,
progress=upload_hook, progress_args=(bot_msg,),
reply_markup=markup,
**meta
)
try:
res_msg = client.send_video(
chat_id,
vp_or_fid,
supports_streaming=True,
caption=cap,
progress=upload_hook,
progress_args=(bot_msg,),
reply_markup=markup,
**meta,
)
except ValueError:
logging.info("Retry to send as animation")
res_msg = client.send_animation(
chat_id,
vp_or_fid,
caption=cap,
progress=upload_hook,
progress_args=(bot_msg,),
reply_markup=markup,
**meta,
)
unique = get_unique_clink(url, bot_msg.chat.id)
obj = res_msg.document or res_msg.video or res_msg.audio
red.add_send_cache(unique, getattr(obj, "file_id", None))
red.update_metrics("video_success")
obj = res_msg.document or res_msg.video or res_msg.audio or res_msg.animation
redis.add_send_cache(unique, getattr(obj, "file_id", None))
redis.update_metrics("video_success")
if ARCHIVE_ID and isinstance(vp_or_fid, pathlib.Path):
client.forward_messages(bot_msg.chat.id, ARCHIVE_ID, res_msg.message_id)
return res_msg
@ -334,11 +347,7 @@ def gen_cap(bm, url, video_path):
chat_id = bm.chat.id
user = bm.chat
try:
user_info = "@{}({})-{}".format(
user.username or "N/A",
user.first_name or "" + user.last_name or "",
user.id
)
user_info = "@{}({})-{}".format(user.username or "N/A", user.first_name or "" + user.last_name or "", user.id)
except Exception:
user_info = ""
@ -348,17 +357,24 @@ def gen_cap(bm, url, video_path):
file_size = sizeof_fmt(os.stat(video_path).st_size)
else:
file_name = getattr(video_path, "file_name", "")
file_size = sizeof_fmt(getattr(video_path, "file_size", (2 << 6) - (2 << 4) - (2 << 2) + (0 ^ 1) + (2 << 5)))
file_size = sizeof_fmt(getattr(video_path, "file_size", (2 << 2) + ((2 << 2) + 1) + (2 << 5)))
meta = dict(
width=getattr(video_path, "width", 0),
height=getattr(video_path, "height", 0),
duration=getattr(video_path, "duration", 0),
thumb=getattr(video_path, "thumb", None),
)
remain = bot_text.remaining_quota_caption(chat_id)
free = payment.get_free_token(chat_id)
pay = payment.get_pay_token(chat_id)
if ENABLE_VIP:
remain = f"Download token count: free {free}, pay {pay}"
else:
remain = ""
worker = get_dl_source()
cap = f"{user_info}\n{file_name}\n\n{url}\n\nInfo: {meta['width']}x{meta['height']} {file_size}\t" \
f"{meta['duration']}s\n{remain}\n{worker}\n{bot_text.custom_text}"
cap = (
f"{user_info}\n{file_name}\n\n{url}\n\nInfo: {meta['width']}x{meta['height']} {file_size}\t"
f"{meta['duration']}s\n{remain}\n{worker}\n{bot_text.custom_text}"
)
return cap, meta
@ -367,8 +383,7 @@ def gen_video_markup():
[
[ # First row
InlineKeyboardButton( # Generates a callback query when pressed
"convert to audio",
callback_data="convert"
"convert to audio", callback_data="convert"
)
]
]
@ -412,10 +427,10 @@ def async_task(task_name, *args):
inspect = app.control.inspect()
worker_stats = inspect.stats()
route_queues = []
padding = math.ceil(sum([i['pool']['max-concurrency'] for i in worker_stats.values()]) / len(worker_stats))
padding = math.ceil(sum([i["pool"]["max-concurrency"] for i in worker_stats.values()]) / len(worker_stats))
for worker_name, stats in worker_stats.items():
route = worker_name.split('@')[1]
concurrency = stats['pool']['max-concurrency']
route = worker_name.split("@")[1]
concurrency = stats["pool"]["max-concurrency"]
route_queues.extend([route] * (concurrency + padding))
destination = random.choice(route_queues)
logging.info("Selecting worker %s from %s in %.2fs", destination, route_queues, time.time() - t0)
@ -424,11 +439,7 @@ def async_task(task_name, *args):
def run_celery():
worker_name = os.getenv("WORKER_NAME", "")
argv = [
"-A", "tasks", 'worker', '--loglevel=info',
"--pool=threads", f"--concurrency={WORKERS}",
"-n", worker_name
]
argv = ["-A", "tasks", "worker", "--loglevel=info", "--pool=threads", f"--concurrency={WORKERS}", "-n", worker_name]
if ENABLE_QUEUE:
argv.extend(["-Q", worker_name])
app.worker_main(argv)
@ -439,14 +450,14 @@ def purge_tasks():
return f"purged {count} tasks."
if __name__ == '__main__':
if __name__ == "__main__":
celery_client.start()
print("Bootstrapping Celery worker now.....")
time.sleep(5)
threading.Thread(target=run_celery, daemon=True).start()
scheduler = BackgroundScheduler(timezone="Asia/Shanghai")
scheduler.add_job(auto_restart, 'interval', seconds=5)
scheduler.add_job(auto_restart, "interval", seconds=10)
scheduler.start()
idle()

View file

@ -18,20 +18,20 @@ import tempfile
import time
import uuid
import coloredlogs
import ffmpeg
import psutil
from db import MySQL
from flower_tasks import app
inspect = app.control.inspect()
def apply_log_formatter():
logging.basicConfig(
coloredlogs.install(
level=logging.INFO,
format='[%(asctime)s %(filename)s:%(lineno)d %(levelname).1s] %(message)s',
datefmt="%Y-%m-%d %H:%M:%S"
fmt="[%(asctime)s %(filename)s:%(lineno)d %(levelname).1s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
@ -41,33 +41,12 @@ def customize_logger(logger: "list"):
logging.getLogger(log).setLevel(level=logging.INFO)
def get_user_settings(user_id: "str") -> "tuple":
db = MySQL()
cur = db.cur
cur.execute("SELECT * FROM settings WHERE user_id = %s", (user_id,))
data = cur.fetchone()
if data is None:
return 100, "high", "video", "Celery"
return data
def set_user_settings(user_id: int, field: "str", value: "str"):
db = MySQL()
cur = db.cur
cur.execute("SELECT * FROM settings WHERE user_id = %s", (user_id,))
data = cur.fetchone()
if data is None:
resolution = method = ""
if field == "resolution":
method = "video"
resolution = value
if field == "method":
method = value
resolution = "high"
cur.execute("INSERT INTO settings VALUES (%s,%s,%s,%s)", (user_id, resolution, method, "Celery"))
else:
cur.execute(f"UPDATE settings SET {field} =%s WHERE user_id = %s", (value, user_id))
db.con.commit()
def sizeof_fmt(num: int, suffix="B"):
for unit in ["", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi"]:
if abs(num) < 1024.0:
return "%3.1f%s%s" % (num, unit, suffix)
num /= 1024.0
return "%.1f%s%s" % (num, "Yi", suffix)
def is_youtube(url: "str"):
@ -76,6 +55,8 @@ def is_youtube(url: "str"):
def adjust_formats(user_id: "str", url: "str", formats: "list", hijack=None):
from database import MySQL
# high: best quality 1080P, 2K, 4K, 8K
# medium: 720P
# low: 480P
@ -84,7 +65,7 @@ def adjust_formats(user_id: "str", url: "str", formats: "list", hijack=None):
return
mapping = {"high": [], "medium": [720], "low": [480]}
settings = get_user_settings(user_id)
settings = MySQL().get_user_settings(user_id)
if settings and is_youtube(url):
for m in mapping.get(settings[1], []):
formats.insert(0, f"bestvideo[ext=mp4][height={m}]+bestaudio[ext=m4a]")
@ -106,7 +87,7 @@ def get_metadata(video_path):
logging.error(e)
try:
thumb = pathlib.Path(video_path).parent.joinpath(f"{uuid.uuid4().hex}-thunmnail.png").as_posix()
ffmpeg.input(video_path, ss=duration / 2).filter('scale', width, -1).output(thumb, vframes=1).run()
ffmpeg.input(video_path, ss=duration / 2).filter("scale", width, -1).output(thumb, vframes=1).run()
except ffmpeg._run.Error:
thumb = None
@ -134,9 +115,9 @@ def get_func_queue(func) -> int:
return 0
def tail(f, lines=1, _buffer=4098):
def tail_log(f, lines=1, _buffer=4098):
"""Tail a file and get X lines from the end"""
# place holder for the lines found
# placeholder for the lines found
lines_found = []
# block counter will be multiplied by buffer
@ -214,7 +195,7 @@ def auto_restart():
if not os.path.exists(log_path):
return
with open(log_path) as f:
logs = "".join(tail(f, lines=10))
logs = "".join(tail_log(f, lines=10))
det = Detector(logs)
method_list = [getattr(det, func) for func in dir(det) if func.endswith("_detector")]
@ -233,5 +214,5 @@ def clean_tempfile():
shutil.rmtree(item, ignore_errors=True)
if __name__ == '__main__':
if __name__ == "__main__":
auto_restart()

View file

@ -24,33 +24,42 @@ from pyrogram.raw import functions
from pyrogram.raw import types as raw_types
from pyrogram.types import InlineKeyboardButton, InlineKeyboardMarkup
from tgbot_ping import get_runtime
from token_bucket import Limiter, MemoryStorage
from channel import Channel
from client_init import create_app
from config import (AUTHORIZED_USER, BURST, ENABLE_CELERY, ENABLE_FFMPEG,
ENABLE_VIP, MULTIPLY, OWNER, PROVIDER_TOKEN, QUOTA, RATE,
REQUIRED_MEMBERSHIP, PLAYLIST_SUPPORT)
from config import (
AUTHORIZED_USER,
ENABLE_CELERY,
ENABLE_FFMPEG,
ENABLE_VIP,
OWNER,
PLAYLIST_SUPPORT,
PROVIDER_TOKEN,
REQUIRED_MEMBERSHIP,
TOKEN_PRICE,
)
from constant import BotText
from db import InfluxDB, MySQL, Redis
from limit import VIP, verify_payment
from database import InfluxDB, MySQL, Redis
from limit import Payment
from tasks import app as celery_app
from tasks import (audio_entrance, direct_download_entrance, hot_patch,
purge_tasks, ytdl_download_entrance)
from utils import (auto_restart, clean_tempfile, customize_logger,
get_revision, get_user_settings, set_user_settings)
from tasks import (
audio_entrance,
direct_download_entrance,
hot_patch,
purge_tasks,
ytdl_download_entrance,
)
from utils import auto_restart, clean_tempfile, customize_logger, get_revision
customize_logger(["pyrogram.client", "pyrogram.session.session", "pyrogram.connection.connection"])
logging.getLogger('apscheduler.executors.default').propagate = False
logging.getLogger("apscheduler.executors.default").propagate = False
app = create_app()
bot_text = BotText()
db = MySQL()
logging.info("Authorized users are %s", AUTHORIZED_USER)
# rate, capacity
mem = MemoryStorage()
# 5 minutes, 2 bursts
lim = Limiter(1 / RATE, BURST, mem)
redis = Redis()
payment = Payment()
channel = Channel()
def private_use(func):
@ -69,10 +78,10 @@ def private_use(func):
users = []
if users and chat_id and chat_id not in users:
message.reply_text(bot_text.private, quote=True)
message.reply_text(BotText.private, quote=True)
return
# membership check
# TODO bug fix # 198 membership check
if REQUIRED_MEMBERSHIP:
try:
if app.get_chat_member(REQUIRED_MEMBERSHIP, chat_id).status != "member":
@ -81,7 +90,7 @@ def private_use(func):
logging.info("user %s check passed for group/channel %s.", chat_id, REQUIRED_MEMBERSHIP)
except UserNotParticipant:
logging.warning("user %s is not a member of group/channel %s", chat_id, REQUIRED_MEMBERSHIP)
message.reply_text(bot_text.membership_require, quote=True)
message.reply_text(BotText.membership_require, quote=True)
return
return func(client, message)
@ -94,32 +103,44 @@ def start_handler(client: "Client", message: "types.Message"):
from_id = message.from_user.id
logging.info("Welcome to youtube-dl bot!")
client.send_chat_action(from_id, "typing")
greeting = bot_text.get_vip_greeting(from_id)
quota = bot_text.remaining_quota_caption(from_id)
custom_text = bot_text.custom_text
text = f"{greeting}{bot_text.start}\n\n{quota}\n{custom_text}"
is_old_user = payment.check_old_user(from_id)
if is_old_user:
info = ""
elif ENABLE_VIP:
free_token, pay_token, reset = payment.get_token(from_id)
info = f"Free token: {free_token}, Pay token: {pay_token}, Reset: {reset}"
else:
info = ""
text = f"{BotText.start}\n\n{info}\n{BotText.custom_text}"
client.send_message(message.chat.id, text)
# add to settings table
db.set_user_settings(from_id, "resolution", "high")
@app.on_message(filters.command(["help"]))
def help_handler(client: "Client", message: "types.Message"):
chat_id = message.chat.id
client.send_chat_action(chat_id, "typing")
client.send_message(chat_id, bot_text.help, disable_web_page_preview=True)
client.send_message(chat_id, BotText.help, disable_web_page_preview=True)
@app.on_message(filters.command(["about"]))
def about_handler(client: "Client", message: "types.Message"):
chat_id = message.chat.id
client.send_chat_action(chat_id, "typing")
client.send_message(chat_id, BotText.about)
@app.on_message(filters.command(["sub"]))
def subscribe_handler(client: "Client", message: "types.Message"):
vip = VIP()
chat_id = message.chat.id
client.send_chat_action(chat_id, "typing")
if message.text == "/sub":
result = vip.get_user_subscription(chat_id)
result = channel.get_user_subscription(chat_id)
else:
link = message.text.split()[1]
try:
result = vip.subscribe_channel(chat_id, link)
result = channel.subscribe_channel(chat_id, link)
except (IndexError, ValueError):
result = f"Error: \n{traceback.format_exc()}"
client.send_message(chat_id, result or "You have no subscription.", disable_web_page_preview=True)
@ -127,15 +148,14 @@ def subscribe_handler(client: "Client", message: "types.Message"):
@app.on_message(filters.command(["unsub"]))
def unsubscribe_handler(client: "Client", message: "types.Message"):
vip = VIP()
chat_id = message.chat.id
client.send_chat_action(chat_id, "typing")
text = message.text.split(" ")
if len(text) == 1:
client.send_message(chat_id, "/unsubscribe channel_id", disable_web_page_preview=True)
client.send_message(chat_id, "/unsub channel_id", disable_web_page_preview=True)
return
rows = vip.unsubscribe_channel(chat_id, text[1])
rows = channel.unsubscribe_channel(chat_id, text[1])
if rows:
text = f"Unsubscribed from {text[1]}"
else:
@ -159,7 +179,7 @@ def uncache_handler(client: "Client", message: "types.Message"):
username = message.from_user.username
link = message.text.split()[1]
if username == OWNER:
count = VIP().del_cache(link)
count = channel.del_cache(link)
message.reply_text(f"{count} cache(s) deleted.", quote=True)
@ -179,26 +199,19 @@ def ping_handler(client: "Client", message: "types.Message"):
else:
bot_info = get_runtime("ytdlbot_ytdl_1", "YouTube-dl")
if message.chat.username == OWNER:
stats = bot_text.ping_worker()[:1000]
client.send_document(chat_id, Redis().generate_file(), caption=f"{bot_info}\n\n{stats}")
stats = BotText.ping_worker()[:1000]
client.send_document(chat_id, redis.generate_file(), caption=f"{bot_info}\n\n{stats}")
else:
client.send_message(chat_id, f"{bot_info}")
@app.on_message(filters.command(["about"]))
def about_handler(client: "Client", message: "types.Message"):
chat_id = message.chat.id
client.send_chat_action(chat_id, "typing")
client.send_message(chat_id, bot_text.about)
@app.on_message(filters.command(["sub_count"]))
def sub_count_handler(client: "Client", message: "types.Message"):
username = message.from_user.username
chat_id = message.chat.id
if username == OWNER:
with BytesIO() as f:
f.write(VIP().sub_count().encode("u8"))
f.write(channel.sub_count().encode("u8"))
f.name = "subscription count.txt"
client.send_document(chat_id, f)
@ -207,15 +220,15 @@ def sub_count_handler(client: "Client", message: "types.Message"):
def direct_handler(client: "Client", message: "types.Message"):
chat_id = message.from_user.id
client.send_chat_action(chat_id, "typing")
url = re.sub(r'/direct\s*', '', message.text)
url = re.sub(r"/direct\s*", "", message.text)
logging.info("direct start %s", url)
if not re.findall(r"^https?://", url.lower()):
Redis().update_metrics("bad_request")
redis.update_metrics("bad_request")
message.reply_text("Send me a DIRECT LINK.", quote=True)
return
bot_msg = message.reply_text("Request received.", quote=True)
Redis().update_metrics("direct_request")
redis.update_metrics("direct_request")
direct_download_entrance(bot_msg, client, url)
@ -223,8 +236,8 @@ def direct_handler(client: "Client", message: "types.Message"):
def settings_handler(client: "Client", message: "types.Message"):
chat_id = message.chat.id
client.send_chat_action(chat_id, "typing")
data = get_user_settings(str(chat_id))
set_mode = (data[-1])
data = db.get_user_settings(str(chat_id))
set_mode = data[-1]
text = {"Local": "Celery", "Celery": "Local"}.get(set_mode, "Local")
mode_text = f"Download mode: **{set_mode}**"
if message.chat.username == OWNER:
@ -237,113 +250,115 @@ def settings_handler(client: "Client", message: "types.Message"):
[ # First row
InlineKeyboardButton("send as document", callback_data="document"),
InlineKeyboardButton("send as video", callback_data="video"),
InlineKeyboardButton("send as audio", callback_data="audio")
InlineKeyboardButton("send as audio", callback_data="audio"),
],
[ # second row
InlineKeyboardButton("High Quality", callback_data="high"),
InlineKeyboardButton("Medium Quality", callback_data="medium"),
InlineKeyboardButton("Low Quality", callback_data="low"),
],
extra
extra,
]
)
client.send_message(chat_id, bot_text.settings.format(data[1], data[2]) + mode_text, reply_markup=markup)
client.send_message(chat_id, BotText.settings.format(data[1], data[2]) + mode_text, reply_markup=markup)
@app.on_message(filters.command(["vip"]))
def vip_handler(client: "Client", message: "types.Message"):
@app.on_message(filters.command(["buy"]))
def buy_handler(client: "Client", message: "types.Message"):
# process as chat.id, not from_user.id
chat_id = message.chat.id
text = message.text.strip()
client.send_chat_action(chat_id, "typing")
if text == "/vip":
client.send_message(chat_id, bot_text.vip, disable_web_page_preview=True)
client.send_message(chat_id, BotText.buy, disable_web_page_preview=True)
# generate telegram invoice here
payload = f"{message.chat.id}-buy"
token_count = message.text.replace("/buy", "").strip()
# currency USD
if token_count.isdigit():
price = int(int(token_count) / TOKEN_PRICE * 100)
else:
bm: typing.Union["types.Message", "typing.Any"] = message.reply_text(bot_text.vip_pay, quote=True)
unique = text.replace("/vip", "").strip()
msg = verify_payment(chat_id, unique, client)
bm.edit_text(msg)
price = 100
invoice = generate_invoice(
price, f"Buy {TOKEN_PRICE} download tokens", "You can pay by Telegram payment or using link above", payload
)
app.send(
functions.messages.SendMedia(
peer=(raw_types.InputPeerUser(user_id=chat_id, access_hash=0)),
media=invoice,
random_id=app.rnd_id(),
message="Buy more download token",
)
)
@app.on_message(filters.command(["redeem"]))
def redeem_handler(client: "Client", message: "types.Message"):
chat_id = message.chat.id
text = message.text.strip()
unique = text.replace("/redeem", "").strip()
msg = payment.verify_payment(chat_id, unique)
message.reply_text(msg, quote=True)
def generate_invoice(amount: "int", title: "str", description: "str", payload: "str"):
invoice = raw_types.input_media_invoice.InputMediaInvoice(
invoice=(
raw_types.invoice.Invoice(currency="USD", prices=[raw_types.LabeledPrice(label="price", amount=amount)])),
invoice=raw_types.invoice.Invoice(
currency="USD", prices=[raw_types.LabeledPrice(label="price", amount=amount)]
),
title=title,
description=description,
provider=PROVIDER_TOKEN,
provider_data=raw_types.DataJSON(data="{}"),
payload=payload.encode(),
start_param=payload
start_param=payload,
)
return invoice
# payment related
@app.on_message(filters.command(["topup"]))
def topup_handler(client: "Client", message: "types.Message"):
chat_id = message.chat.id
client.send_chat_action(chat_id, "typing")
invoice = generate_invoice(100, bot_text.topup_title, bot_text.topup_description,
f"{message.chat.id}-topup")
app.send(
functions.messages.SendMedia(
peer=(raw_types.InputPeerUser(user_id=chat_id, access_hash=0)),
media=invoice,
random_id=app.rnd_id(),
message="Please use your card to pay for more traffic"
)
)
@app.on_message(filters.command(["tgvip"]))
def tgvip_handler(client: "Client", message: "types.Message"):
chat_id = message.chat.id
client.send_chat_action(chat_id, "typing")
invoice = generate_invoice(1000, f"VIP1", f"pay USD${MULTIPLY} for VIP1", f"{message.chat.id}-vip1".encode())
app.send(
functions.messages.SendMedia(
peer=(raw_types.InputPeerUser(user_id=chat_id, access_hash=0)),
media=invoice,
random_id=app.rnd_id(),
message="Please use your card to pay for more traffic"
)
)
@app.on_message(filters.incoming & filters.text)
@private_use
def download_handler(client: "Client", message: "types.Message"):
# check remaining quota
red = Redis()
chat_id = message.from_user.id
client.send_chat_action(chat_id, 'typing')
red.user_count(chat_id)
client.send_chat_action(chat_id, "typing")
redis.user_count(chat_id)
url = re.sub(r'/ytdl\s*', '', message.text)
url = re.sub(r"/ytdl\s*", "", message.text)
logging.info("start %s", url)
# check url
if not re.findall(r"^https?://", url.lower()):
red.update_metrics("bad_request")
redis.update_metrics("bad_request")
message.reply_text("I think you should send me a link.", quote=True)
return
# disable by default
if not PLAYLIST_SUPPORT:
if re.findall(r"^https://www\.youtube\.com/channel/", VIP.extract_canonical_link(url)) or "list" in url:
message.reply_text("Channel/list download is disabled now. Please send me individual video link.", quote=True)
red.update_metrics("reject_channel")
if re.findall(r"^https://www\.youtube\.com/channel/", Channel.extract_canonical_link(url)) or "list" in url:
message.reply_text(
"The ability to download a channel or list has been disabled."
"Kindly provide me with the specific link for each video instead.",
quote=True,
)
redis.update_metrics("reject_channel")
return
# non vip user, consume too many token
if (not VIP().check_vip(chat_id)) and (not lim.consume(str(chat_id).encode(), 1)):
red.update_metrics("rate_limit")
message.reply_text(bot_text.too_fast, quote=True)
return
red.update_metrics("video_request")
text = bot_text.get_receive_link_text()
# old user is not limited by token
if ENABLE_VIP and not payment.check_old_user(chat_id):
free, pay, reset = payment.get_token(chat_id)
if free + pay <= 0:
message.reply_text(
f"You don't have enough token. Please wait until {reset} or /buy more token.", quote=True
)
redis.update_metrics("reject_token")
return
else:
payment.use_token(chat_id)
redis.update_metrics("video_request")
text = BotText.get_receive_link_text()
try:
# raise pyrogram.errors.exceptions.FloodWait(10)
bot_msg: typing.Union["types.Message", "typing.Any"] = message.reply_text(text, quote=True)
@ -352,15 +367,16 @@ def download_handler(client: "Client", message: "types.Message"):
f.write(str(e).encode())
f.write(b"Your job will be done soon. Just wait! Don't rush.")
f.name = "Please don't flood me.txt"
bot_msg = message.reply_document(f, caption=f"Flood wait! Please wait {e.x} seconds...."
f"Your job will start automatically", quote=True)
bot_msg = message.reply_document(
f, caption=f"Flood wait! Please wait {e.x} seconds...." f"Your job will start automatically", quote=True
)
f.close()
client.send_message(OWNER, f"Flood wait! 🙁 {e.x} seconds....")
time.sleep(e.x)
client.send_chat_action(chat_id, 'upload_video')
client.send_chat_action(chat_id, "upload_video")
bot_msg.chat = message.chat
ytdl_download_entrance(bot_msg, client, url)
ytdl_download_entrance(client, bot_msg, url)
@app.on_callback_query(filters.regex(r"document|video|audio"))
@ -368,7 +384,7 @@ def send_method_callback(client: "Client", callback_query: types.CallbackQuery):
chat_id = callback_query.message.chat.id
data = callback_query.data
logging.info("Setting %s file type to %s", chat_id, data)
set_user_settings(chat_id, "method", data)
db.set_user_settings(chat_id, "method", data)
callback_query.answer(f"Your send type was set to {callback_query.data}")
@ -377,7 +393,7 @@ def download_resolution_callback(client: "Client", callback_query: types.Callbac
chat_id = callback_query.message.chat.id
data = callback_query.data
logging.info("Setting %s file type to %s", chat_id, data)
set_user_settings(chat_id, "resolution", data)
db.set_user_settings(chat_id, "resolution", data)
callback_query.answer(f"Your default download quality was set to {callback_query.data}")
@ -389,7 +405,7 @@ def audio_callback(client: "Client", callback_query: types.CallbackQuery):
return
callback_query.answer(f"Converting to audio...please wait patiently")
Redis().update_metrics("audio_request")
redis.update_metrics("audio_request")
vmsg = callback_query.message
audio_entrance(vmsg, client)
@ -397,24 +413,23 @@ def audio_callback(client: "Client", callback_query: types.CallbackQuery):
@app.on_callback_query(filters.regex(r"Local|Celery"))
def owner_local_callback(client: "Client", callback_query: types.CallbackQuery):
chat_id = callback_query.message.chat.id
set_user_settings(chat_id, "mode", callback_query.data)
db.set_user_settings(chat_id, "mode", callback_query.data)
callback_query.answer(f"Download mode was changed to {callback_query.data}")
def periodic_sub_check():
vip = VIP()
exceptions = pyrogram.errors.exceptions
for cid, uids in vip.group_subscriber().items():
video_url = vip.has_newer_update(cid)
for cid, uids in channel.group_subscriber().items():
video_url = channel.has_newer_update(cid)
if video_url:
logging.info(f"periodic update:{video_url} - {uids}")
for uid in uids:
try:
bot_msg = app.send_message(uid, f"{video_url} is downloading...", disable_web_page_preview=True)
ytdl_download_entrance(bot_msg, app, video_url)
except(exceptions.bad_request_400.PeerIdInvalid, exceptions.bad_request_400.UserIsBlocked) as e:
except (exceptions.bad_request_400.PeerIdInvalid, exceptions.bad_request_400.UserIsBlocked) as e:
logging.warning("User is blocked or deleted. %s", e)
vip.deactivate_user_subscription(uid)
channel.deactivate_user_subscription(uid)
except Exception as e:
logging.error("Unknown error when sending message to user. %s", traceback.format_exc())
finally:
@ -424,44 +439,29 @@ def periodic_sub_check():
@app.on_raw_update()
def raw_update(client: "Client", update, users, chats):
action = getattr(getattr(update, "message", None), "action", None)
if update.QUALNAME == 'types.UpdateBotPrecheckoutQuery':
if update.QUALNAME == "types.UpdateBotPrecheckoutQuery":
client.send(
functions.messages.SetBotPrecheckoutResults(
query_id=update.query_id,
success=True,
)
)
elif action and action.QUALNAME == 'types.MessageActionPaymentSentMe':
elif action and action.QUALNAME == "types.MessageActionPaymentSentMe":
logging.info("Payment received. %s", action)
uid = update.message.peer_id.user_id
vip = VIP()
amount = f"{action.total_amount / 100} {action.currency}"
if "vip" in action.payload.decode():
ud = {
"user_id": uid,
"username": users.get(uid).username,
"payment_amount": 10,
"payment_id": 0,
"level": 1,
"quota": QUOTA * 2
}
vip.direct_add_vip(ud)
client.send_message(uid, f"Thank you {uid}. VIP payment received: {amount}")
else:
vip.set_topup(uid)
client.send_message(uid, f"Thank you {uid}. Top up payment received: {amount}")
amount = action.total_amount / 100
payment.add_pay_user([uid, amount, action.charge.provider_charge_id, 0, amount * TOKEN_PRICE])
client.send_message(uid, f"Thank you {uid}. Payment received: {amount} {action.currency}")
if __name__ == '__main__':
MySQL()
scheduler = BackgroundScheduler(timezone="Europe/Stockholm", job_defaults={'max_instances': 5})
scheduler.add_job(Redis().reset_today, 'cron', hour=0, minute=0)
scheduler.add_job(auto_restart, 'interval', seconds=60)
scheduler.add_job(clean_tempfile, 'interval', seconds=60)
scheduler.add_job(InfluxDB().collect_data, 'interval', seconds=60)
if __name__ == "__main__":
scheduler = BackgroundScheduler(timezone="Asia/Shanghai", job_defaults={"max_instances": 5})
scheduler.add_job(redis.reset_today, "cron", hour=0, minute=0)
scheduler.add_job(auto_restart, "interval", seconds=60)
scheduler.add_job(clean_tempfile, "interval", seconds=60)
scheduler.add_job(InfluxDB().collect_data, "interval", seconds=60)
# default quota allocation of 10,000 units per day
scheduler.add_job(periodic_sub_check, 'interval', seconds=60 * 60)
scheduler.add_job(periodic_sub_check, "interval", seconds=3600)
scheduler.start()
banner = f"""
@ -469,7 +469,7 @@ if __name__ == '__main__':
By @BennyThink, VIP mode: {ENABLE_VIP}, Distribution: {ENABLE_CELERY}
By @BennyThink, VIP mode: {ENABLE_VIP}, Celery Mode: {ENABLE_CELERY}
Version: {get_revision()}
"""
print(banner)