From 218b1d5ec969e90aee30680659f05bd0fb5a901f Mon Sep 17 00:00:00 2001 From: Benny Date: Thu, 6 Apr 2023 22:33:24 +0200 Subject: [PATCH] token mode instead of vip mode --- .gitignore | 8 +- FAQ.md | 8 +- README.md | 303 +++++------------------------ k8s.md | 200 +++++++++++++++++++ requirements.txt | 11 +- scripts/migrate_to_mysql.py | 1 + scripts/start.sh | 4 - ytdlbot/broadcast.py | 62 ------ ytdlbot/channel.py | 177 +++++++++++++++++ ytdlbot/config.py | 29 ++- ytdlbot/constant.py | 120 +++++------- ytdlbot/{db.py => database.py} | 132 ++++++++----- ytdlbot/downloader.py | 149 ++++---------- ytdlbot/fakemysql.py | 58 ------ ytdlbot/limit.py | 343 ++++++++------------------------- ytdlbot/migration.sql | 3 - ytdlbot/tasks.py | 231 +++++++++++----------- ytdlbot/utils.py | 55 ++---- ytdlbot/ytdl_bot.py | 308 ++++++++++++++--------------- 19 files changed, 1001 insertions(+), 1201 deletions(-) create mode 100644 k8s.md delete mode 100644 ytdlbot/broadcast.py create mode 100644 ytdlbot/channel.py rename ytdlbot/{db.py => database.py} (71%) delete mode 100644 ytdlbot/fakemysql.py delete mode 100644 ytdlbot/migration.sql diff --git a/.gitignore b/.gitignore index ea7f521..1157294 100644 --- a/.gitignore +++ b/.gitignore @@ -159,4 +159,10 @@ ytdlbot/ytdl.session data/* upgrade_worker.sh ytdl.session -reinforcement/* \ No newline at end of file +reinforcement/* +/ytdlbot/session/celery.session +/.idea/prettier.xml +/.idea/watcherTasks.xml +/ytdlbot/session/ytdl.session-journal +/ytdlbot/unknown_errors.txt +/ytdlbot/ytdl.session-journal diff --git a/FAQ.md b/FAQ.md index 0039e28..4a0f7b5 100644 --- a/FAQ.md +++ b/FAQ.md @@ -1,10 +1,11 @@ # Project status -This project is in zombie mode now. +This project is currently inactive and is no longer being maintained. -It is not maintained anymore. No features will be added, no bugs will be fixed. +No new features will be added, and no bugs will be fixed. -Existing code and bot will continue to work without any guarantees. +While the existing code and bot will continue to work, there are no guarantees regarding their reliability or +functionality. # Can you support downloading from XXX? @@ -14,7 +15,6 @@ Please reach out to [yt-dlp](https://github.com/yt-dlp/yt-dlp) Just wait. It is a free service, and it is not guaranteed to work. -See [Grafana Dashboard](https://grafana.dmesg.app/d/9yXGmc1nk/youtube-download-celery?orgId=2) for current server status # Why is the bot not responding? diff --git a/README.md b/README.md index 3469ac5..7c04526 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ YouTube Download Bot๐Ÿš€ -Download videos from YouTube and other platforms through a Telegram Bot +This Telegram bot allows you to download videos from YouTube and other supported platforms. ----- **READ [FAQ](FAQ.md) FIRST IF YOU ENCOUNTER ANY ISSUES.** @@ -12,7 +12,7 @@ Download videos from YouTube and other platforms through a Telegram Bot ----- [![Deploy](https://www.herokucdn.com/deploy/button.svg)](https://heroku.com/deploy) -Can't deploy? Fork to your personal account and deploy it there! +If you are having trouble deploying, you can fork the project to your personal account and deploy it from there. **Starting November 28, 2022, free Heroku Dynos, free Heroku Postgres, and free Heroku Data for Redisยฎ plans will no longer be available.** @@ -27,14 +27,13 @@ Websites [supported by youtube-dl](https://ytdl-org.github.io/youtube-dl/support # Limitations of my bot -I don't have unlimited servers and bandwidth, so I have to make some restrictions. +Due to limitations on servers and bandwidth, there are some restrictions on this service. -* 5 GiB one-way traffic per 24 hours for each user -* maximum 5 minutes streaming conversion support -* maximum 3 subscriptions -* limited request in certain time range +* Each user is limited to 5 free downloads per 24-hour period +* there is a maximum of three subscriptions allowed for YouTube channels. -You can choose to become 'VIP' if you really need large traffic. And also, you could always deploy your own bot. +If you require more downloads, you can purchase additional tokens. Additionally, you have the option of deploying your +own bot. # Features @@ -45,7 +44,7 @@ You can choose to become 'VIP' if you really need large traffic. And also, you c 3. support progress bar 4. audio conversion 5. playlist support -6. VIP support +6. payment support 7. support different video resolutions 8. support sending as file or streaming as video 9. supports celery worker distribution - faster than before. @@ -56,25 +55,23 @@ You can choose to become 'VIP' if you really need large traffic. And also, you c # How to deploy? -You can deploy this bot on any platform that supports Python. - -## Heroku - -Use the button above! It should work like a magic but with limited functionalities. +This bot can be deployed on any platform that supports Python. ## Run natively on your machine -1. clone code -2. install ffmpeg -3. install Python 3.6+ -4. install aria2 and add to PATH -5. pip3 install -r requirements.txt -6. set environment variables `TOKEN`, `APP_ID` and `APP_HASH`, and more if you like. -7. `python3 ytdl_bot.py` +To deploy this bot, follow these steps: + +1. Clone the code from the repository. +2. Install FFmpeg. +3. Install Python 3.6 or a later version. +4. Install Aria2 and add it to the PATH. +5. Install the required packages by running `pip3 install -r requirements.txt`. +6. Set the environment variables `TOKEN`, `APP_ID`, `APP_HASH`, and any others that you may need. +7. Run `python3 ytdl_bot.py`. ## Docker -Some functions, such as VIP, ping will be disabled. +This bot has a simple one-line code and some functions, such as VIP and ping, are disabled. ```shell docker run -e APP_ID=111 -e APP_HASH=111 -e TOKEN=370FXI bennythink/ytdlbot @@ -104,42 +101,35 @@ mkdir env vim env/ytdl.env ``` -you can configure all the following environment variables: +You can configure all the following environment variables: -* PYRO_WORKERS: number of workers for pyrogram, default is 100 * WORKERS: workers count for celery +* PYRO_WORKERS: number of workers for pyrogram, default is 100 * APP_ID: **REQUIRED**, get it from https://core.telegram.org/ * APP_HASH: **REQUIRED** * TOKEN: **REQUIRED** * REDIS: **REQUIRED if you need VIP mode and cache** โš ๏ธ Don't publish your redis server on the internet. โš ๏ธ - +* EXPIRE: token expire time, default: 1 day +* ENABLE_VIP: enable VIP mode * OWNER: owner username -* QUOTA: quota in bytes -* EX: quota expire time -* MULTIPLY: vip quota comparing to normal quota -* USD2CNY: exchange rate -* VIP: VIP mode, default: disable -* AFD_LINK -* COFFEE_LINK -* COFFEE_TOKEN -* AFD_TOKEN -* AFD_USER_ID - -* AUTHORIZED_USER: users that could use this bot, user_id, separated with `,` -* REQUIRED_MEMBERSHIP: group or channel username, user must join this group to use the bot. Could be use with - above `AUTHORIZED_USER` - -* ENABLE_CELERY: Distribution mode, default: disable. You'll can setup workers in different locations. -* ENABLE_FFMPEG: enable ffmpeg so Telegram can stream -* MYSQL_HOST: you'll have to setup MySQL if you enable VIP mode -* MYSQL_USER -* MYSQL_PASS +* AUTHORIZED_USER: only authorized users can use the bot +* REQUIRED_MEMBERSHIP: group or channel username, user must join this group to use the bot +* ENABLE_CELERY: celery mode, default: disable +* ENABLE_QUEUE: celery queue +* BROKER: celery broker, should be redis://redis:6379/0 +* MYSQL_HOST:MySQL host +* MYSQL_USER: MySQL username +* MYSQL_PASS: MySQL password +* AUDIO_FORMAT: default audio format +* ARCHIVE_ID: forward all downloads to this group/channel +* IPv6 = os.getenv("IPv6", False) +* ENABLE_FFMPEG = os.getenv("ENABLE_FFMPEG", False) +* PROVIDER_TOKEN: stripe token on Telegram payment +* PLAYLIST_SUPPORT: download playlist support +* ENABLE_ARIA2: enable aria2c download +* FREE_DOWNLOAD: free download count per day +* TOKEN_PRICE: token price per 1 USD * GOOGLE_API_KEY: YouTube API key, required for YouTube video subscription. -* AUDIO_FORMAT: audio format, default is m4a. You can set to any known and supported format for ffmpeg. For - example,`mp3`, `flac`, etc. โš ๏ธ m4a is the fastest. Other formats may affect performance. -* ARCHIVE_ID: group or channel id/username. All downloads will send to this group first and then forward to end user. -* PLAYLIST_SUPPORT: `True` or `False`, Ability to enable or disable downloads of playlist / channels by bot. Default: `False`. - **Inline button will be lost during the forwarding.** ## 3.2 Set up init data @@ -187,7 +177,7 @@ vim data/instagram.com_cookies.txt In `flower` service section, you may want to change your basic authentication username password and publish port. -You can also limit CPU and RAM usage by adding an `deploy' key: +You can also limit CPU and RAM usage by adding a `deploy' key: ```docker deploy: @@ -231,208 +221,12 @@ On the other machine: docker-compose -f worker.yml up -d ``` -**โš ๏ธ Bear in mind don't publish redis directly on the internet! You can use WireGuard to wrap it up.** +**โš ๏ธ Please bear in mind that you should not publish Redis directly on the internet. +Instead, you can use WireGuard to wrap it up for added security.** -## Kubernetes +## kubernetes -Kubernetes, also known as K8s, is an open-source system for automating deployment, scaling, and management of -containerized applications - -# Complete deployment guide for k8s deloyment - -* contains every functionality -* compatible with amd64, arm64 and armv7l - -## First. Get all file in k8s folder - -Download `k8s` file to a directory on your k8s server and go to this folder - -## 1. Create Redis deloyment - -```shell -kubectl apply -f 01.redis.yml -``` - -This command will create ytdl namespace, redis pod and redis service - -## 2. Creat MariaDB deloyment - -```shell -kubectl apply -f 02.mariadb.yml -``` - -This deloyment will claim 10GB storage from storageClassName: longhorn. Please replace longhorn with your -storageClassName before apply. - -## 3. Set environment variables - -Create configMap for env - -### 3.1 Edit configmap.yml - -```shell -vim 03.configmap.yml -``` - -you can configure all the following environment variables: - -* PYRO_WORKERS: number of workers for pyrogram, default is 100 -* WORKERS: workers count for celery -* APP_ID: **REQUIRED**, get it from https://core.telegram.org/ -* APP_HASH: **REQUIRED** -* TOKEN: **REQUIRED** -* REDIS: **REQUIRED if you need VIP mode and cache** โš ๏ธ Don't publish your redis server on the internet. โš ๏ธ - -* OWNER: owner username -* QUOTA: quota in bytes -* EX: quota expire time -* MULTIPLY: vip quota comparing to normal quota -* USD2CNY: exchange rate -* VIP: VIP mode, default: disable -* AFD_LINK -* COFFEE_LINK -* COFFEE_TOKEN -* AFD_TOKEN -* AFD_USER_ID - -* AUTHORIZED_USER: users that could use this bot, user_id, separated with `,` -* REQUIRED_MEMBERSHIP: group or channel username, user must join this group to use the bot. Could be use with - above `AUTHORIZED_USER` - -* ENABLE_CELERY: Distribution mode, default: disable. You'll can setup workers in different locations. -* ENABLE_FFMPEG: enable ffmpeg so Telegram can stream -* MYSQL_HOST: you'll have to setup MySQL if you enable VIP mode -* MYSQL_USER -* MYSQL_PASS -* GOOGLE_API_KEY: YouTube API key, required for YouTube video subscription. -* AUDIO_FORMAT: audio format, default is m4a. You can set to any known and supported format for ffmpeg. For - example,`mp3`, `flac`, etc. โš ๏ธ m4a is the fastest. Other formats may affect performance. -* ARCHIVE_ID: group or channel id/username. All downloads will send to this group first and then forward to end user. - **Inline button will be lost during the forwarding.** - -### 3.2 Apply configMap for environment variables - -```shell -kubectl apply -f 03.configmap.yml -``` - -## 4. Run Master Celery - -```shell -kubectl apply -f 04.ytdl-master.yml -``` - -This deloyment will create ytdl-pvc PersistentVolumeClaim on storageClassName: longhorn. This clain will contain vnstat, -cookies folder and flower database. Please replace longhorn with your storageClassName before apply - -### 4.1 Setup instagram cookies - -Required if you want to support instagram. - -You can use this extension -[Get cookies.txt](https://chrome.google.com/webstore/detail/get-cookiestxt/bgaddhkoddajcdgocldbbfleckgcbcid) -to get instagram cookies - -Get pod running ytdl master: - -```shell -kubectl get pods --namespace ytdl -``` - -Name should be ytdl-xxxxxxxx - -Access to pod - -```shell -kubectl --namespace=ytdl exec --stdin --tty ytdl-xxx -- sh -``` - -(replace ytdl-xxx by your pod name) - -Go to ytdl-pvc mounted folder - -```shell -cd /ytdlbot/ytdlbot/data/ -vim instagram.com_cookies.txt -# paste your cookies -``` - -## 5. Run Worker Celery - -```shell -kubectl apply -f 05.ytdl-worker.yml -``` - -## 6. Run Flower image (OPTIONAL) - -### 6.1 Setup flower db - -Get pod running ytdl master: - -```shell -kubectl get pods --namespace ytdl -``` - -Name should be ytdl-xxxxxxxx - -Access to pod - -```shell -kubectl --namespace=ytdl exec --stdin --tty ytdl-xxx -- sh -``` - -(replace ytdl-xxx by your pod name) - -Go to ytdl-pvc mounted folder - -```shel -cd /var/lib/vnstat/ -``` - -Create flower database file - -```shell -{} ~ python3 -Python 3.9.9 (main, Nov 21 2021, 03:22:47) -[Clang 12.0.0 (clang-1200.0.32.29)] on darwin -Type "help", "copyright", "credits" or "license" for more information. ->>> import dbm;dbm.open("flower","n");exit() -``` - -### 6.2 Config Flower Ingress - -This step need config ingress from line 51 of file 06.flower.yml with your ingress service. Need for access from -internet. -YML file should be adjusted depending on your load balancing, ingress and network system - -For active SSL - -```yml -cert-manager.io/cluster-issuer: letsencrypt-prod -``` - -Replace nginx by your ingress service - -```yml -ingressClassName: nginx -``` - -Add your domain, example - -```yml -tls: - - hosts: - - flower.benny.com - secretName: flower-tls - rules: - - host: flower.benny.com -``` - -### 6.3 Apply Flower deloyment - -```shell -kubectl apply -f 06.flower.yml -``` +refer guide here [kubernetes](k8s.md) # Command @@ -450,8 +244,6 @@ unsub - Unsubscribe from YouTube Channel sub_count - Check subscription status, owner only. uncache - Delete cache for this link, owner only. purge - Delete all tasks , owner only. -topup - Top up quota -tgvip - Using Telegram payment to pay for VIP ``` # Test data @@ -481,8 +273,9 @@ https://twitter.com/BennyThinks/status/1475836588542341124 ## Stripe -You can choose to donate via Stripe. Please click the button below to donate via Stripe. -Choose the currency and payment method that suits you. +You can choose to donate via Stripe by clicking the button below. + +Select the currency and payment method that suits you. | USD(Card, Apple Pay and Google Pay) | SEK(Card, Apple Pay and Google Pay) | CNY(Card, Apple Pay, Google Pay and Alipay) | |--------------------------------------------------|--------------------------------------------------|--------------------------------------------------| diff --git a/k8s.md b/k8s.md new file mode 100644 index 0000000..61b0146 --- /dev/null +++ b/k8s.md @@ -0,0 +1,200 @@ +## Kubernetes + +Kubernetes, also known as K8s, is an open-source system for automating deployment, scaling, and management of +containerized applications + +# Complete deployment guide for k8s deloyment + +* contains every functionality +* compatible with amd64, arm64 and armv7l + +## First. Get all file in k8s folder + +Download `k8s` file to a directory on your k8s server and go to this folder + +## 1. Create Redis deloyment + +```shell +kubectl apply -f 01.redis.yml +``` + +This command will create ytdl namespace, redis pod and redis service + +## 2. Creat MariaDB deloyment + +```shell +kubectl apply -f 02.mariadb.yml +``` + +This deloyment will claim 10GB storage from storageClassName: longhorn. Please replace longhorn with your +storageClassName before apply. + +## 3. Set environment variables + +Create configMap for env + +### 3.1 Edit configmap.yml + +```shell +vim 03.configmap.yml +``` + +you can configure all the following environment variables: + +* PYRO_WORKERS: number of workers for pyrogram, default is 100 +* WORKERS: workers count for celery +* APP_ID: **REQUIRED**, get it from https://core.telegram.org/ +* APP_HASH: **REQUIRED** +* TOKEN: **REQUIRED** +* REDIS: **REQUIRED if you need VIP mode and cache** โš ๏ธ Don't publish your redis server on the internet. โš ๏ธ + +* OWNER: owner username +* QUOTA: quota in bytes +* EX: quota expire time +* MULTIPLY: vip quota comparing to normal quota +* USD2CNY: exchange rate +* VIP: VIP mode, default: disable +* AFD_LINK +* COFFEE_LINK +* COFFEE_TOKEN +* AFD_TOKEN +* AFD_USER_ID + +* AUTHORIZED_USER: users that could use this bot, user_id, separated with `,` +* REQUIRED_MEMBERSHIP: group or channel username, user must join this group to use the bot. Could be use with + above `AUTHORIZED_USER` + +* ENABLE_CELERY: Distribution mode, default: disable. You'll can setup workers in different locations. +* ENABLE_FFMPEG: enable ffmpeg so Telegram can stream +* MYSQL_HOST: you'll have to setup MySQL if you enable VIP mode +* MYSQL_USER +* MYSQL_PASS +* GOOGLE_API_KEY: YouTube API key, required for YouTube video subscription. +* AUDIO_FORMAT: audio format, default is m4a. You can set to any known and supported format for ffmpeg. For + example,`mp3`, `flac`, etc. โš ๏ธ m4a is the fastest. Other formats may affect performance. +* ARCHIVE_ID: group or channel id/username. All downloads will send to this group first and then forward to end user. + **Inline button will be lost during the forwarding.** + +### 3.2 Apply configMap for environment variables + +```shell +kubectl apply -f 03.configmap.yml +``` + +## 4. Run Master Celery + +```shell +kubectl apply -f 04.ytdl-master.yml +``` + +This deloyment will create ytdl-pvc PersistentVolumeClaim on storageClassName: longhorn. This clain will contain vnstat, +cookies folder and flower database. Please replace longhorn with your storageClassName before apply + +### 4.1 Setup instagram cookies + +Required if you want to support instagram. + +You can use this extension +[Get cookies.txt](https://chrome.google.com/webstore/detail/get-cookiestxt/bgaddhkoddajcdgocldbbfleckgcbcid) +to get instagram cookies + +Get pod running ytdl master: + +```shell +kubectl get pods --namespace ytdl +``` + +Name should be ytdl-xxxxxxxx + +Access to pod + +```shell +kubectl --namespace=ytdl exec --stdin --tty ytdl-xxx -- sh +``` + +(replace ytdl-xxx by your pod name) + +Go to ytdl-pvc mounted folder + +```shell +cd /ytdlbot/ytdlbot/data/ +vim instagram.com_cookies.txt +# paste your cookies +``` + +## 5. Run Worker Celery + +```shell +kubectl apply -f 05.ytdl-worker.yml +``` + +## 6. Run Flower image (OPTIONAL) + +### 6.1 Setup flower db + +Get pod running ytdl master: + +```shell +kubectl get pods --namespace ytdl +``` + +Name should be ytdl-xxxxxxxx + +Access to pod + +```shell +kubectl --namespace=ytdl exec --stdin --tty ytdl-xxx -- sh +``` + +(replace ytdl-xxx by your pod name) + +Go to ytdl-pvc mounted folder + +```shel +cd /var/lib/vnstat/ +``` + +Create flower database file + +```shell +{} ~ python3 +Python 3.9.9 (main, Nov 21 2021, 03:22:47) +[Clang 12.0.0 (clang-1200.0.32.29)] on darwin +Type "help", "copyright", "credits" or "license" for more information. +>>> import dbm;dbm.open("flower","n");exit() +``` + +### 6.2 Config Flower Ingress + +This step need config ingress from line 51 of file 06.flower.yml with your ingress service. Need for access from +internet. +YML file should be adjusted depending on your load balancing, ingress and network system + +For active SSL + +```yml +cert-manager.io/cluster-issuer: letsencrypt-prod +``` + +Replace nginx by your ingress service + +```yml +ingressClassName: nginx +``` + +Add your domain, example + +```yml +tls: + - hosts: + - flower.benny.com + secretName: flower-tls + rules: + - host: flower.benny.com +``` + +### 6.3 Apply Flower deloyment + +```shell +kubectl apply -f 06.flower.yml +``` diff --git a/requirements.txt b/requirements.txt index 480c545..be225b9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,7 @@ pyrogram==1.4.16 tgcrypto==1.2.5 -yt-dlp==2023.3.3 -APScheduler==3.10.0 +yt-dlp==2023.3.4 +APScheduler==3.10.1 beautifultable==1.1.0 ffmpeg-python==0.2.0 PyMySQL==1.0.2 @@ -10,14 +10,15 @@ filetype==1.2.0 flower==1.2.0 psutil==5.9.4 influxdb==5.3.1 -beautifulsoup4==4.11.2 -fakeredis==2.9.2 +beautifulsoup4==4.12.1 +fakeredis==2.10.3 supervisor==4.2.5 tgbot-ping==1.0.7 redis==4.3.3 requests==2.28.2 -tqdm==4.64.1 +tqdm==4.65.0 requests-toolbelt==0.10.1 ffpb==0.4.1 youtube-search-python==1.6.6 token-bucket==0.3.0 +coloredlogs==15.0.1 diff --git a/scripts/migrate_to_mysql.py b/scripts/migrate_to_mysql.py index 30b5f4f..436fb78 100644 --- a/scripts/migrate_to_mysql.py +++ b/scripts/migrate_to_mysql.py @@ -8,6 +8,7 @@ __author__ = "Benny " import sqlite3 + import pymysql mysql_con = pymysql.connect(host='localhost', user='root', passwd='root', db='vip', charset='utf8mb4') diff --git a/scripts/start.sh b/scripts/start.sh index acb78ec..242f207 100644 --- a/scripts/start.sh +++ b/scripts/start.sh @@ -9,9 +9,5 @@ docker run -d --restart unless-stopped --name ytdl \ -e WORKERS=4 \ -e VIP=True \ -e CUSTOM_TEXT=#StandWithUkraine \ - -e WORKER_NAME=Pi \ - -v "$(pwd)"/session:/ytdlbot/ytdlbot/session \ - -v "$(pwd)"/ramdisk:/tmp/ \ - --log-driver none \ bennythink/ytdlbot \ /usr/local/bin/supervisord -c "/ytdlbot/conf/supervisor_worker.conf" diff --git a/ytdlbot/broadcast.py b/ytdlbot/broadcast.py deleted file mode 100644 index 91d9a4b..0000000 --- a/ytdlbot/broadcast.py +++ /dev/null @@ -1,62 +0,0 @@ -#!/usr/local/bin/python3 -# coding: utf-8 - -# ytdlbot - broadcast.py -# 8/25/21 16:11 -# - -__author__ = "Benny " - -import argparse -import contextlib -import logging -import random -import sys -import tempfile -import time - -from tqdm import tqdm - -from db import Redis -from ytdl_bot import create_app - -parser = argparse.ArgumentParser(description='Broadcast to users') -parser.add_argument('-m', help='message', required=True) -parser.add_argument('-p', help='picture', default=None) -parser.add_argument('--notify', help='notify all users?', action="store_false") -parser.add_argument('-u', help='user_id', type=int) -logging.basicConfig(level=logging.INFO) -args = parser.parse_args() - -r = Redis().r -keys = r.keys("*") -user_ids = set() -for key in keys: - if key.isdigit(): - user_ids.add(key) - -metrics = r.hgetall("metrics") - -for key in metrics: - if key.isdigit(): - user_ids.add(key) - -if args.u: - user_ids = [args.u] - -if "YES" != input("Are you sure you want to send broadcast message to %s users?\n>" % len(user_ids)): - logging.info("Abort") - sys.exit(1) - -with tempfile.NamedTemporaryFile() as tmp: - app = create_app(tmp.name, 1) - app.start() - for user_id in tqdm(user_ids): - time.sleep(random.random() * 5) - if args.p: - with contextlib.suppress(Exception): - app.send_photo(user_id, args.p, caption=args.m, disable_notification=args.notify) - else: - with contextlib.suppress(Exception): - app.send_message(user_id, args.m, disable_notification=args.notify) - app.stop() diff --git a/ytdlbot/channel.py b/ytdlbot/channel.py new file mode 100644 index 0000000..56e79a5 --- /dev/null +++ b/ytdlbot/channel.py @@ -0,0 +1,177 @@ +#!/usr/bin/env python3 +# coding: utf-8 +import http +import logging +import os +import re + +import requests +from bs4 import BeautifulSoup + +from config import ENABLE_VIP +from limit import Payment + + +class Channel(Payment): + def subscribe_channel(self, user_id: "int", share_link: "str"): + if not re.findall(r"youtube\.com|youtu\.be", share_link): + raise ValueError("Is this a valid YouTube Channel link?") + if ENABLE_VIP: + self.cur.execute("select count(user_id) from subscribe where user_id=%s", (user_id,)) + usage = int(self.cur.fetchone()[0]) + if usage >= 5: + # TODO: 5 tokens for one more subscription? + logging.warning("User %s has subscribed %s channels", user_id, usage) + return "You have subscribed too many channels. Maximum 5 channels." + + data = self.get_channel_info(share_link) + channel_id = data["channel_id"] + + self.cur.execute("select user_id from subscribe where user_id=%s and channel_id=%s", (user_id, channel_id)) + if self.cur.fetchall(): + raise ValueError("You have already subscribed this channel.") + + self.cur.execute( + "INSERT IGNORE INTO channel values" + "(%(link)s,%(title)s,%(description)s,%(channel_id)s,%(playlist)s,%(last_video)s)", + data, + ) + self.cur.execute("INSERT INTO subscribe values(%s,%s, NULL)", (user_id, channel_id)) + self.con.commit() + logging.info("User %s subscribed channel %s", user_id, data["title"]) + return "Subscribed to {}".format(data["title"]) + + def unsubscribe_channel(self, user_id: "int", channel_id: "str"): + affected_rows = self.cur.execute( + "DELETE FROM subscribe WHERE user_id=%s AND channel_id=%s", (user_id, channel_id) + ) + self.con.commit() + logging.info("User %s tried to unsubscribe channel %s", user_id, channel_id) + return affected_rows + + @staticmethod + def extract_canonical_link(url): + # canonic link works for many websites. It will strip out unnecessary stuff + props = ["canonical", "alternate", "shortlinkUrl"] + headers = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.163 Safari/537.36" + } + cookie = {"CONSENT": "YES+cb.20210328-17-p0.en+FX+999"} + # send head request first + r = requests.head(url, headers=headers, allow_redirects=True, cookies=cookie) + if r.status_code != http.HTTPStatus.METHOD_NOT_ALLOWED and "text/html" not in r.headers.get("content-type"): + # get content-type, if it's not text/html, there's no need to issue a GET request + logging.warning("%s Content-type is not text/html, no need to GET for extract_canonical_link", url) + return url + + html_doc = requests.get(url, headers=headers, cookies=cookie, timeout=5).text + soup = BeautifulSoup(html_doc, "html.parser") + for prop in props: + element = soup.find("link", rel=prop) + try: + href = element["href"] + if href not in ["null", "", None]: + return href + except Exception: + logging.warning("Canonical exception %s", url) + + return url + + def get_channel_info(self, url: "str"): + api_key = os.getenv("GOOGLE_API_KEY") + canonical_link = self.extract_canonical_link(url) + try: + channel_id = canonical_link.split("youtube.com/channel/")[1] + except IndexError: + channel_id = canonical_link.split("/")[-1] + channel_api = ( + f"https://www.googleapis.com/youtube/v3/channels?part=snippet,contentDetails&id={channel_id}&key={api_key}" + ) + + data = requests.get(channel_api).json() + snippet = data["items"][0]["snippet"] + title = snippet["title"] + description = snippet["description"] + playlist = data["items"][0]["contentDetails"]["relatedPlaylists"]["uploads"] + + return { + "link": url, + "title": title, + "description": description, + "channel_id": channel_id, + "playlist": playlist, + "last_video": self.get_latest_video(playlist), + } + + @staticmethod + def get_latest_video(playlist_id: "str"): + api_key = os.getenv("GOOGLE_API_KEY") + video_api = ( + f"https://www.googleapis.com/youtube/v3/playlistItems?part=snippet&maxResults=1&" + f"playlistId={playlist_id}&key={api_key}" + ) + data = requests.get(video_api).json() + video_id = data["items"][0]["snippet"]["resourceId"]["videoId"] + logging.info(f"Latest video %s from %s", video_id, data["items"][0]["snippet"]["channelTitle"]) + return f"https://www.youtube.com/watch?v={video_id}" + + def has_newer_update(self, channel_id: "str"): + self.cur.execute("SELECT playlist,latest_video FROM channel WHERE channel_id=%s", (channel_id,)) + data = self.cur.fetchone() + playlist_id = data[0] + old_video = data[1] + newest_video = self.get_latest_video(playlist_id) + if old_video != newest_video: + logging.info("Newer update found for %s %s", channel_id, newest_video) + self.cur.execute("UPDATE channel SET latest_video=%s WHERE channel_id=%s", (newest_video, channel_id)) + self.con.commit() + return newest_video + + def get_user_subscription(self, user_id: "int"): + self.cur.execute( + """ + select title, link, channel.channel_id from channel, subscribe + where subscribe.user_id = %s and channel.channel_id = subscribe.channel_id + """, + (user_id,), + ) + data = self.cur.fetchall() + text = "" + for item in data: + text += "[{}]({}) `{}\n`".format(*item) + return text + + def group_subscriber(self): + # {"channel_id": [user_id, user_id, ...]} + self.cur.execute("select * from subscribe where is_valid=1") + data = self.cur.fetchall() + group = {} + for item in data: + group.setdefault(item[1], []).append(item[0]) + logging.info("Checking periodic subscriber...") + return group + + def deactivate_user_subscription(self, user_id: "int"): + self.cur.execute("UPDATE subscribe set is_valid=0 WHERE user_id=%s", (user_id,)) + self.con.commit() + + def sub_count(self): + sql = """ + select user_id, channel.title, channel.link + from subscribe, channel where subscribe.channel_id = channel.channel_id + """ + self.cur.execute(sql) + data = self.cur.fetchall() + text = f"Total {len(data)} subscriptions found.\n\n" + for item in data: + text += "{} ==> [{}]({})\n".format(*item) + return text + + def del_cache(self, user_link: "str"): + unique = self.extract_canonical_link(user_link) + caches = self.r.hgetall("cache") + count = 0 + for key in caches: + if key.startswith(unique): + count += self.del_send_cache(key) + return count diff --git a/ytdlbot/config.py b/ytdlbot/config.py index 9183b11..eebd7f4 100644 --- a/ytdlbot/config.py +++ b/ytdlbot/config.py @@ -12,26 +12,18 @@ import os # general settings WORKERS: "int" = int(os.getenv("WORKERS", 100)) PYRO_WORKERS: "int" = int(os.getenv("PYRO_WORKERS", 100)) -APP_ID: "int" = int(os.getenv("APP_ID", 111)) -APP_HASH = os.getenv("APP_HASH", "111") -TOKEN = os.getenv("TOKEN", "3703WLI") +APP_ID: "int" = int(os.getenv("APP_ID", 198214)) +APP_HASH = os.getenv("APP_HASH", "1234b90") +TOKEN = os.getenv("TOKEN", "1234") -REDIS = os.getenv("REDIS") - -# quota settings -QUOTA = int(os.getenv("QUOTA", 5 * 1024 * 1024 * 1024)) # 5G -if os.uname().sysname == "Darwin": - QUOTA = 10 * 1024 * 1024 # 10M +REDIS = os.getenv("REDIS", "redis") TG_MAX_SIZE = 2 * 1024 * 1024 * 1024 * 0.99 # TG_MAX_SIZE = 10 * 1024 * 1024 -EX = int(os.getenv("EX", 24 * 3600)) -MULTIPLY = os.getenv("MULTIPLY", 10) # VIP1 is 5*5-25G, VIP2 is 50G -USD2CNY = os.getenv("USD2CNY", 6) # $5 --> ยฅ30 +EXPIRE = 24 * 3600 ENABLE_VIP = os.getenv("VIP", False) -MAX_DURATION = int(os.getenv("MAX_DURATION", 60)) AFD_LINK = os.getenv("AFD_LINK", "https://afdian.net/@BennyThink") COFFEE_LINK = os.getenv("COFFEE_LINK", "https://www.buymeacoffee.com/bennythink") COFFEE_TOKEN = os.getenv("COFFEE_TOKEN") @@ -49,7 +41,7 @@ ENABLE_CELERY = os.getenv("ENABLE_CELERY", False) ENABLE_QUEUE = os.getenv("ENABLE_QUEUE", False) BROKER = os.getenv("BROKER", f"redis://{REDIS}:6379/4") -MYSQL_HOST = os.getenv("MYSQL_HOST") +MYSQL_HOST = os.getenv("MYSQL_HOST", "mysql") MYSQL_USER = os.getenv("MYSQL_USER", "root") MYSQL_PASS = os.getenv("MYSQL_PASS", "root") @@ -58,9 +50,12 @@ ARCHIVE_ID = os.getenv("ARCHIVE_ID") IPv6 = os.getenv("IPv6", False) ENABLE_FFMPEG = os.getenv("ENABLE_FFMPEG", False) -# 0.01 means basically no limit -RATE = float(os.getenv("RATE", 0.01)) -BURST = int(os.getenv("BURST", 3)) + +# Stripe setting PROVIDER_TOKEN = os.getenv("PROVIDER_TOKEN") or "1234" + PLAYLIST_SUPPORT = os.getenv("PLAYLIST_SUPPORT", False) ENABLE_ARIA2 = os.getenv("ENABLE_ARIA2", False) + +FREE_DOWNLOAD = os.getenv("FREE_DOWNLOAD", 5) +TOKEN_PRICE = os.getenv("BUY_UNIT", 20) # one USD=20 credits diff --git a/ytdlbot/constant.py b/ytdlbot/constant.py index 92d0a02..a970287 100644 --- a/ytdlbot/constant.py +++ b/ytdlbot/constant.py @@ -8,13 +8,16 @@ __author__ = "Benny " import os -import time -from config import (AFD_LINK, BURST, COFFEE_LINK, ENABLE_CELERY, ENABLE_VIP, - EX, MULTIPLY, RATE, REQUIRED_MEMBERSHIP, USD2CNY) -from db import InfluxDB -from downloader import sizeof_fmt -from limit import QUOTA, VIP +from config import ( + AFD_LINK, + COFFEE_LINK, + ENABLE_CELERY, + FREE_DOWNLOAD, + REQUIRED_MEMBERSHIP, + TOKEN_PRICE, +) +from database import InfluxDB from utils import get_func_queue @@ -22,96 +25,64 @@ class BotText: start = "Welcome to YouTube Download bot. Type /help for more information." help = f""" -1. This bot should works at all times. If it doesn't, wait for a few minutes, try to send the link again. +1. This bot should work at all times. If it doesn't, please wait for a few minutes and try sending the link again. -2. At this time of writing, this bot consumes more than 100GB of network traffic per day. -In order to avoid being abused, -every one can use this bot within **{sizeof_fmt(QUOTA)} of quota for every {int(EX / 3600)} hours.** +2. At the time of writing, this bot consumes more than 100GB of network traffic per day. +To prevent abuse, each user is limited to 5 downloads per 24 hours. -3. You can optionally choose to become 'VIP' user if you need more traffic. Type /vip for more information. +3. You have the option to buy more tokens. Type /buy for more information. -4. Source code for this bot will always stay open, here-> https://github.com/tgbot-collection/ytdlbot +4. The source code for this bot will always remain open and can be found here: https://github.com/tgbot-collection/ytdlbot + """ -5. Request limit is applied for everyone, excluding VIP users. - """ if ENABLE_VIP else "Help text" + about = "YouTube Downloader by @BennyThink.\n\nOpen source on GitHub: https://github.com/tgbot-collection/ytdlbot" - about = "YouTube-DL by @BennyThink. Open source on GitHub: https://github.com/tgbot-collection/ytdlbot" - - vip = f""" + buy = f""" **Terms:** -1. You can use this service, free of charge, {sizeof_fmt(QUOTA)} per {int(EX / 3600)} hours. -2. The above traffic, is counted one-way. For example, if you download a video of 1GB, your will use 1GB instead of 2GB. -3. Streaming support is limited due to high costs of conversion. -4. I won't gather any personal information, which means I don't know how many and what videos did you download. -5. No rate limit for VIP users. -6. Possible to refund, but you'll have to bear with process fee. -7. I'll record your unique ID after a successful payment, usually it's payment ID or email address. -8. VIP identity won't expire. -9. Please try not to abuse this service. It's a open source project, you can always deploy your own bot. +1. You can use this service free of charge for up to {FREE_DOWNLOAD} downloads within a 24-hour period, regardless of whether the download is successful or not. -**Pay Tier:** -1. Everyone: {sizeof_fmt(QUOTA)} per {int(EX / 3600)} hours -2. VIP1: ${MULTIPLY} or ยฅ{MULTIPLY * USD2CNY}, {sizeof_fmt(QUOTA * 2)} per {int(EX / 3600)} hours -3. VIP2: ${MULTIPLY * 2} or ยฅ{MULTIPLY * USD2CNY * 2}, {sizeof_fmt(QUOTA * 2 * 2)} per {int(EX / 3600)} hours -4. VIP4....VIPn. +2. You can purchase additional download tokens, which will be valid indefinitely. -**Temporary top up** -Just want more traffic for a short period of time? Don't worry, you can use /topup command to top up your quota. -It's valid permanently, until you use it up. +3. I will not gather any personal information, so I won't know how many or which videos you have downloaded. -**Payment method:** -1. (afdian) Mainland China: {AFD_LINK} -2. (buy me a coffee) Other countries or regions: {COFFEE_LINK} -3. Telegram Payment(stripe), please directly using /tgvip command. +4. Refunds are possible, but you will be responsible for the processing fee charged by the payment provider (Stripe, Buy Me a Coffee, etc.). + +5. I will record your unique ID after a successful payment, which is usually your payment ID or email address. + + +**Download token price:** +1. Everyone: {FREE_DOWNLOAD} tokens per 24 hours, free of charge. +2. 1 USD == {TOKEN_PRICE} tokens, valid indefinitely. +3. 7 CNY == {TOKEN_PRICE} tokens, valid indefinitely. + +**Payment option:** +1. AFDIAN(AliPay, WeChat Pay and PayPal): {AFD_LINK} +2. Buy me a coffee: {COFFEE_LINK} +3. Telegram Payment(Stripe), see following invoice. **After payment:** -1. afdian: with your order number `/vip 123456` -2. buy me a coffee: with your email `/vip someone@else.com` -3. Telegram Payment: automatically activated - """ if ENABLE_VIP else "VIP is not enabled." - vip_pay = "Processing your payments...If it's not responding after one minute, please contact @BennyThink." +1. Afdian: Provide your order number with the /redeem command (e.g., `/redeem 123456`). +2. Buy Me a Coffee: Provide your email with the /redeem command (e.g., `/redeem some@one.com`). **Use different email each time.** +3. Telegram Payment: Your payment will be automatically activated. +Want to buy more token at once? Let's say 100? Here you go! `/buy 123` + """ private = "This bot is for private use" membership_require = f"You need to join this group or channel to use this bot\n\nhttps://t.me/{REQUIRED_MEMBERSHIP}" settings = """ -Select sending format and video quality. **Only applies to YouTube** -High quality is recommended; Medium quality is aimed as 720P while low quality is aimed as 480P. - -Remember if you choose to send as document, there will be no streaming. +Please choose the desired format and video quality for your video. Note that these settings only **apply to YouTube videos**. + +High quality is recommended. Medium quality is 720P, while low quality is 480P. + +Please keep in mind that if you choose to send the video as a document, it will not be possible to stream it. Your current settings: Video quality: **{0}** Sending format: **{1}** """ custom_text = os.getenv("CUSTOM_TEXT", "") - topup_description = f"US$1 will give you {sizeof_fmt(QUOTA)} traffic permanently" - topup_title = "Pay US$1 for more traffic!" - - def remaining_quota_caption(self, chat_id): - if not ENABLE_VIP: - return "" - used, total, ttl = self.return_remaining_quota(chat_id) - refresh_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(ttl + time.time())) - caption = f"Remaining quota: **{sizeof_fmt(used)}/{sizeof_fmt(total)}**, " \ - f"refresh at {refresh_time}\n" - return caption - - @staticmethod - def return_remaining_quota(chat_id): - used, total, ttl = VIP().check_remaining_quota(chat_id) - return used, total, ttl - - @staticmethod - def get_vip_greeting(chat_id): - if not ENABLE_VIP: - return "" - v = VIP().check_vip(chat_id) - if v: - return f"Hello {v[1]}, VIP{v[-2]}โ˜บ๏ธ\n\n" - else: - return "" @staticmethod def get_receive_link_text(): @@ -126,6 +97,7 @@ Sending format: **{1}** @staticmethod def ping_worker(): from tasks import app as celery_app + workers = InfluxDB().extract_dashboard_data() # [{'celery@BennyใฎMBP': 'abc'}, {'celery@BennyใฎMBP': 'abc'}] response = celery_app.control.broadcast("ping_revision", reply=True) @@ -144,5 +116,3 @@ Sending format: **{1}** text += f"{status}{hostname} **{active}** {load} {rev}\n" return text - - too_fast = f"You have reached rate limit. Current rate limit is 1 request per {RATE} seconds, {BURST - 1} bursts." diff --git a/ytdlbot/db.py b/ytdlbot/database.py similarity index 71% rename from ytdlbot/db.py rename to ytdlbot/database.py index 15e2a01..1075f22 100644 --- a/ytdlbot/db.py +++ b/ytdlbot/database.py @@ -1,7 +1,7 @@ #!/usr/local/bin/python3 # coding: utf-8 -# ytdlbot - db.py +# ytdlbot - database.py # 12/7/21 16:57 # @@ -13,6 +13,7 @@ import datetime import logging import os import re +import sqlite3 import subprocess import time from io import BytesIO @@ -24,13 +25,49 @@ import requests from beautifultable import BeautifulTable from influxdb import InfluxDBClient -from config import MYSQL_HOST, MYSQL_PASS, MYSQL_USER, QUOTA, REDIS -from fakemysql import FakeMySQL +from config import MYSQL_HOST, MYSQL_PASS, MYSQL_USER, REDIS +from utils import sizeof_fmt + +init_con = sqlite3.connect(":memory:", check_same_thread=False) + + +class FakeMySQL: + @staticmethod + def cursor() -> "Cursor": + return Cursor() + + def commit(self): + pass + + def close(self): + pass + + +class Cursor: + def __init__(self): + self.con = init_con + self.cur = self.con.cursor() + + def execute(self, *args, **kwargs): + sql = self.sub(args[0]) + new_args = (sql,) + args[1:] + return self.cur.execute(*new_args, **kwargs) + + def fetchall(self): + return self.cur.fetchall() + + def fetchone(self): + return self.cur.fetchone() + + @staticmethod + def sub(sql): + sql = re.sub(r"CHARSET.*|charset.*", "", sql, re.IGNORECASE) + sql = sql.replace("%s", "?") + return sql class Redis: def __init__(self): - super(Redis, self).__init__() if REDIS: self.r = redis.StrictRedis(host=REDIS, db=0, decode_responses=True) else: @@ -61,6 +98,7 @@ class Redis: {usage_banner} %s """ + super().__init__() def __del__(self): self.r.close() @@ -82,14 +120,13 @@ class Redis: return table def show_usage(self): - from downloader import sizeof_fmt db = MySQL() - db.cur.execute("select * from vip") + db.cur.execute("select user_id,payment_amount,old_user,token from payment") data = db.cur.fetchall() fd = [] for item in data: - fd.append([item[0], item[1], sizeof_fmt(item[-1])]) - db_text = self.generate_table(["ID", "username", "quota"], fd) + fd.append([item[0], item[1], item[2], item[3]]) + db_text = self.generate_table(["ID", "pay amount", "old user", "token"], fd) fd = [] hash_keys = self.r.hgetall("metrics") @@ -117,10 +154,10 @@ class Redis: # vnstat if os.uname().sysname == "Darwin": - cmd = "/usr/local/bin/vnstat -i en0".split() + cmd = "/opt/homebrew/bin/vnstat -i en0".split() else: cmd = "/usr/bin/vnstat -i eth0".split() - vnstat_text = subprocess.check_output(cmd).decode('u8') + vnstat_text = subprocess.check_output(cmd).decode("u8") return self.final_text % (db_text, vnstat_text, quota_text, metrics_text, usage_text) def reset_today(self): @@ -152,18 +189,16 @@ class Redis: class MySQL: vip_sql = """ - create table if not exists vip + CREATE TABLE if not exists `payment` ( - user_id bigint not null, - username varchar(256) null, - payment_amount int null, - payment_id varchar(256) null, - level int default 1 null, - quota bigint default %s null, - constraint VIP_pk - primary key (user_id) - ); - """ % QUOTA + `user_id` bigint NOT NULL, + `payment_amount` float DEFAULT NULL, + `payment_id` varchar(256) DEFAULT NULL, + `old_user` tinyint(1) DEFAULT NULL, + `token` int DEFAULT NULL, + UNIQUE KEY `payment_id` (`payment_id`) + ) CHARSET = utf8mb4 + """ settings_sql = """ create table if not exists settings @@ -202,13 +237,15 @@ class MySQL: def __init__(self): if MYSQL_HOST: - self.con = pymysql.connect(host=MYSQL_HOST, user=MYSQL_USER, passwd=MYSQL_PASS, db="ytdl", - charset="utf8mb4") + self.con = pymysql.connect( + host=MYSQL_HOST, user=MYSQL_USER, passwd=MYSQL_PASS, db="ytdl", charset="utf8mb4" + ) else: self.con = FakeMySQL() self.cur = self.con.cursor() self.init_db() + super().__init__() def init_db(self): self.cur.execute(self.vip_sql) @@ -220,6 +257,31 @@ class MySQL: def __del__(self): self.con.close() + def get_user_settings(self, user_id: "str") -> "tuple": + cur = self.con.cursor() + cur.execute("SELECT * FROM settings WHERE user_id = %s", (user_id,)) + data = cur.fetchone() + if data is None: + return 100, "high", "video", "Celery" + return data + + def set_user_settings(self, user_id: int, field: "str", value: "str"): + cur = self.con.cursor() + cur.execute("SELECT * FROM settings WHERE user_id = %s", (user_id,)) + data = cur.fetchone() + if data is None: + resolution = method = "" + if field == "resolution": + method = "video" + resolution = value + if field == "method": + method = value + resolution = "high" + cur.execute("INSERT INTO settings VALUES (%s,%s,%s,%s)", (user_id, resolution, method, "Celery")) + else: + cur.execute(f"UPDATE settings SET {field} =%s WHERE user_id = %s", (value, user_id)) + self.con.commit() + class InfluxDB: def __init__(self): @@ -231,8 +293,8 @@ class InfluxDB: @staticmethod def get_worker_data(): - password = os.getenv("FLOWER_PASSWORD", "123456abc") username = os.getenv("FLOWER_USERNAME", "benny") + password = os.getenv("FLOWER_PASSWORD", "123456abc") token = base64.b64encode(f"{username}:{password}".encode()).decode() headers = {"Authorization": f"Basic {token}"} r = requests.get("https://celery.dmesg.app/dashboard?json=1", headers=headers) @@ -250,7 +312,6 @@ class InfluxDB: "tags": { "hostname": worker["hostname"], }, - "time": datetime.datetime.utcnow(), "fields": { "task-received": worker.get("task-received", 0), @@ -262,7 +323,7 @@ class InfluxDB: "load1": load1, "load5": load5, "load15": load15, - } + }, } json_body.append(t) return json_body @@ -273,26 +334,11 @@ class InfluxDB: def __fill_overall_data(self): active = sum([i["active"] for i in self.data["data"]]) - json_body = [ - { - "measurement": "active", - "time": datetime.datetime.utcnow(), - "fields": { - "active": active - } - } - ] + json_body = [{"measurement": "active", "time": datetime.datetime.utcnow(), "fields": {"active": active}}] self.client.write_points(json_body) def __fill_redis_metrics(self): - json_body = [ - { - "measurement": "metrics", - "time": datetime.datetime.utcnow(), - "fields": { - } - } - ] + json_body = [{"measurement": "metrics", "time": datetime.datetime.utcnow(), "fields": {}}] r = Redis().r hash_keys = r.hgetall("metrics") for key, value in hash_keys.items(): diff --git a/ytdlbot/downloader.py b/ytdlbot/downloader.py index 9b33c1f..e524a1b 100644 --- a/ytdlbot/downloader.py +++ b/ytdlbot/downloader.py @@ -24,23 +24,14 @@ import filetype import yt_dlp as ytdl from tqdm import tqdm -from config import (AUDIO_FORMAT, ENABLE_FFMPEG, ENABLE_VIP, MAX_DURATION, ENABLE_ARIA2, - TG_MAX_SIZE, IPv6) -from db import Redis -from limit import VIP -from utils import (adjust_formats, apply_log_formatter, current_time, - get_user_settings) +from config import AUDIO_FORMAT, ENABLE_ARIA2, ENABLE_FFMPEG, IPv6 +from limit import Payment +from utils import adjust_formats, apply_log_formatter, current_time +from ytdlbot.config import TG_MAX_SIZE r = fakeredis.FakeStrictRedis() apply_log_formatter() - - -def sizeof_fmt(num: int, suffix='B'): - for unit in ['', 'Ki', 'Mi', 'Gi', 'Ti', 'Pi', 'Ei', 'Zi']: - if abs(num) < 1024.0: - return "%3.1f%s%s" % (num, unit, suffix) - num /= 1024.0 - return "%.1f%s%s" % (num, 'Yi', suffix) +payment = Payment() def edit_text(bot_msg, text): @@ -60,9 +51,15 @@ def tqdm_progress(desc, total, finished, speed="", eta=""): return "" f = StringIO() - tqdm(total=total, initial=finished, file=f, ascii=False, unit_scale=True, ncols=30, - bar_format="{l_bar}{bar} |{n_fmt}/{total_fmt} " - ) + tqdm( + total=total, + initial=finished, + file=f, + ascii=False, + unit_scale=True, + ncols=30, + bar_format="{l_bar}{bar} |{n_fmt}/{total_fmt} ", + ) raw_output = f.getvalue() tqdm_output = raw_output.split("|") progress = f"`[{tqdm_output[1]}]`" @@ -80,7 +77,7 @@ def tqdm_progress(desc, total, finished, speed="", eta=""): def remove_bash_color(text): - return re.sub(r'\u001b|\[0;94m|\u001b\[0m|\[0;32m|\[0m|\[0;33m', "", text) + return re.sub(r"\u001b|\[0;94m|\u001b\[0m|\[0;32m|\[0m|\[0;33m", "", text) def download_hook(d: dict, bot_msg): @@ -90,16 +87,12 @@ def download_hook(d: dict, bot_msg): original_url = d["info_dict"]["original_url"] key = f"{bot_msg.chat.id}-{original_url}" - if d['status'] == 'downloading': + if d["status"] == "downloading": downloaded = d.get("downloaded_bytes", 0) total = d.get("total_bytes") or d.get("total_bytes_estimate", 0) # percent = remove_bash_color(d.get("_percent_str", "N/A")) speed = remove_bash_color(d.get("_speed_str", "N/A")) - if ENABLE_VIP and not r.exists(key): - result, err_msg = check_quota(total, bot_msg.chat.id) - if result is False: - raise ValueError(err_msg) eta = remove_bash_color(d.get("_eta_str", d.get("eta"))) text = tqdm_progress("Downloading...", total, downloaded, speed, eta) edit_text(bot_msg, text) @@ -107,25 +100,10 @@ def download_hook(d: dict, bot_msg): def upload_hook(current, total, bot_msg): - # filesize = sizeof_fmt(total) text = tqdm_progress("Uploading...", total, current) edit_text(bot_msg, text) -def check_quota(file_size, chat_id) -> ("bool", "str"): - remain, _, ttl = VIP().check_remaining_quota(chat_id) - if file_size > remain: - refresh_time = current_time(ttl + time.time()) - err = f"Quota exceed, you have {sizeof_fmt(remain)} remaining, " \ - f"but you want to download a video with {sizeof_fmt(file_size)} in size. \n" \ - f"Try again in {ttl} seconds({refresh_time})" - logging.warning(err) - Redis().update_metrics("quota_exceed") - return False, err - else: - return True, "" - - def convert_to_mp4(resp: dict, bot_msg): default_type = ["video/x-flv", "video/webm"] if resp["status"]: @@ -136,7 +114,9 @@ def convert_to_mp4(resp: dict, bot_msg): if mime in default_type: if not can_convert_mp4(path, bot_msg.chat.id): logging.warning("Conversion abort for %s", bot_msg.chat.id) - bot_msg._client.send_message(bot_msg.chat.id, "Can't convert your video to streaming format.") + bot_msg._client.send_message( + bot_msg.chat.id, "Can't convert your video to streaming format. ffmpeg has been disabled." + ) break edit_text(bot_msg, f"{current_time()}: Converting {path.name} to mp4. Please wait.") new_file_path = path.with_suffix(".mp4") @@ -170,18 +150,7 @@ def run_ffmpeg(cmd_list, bm): def can_convert_mp4(video_path, uid): if not ENABLE_FFMPEG: return False - if not ENABLE_VIP: - return True - video_streams = ffmpeg.probe(video_path, select_streams="v") - try: - duration = int(float(video_streams["format"]["duration"])) - except Exception: - duration = 0 - if duration > MAX_DURATION and not VIP().check_vip(uid): - logging.info("Video duration: %s, not vip, can't convert", duration) - return False - else: - return True + return True def ytdl_download(url, tempdir, bm, **kwargs) -> dict: @@ -190,33 +159,27 @@ def ytdl_download(url, tempdir, bm, **kwargs) -> dict: response = {"status": True, "error": "", "filepath": []} output = pathlib.Path(tempdir, "%(title).70s.%(ext)s").as_posix() ydl_opts = { - 'progress_hooks': [lambda d: download_hook(d, bm)], - 'outtmpl': output, - 'restrictfilenames': False, - 'quiet': True, - "proxy": os.getenv("YTDL_PROXY") + "progress_hooks": [lambda d: download_hook(d, bm)], + "outtmpl": output, + "restrictfilenames": False, + "quiet": True, + "proxy": os.getenv("YTDL_PROXY"), } if ENABLE_ARIA2: ydl_opts["external_downloader"] = "aria2c" - ydl_opts["external_downloader_args"] = ['--min-split-size=1M', - '--max-connection-per-server=16', - '--max-concurrent-downloads=16', - '--split=16' - ] + ydl_opts["external_downloader_args"] = [ + "--min-split-size=1M", + "--max-connection-per-server=16", + "--max-concurrent-downloads=16", + "--split=16", + ] formats = [ "bestvideo[ext=mp4]+bestaudio[ext=m4a]/bestvideo+bestaudio", "bestvideo[vcodec^=avc]+bestaudio[acodec^=mp4a]/best[vcodec^=avc]/best", - None + None, ] adjust_formats(chat_id, url, formats, hijack) add_instagram_cookies(url, ydl_opts) - # check quota before download - if ENABLE_VIP: - # check quota after download - remain, _, ttl = VIP().check_remaining_quota(chat_id) - result, err_msg = check_quota(detect_filesize(url), chat_id) - if not result: - return {"status": False, "error": err_msg, "filepath": []} address = ["::", "0.0.0.0"] if IPv6 else [None] for format_ in formats: @@ -230,6 +193,7 @@ def ytdl_download(url, tempdir, bm, **kwargs) -> dict: ydl.download([url]) response["status"] = True response["error"] = "" + response["filepath"] = list(pathlib.Path(tempdir).glob("*")) break except Exception as e: logging.error("Download failed for %s ", url) @@ -243,32 +207,15 @@ def ytdl_download(url, tempdir, bm, **kwargs) -> dict: if response["status"] is False: return response - for i in os.listdir(tempdir): - p = pathlib.Path(tempdir, i) - file_size = os.stat(p).st_size - if ENABLE_VIP: - # check quota after download - remain, _, ttl = VIP().check_remaining_quota(chat_id) - result, err_msg = check_quota(file_size, chat_id) - else: - result, err_msg = True, "" - if result is False: - response["status"] = False - response["error"] = err_msg - else: - VIP().use_quota(bm.chat.id, file_size) - response["status"] = True - response["filepath"].append(p) - # convert format if necessary - settings = get_user_settings(str(chat_id)) + settings = payment.get_user_settings(str(chat_id)) if settings[2] == "video" or isinstance(settings[2], MagicMock): # only convert if send type is video convert_to_mp4(response, bm) if settings[2] == "audio" or hijack == "bestaudio[ext=m4a]": convert_audio_format(response, bm) - # enable it for now - split_large_video(response) + # disable it for now + # split_large_video(response) return response @@ -280,9 +227,7 @@ def convert_audio_format(resp: "dict", bm): path: "pathlib.Path" for path in resp["filepath"]: streams = ffmpeg.probe(path)["streams"] - if (AUDIO_FORMAT is None and - len(streams) == 1 and - streams[0]["codec_type"] == "audio"): + if AUDIO_FORMAT is None and len(streams) == 1 and streams[0]["codec_type"] == "audio": logging.info("%s is audio, default format, no need to convert", path) elif AUDIO_FORMAT is None and len(streams) >= 2: logging.info("%s is video, default format, need to extract audio", path) @@ -311,11 +256,6 @@ def add_instagram_cookies(url: "str", opt: "dict"): opt["cookiefile"] = pathlib.Path(__file__).parent.joinpath("instagram.com_cookies.txt").as_posix() -def run_splitter(video_path: "str"): - subprocess.check_output(f"sh split-video.sh {video_path} {TG_MAX_SIZE * 0.95} ".split()) - os.remove(video_path) - - def split_large_video(response: "dict"): original_video = None split = False @@ -324,19 +264,8 @@ def split_large_video(response: "dict"): if size > TG_MAX_SIZE: split = True logging.warning("file is too large %s, splitting...", size) - run_splitter(original_video) + subprocess.check_output(f"sh split-video.sh {original_video} {TG_MAX_SIZE * 0.95} ".split()) + os.remove(original_video) if split and original_video: response["filepath"] = [i.as_posix() for i in pathlib.Path(original_video).parent.glob("*")] - - -def detect_filesize(url: "str") -> "int": - # find the largest file size - with ytdl.YoutubeDL() as ydl: - info_dict = ydl.extract_info(url, download=False) - max_size = 0 - max_size_list = [i.get("filesize", 0) for i in info_dict["formats"] if i.get("filesize")] - if max_size_list: - max_size = max(max_size_list) - logging.info("%s max size is %s", url, max_size) - return max_size diff --git a/ytdlbot/fakemysql.py b/ytdlbot/fakemysql.py deleted file mode 100644 index eb96a34..0000000 --- a/ytdlbot/fakemysql.py +++ /dev/null @@ -1,58 +0,0 @@ -#!/usr/local/bin/python3 -# coding: utf-8 - -# ytdlbot - fakemysql.py -# 2/20/22 20:08 -# - -__author__ = "Benny " - -import re -import sqlite3 - -init_con = sqlite3.connect(":memory:", check_same_thread=False) - - -class FakeMySQL: - @staticmethod - def cursor() -> "Cursor": - return Cursor() - - def commit(self): - pass - - def close(self): - pass - - -class Cursor: - def __init__(self): - self.con = init_con - self.cur = self.con.cursor() - - def execute(self, *args, **kwargs): - sql = self.sub(args[0]) - new_args = (sql,) + args[1:] - return self.cur.execute(*new_args, **kwargs) - - def fetchall(self): - return self.cur.fetchall() - - def fetchone(self): - return self.cur.fetchone() - - @staticmethod - def sub(sql): - sql = re.sub(r"CHARSET.*|charset.*", "", sql, re.IGNORECASE) - sql = sql.replace("%s", "?") - return sql - - -if __name__ == '__main__': - con = FakeMySQL() - cur = con.cursor() - cur.execute("create table user(id int, name varchar(20))") - cur.execute("insert into user values(%s,%s)", (1, "benny")) - cur.execute("select * from user") - data = cur.fetchall() - print(data) diff --git a/ytdlbot/limit.py b/ytdlbot/limit.py index 3e9f04c..6e6128a 100644 --- a/ytdlbot/limit.py +++ b/ytdlbot/limit.py @@ -8,239 +8,26 @@ __author__ = "Benny " import hashlib -import http import logging -import math -import os -import re import time -from unittest.mock import MagicMock import requests -from bs4 import BeautifulSoup -from config import (AFD_TOKEN, AFD_USER_ID, COFFEE_TOKEN, ENABLE_VIP, EX, - MULTIPLY, OWNER, QUOTA, USD2CNY) -from db import MySQL, Redis -from utils import apply_log_formatter +from config import ( + AFD_TOKEN, + AFD_USER_ID, + COFFEE_TOKEN, + EXPIRE, + FREE_DOWNLOAD, + OWNER, + TOKEN_PRICE, +) +from database import MySQL, Redis +from utils import apply_log_formatter, current_time apply_log_formatter() -class VIP(Redis, MySQL): - - def check_vip(self, user_id: "int") -> "tuple": - self.cur.execute("SELECT * FROM vip WHERE user_id=%s", (user_id,)) - data = self.cur.fetchone() - return data - - def __add_vip(self, user_data: "dict"): - sql = "INSERT INTO vip VALUES (%s,%s,%s,%s,%s,%s);" - self.cur.execute(sql, list(user_data.values())) - self.con.commit() - # also remove redis cache - self.r.delete(user_data["user_id"]) - - def add_vip(self, user_data: "dict") -> "str": - # first select - self.cur.execute("SELECT * FROM vip WHERE payment_id=%s", (user_data["payment_id"],)) - is_exist = self.cur.fetchone() - if is_exist: - return "Failed. {} is being used by user {}".format(user_data["payment_id"], is_exist[0]) - self.__add_vip(user_data) - return "Success! You are VIP{} now!".format(user_data["level"]) - - def direct_add_vip(self, user_data: "dict") -> ("bool", "str"): - self.__add_vip(user_data) - return "Success payment from Telegram! You are VIP{} now!".format(user_data["level"]) - - def remove_vip(self, user_id: "int"): - raise NotImplementedError() - - def get_user_quota(self, user_id: "int") -> int: - # even VIP have certain quota - q = self.check_vip(user_id) - topup = self.r.hget("topup", user_id) - if q: - return q[-1] - elif topup: - return int(topup) + QUOTA - else: - return QUOTA - - def set_topup(self, user_id: "int"): - self.r.hset("topup", user_id, QUOTA) - - def check_remaining_quota(self, user_id: "int"): - user_quota = self.get_user_quota(user_id) - ttl = self.r.ttl(user_id) - q = int(self.r.get(user_id)) if self.r.exists(user_id) else user_quota - if q <= 0: - q = 0 - return q, user_quota, ttl - - def use_quota(self, user_id: "int", traffic: "int"): - user_quota = self.get_user_quota(user_id) - # fix for standard mode - if isinstance(user_quota, MagicMock): - user_quota = 2 ** 32 - if self.r.exists(user_id): - self.r.decr(user_id, traffic) - else: - self.r.set(user_id, user_quota - traffic, ex=EX) - - def subscribe_channel(self, user_id: "int", share_link: "str"): - if not re.findall(r"youtube\.com|youtu\.be", share_link): - raise ValueError("Is this a valid YouTube Channel link?") - if ENABLE_VIP: - self.cur.execute("select count(user_id) from subscribe where user_id=%s", (user_id,)) - usage = int(self.cur.fetchone()[0]) - if usage >= 5 and not self.check_vip(user_id): - logging.warning("User %s is not VIP but has subscribed %s channels", user_id, usage) - return "You have subscribed too many channels. Please upgrade to VIP to subscribe more channels." - - data = self.get_channel_info(share_link) - channel_id = data["channel_id"] - - self.cur.execute("select user_id from subscribe where user_id=%s and channel_id=%s", (user_id, channel_id)) - if self.cur.fetchall(): - raise ValueError("You have already subscribed this channel.") - - self.cur.execute("INSERT IGNORE INTO channel values" - "(%(link)s,%(title)s,%(description)s,%(channel_id)s,%(playlist)s,%(last_video)s)", data) - self.cur.execute("INSERT INTO subscribe values(%s,%s, NULL)", (user_id, channel_id)) - self.con.commit() - logging.info("User %s subscribed channel %s", user_id, data["title"]) - return "Subscribed to {}".format(data["title"]) - - def unsubscribe_channel(self, user_id: "int", channel_id: "str"): - affected_rows = self.cur.execute("DELETE FROM subscribe WHERE user_id=%s AND channel_id=%s", - (user_id, channel_id)) - self.con.commit() - logging.info("User %s tried to unsubscribe channel %s", user_id, channel_id) - return affected_rows - - @staticmethod - def extract_canonical_link(url): - # canonic link works for many websites. It will strip out unnecessary stuff - proxy = {"https": os.getenv("YTDL_PROXY")} if os.getenv("YTDL_PROXY") else None - props = ["canonical", "alternate", "shortlinkUrl"] - headers = { - "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.163 Safari/537.36"} - # send head request first - r = requests.head(url, headers=headers, proxies=proxy) - if r.status_code != http.HTTPStatus.METHOD_NOT_ALLOWED and "text/html" not in r.headers.get("content-type"): - # get content-type, if it's not text/html, there's no need to issue a GET request - logging.warning("%s Content-type is not text/html, no need to GET for extract_canonical_link", url) - return url - - html_doc = requests.get(url, headers=headers, timeout=5, proxies=proxy).text - soup = BeautifulSoup(html_doc, "html.parser") - for prop in props: - element = soup.find("link", rel=prop) - try: - href = element["href"] - if href not in ["null", "", None]: - return href - except Exception: - logging.warning("Canonical exception %s", url) - - return url - - def get_channel_info(self, url: "str"): - api_key = os.getenv("GOOGLE_API_KEY") - canonical_link = self.extract_canonical_link(url) - try: - channel_id = canonical_link.split("https://www.youtube.com/channel/")[1] - except IndexError: - channel_id = canonical_link.split("https://youtube.com/channel/")[1] - channel_api = f"https://www.googleapis.com/youtube/v3/channels?part=snippet,contentDetails&" \ - f"id={channel_id}&key={api_key}" - data = requests.get(channel_api).json() - snippet = data['items'][0]['snippet'] - title = snippet['title'] - description = snippet['description'] - playlist = data['items'][0]['contentDetails']['relatedPlaylists']['uploads'] - - return { - "link": url, - "title": title, - "description": description, - "channel_id": channel_id, - "playlist": playlist, - "last_video": VIP.get_latest_video(playlist) - } - - @staticmethod - def get_latest_video(playlist_id: "str"): - api_key = os.getenv("GOOGLE_API_KEY") - video_api = f"https://www.googleapis.com/youtube/v3/playlistItems?part=snippet&maxResults=1&" \ - f"playlistId={playlist_id}&key={api_key}" - data = requests.get(video_api).json() - video_id = data['items'][0]['snippet']['resourceId']['videoId'] - logging.info(f"Latest video %s from %s", video_id, data['items'][0]['snippet']['channelTitle']) - return f"https://www.youtube.com/watch?v={video_id}" - - def has_newer_update(self, channel_id: "str"): - self.cur.execute("SELECT playlist,latest_video FROM channel WHERE channel_id=%s", (channel_id,)) - data = self.cur.fetchone() - playlist_id = data[0] - old_video = data[1] - newest_video = VIP.get_latest_video(playlist_id) - if old_video != newest_video: - logging.info("Newer update found for %s %s", channel_id, newest_video) - self.cur.execute("UPDATE channel SET latest_video=%s WHERE channel_id=%s", (newest_video, channel_id)) - self.con.commit() - return newest_video - - def get_user_subscription(self, user_id: "int"): - self.cur.execute( - """ - select title, link, channel.channel_id from channel, subscribe - where subscribe.user_id = %s and channel.channel_id = subscribe.channel_id - """, (user_id,)) - data = self.cur.fetchall() - text = "" - for item in data: - text += "[{}]({}) `{}\n`".format(*item) - return text - - def group_subscriber(self): - # {"channel_id": [user_id, user_id, ...]} - self.cur.execute("select * from subscribe where is_valid=1") - data = self.cur.fetchall() - group = {} - for item in data: - group.setdefault(item[1], []).append(item[0]) - logging.info("Checking periodic subscriber...") - return group - - def deactivate_user_subscription(self, user_id: "int"): - self.cur.execute("UPDATE subscribe set is_valid=0 WHERE user_id=%s", (user_id,)) - self.con.commit() - - def sub_count(self): - sql = """ - select user_id, channel.title, channel.link - from subscribe, channel where subscribe.channel_id = channel.channel_id - """ - self.cur.execute(sql) - data = self.cur.fetchall() - text = f"Total {len(data)} subscriptions found.\n\n" - for item in data: - text += "{} ==> [{}]({})\n".format(*item) - return text - - def del_cache(self, user_link: "str"): - unique = self.extract_canonical_link(user_link) - caches = self.r.hgetall("cache") - count = 0 - for key in caches: - if key.startswith(unique): - count += self.del_send_cache(key) - return count - - class BuyMeACoffee: def __init__(self): self._token = COFFEE_TOKEN @@ -266,8 +53,7 @@ class BuyMeACoffee: price = float(order.get("support_coffee_price", 0)) cups = float(order.get("support_coffees", 1)) amount = price * cups - level = math.floor(amount / MULTIPLY) - return level, amount, email + return amount, email class Afdian: @@ -279,11 +65,11 @@ class Afdian: def _generate_signature(self): data = { "user_id": self._user_id, - "params": "{\"x\":0}", + "params": '{"x":0}', "ts": int(time.time()), } sign_text = "{token}params{params}ts{ts}user_id{user_id}".format( - token=self._token, params=data['params'], ts=data["ts"], user_id=data["user_id"] + token=self._token, params=data["params"], ts=data["ts"], user_id=data["user_id"] ) md5 = hashlib.md5(sign_text.encode("u8")) @@ -305,44 +91,75 @@ class Afdian: def get_user_payment(self, trade_no: "str") -> ("int", "float", "str"): order = self._get_afdian_status(trade_no) amount = float(order.get("show_amount", 0)) - level = math.floor(amount / (MULTIPLY * USD2CNY)) - return level, amount, trade_no + # convert to USD + return amount / 7, trade_no -def verify_payment(user_id, unique, client) -> "str": - if not ENABLE_VIP: - return "VIP is not enabled." - logging.info("Verifying payment for %s - %s", user_id, unique) - if "@" in unique: - pay = BuyMeACoffee() - else: - pay = Afdian() +class Payment(Redis, MySQL): + # TODO transaction issues + def check_old_user(self, user_id: "int") -> "tuple": + self.cur.execute("SELECT * FROM payment WHERE user_id=%s AND old_user=1", (user_id,)) + data = self.cur.fetchone() + return data - level, amount, pay_id = pay.get_user_payment(unique) - if amount == 0: - return f"You pay amount is {amount}. Did you input wrong order ID or email? " \ - f"Talk to @{OWNER} if you need any assistant." - if not level: - return f"You pay amount {amount} is below minimum ${MULTIPLY}. " \ - f"Talk to @{OWNER} if you need any assistant." - else: - vip = VIP() - ud = { - "user_id": user_id, - "username": client.get_chat(user_id).first_name, - "payment_amount": amount, - "payment_id": pay_id, - "level": level, - "quota": QUOTA * level * MULTIPLY - } + def get_pay_token(self, user_id: "int") -> int: + self.cur.execute("SELECT token FROM payment WHERE user_id=%s", (user_id,)) + data = self.cur.fetchall() or [(0,)] + return sum([i[0] for i in data if i[0]]) - message = vip.add_vip(ud) - return message + def get_free_token(self, user_id: "int") -> int: + if self.r.exists(user_id): + return int(self.r.get(user_id)) + else: + # set and return + self.r.set(user_id, FREE_DOWNLOAD, ex=EXPIRE) + return FREE_DOWNLOAD + def get_token(self, user_id): + ttl = self.r.ttl(user_id) + return self.get_free_token(user_id), self.get_pay_token(user_id), current_time(time.time() + ttl) -def subscribe_query(): - vip = VIP() - for cid, uid in vip.group_subscriber().items(): - has = vip.has_newer_update(cid) - if has: - print(f"{has} - {uid}") + def use_free_token(self, user_id: "int"): + if self.r.exists(user_id): + self.r.decr(user_id, 1) + else: + # first time download + self.r.set(user_id, 5 - 1, ex=EXPIRE) + + def use_pay_token(self, user_id: int): + # a user may pay multiple times, so we'll need to filter the first payment with valid token + self.cur.execute("SELECT payment_id FROM payment WHERE user_id=%s AND token>0", (user_id,)) + data = self.cur.fetchone() + payment_id = data[0] + logging.info("User %s use pay token with payment_id %s", user_id, payment_id) + self.cur.execute("UPDATE payment SET token=token-1 WHERE payment_id=%s", (payment_id,)) + self.con.commit() + + def use_token(self, user_id): + free = self.get_free_token(user_id) + if free > 0: + self.use_free_token(user_id) + else: + self.use_pay_token(user_id) + + def add_pay_user(self, pay_data: "list"): + self.cur.execute("INSERT INTO payment VALUES (%s,%s,%s,%s,%s)", pay_data) + self.con.commit() + + def verify_payment(self, user_id: "int", unique: "str") -> "str": + pay = BuyMeACoffee() if "@" in unique else Afdian() + self.cur.execute("SELECT * FROM payment WHERE payment_id=%s ", (unique,)) + data = self.cur.fetchone() + if data: + # TODO what if a user pay twice with the same email address? + return ( + f"Failed. Payment has been verified by other users. Please contact @{OWNER} if you have any questions." + ) + + amount, pay_id = pay.get_user_payment(unique) + logging.info("User %s paid %s, identifier is %s", user_id, amount, unique) + # amount is already in USD + if amount == 0: + return "Payment not found. Please check your payment ID or email address" + self.add_pay_user([user_id, amount, pay_id, 0, amount * TOKEN_PRICE]) + return "Thanks! Your payment has been verified. /start to get your token details" diff --git a/ytdlbot/migration.sql b/ytdlbot/migration.sql deleted file mode 100644 index 15fc385..0000000 --- a/ytdlbot/migration.sql +++ /dev/null @@ -1,3 +0,0 @@ -alter table settings - add mode varchar(32) default 'Celery' null; - diff --git a/ytdlbot/tasks.py b/ytdlbot/tasks.py index 82448a2..29392e1 100644 --- a/ytdlbot/tasks.py +++ b/ytdlbot/tasks.py @@ -31,26 +31,33 @@ from pyrogram import idle from pyrogram.types import InlineKeyboardButton, InlineKeyboardMarkup, Message from requests_toolbelt import MultipartEncoder, MultipartEncoderMonitor +from channel import Channel from client_init import create_app -from config import (ARCHIVE_ID, BROKER, ENABLE_CELERY, ENABLE_QUEUE, - ENABLE_VIP, TG_MAX_SIZE, WORKERS) +from config import ARCHIVE_ID, BROKER, ENABLE_CELERY, ENABLE_QUEUE, TG_MAX_SIZE, WORKERS, ENABLE_VIP from constant import BotText -from db import Redis -from downloader import (edit_text, sizeof_fmt, tqdm_progress, upload_hook, - ytdl_download) -from limit import VIP -from utils import (apply_log_formatter, auto_restart, customize_logger, - get_metadata, get_revision, get_user_settings) +from database import Redis +from downloader import edit_text, tqdm_progress, upload_hook, ytdl_download +from limit import Payment +from utils import ( + apply_log_formatter, + auto_restart, + customize_logger, + get_metadata, + get_revision, + sizeof_fmt, +) customize_logger(["pyrogram.client", "pyrogram.session.session", "pyrogram.connection.connection"]) apply_log_formatter() bot_text = BotText() -logging.getLogger('apscheduler.executors.default').propagate = False +logging.getLogger("apscheduler.executors.default").propagate = False # celery -A tasks worker --loglevel=info --pool=solo # app = Celery('celery', broker=BROKER, accept_content=['pickle'], task_serializer='pickle') -app = Celery('tasks', broker=BROKER) - +app = Celery("tasks", broker=BROKER) +redis = Redis() +payment = Payment() +channel = Channel() celery_client = create_app(":memory:") @@ -80,9 +87,10 @@ def audio_task(chat_id, message_id): def get_unique_clink(original_url, user_id): - settings = get_user_settings(str(user_id)) - clink = VIP().extract_canonical_link(original_url) + settings = payment.get_user_settings(str(user_id)) + clink = channel.extract_canonical_link(original_url) try: + # different user may have different resolution settings unique = "{}?p={}{}".format(clink, *settings[1:]) except IndexError: unique = clink @@ -97,45 +105,34 @@ def direct_download_task(chat_id, message_id, url): logging.info("Direct download celery tasks ended.") -def forward_video(url, client, bot_msg): +def forward_video(client, bot_msg, url): chat_id = bot_msg.chat.id - red = Redis() - vip = VIP() unique = get_unique_clink(url, chat_id) - - cached_fid = red.get_send_cache(unique) + cached_fid = redis.get_send_cache(unique) if not cached_fid: return False try: res_msg: "Message" = upload_processor(client, bot_msg, url, cached_fid) - if not res_msg: - raise ValueError("Failed to forward message") - obj = res_msg.document or res_msg.video or res_msg.audio - if ENABLE_VIP: - file_size = getattr(obj, "file_size", None) \ - or getattr(obj, "file_size", None) \ - or getattr(obj, "file_size", 10) - # TODO: forward file size may exceed the limit - vip.use_quota(chat_id, file_size) + obj = res_msg.document or res_msg.video or res_msg.audio or res_msg.animation + caption, _ = gen_cap(bot_msg, url, obj) res_msg.edit_text(caption, reply_markup=gen_video_markup()) bot_msg.edit_text(f"Download success!โœ…โœ…โœ…") - red.update_metrics("cache_hit") + redis.update_metrics("cache_hit") return True - except Exception as e: traceback.print_exc() logging.error("Failed to forward message %s", e) - red.del_send_cache(unique) - red.update_metrics("cache_miss") + redis.del_send_cache(unique) + redis.update_metrics("cache_miss") -def ytdl_download_entrance(bot_msg, client, url): +def ytdl_download_entrance(client, bot_msg, url): chat_id = bot_msg.chat.id - if forward_video(url, client, bot_msg): + if forward_video(client, bot_msg, url): return - mode = get_user_settings(str(chat_id))[-1] + mode = payment.get_user_settings(str(chat_id))[-1] if ENABLE_CELERY and mode in [None, "Celery"]: async_task(ytdl_download_task, chat_id, bot_msg.message_id, url) # ytdl_download_task.delay(chat_id, bot_msg.message_id, url) @@ -163,19 +160,10 @@ def audio_entrance(bot_msg, client): def direct_normal_download(bot_msg, client, url): chat_id = bot_msg.chat.id headers = { - "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.149 Safari/537.36"} - vip = VIP() + "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.149 Safari/537.36" + } + vip = Payment() length = 0 - if ENABLE_VIP: - remain, _, _ = vip.check_remaining_quota(chat_id) - try: - head_req = requests.head(url, headers=headers) - length = int(head_req.headers.get("content-length")) - except (TypeError, requests.exceptions.RequestException): - length = 0 - if remain < length: - bot_msg.reply_text(f"Sorry, you have reached your quota.\n") - return req = None try: @@ -203,13 +191,15 @@ def direct_normal_download(bot_msg, client, url): downloaded += len(chunk) logging.info("Downloaded file %s", filename) st_size = os.stat(filepath).st_size - if ENABLE_VIP: - vip.use_quota(chat_id, st_size) + client.send_chat_action(chat_id, "upload_document") - client.send_document(bot_msg.chat.id, filepath, - caption=f"filesize: {sizeof_fmt(st_size)}", - progress=upload_hook, progress_args=(bot_msg,), - ) + client.send_document( + bot_msg.chat.id, + filepath, + caption=f"filesize: {sizeof_fmt(st_size)}", + progress=upload_hook, + progress_args=(bot_msg,), + ) bot_msg.edit_text("Download success!โœ…") @@ -219,11 +209,11 @@ def normal_audio(bot_msg, client): status_msg = bot_msg.reply_text("Converting to audio...please wait patiently", quote=True) orig_url: "str" = re.findall(r"https?://.*", bot_msg.caption)[0] with tempfile.TemporaryDirectory(prefix="ytdl-") as tmp: - client.send_chat_action(chat_id, 'record_audio') + client.send_chat_action(chat_id, "record_audio") # just try to download the audio using yt-dlp resp = ytdl_download(orig_url, tmp, status_msg, hijack="bestaudio[ext=m4a]") status_msg.edit_text("Sending audio now...") - client.send_chat_action(chat_id, 'upload_audio') + client.send_chat_action(chat_id, "upload_audio") for f in resp["filepath"]: client.send_audio(chat_id, f) status_msg.edit_text("โœ… Conversion complete.") @@ -240,7 +230,7 @@ def get_dl_source(): def upload_transfer_sh(bm, paths: list) -> "str": d = {p.name: (md5(p.name.encode("utf8")).hexdigest() + p.suffix, p.open("rb")) for p in paths} monitor = MultipartEncoderMonitor(MultipartEncoder(fields=d), lambda x: upload_hook(x.bytes_read, x.len, bm)) - headers = {'Content-Type': monitor.content_type} + headers = {"Content-Type": monitor.content_type} try: req = requests.post("https://transfer.sh", data=monitor, headers=headers) bm.edit_text(f"Download success!โœ…") @@ -256,9 +246,9 @@ def ytdl_normal_download(bot_msg, client, url): result = ytdl_download(url, temp_dir.name, bot_msg) logging.info("Download complete.") if result["status"]: - client.send_chat_action(chat_id, 'upload_document') + client.send_chat_action(chat_id, "upload_document") video_paths = result["filepath"] - bot_msg.edit_text('Download complete. Sending now...') + bot_msg.edit_text("Download complete. Sending now...") for video_path in video_paths: # normally there's only one video in that path... st_size = os.stat(video_path).st_size @@ -268,9 +258,9 @@ def ytdl_normal_download(bot_msg, client, url): # client.send_message(chat_id, upload_transfer_sh(bot_msg, video_paths)) continue upload_processor(client, bot_msg, url, video_path) - bot_msg.edit_text('Download success!โœ…') + bot_msg.edit_text("Download success!โœ…") else: - client.send_chat_action(chat_id, 'typing') + client.send_chat_action(chat_id, "typing") tb = result["error"][0:4000] bot_msg.edit_text(f"Download failed!โŒ\n\n```{tb}```", disable_web_page_preview=True) @@ -279,52 +269,75 @@ def ytdl_normal_download(bot_msg, client, url): def upload_processor(client, bot_msg, url, vp_or_fid: "typing.Any[str, pathlib.Path]"): chat_id = bot_msg.chat.id - red = Redis() markup = gen_video_markup() cap, meta = gen_cap(bot_msg, url, vp_or_fid) - settings = get_user_settings(str(chat_id)) + settings = payment.get_user_settings(str(chat_id)) if ARCHIVE_ID and isinstance(vp_or_fid, pathlib.Path): chat_id = ARCHIVE_ID if settings[2] == "document": logging.info("Sending as document") try: # send as document could be sent as video even if it's a document - res_msg = client.send_document(chat_id, vp_or_fid, - caption=cap, - progress=upload_hook, progress_args=(bot_msg,), - reply_markup=markup, - thumb=meta["thumb"], - force_document=True - ) + res_msg = client.send_document( + chat_id, + vp_or_fid, + caption=cap, + progress=upload_hook, + progress_args=(bot_msg,), + reply_markup=markup, + thumb=meta["thumb"], + force_document=True, + ) except ValueError: logging.error("Retry to send as video") - res_msg = client.send_video(chat_id, vp_or_fid, - supports_streaming=True, - caption=cap, - progress=upload_hook, progress_args=(bot_msg,), - reply_markup=markup, - **meta - ) + res_msg = client.send_video( + chat_id, + vp_or_fid, + supports_streaming=True, + caption=cap, + progress=upload_hook, + progress_args=(bot_msg,), + reply_markup=markup, + **meta, + ) elif settings[2] == "audio": logging.info("Sending as audio") - res_msg = client.send_audio(chat_id, vp_or_fid, - caption=cap, - progress=upload_hook, progress_args=(bot_msg,), - ) + res_msg = client.send_audio( + chat_id, + vp_or_fid, + caption=cap, + progress=upload_hook, + progress_args=(bot_msg,), + ) else: logging.info("Sending as video") - res_msg = client.send_video(chat_id, vp_or_fid, - supports_streaming=True, - caption=cap, - progress=upload_hook, progress_args=(bot_msg,), - reply_markup=markup, - **meta - ) + try: + res_msg = client.send_video( + chat_id, + vp_or_fid, + supports_streaming=True, + caption=cap, + progress=upload_hook, + progress_args=(bot_msg,), + reply_markup=markup, + **meta, + ) + except ValueError: + logging.info("Retry to send as animation") + res_msg = client.send_animation( + chat_id, + vp_or_fid, + caption=cap, + progress=upload_hook, + progress_args=(bot_msg,), + reply_markup=markup, + **meta, + ) unique = get_unique_clink(url, bot_msg.chat.id) - obj = res_msg.document or res_msg.video or res_msg.audio - red.add_send_cache(unique, getattr(obj, "file_id", None)) - red.update_metrics("video_success") + obj = res_msg.document or res_msg.video or res_msg.audio or res_msg.animation + redis.add_send_cache(unique, getattr(obj, "file_id", None)) + redis.update_metrics("video_success") if ARCHIVE_ID and isinstance(vp_or_fid, pathlib.Path): client.forward_messages(bot_msg.chat.id, ARCHIVE_ID, res_msg.message_id) return res_msg @@ -334,11 +347,7 @@ def gen_cap(bm, url, video_path): chat_id = bm.chat.id user = bm.chat try: - user_info = "@{}({})-{}".format( - user.username or "N/A", - user.first_name or "" + user.last_name or "", - user.id - ) + user_info = "@{}({})-{}".format(user.username or "N/A", user.first_name or "" + user.last_name or "", user.id) except Exception: user_info = "" @@ -348,17 +357,24 @@ def gen_cap(bm, url, video_path): file_size = sizeof_fmt(os.stat(video_path).st_size) else: file_name = getattr(video_path, "file_name", "") - file_size = sizeof_fmt(getattr(video_path, "file_size", (2 << 6) - (2 << 4) - (2 << 2) + (0 ^ 1) + (2 << 5))) + file_size = sizeof_fmt(getattr(video_path, "file_size", (2 << 2) + ((2 << 2) + 1) + (2 << 5))) meta = dict( width=getattr(video_path, "width", 0), height=getattr(video_path, "height", 0), duration=getattr(video_path, "duration", 0), thumb=getattr(video_path, "thumb", None), ) - remain = bot_text.remaining_quota_caption(chat_id) + free = payment.get_free_token(chat_id) + pay = payment.get_pay_token(chat_id) + if ENABLE_VIP: + remain = f"Download token count: free {free}, pay {pay}" + else: + remain = "" worker = get_dl_source() - cap = f"{user_info}\n{file_name}\n\n{url}\n\nInfo: {meta['width']}x{meta['height']} {file_size}\t" \ - f"{meta['duration']}s\n{remain}\n{worker}\n{bot_text.custom_text}" + cap = ( + f"{user_info}\n{file_name}\n\n{url}\n\nInfo: {meta['width']}x{meta['height']} {file_size}\t" + f"{meta['duration']}s\n{remain}\n{worker}\n{bot_text.custom_text}" + ) return cap, meta @@ -367,8 +383,7 @@ def gen_video_markup(): [ [ # First row InlineKeyboardButton( # Generates a callback query when pressed - "convert to audio", - callback_data="convert" + "convert to audio", callback_data="convert" ) ] ] @@ -412,10 +427,10 @@ def async_task(task_name, *args): inspect = app.control.inspect() worker_stats = inspect.stats() route_queues = [] - padding = math.ceil(sum([i['pool']['max-concurrency'] for i in worker_stats.values()]) / len(worker_stats)) + padding = math.ceil(sum([i["pool"]["max-concurrency"] for i in worker_stats.values()]) / len(worker_stats)) for worker_name, stats in worker_stats.items(): - route = worker_name.split('@')[1] - concurrency = stats['pool']['max-concurrency'] + route = worker_name.split("@")[1] + concurrency = stats["pool"]["max-concurrency"] route_queues.extend([route] * (concurrency + padding)) destination = random.choice(route_queues) logging.info("Selecting worker %s from %s in %.2fs", destination, route_queues, time.time() - t0) @@ -424,11 +439,7 @@ def async_task(task_name, *args): def run_celery(): worker_name = os.getenv("WORKER_NAME", "") - argv = [ - "-A", "tasks", 'worker', '--loglevel=info', - "--pool=threads", f"--concurrency={WORKERS}", - "-n", worker_name - ] + argv = ["-A", "tasks", "worker", "--loglevel=info", "--pool=threads", f"--concurrency={WORKERS}", "-n", worker_name] if ENABLE_QUEUE: argv.extend(["-Q", worker_name]) app.worker_main(argv) @@ -439,14 +450,14 @@ def purge_tasks(): return f"purged {count} tasks." -if __name__ == '__main__': +if __name__ == "__main__": celery_client.start() print("Bootstrapping Celery worker now.....") time.sleep(5) threading.Thread(target=run_celery, daemon=True).start() scheduler = BackgroundScheduler(timezone="Asia/Shanghai") - scheduler.add_job(auto_restart, 'interval', seconds=5) + scheduler.add_job(auto_restart, "interval", seconds=10) scheduler.start() idle() diff --git a/ytdlbot/utils.py b/ytdlbot/utils.py index eea2fc6..a1c2b1b 100644 --- a/ytdlbot/utils.py +++ b/ytdlbot/utils.py @@ -18,20 +18,20 @@ import tempfile import time import uuid +import coloredlogs import ffmpeg import psutil -from db import MySQL from flower_tasks import app inspect = app.control.inspect() def apply_log_formatter(): - logging.basicConfig( + coloredlogs.install( level=logging.INFO, - format='[%(asctime)s %(filename)s:%(lineno)d %(levelname).1s] %(message)s', - datefmt="%Y-%m-%d %H:%M:%S" + fmt="[%(asctime)s %(filename)s:%(lineno)d %(levelname).1s] %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", ) @@ -41,33 +41,12 @@ def customize_logger(logger: "list"): logging.getLogger(log).setLevel(level=logging.INFO) -def get_user_settings(user_id: "str") -> "tuple": - db = MySQL() - cur = db.cur - cur.execute("SELECT * FROM settings WHERE user_id = %s", (user_id,)) - data = cur.fetchone() - if data is None: - return 100, "high", "video", "Celery" - return data - - -def set_user_settings(user_id: int, field: "str", value: "str"): - db = MySQL() - cur = db.cur - cur.execute("SELECT * FROM settings WHERE user_id = %s", (user_id,)) - data = cur.fetchone() - if data is None: - resolution = method = "" - if field == "resolution": - method = "video" - resolution = value - if field == "method": - method = value - resolution = "high" - cur.execute("INSERT INTO settings VALUES (%s,%s,%s,%s)", (user_id, resolution, method, "Celery")) - else: - cur.execute(f"UPDATE settings SET {field} =%s WHERE user_id = %s", (value, user_id)) - db.con.commit() +def sizeof_fmt(num: int, suffix="B"): + for unit in ["", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi"]: + if abs(num) < 1024.0: + return "%3.1f%s%s" % (num, unit, suffix) + num /= 1024.0 + return "%.1f%s%s" % (num, "Yi", suffix) def is_youtube(url: "str"): @@ -76,6 +55,8 @@ def is_youtube(url: "str"): def adjust_formats(user_id: "str", url: "str", formats: "list", hijack=None): + from database import MySQL + # high: best quality 1080P, 2K, 4K, 8K # medium: 720P # low: 480P @@ -84,7 +65,7 @@ def adjust_formats(user_id: "str", url: "str", formats: "list", hijack=None): return mapping = {"high": [], "medium": [720], "low": [480]} - settings = get_user_settings(user_id) + settings = MySQL().get_user_settings(user_id) if settings and is_youtube(url): for m in mapping.get(settings[1], []): formats.insert(0, f"bestvideo[ext=mp4][height={m}]+bestaudio[ext=m4a]") @@ -106,7 +87,7 @@ def get_metadata(video_path): logging.error(e) try: thumb = pathlib.Path(video_path).parent.joinpath(f"{uuid.uuid4().hex}-thunmnail.png").as_posix() - ffmpeg.input(video_path, ss=duration / 2).filter('scale', width, -1).output(thumb, vframes=1).run() + ffmpeg.input(video_path, ss=duration / 2).filter("scale", width, -1).output(thumb, vframes=1).run() except ffmpeg._run.Error: thumb = None @@ -134,9 +115,9 @@ def get_func_queue(func) -> int: return 0 -def tail(f, lines=1, _buffer=4098): +def tail_log(f, lines=1, _buffer=4098): """Tail a file and get X lines from the end""" - # place holder for the lines found + # placeholder for the lines found lines_found = [] # block counter will be multiplied by buffer @@ -214,7 +195,7 @@ def auto_restart(): if not os.path.exists(log_path): return with open(log_path) as f: - logs = "".join(tail(f, lines=10)) + logs = "".join(tail_log(f, lines=10)) det = Detector(logs) method_list = [getattr(det, func) for func in dir(det) if func.endswith("_detector")] @@ -233,5 +214,5 @@ def clean_tempfile(): shutil.rmtree(item, ignore_errors=True) -if __name__ == '__main__': +if __name__ == "__main__": auto_restart() diff --git a/ytdlbot/ytdl_bot.py b/ytdlbot/ytdl_bot.py index f1298a9..3bcf540 100644 --- a/ytdlbot/ytdl_bot.py +++ b/ytdlbot/ytdl_bot.py @@ -24,33 +24,42 @@ from pyrogram.raw import functions from pyrogram.raw import types as raw_types from pyrogram.types import InlineKeyboardButton, InlineKeyboardMarkup from tgbot_ping import get_runtime -from token_bucket import Limiter, MemoryStorage +from channel import Channel from client_init import create_app -from config import (AUTHORIZED_USER, BURST, ENABLE_CELERY, ENABLE_FFMPEG, - ENABLE_VIP, MULTIPLY, OWNER, PROVIDER_TOKEN, QUOTA, RATE, - REQUIRED_MEMBERSHIP, PLAYLIST_SUPPORT) +from config import ( + AUTHORIZED_USER, + ENABLE_CELERY, + ENABLE_FFMPEG, + ENABLE_VIP, + OWNER, + PLAYLIST_SUPPORT, + PROVIDER_TOKEN, + REQUIRED_MEMBERSHIP, + TOKEN_PRICE, +) from constant import BotText -from db import InfluxDB, MySQL, Redis -from limit import VIP, verify_payment +from database import InfluxDB, MySQL, Redis +from limit import Payment from tasks import app as celery_app -from tasks import (audio_entrance, direct_download_entrance, hot_patch, - purge_tasks, ytdl_download_entrance) -from utils import (auto_restart, clean_tempfile, customize_logger, - get_revision, get_user_settings, set_user_settings) +from tasks import ( + audio_entrance, + direct_download_entrance, + hot_patch, + purge_tasks, + ytdl_download_entrance, +) +from utils import auto_restart, clean_tempfile, customize_logger, get_revision customize_logger(["pyrogram.client", "pyrogram.session.session", "pyrogram.connection.connection"]) -logging.getLogger('apscheduler.executors.default').propagate = False +logging.getLogger("apscheduler.executors.default").propagate = False app = create_app() -bot_text = BotText() - +db = MySQL() logging.info("Authorized users are %s", AUTHORIZED_USER) - -# rate, capacity -mem = MemoryStorage() -# 5 minutes, 2 bursts -lim = Limiter(1 / RATE, BURST, mem) +redis = Redis() +payment = Payment() +channel = Channel() def private_use(func): @@ -69,10 +78,10 @@ def private_use(func): users = [] if users and chat_id and chat_id not in users: - message.reply_text(bot_text.private, quote=True) + message.reply_text(BotText.private, quote=True) return - # membership check + # TODO bug fix # 198 membership check if REQUIRED_MEMBERSHIP: try: if app.get_chat_member(REQUIRED_MEMBERSHIP, chat_id).status != "member": @@ -81,7 +90,7 @@ def private_use(func): logging.info("user %s check passed for group/channel %s.", chat_id, REQUIRED_MEMBERSHIP) except UserNotParticipant: logging.warning("user %s is not a member of group/channel %s", chat_id, REQUIRED_MEMBERSHIP) - message.reply_text(bot_text.membership_require, quote=True) + message.reply_text(BotText.membership_require, quote=True) return return func(client, message) @@ -94,32 +103,44 @@ def start_handler(client: "Client", message: "types.Message"): from_id = message.from_user.id logging.info("Welcome to youtube-dl bot!") client.send_chat_action(from_id, "typing") - greeting = bot_text.get_vip_greeting(from_id) - quota = bot_text.remaining_quota_caption(from_id) - custom_text = bot_text.custom_text - text = f"{greeting}{bot_text.start}\n\n{quota}\n{custom_text}" - + is_old_user = payment.check_old_user(from_id) + if is_old_user: + info = "" + elif ENABLE_VIP: + free_token, pay_token, reset = payment.get_token(from_id) + info = f"Free token: {free_token}, Pay token: {pay_token}, Reset: {reset}" + else: + info = "" + text = f"{BotText.start}\n\n{info}\n{BotText.custom_text}" client.send_message(message.chat.id, text) + # add to settings table + db.set_user_settings(from_id, "resolution", "high") @app.on_message(filters.command(["help"])) def help_handler(client: "Client", message: "types.Message"): chat_id = message.chat.id client.send_chat_action(chat_id, "typing") - client.send_message(chat_id, bot_text.help, disable_web_page_preview=True) + client.send_message(chat_id, BotText.help, disable_web_page_preview=True) + + +@app.on_message(filters.command(["about"])) +def about_handler(client: "Client", message: "types.Message"): + chat_id = message.chat.id + client.send_chat_action(chat_id, "typing") + client.send_message(chat_id, BotText.about) @app.on_message(filters.command(["sub"])) def subscribe_handler(client: "Client", message: "types.Message"): - vip = VIP() chat_id = message.chat.id client.send_chat_action(chat_id, "typing") if message.text == "/sub": - result = vip.get_user_subscription(chat_id) + result = channel.get_user_subscription(chat_id) else: link = message.text.split()[1] try: - result = vip.subscribe_channel(chat_id, link) + result = channel.subscribe_channel(chat_id, link) except (IndexError, ValueError): result = f"Error: \n{traceback.format_exc()}" client.send_message(chat_id, result or "You have no subscription.", disable_web_page_preview=True) @@ -127,15 +148,14 @@ def subscribe_handler(client: "Client", message: "types.Message"): @app.on_message(filters.command(["unsub"])) def unsubscribe_handler(client: "Client", message: "types.Message"): - vip = VIP() chat_id = message.chat.id client.send_chat_action(chat_id, "typing") text = message.text.split(" ") if len(text) == 1: - client.send_message(chat_id, "/unsubscribe channel_id", disable_web_page_preview=True) + client.send_message(chat_id, "/unsub channel_id", disable_web_page_preview=True) return - rows = vip.unsubscribe_channel(chat_id, text[1]) + rows = channel.unsubscribe_channel(chat_id, text[1]) if rows: text = f"Unsubscribed from {text[1]}" else: @@ -159,7 +179,7 @@ def uncache_handler(client: "Client", message: "types.Message"): username = message.from_user.username link = message.text.split()[1] if username == OWNER: - count = VIP().del_cache(link) + count = channel.del_cache(link) message.reply_text(f"{count} cache(s) deleted.", quote=True) @@ -179,26 +199,19 @@ def ping_handler(client: "Client", message: "types.Message"): else: bot_info = get_runtime("ytdlbot_ytdl_1", "YouTube-dl") if message.chat.username == OWNER: - stats = bot_text.ping_worker()[:1000] - client.send_document(chat_id, Redis().generate_file(), caption=f"{bot_info}\n\n{stats}") + stats = BotText.ping_worker()[:1000] + client.send_document(chat_id, redis.generate_file(), caption=f"{bot_info}\n\n{stats}") else: client.send_message(chat_id, f"{bot_info}") -@app.on_message(filters.command(["about"])) -def about_handler(client: "Client", message: "types.Message"): - chat_id = message.chat.id - client.send_chat_action(chat_id, "typing") - client.send_message(chat_id, bot_text.about) - - @app.on_message(filters.command(["sub_count"])) def sub_count_handler(client: "Client", message: "types.Message"): username = message.from_user.username chat_id = message.chat.id if username == OWNER: with BytesIO() as f: - f.write(VIP().sub_count().encode("u8")) + f.write(channel.sub_count().encode("u8")) f.name = "subscription count.txt" client.send_document(chat_id, f) @@ -207,15 +220,15 @@ def sub_count_handler(client: "Client", message: "types.Message"): def direct_handler(client: "Client", message: "types.Message"): chat_id = message.from_user.id client.send_chat_action(chat_id, "typing") - url = re.sub(r'/direct\s*', '', message.text) + url = re.sub(r"/direct\s*", "", message.text) logging.info("direct start %s", url) if not re.findall(r"^https?://", url.lower()): - Redis().update_metrics("bad_request") + redis.update_metrics("bad_request") message.reply_text("Send me a DIRECT LINK.", quote=True) return bot_msg = message.reply_text("Request received.", quote=True) - Redis().update_metrics("direct_request") + redis.update_metrics("direct_request") direct_download_entrance(bot_msg, client, url) @@ -223,8 +236,8 @@ def direct_handler(client: "Client", message: "types.Message"): def settings_handler(client: "Client", message: "types.Message"): chat_id = message.chat.id client.send_chat_action(chat_id, "typing") - data = get_user_settings(str(chat_id)) - set_mode = (data[-1]) + data = db.get_user_settings(str(chat_id)) + set_mode = data[-1] text = {"Local": "Celery", "Celery": "Local"}.get(set_mode, "Local") mode_text = f"Download mode: **{set_mode}**" if message.chat.username == OWNER: @@ -237,113 +250,115 @@ def settings_handler(client: "Client", message: "types.Message"): [ # First row InlineKeyboardButton("send as document", callback_data="document"), InlineKeyboardButton("send as video", callback_data="video"), - InlineKeyboardButton("send as audio", callback_data="audio") + InlineKeyboardButton("send as audio", callback_data="audio"), ], [ # second row InlineKeyboardButton("High Quality", callback_data="high"), InlineKeyboardButton("Medium Quality", callback_data="medium"), InlineKeyboardButton("Low Quality", callback_data="low"), ], - extra + extra, ] ) - client.send_message(chat_id, bot_text.settings.format(data[1], data[2]) + mode_text, reply_markup=markup) + client.send_message(chat_id, BotText.settings.format(data[1], data[2]) + mode_text, reply_markup=markup) -@app.on_message(filters.command(["vip"])) -def vip_handler(client: "Client", message: "types.Message"): +@app.on_message(filters.command(["buy"])) +def buy_handler(client: "Client", message: "types.Message"): # process as chat.id, not from_user.id chat_id = message.chat.id text = message.text.strip() client.send_chat_action(chat_id, "typing") - if text == "/vip": - client.send_message(chat_id, bot_text.vip, disable_web_page_preview=True) + client.send_message(chat_id, BotText.buy, disable_web_page_preview=True) + # generate telegram invoice here + payload = f"{message.chat.id}-buy" + token_count = message.text.replace("/buy", "").strip() + # currency USD + if token_count.isdigit(): + price = int(int(token_count) / TOKEN_PRICE * 100) else: - bm: typing.Union["types.Message", "typing.Any"] = message.reply_text(bot_text.vip_pay, quote=True) - unique = text.replace("/vip", "").strip() - msg = verify_payment(chat_id, unique, client) - bm.edit_text(msg) + price = 100 + invoice = generate_invoice( + price, f"Buy {TOKEN_PRICE} download tokens", "You can pay by Telegram payment or using link above", payload + ) + + app.send( + functions.messages.SendMedia( + peer=(raw_types.InputPeerUser(user_id=chat_id, access_hash=0)), + media=invoice, + random_id=app.rnd_id(), + message="Buy more download token", + ) + ) + + +@app.on_message(filters.command(["redeem"])) +def redeem_handler(client: "Client", message: "types.Message"): + chat_id = message.chat.id + text = message.text.strip() + unique = text.replace("/redeem", "").strip() + msg = payment.verify_payment(chat_id, unique) + message.reply_text(msg, quote=True) def generate_invoice(amount: "int", title: "str", description: "str", payload: "str"): invoice = raw_types.input_media_invoice.InputMediaInvoice( - invoice=( - raw_types.invoice.Invoice(currency="USD", prices=[raw_types.LabeledPrice(label="price", amount=amount)])), + invoice=raw_types.invoice.Invoice( + currency="USD", prices=[raw_types.LabeledPrice(label="price", amount=amount)] + ), title=title, description=description, provider=PROVIDER_TOKEN, provider_data=raw_types.DataJSON(data="{}"), payload=payload.encode(), - start_param=payload + start_param=payload, ) return invoice -# payment related -@app.on_message(filters.command(["topup"])) -def topup_handler(client: "Client", message: "types.Message"): - chat_id = message.chat.id - client.send_chat_action(chat_id, "typing") - invoice = generate_invoice(100, bot_text.topup_title, bot_text.topup_description, - f"{message.chat.id}-topup") - - app.send( - functions.messages.SendMedia( - peer=(raw_types.InputPeerUser(user_id=chat_id, access_hash=0)), - media=invoice, - random_id=app.rnd_id(), - message="Please use your card to pay for more traffic" - ) - ) - - -@app.on_message(filters.command(["tgvip"])) -def tgvip_handler(client: "Client", message: "types.Message"): - chat_id = message.chat.id - client.send_chat_action(chat_id, "typing") - invoice = generate_invoice(1000, f"VIP1", f"pay USD${MULTIPLY} for VIP1", f"{message.chat.id}-vip1".encode()) - - app.send( - functions.messages.SendMedia( - peer=(raw_types.InputPeerUser(user_id=chat_id, access_hash=0)), - media=invoice, - random_id=app.rnd_id(), - message="Please use your card to pay for more traffic" - ) - ) - - @app.on_message(filters.incoming & filters.text) @private_use def download_handler(client: "Client", message: "types.Message"): - # check remaining quota - red = Redis() chat_id = message.from_user.id - client.send_chat_action(chat_id, 'typing') - red.user_count(chat_id) + client.send_chat_action(chat_id, "typing") + redis.user_count(chat_id) - url = re.sub(r'/ytdl\s*', '', message.text) + url = re.sub(r"/ytdl\s*", "", message.text) logging.info("start %s", url) + # check url if not re.findall(r"^https?://", url.lower()): - red.update_metrics("bad_request") + redis.update_metrics("bad_request") message.reply_text("I think you should send me a link.", quote=True) return + # disable by default if not PLAYLIST_SUPPORT: - if re.findall(r"^https://www\.youtube\.com/channel/", VIP.extract_canonical_link(url)) or "list" in url: - message.reply_text("Channel/list download is disabled now. Please send me individual video link.", quote=True) - red.update_metrics("reject_channel") + if re.findall(r"^https://www\.youtube\.com/channel/", Channel.extract_canonical_link(url)) or "list" in url: + message.reply_text( + "The ability to download a channel or list has been disabled." + "Kindly provide me with the specific link for each video instead.", + quote=True, + ) + redis.update_metrics("reject_channel") return - # non vip user, consume too many token - if (not VIP().check_vip(chat_id)) and (not lim.consume(str(chat_id).encode(), 1)): - red.update_metrics("rate_limit") - message.reply_text(bot_text.too_fast, quote=True) - return - red.update_metrics("video_request") - text = bot_text.get_receive_link_text() + # old user is not limited by token + if ENABLE_VIP and not payment.check_old_user(chat_id): + free, pay, reset = payment.get_token(chat_id) + if free + pay <= 0: + message.reply_text( + f"You don't have enough token. Please wait until {reset} or /buy more token.", quote=True + ) + redis.update_metrics("reject_token") + return + else: + payment.use_token(chat_id) + + redis.update_metrics("video_request") + + text = BotText.get_receive_link_text() try: # raise pyrogram.errors.exceptions.FloodWait(10) bot_msg: typing.Union["types.Message", "typing.Any"] = message.reply_text(text, quote=True) @@ -352,15 +367,16 @@ def download_handler(client: "Client", message: "types.Message"): f.write(str(e).encode()) f.write(b"Your job will be done soon. Just wait! Don't rush.") f.name = "Please don't flood me.txt" - bot_msg = message.reply_document(f, caption=f"Flood wait! Please wait {e.x} seconds...." - f"Your job will start automatically", quote=True) + bot_msg = message.reply_document( + f, caption=f"Flood wait! Please wait {e.x} seconds...." f"Your job will start automatically", quote=True + ) f.close() client.send_message(OWNER, f"Flood wait! ๐Ÿ™ {e.x} seconds....") time.sleep(e.x) - client.send_chat_action(chat_id, 'upload_video') + client.send_chat_action(chat_id, "upload_video") bot_msg.chat = message.chat - ytdl_download_entrance(bot_msg, client, url) + ytdl_download_entrance(client, bot_msg, url) @app.on_callback_query(filters.regex(r"document|video|audio")) @@ -368,7 +384,7 @@ def send_method_callback(client: "Client", callback_query: types.CallbackQuery): chat_id = callback_query.message.chat.id data = callback_query.data logging.info("Setting %s file type to %s", chat_id, data) - set_user_settings(chat_id, "method", data) + db.set_user_settings(chat_id, "method", data) callback_query.answer(f"Your send type was set to {callback_query.data}") @@ -377,7 +393,7 @@ def download_resolution_callback(client: "Client", callback_query: types.Callbac chat_id = callback_query.message.chat.id data = callback_query.data logging.info("Setting %s file type to %s", chat_id, data) - set_user_settings(chat_id, "resolution", data) + db.set_user_settings(chat_id, "resolution", data) callback_query.answer(f"Your default download quality was set to {callback_query.data}") @@ -389,7 +405,7 @@ def audio_callback(client: "Client", callback_query: types.CallbackQuery): return callback_query.answer(f"Converting to audio...please wait patiently") - Redis().update_metrics("audio_request") + redis.update_metrics("audio_request") vmsg = callback_query.message audio_entrance(vmsg, client) @@ -397,24 +413,23 @@ def audio_callback(client: "Client", callback_query: types.CallbackQuery): @app.on_callback_query(filters.regex(r"Local|Celery")) def owner_local_callback(client: "Client", callback_query: types.CallbackQuery): chat_id = callback_query.message.chat.id - set_user_settings(chat_id, "mode", callback_query.data) + db.set_user_settings(chat_id, "mode", callback_query.data) callback_query.answer(f"Download mode was changed to {callback_query.data}") def periodic_sub_check(): - vip = VIP() exceptions = pyrogram.errors.exceptions - for cid, uids in vip.group_subscriber().items(): - video_url = vip.has_newer_update(cid) + for cid, uids in channel.group_subscriber().items(): + video_url = channel.has_newer_update(cid) if video_url: logging.info(f"periodic update:{video_url} - {uids}") for uid in uids: try: bot_msg = app.send_message(uid, f"{video_url} is downloading...", disable_web_page_preview=True) ytdl_download_entrance(bot_msg, app, video_url) - except(exceptions.bad_request_400.PeerIdInvalid, exceptions.bad_request_400.UserIsBlocked) as e: + except (exceptions.bad_request_400.PeerIdInvalid, exceptions.bad_request_400.UserIsBlocked) as e: logging.warning("User is blocked or deleted. %s", e) - vip.deactivate_user_subscription(uid) + channel.deactivate_user_subscription(uid) except Exception as e: logging.error("Unknown error when sending message to user. %s", traceback.format_exc()) finally: @@ -424,44 +439,29 @@ def periodic_sub_check(): @app.on_raw_update() def raw_update(client: "Client", update, users, chats): action = getattr(getattr(update, "message", None), "action", None) - if update.QUALNAME == 'types.UpdateBotPrecheckoutQuery': + if update.QUALNAME == "types.UpdateBotPrecheckoutQuery": client.send( functions.messages.SetBotPrecheckoutResults( query_id=update.query_id, success=True, ) ) - elif action and action.QUALNAME == 'types.MessageActionPaymentSentMe': + elif action and action.QUALNAME == "types.MessageActionPaymentSentMe": logging.info("Payment received. %s", action) uid = update.message.peer_id.user_id - vip = VIP() - amount = f"{action.total_amount / 100} {action.currency}" - if "vip" in action.payload.decode(): - ud = { - "user_id": uid, - "username": users.get(uid).username, - "payment_amount": 10, - "payment_id": 0, - "level": 1, - "quota": QUOTA * 2 - } - vip.direct_add_vip(ud) - client.send_message(uid, f"Thank you {uid}. VIP payment received: {amount}") - - else: - vip.set_topup(uid) - client.send_message(uid, f"Thank you {uid}. Top up payment received: {amount}") + amount = action.total_amount / 100 + payment.add_pay_user([uid, amount, action.charge.provider_charge_id, 0, amount * TOKEN_PRICE]) + client.send_message(uid, f"Thank you {uid}. Payment received: {amount} {action.currency}") -if __name__ == '__main__': - MySQL() - scheduler = BackgroundScheduler(timezone="Europe/Stockholm", job_defaults={'max_instances': 5}) - scheduler.add_job(Redis().reset_today, 'cron', hour=0, minute=0) - scheduler.add_job(auto_restart, 'interval', seconds=60) - scheduler.add_job(clean_tempfile, 'interval', seconds=60) - scheduler.add_job(InfluxDB().collect_data, 'interval', seconds=60) +if __name__ == "__main__": + scheduler = BackgroundScheduler(timezone="Asia/Shanghai", job_defaults={"max_instances": 5}) + scheduler.add_job(redis.reset_today, "cron", hour=0, minute=0) + scheduler.add_job(auto_restart, "interval", seconds=60) + scheduler.add_job(clean_tempfile, "interval", seconds=60) + scheduler.add_job(InfluxDB().collect_data, "interval", seconds=60) # default quota allocation of 10,000 units per day - scheduler.add_job(periodic_sub_check, 'interval', seconds=60 * 60) + scheduler.add_job(periodic_sub_check, "interval", seconds=3600) scheduler.start() banner = f""" โ–Œ โ–Œ โ–€โ–›โ–˜ โ–Œ โ–›โ–€โ–– โ–œ โ–Œ @@ -469,7 +469,7 @@ if __name__ == '__main__': โ–Œ โ–Œ โ–Œ โ–Œ โ–Œ โ–Œ โ–Œ โ–Œ โ–Œ โ–Œ โ–›โ–€ โ–Œ โ–Œ โ–Œ โ–Œ โ–โ–โ– โ–Œ โ–Œ โ– โ–Œ โ–Œ โ–žโ–€โ–Œ โ–Œ โ–Œ โ–˜ โ–โ–€ โ–โ–€โ–˜ โ–˜ โ–โ–€โ–˜ โ–€โ–€ โ–โ–€โ–˜ โ–€โ–€ โ–โ–€ โ–˜โ–˜ โ–˜ โ–˜ โ–˜ โ–โ–€ โ–โ–€โ–˜ โ–โ–€โ–˜ -By @BennyThink, VIP mode: {ENABLE_VIP}, Distribution: {ENABLE_CELERY} +By @BennyThink, VIP mode: {ENABLE_VIP}, Celery Mode: {ENABLE_CELERY} Version: {get_revision()} """ print(banner)