#!/usr/bin/env python3 # Telegram Download Daemon # Author: Alfonso E.M. # You need to install telethon (and cryptg to speed up downloads) from os import getenv, path from shutil import move import subprocess import math import time import random import string import os.path from mimetypes import guess_extension from sessionManager import getSession, saveSession from telethon import TelegramClient, events, __version__ from telethon.tl.types import PeerChannel, DocumentAttributeFilename, DocumentAttributeVideo import logging logging.basicConfig(format='[%(levelname) 5s/%(asctime)s]%(name)s:%(message)s', level=logging.WARNING) import multiprocessing import argparse import asyncio TDD_VERSION="1.14" TELEGRAM_DAEMON_API_ID = getenv("TELEGRAM_DAEMON_API_ID") TELEGRAM_DAEMON_API_HASH = getenv("TELEGRAM_DAEMON_API_HASH") TELEGRAM_DAEMON_CHANNEL = getenv("TELEGRAM_DAEMON_CHANNEL") TELEGRAM_DAEMON_SESSION_PATH = getenv("TELEGRAM_DAEMON_SESSION_PATH") TELEGRAM_DAEMON_DEST=getenv("TELEGRAM_DAEMON_DEST", "/telegram-downloads") TELEGRAM_DAEMON_TEMP=getenv("TELEGRAM_DAEMON_TEMP", "") TELEGRAM_DAEMON_DUPLICATES=getenv("TELEGRAM_DAEMON_DUPLICATES", "rename") TELEGRAM_DAEMON_TEMP_SUFFIX="tdd" TELEGRAM_DAEMON_WORKERS=getenv("TELEGRAM_DAEMON_WORKERS", multiprocessing.cpu_count()) parser = argparse.ArgumentParser( description="Script to download files from a Telegram Channel.") parser.add_argument( "--api-id", required=TELEGRAM_DAEMON_API_ID == None, type=int, default=TELEGRAM_DAEMON_API_ID, help= 'api_id from https://core.telegram.org/api/obtaining_api_id (default is TELEGRAM_DAEMON_API_ID env var)' ) parser.add_argument( "--api-hash", required=TELEGRAM_DAEMON_API_HASH == None, type=str, default=TELEGRAM_DAEMON_API_HASH, help= 'api_hash from https://core.telegram.org/api/obtaining_api_id (default is TELEGRAM_DAEMON_API_HASH env var)' ) parser.add_argument( "--dest", type=str, default=TELEGRAM_DAEMON_DEST, help= 'Destination path for downloaded files (default is /telegram-downloads).') parser.add_argument( "--temp", type=str, default=TELEGRAM_DAEMON_TEMP, help= 'Destination path for temporary files (default is using the same downloaded files directory).') parser.add_argument( "--channel", required=TELEGRAM_DAEMON_CHANNEL == None, type=int, default=TELEGRAM_DAEMON_CHANNEL, help= 'Channel id to download from it (default is TELEGRAM_DAEMON_CHANNEL env var' ) parser.add_argument( "--duplicates", choices=["ignore", "rename", "overwrite"], type=str, default=TELEGRAM_DAEMON_DUPLICATES, help= '"ignore"=do not download duplicated files, "rename"=add a random suffix, "overwrite"=redownload and overwrite.' ) parser.add_argument( "--workers", type=int, default=TELEGRAM_DAEMON_WORKERS, help= 'number of simultaneous downloads' ) args = parser.parse_args() api_id = args.api_id api_hash = args.api_hash channel_id = args.channel downloadFolder = args.dest tempFolder = args.temp duplicates=args.duplicates worker_count = args.workers updateFrequency = 10 lastUpdate = 0 if not tempFolder: tempFolder = downloadFolder # Edit these lines: proxy = None # End of interesting parameters async def sendHelloMessage(client, peerChannel): entity = await client.get_entity(peerChannel) print("Telegram Download Daemon "+TDD_VERSION+" using Telethon "+__version__) print(" Simultaneous downloads:"+str(worker_count)) await client.send_message(entity, "Telegram Download Daemon "+TDD_VERSION+" using Telethon "+__version__) await client.send_message(entity, "Hi! Ready for your files!") async def log_reply(message, reply): print(reply) await message.edit(reply) def getRandomId(len): chars=string.ascii_lowercase + string.digits return ''.join(random.choice(chars) for x in range(len)) def getFilename(event: events.NewMessage.Event): mediaFileName = "unknown" if hasattr(event.media, 'photo'): mediaFileName = str(event.media.photo.id)+".jpeg" elif hasattr(event.media, 'document'): for attribute in event.media.document.attributes: if isinstance(attribute, DocumentAttributeFilename): mediaFileName=attribute.file_name break if isinstance(attribute, DocumentAttributeVideo): if event.original_update.message.message != '': mediaFileName = event.original_update.message.message else: mediaFileName = str(event.message.media.document.id) mediaFileName+=guess_extension(event.message.media.document.mime_type) mediaFileName="".join(c for c in mediaFileName if c.isalnum() or c in "()._- ") return mediaFileName in_progress={} async def set_progress(filename, message, received, total): global lastUpdate global updateFrequency if received >= total: try: in_progress.pop(filename) except: pass return percentage = math.trunc(received / total * 10000) / 100 progress_message= "{0} % ({1} / {2})".format(percentage, received, total) in_progress[filename] = progress_message currentTime=time.time() if (currentTime - lastUpdate) > updateFrequency: await log_reply(message, progress_message) lastUpdate=currentTime with TelegramClient(getSession(), api_id, api_hash, proxy=proxy).start() as client: saveSession(client.session) queue = asyncio.Queue() peerChannel = PeerChannel(channel_id) @client.on(events.NewMessage()) async def handler(event): if event.to_id != peerChannel: return print(event) try: if not event.media and event.message: command = event.message.message command = command.lower() output = "Unknown command" if command == "list": output = subprocess.run(["ls -l "+downloadFolder], shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT).stdout.decode('utf-8') elif command == "status": try: output = "".join([ "{0}: {1}\n".format(key,value) for (key, value) in in_progress.items()]) if output: output = "Active downloads:\n\n" + output else: output = "No active downloads" except: output = "Some error occured while checking the status. Retry." elif command == "clean": output = "Cleaning "+tempFolder+"\n" output+=subprocess.run(["rm "+tempFolder+"/*."+TELEGRAM_DAEMON_TEMP_SUFFIX], shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT).stdout elif command == "queue": try: files_in_queue = [] for q in queue.__dict__['_queue']: files_in_queue.append(getFilename(q[0])) output = "".join([ "{0}\n".format(filename) for (filename) in files_in_queue]) if output: output = "Files in queue:\n\n" + output else: output = "Queue is empty" except: output = "Some error occured while checking the queue. Retry." else: output = "Available commands: list, status, clean, queue" await log_reply(event, output) if event.media: if hasattr(event.media, 'document') or hasattr(event.media,'photo'): filename=getFilename(event) if ( path.exists("{0}/{1}.{2}".format(tempFolder,filename,TELEGRAM_DAEMON_TEMP_SUFFIX)) or path.exists("{0}/{1}".format(downloadFolder,filename)) ) and duplicates == "ignore": message=await event.reply("{0} already exists. Ignoring it.".format(filename)) else: message=await event.reply("{0} added to queue".format(filename)) await queue.put([event, message]) else: message=await event.reply("That is not downloadable. Try to send it as a file.") except Exception as e: print('Events handler error: ', e) async def worker(): while True: try: element = await queue.get() event=element[0] message=element[1] filename=getFilename(event) fileName, fileExtension = os.path.splitext(filename) tempfilename=fileName+"-"+getRandomId(8)+fileExtension if path.exists("{0}/{1}.{2}".format(tempFolder,tempfilename,TELEGRAM_DAEMON_TEMP_SUFFIX)) or path.exists("{0}/{1}".format(downloadFolder,filename)): if duplicates == "rename": filename=tempfilename if hasattr(event.media, 'photo'): size = 0 else: size=event.media.document.size await log_reply( message, "Downloading file {0} ({1} bytes)".format(filename,size) ) download_callback = lambda received, total: set_progress(filename, message, received, total) await client.download_media(event.message, "{0}/{1}.{2}".format(tempFolder,filename,TELEGRAM_DAEMON_TEMP_SUFFIX), progress_callback = download_callback) set_progress(filename, message, 100, 100) move("{0}/{1}.{2}".format(tempFolder,filename,TELEGRAM_DAEMON_TEMP_SUFFIX), "{0}/{1}".format(downloadFolder,filename)) await log_reply(message, "{0} ready".format(filename)) queue.task_done() except Exception as e: try: await log_reply(message, "Error: {}".format(str(e))) # If it failed, inform the user about it. except: pass print('Queue worker error: ', e) async def start(): tasks = [] loop = asyncio.get_event_loop() for i in range(worker_count): task = loop.create_task(worker()) tasks.append(task) await sendHelloMessage(client, peerChannel) await client.run_until_disconnected() for task in tasks: task.cancel() await asyncio.gather(*tasks, return_exceptions=True) client.loop.run_until_complete(start())