telegram_media_downloader/media_downloader.py

376 lines
11 KiB
Python
Raw Normal View History

2019-11-06 20:07:25 +08:00
"""Downloads media from telegram."""
import asyncio
2019-07-24 23:42:25 +08:00
import logging
import os
from typing import List, Optional, Tuple, Union
2019-07-24 23:42:25 +08:00
2019-11-06 20:07:25 +08:00
import pyrogram
2020-07-11 05:38:57 +08:00
import yaml
from pyrogram.types import Audio, Document, Photo, Video, VideoNote, Voice
from rich.logging import RichHandler
2020-07-22 01:36:03 +08:00
from utils.file_management import get_next_name, manage_duplicate_file
2020-12-31 21:27:41 +08:00
from utils.log import LogFilter
2020-12-30 19:15:27 +08:00
from utils.meta import print_meta
from utils.updates import check_for_updates
2019-07-24 23:42:25 +08:00
logging.basicConfig(
level=logging.INFO,
format="%(message)s",
datefmt="[%X]",
handlers=[RichHandler()],
)
2020-12-31 21:27:41 +08:00
logging.getLogger("pyrogram.session.session").addFilter(LogFilter())
logging.getLogger("pyrogram.client").addFilter(LogFilter())
logger = logging.getLogger("media_downloader")
2019-07-24 23:42:25 +08:00
THIS_DIR = os.path.dirname(os.path.abspath(__file__))
FAILED_IDS: list = []
DOWNLOADED_IDS: list = []
2019-07-24 23:42:25 +08:00
def update_config(config: dict):
2020-12-13 07:31:01 +08:00
"""
2022-03-01 03:39:44 +08:00
Update existing configuration file.
Parameters
----------
2020-12-13 00:05:44 +08:00
config: dict
2022-03-01 03:39:44 +08:00
Configuration to be written into config file.
"""
config["ids_to_retry"] = (
list(set(config["ids_to_retry"]) - set(DOWNLOADED_IDS)) + FAILED_IDS
)
with open("config.yaml", "w") as yaml_file:
yaml.dump(config, yaml_file, default_flow_style=False)
logger.info("Updated last read message_id to config file")
2019-07-24 23:42:25 +08:00
def _can_download(_type: str, file_formats: dict, file_format: Optional[str]) -> bool:
2020-12-13 00:05:44 +08:00
"""
Check if the given file format can be downloaded.
Parameters
----------
_type: str
Type of media object.
file_formats: dict
Dictionary containing the list of file_formats
to be downloaded for `audio`, `document` & `video`
media types
file_format: str
Format of the current file to be downloaded.
Returns
-------
bool
True if the file format can be downloaded else False.
"""
2020-07-22 02:05:26 +08:00
if _type in ["audio", "document", "video"]:
allowed_formats: list = file_formats[_type]
if not file_format in allowed_formats and allowed_formats[0] != "all":
return False
return True
def _is_exist(file_path: str) -> bool:
2020-12-13 00:05:44 +08:00
"""
Check if a file exists and it is not a directory.
Parameters
----------
file_path: str
Absolute path of the file to be checked.
Returns
-------
bool
True if the file exists else False.
"""
2020-07-22 02:05:26 +08:00
return not os.path.isdir(file_path) and os.path.exists(file_path)
async def _get_media_meta(
media_obj: Union[Audio, Document, Photo, Video, VideoNote, Voice],
_type: str,
) -> Tuple[str, Optional[str]]:
"""Extract file name and file id from media object.
Parameters
----------
media_obj: Union[Audio, Document, Photo, Video, VideoNote, Voice]
Media object to be extracted.
2020-12-13 00:05:44 +08:00
_type: str
Type of media object.
Returns
-------
Tuple[str, Optional[str]]
file_name, file_format
"""
2020-06-11 22:05:41 +08:00
if _type in ["audio", "document", "video"]:
# pylint: disable = C0301
file_format: Optional[str] = media_obj.mime_type.split("/")[-1] # type: ignore
2020-06-11 22:05:41 +08:00
else:
file_format = None
if _type in ["voice", "video_note"]:
# pylint: disable = C0209
file_format = media_obj.mime_type.split("/")[-1] # type: ignore
file_name: str = os.path.join(
THIS_DIR,
_type,
"{}_{}.{}".format(
_type,
media_obj.date.isoformat(), # type: ignore
file_format,
),
)
else:
2020-07-15 18:44:46 +08:00
file_name = os.path.join(
THIS_DIR, _type, getattr(media_obj, "file_name", None) or ""
)
return file_name, file_format
2020-06-11 22:05:41 +08:00
async def download_media(
2020-11-03 00:04:32 +08:00
client: pyrogram.client.Client,
message: pyrogram.types.Message,
2020-06-11 22:05:41 +08:00
media_types: List[str],
file_formats: dict,
):
2020-12-13 00:05:44 +08:00
"""
Download media from Telegram.
Each of the files to download are retried 3 times with a
delay of 5 seconds each.
Parameters
----------
2020-11-03 00:04:32 +08:00
client: pyrogram.client.Client
Client to interact with Telegram APIs.
2020-11-03 00:04:32 +08:00
message: pyrogram.types.Message
2022-03-01 03:39:44 +08:00
Message object retrieved from telegram.
media_types: list
List of strings of media types to be downloaded.
Ex : `["audio", "photo"]`
Supported formats:
* audio
* document
* photo
* video
* voice
2020-06-11 22:05:41 +08:00
file_formats: dict
Dictionary containing the list of file_formats
to be downloaded for `audio`, `document` & `video`
2020-12-13 00:05:44 +08:00
media types.
Returns
-------
2020-12-13 00:05:44 +08:00
int
Current message id.
"""
for retry in range(3):
try:
if message.media is None:
2022-07-18 22:48:31 +08:00
return message.id
for _type in media_types:
_media = getattr(message, _type, None)
if _media is None:
continue
file_name, file_format = await _get_media_meta(_media, _type)
2020-06-11 22:05:41 +08:00
if _can_download(_type, file_formats, file_format):
2020-07-22 01:36:03 +08:00
if _is_exist(file_name):
file_name = get_next_name(file_name)
download_path = await client.download_media(
message, file_name=file_name
2020-07-22 01:36:03 +08:00
)
# pylint: disable = C0301
download_path = manage_duplicate_file(download_path) # type: ignore
2020-07-22 01:36:03 +08:00
else:
download_path = await client.download_media(
message, file_name=file_name
2020-07-22 01:36:03 +08:00
)
if download_path:
logger.info("Media downloaded - %s", download_path)
2022-07-18 22:48:31 +08:00
DOWNLOADED_IDS.append(message.id)
break
except pyrogram.errors.exceptions.bad_request_400.BadRequest:
logger.warning(
"Message[%d]: file reference expired, refetching...",
2022-07-18 22:48:31 +08:00
message.id,
)
message = await client.get_messages( # type: ignore
chat_id=message.chat.id, # type: ignore
2022-07-18 22:48:31 +08:00
message_ids=message.id,
)
if retry == 2:
# pylint: disable = C0301
logger.error(
"Message[%d]: file reference expired for 3 retries, download skipped.",
2022-07-18 22:48:31 +08:00
message.id,
)
2022-07-18 22:48:31 +08:00
FAILED_IDS.append(message.id)
except TypeError:
# pylint: disable = C0301
logger.warning(
2022-03-01 03:39:44 +08:00
"Timeout Error occurred when downloading Message[%d], retrying after 5 seconds",
2022-07-18 22:48:31 +08:00
message.id,
)
await asyncio.sleep(5)
if retry == 2:
logger.error(
"Message[%d]: Timing out after 3 reties, download skipped.",
2022-07-18 22:48:31 +08:00
message.id,
)
2022-07-18 22:48:31 +08:00
FAILED_IDS.append(message.id)
except Exception as e:
# pylint: disable = C0301
logger.error(
"Message[%d]: could not be downloaded due to following exception:\n[%s].",
2022-07-18 22:48:31 +08:00
message.id,
e,
exc_info=True,
)
2022-07-18 22:48:31 +08:00
FAILED_IDS.append(message.id)
break
2022-07-18 22:48:31 +08:00
return message.id
async def process_messages(
2020-11-03 00:04:32 +08:00
client: pyrogram.client.Client,
messages: List[pyrogram.types.Message],
media_types: List[str],
2020-06-11 22:05:41 +08:00
file_formats: dict,
) -> int:
2020-12-13 00:05:44 +08:00
"""
Download media from Telegram.
Parameters
----------
2020-11-03 00:04:32 +08:00
client: pyrogram.client.Client
Client to interact with Telegram APIs.
2020-07-10 05:55:28 +08:00
messages: list
List of telegram messages.
media_types: list
List of strings of media types to be downloaded.
Ex : `["audio", "photo"]`
Supported formats:
* audio
* document
* photo
* video
* voice
2020-06-11 22:05:41 +08:00
file_formats: dict
Dictionary containing the list of file_formats
to be downloaded for `audio`, `document` & `video`
2020-12-13 00:05:44 +08:00
media types.
Returns
-------
2020-12-13 00:05:44 +08:00
int
Max value of list of message ids.
"""
message_ids = await asyncio.gather(
*[
2020-06-11 22:05:41 +08:00
download_media(client, message, media_types, file_formats)
2020-07-10 05:55:28 +08:00
for message in messages
]
2019-07-24 23:42:25 +08:00
)
last_message_id: int = max(message_ids)
return last_message_id
2019-07-24 23:42:25 +08:00
2020-12-13 00:05:44 +08:00
async def begin_import(config: dict, pagination_limit: int) -> dict:
"""
Create pyrogram client and initiate download.
The pyrogram client is created using the ``api_id``, ``api_hash``
2022-03-01 03:39:44 +08:00
from the config and iter through message offset on the
2020-12-13 00:05:44 +08:00
``last_message_id`` and the requested file_formats.
Parameters
----------
config: dict
Dict containing the config to create pyrogram client.
pagination_limit: int
Number of message to download asynchronously as a batch.
Returns
-------
dict
2022-03-01 03:39:44 +08:00
Updated configuration to be written into config file.
2020-12-13 00:05:44 +08:00
"""
2019-11-06 20:07:25 +08:00
client = pyrogram.Client(
"media_downloader",
api_id=config["api_id"],
api_hash=config["api_hash"],
2019-07-24 23:42:25 +08:00
)
await client.start()
2020-07-10 05:55:28 +08:00
last_read_message_id: int = config["last_read_message_id"]
2022-07-18 22:48:31 +08:00
messages_iter = client.get_chat_history(
config["chat_id"], offset_id=last_read_message_id, reverse=True
2019-07-24 23:42:25 +08:00
)
2020-07-10 05:55:28 +08:00
messages_list: list = []
pagination_count: int = 0
if config["ids_to_retry"]:
logger.info("Downloading files failed during last run...")
2022-03-01 03:45:34 +08:00
skipped_messages: list = await client.get_messages( # type: ignore
chat_id=config["chat_id"], message_ids=config["ids_to_retry"]
)
for message in skipped_messages:
pagination_count += 1
messages_list.append(message)
2020-07-10 05:55:28 +08:00
async for message in messages_iter: # type: ignore
2020-07-10 20:08:06 +08:00
if pagination_count != pagination_limit:
2020-07-10 05:55:28 +08:00
pagination_count += 1
messages_list.append(message)
else:
last_read_message_id = await process_messages(
client,
messages_list,
config["media_types"],
config["file_formats"],
)
pagination_count = 0
messages_list = []
messages_list.append(message)
2020-12-10 22:05:44 +08:00
config["last_read_message_id"] = last_read_message_id
update_config(config)
2020-07-10 05:55:28 +08:00
if messages_list:
last_read_message_id = await process_messages(
client,
messages_list,
config["media_types"],
config["file_formats"],
)
await client.stop()
2020-07-10 05:55:28 +08:00
config["last_read_message_id"] = last_read_message_id
return config
2019-07-24 23:42:25 +08:00
def main():
"""Main function of the downloader."""
with open(os.path.join(THIS_DIR, "config.yaml")) as f:
config = yaml.safe_load(f)
updated_config = asyncio.get_event_loop().run_until_complete(
2020-07-10 20:08:06 +08:00
begin_import(config, pagination_limit=100)
)
if FAILED_IDS:
logger.info(
"Downloading of %d files failed. "
"Failed message ids are added to config file.\n"
"These files will be downloaded on the next run.",
len(set(FAILED_IDS)),
)
update_config(updated_config)
check_for_updates()
if __name__ == "__main__":
2020-12-30 19:15:27 +08:00
print_meta(logger)
main()