"""Downloads media from telegram.""" import os import logging from typing import List, Tuple, Optional from datetime import datetime as dt import asyncio import pyrogram import yaml from utils.file_management import get_next_name, manage_duplicate_file logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) THIS_DIR = os.path.dirname(os.path.abspath(__file__)) def update_config(config: dict): """Update exisitng configuration file. Parameters ---------- config: dictionary Configuraiton to be written into config file. """ with open("config.yaml", "w") as yaml_file: yaml.dump(config, yaml_file, default_flow_style=False) logger.info("Updated last read message_id to config file") def _can_download( _type: str, file_formats: dict, file_format: Optional[str] ) -> bool: """Check if the given file format can be downloaded""" if _type in ["audio", "document", "video"]: allowed_formats: list = file_formats[_type] if not file_format in allowed_formats and allowed_formats[0] != "all": return False return True def _is_exist(file_path: str) -> bool: """Check if a file exists and it is not a directory""" return not os.path.isdir(file_path) and os.path.exists(file_path) async def _get_media_meta( media_obj: pyrogram.types.messages_and_media, _type: str ) -> Tuple[str, str, Optional[str]]: """Extract file name and file id. Parameters ---------- media_obj: pyrogram.types.messages_and_media Media object to be extracted. _type: string Type of media object. Returns ------- tuple file_ref, file_name, file_format """ file_ref: str = media_obj.file_ref if _type in ["audio", "document", "video"]: file_format: Optional[str] = media_obj.mime_type.split("/")[-1] else: file_format = None if _type == "voice": file_format = media_obj.mime_type.split("/")[-1] file_name: str = os.path.join( THIS_DIR, _type, "voice_{}.{}".format( dt.utcfromtimestamp(media_obj.date).isoformat(), file_format ), ) else: file_name = os.path.join( THIS_DIR, _type, getattr(media_obj, "file_name", None) or "" ) return file_ref, file_name, file_format async def download_media( client: pyrogram.client.Client, message: pyrogram.types.Message, media_types: List[str], file_formats: dict, ): """Download media from Telegram. Parameters ---------- client: pyrogram.client.Client Client to interact with Telegram APIs. message: pyrogram.types.Message Message object retrived from telegram. media_types: list List of strings of media types to be downloaded. Ex : `["audio", "photo"]` Supported formats: * audio * document * photo * video * voice file_formats: dict Dictionary containing the list of file_formats to be downloaded for `audio`, `document` & `video` media types Returns ------- integer message_id """ if message.media: for _type in media_types: _media = getattr(message, _type, None) if _media: file_ref, file_name, file_format = await _get_media_meta( _media, _type ) if _can_download(_type, file_formats, file_format): if _is_exist(file_name): file_name = get_next_name(file_name) download_path = await client.download_media( message, file_ref=file_ref, file_name=file_name ) download_path = manage_duplicate_file(download_path) else: download_path = await client.download_media( message, file_ref=file_ref, file_name=file_name ) logger.info("Media downloaded - %s", download_path) return message.message_id async def process_messages( client: pyrogram.client.Client, messages: List[pyrogram.types.Message], media_types: List[str], file_formats: dict, ) -> int: """Download media from Telegram. Parameters ---------- client: pyrogram.client.Client Client to interact with Telegram APIs. messages: list List of telegram messages. media_types: list List of strings of media types to be downloaded. Ex : `["audio", "photo"]` Supported formats: * audio * document * photo * video * voice file_formats: dict Dictionary containing the list of file_formats to be downloaded for `audio`, `document` & `video` media types Returns ------- integer last_message_id """ message_ids = await asyncio.gather( *[ download_media(client, message, media_types, file_formats) for message in messages ] ) last_message_id = max(message_ids) return last_message_id async def begin_import(config: dict, pagination_limit: int): """Skeleton fucntion that creates client and import, write config""" client = pyrogram.Client( "media_downloader", api_id=config["api_id"], api_hash=config["api_hash"], ) await client.start() last_read_message_id: int = config["last_read_message_id"] messages_iter = client.iter_history( config["chat_id"], offset_id=last_read_message_id, reverse=True, ) pagination_count: int = 0 messages_list: list = [] async for message in messages_iter: if pagination_count != pagination_limit: pagination_count += 1 messages_list.append(message) else: last_read_message_id = await process_messages( client, messages_list, config["media_types"], config["file_formats"], ) pagination_count = 0 messages_list = [] messages_list.append(message) if messages_list: last_read_message_id = await process_messages( client, messages_list, config["media_types"], config["file_formats"], ) await client.stop() config["last_read_message_id"] = last_read_message_id return config if __name__ == "__main__": f = open(os.path.join(THIS_DIR, "config.yaml")) config = yaml.safe_load(f) f.close() updated_config = asyncio.get_event_loop().run_until_complete( begin_import(config, pagination_limit=100) ) update_config(updated_config)