mirror of
https://github.com/Dineshkarthik/telegram_media_downloader.git
synced 2025-01-08 15:17:43 +08:00
348 lines
10 KiB
Python
348 lines
10 KiB
Python
"""Unittest module for media downloader."""
|
|
import os
|
|
import copy
|
|
import logging
|
|
import platform
|
|
import unittest
|
|
|
|
import mock
|
|
import pytest
|
|
import asyncio
|
|
|
|
from media_downloader import (
|
|
_get_media_meta,
|
|
download_media,
|
|
update_config,
|
|
begin_import,
|
|
process_messages,
|
|
)
|
|
|
|
MOCK_DIR: str = "/root/project"
|
|
if platform.system() == "Windows":
|
|
MOCK_DIR = "\\root\\project"
|
|
MOCK_CONF = {
|
|
"api_id": 123,
|
|
"api_hash": "hasw5Tgawsuj67",
|
|
"last_read_message_id": 0,
|
|
"chat_id": 8654123,
|
|
"media_types": ["audio", "voice"],
|
|
"file_formats": {"audio": ["all"], "voice": ["all"]},
|
|
}
|
|
|
|
|
|
def platform_generic_path(_path: str) -> str:
|
|
platform_specific_path: str = _path
|
|
if platform.system() == "Windows":
|
|
platform_specific_path = platform_specific_path.replace("/", "\\")
|
|
return platform_specific_path
|
|
|
|
|
|
class MockMessage:
|
|
def __init__(self, **kwargs):
|
|
self.message_id = kwargs.get("id")
|
|
self.media = kwargs.get("media")
|
|
self.audio = kwargs.get("audio", None)
|
|
self.document = kwargs.get("document", None)
|
|
self.photo = kwargs.get("photo", None)
|
|
self.video = kwargs.get("video", None)
|
|
self.voice = kwargs.get("voice", None)
|
|
|
|
|
|
class MockAudio:
|
|
def __init__(self, **kwargs):
|
|
self.file_ref = kwargs["file_ref"]
|
|
self.file_name = kwargs["file_name"]
|
|
self.mime_type = kwargs["mime_type"]
|
|
|
|
|
|
class MockDocument:
|
|
def __init__(self, **kwargs):
|
|
self.file_ref = kwargs["file_ref"]
|
|
self.file_name = kwargs["file_name"]
|
|
self.mime_type = kwargs["mime_type"]
|
|
|
|
|
|
class MockPhoto:
|
|
def __init__(self, **kwargs):
|
|
self.file_ref = kwargs["file_ref"]
|
|
self.date = kwargs["date"]
|
|
|
|
|
|
class MockVoice:
|
|
def __init__(self, **kwargs):
|
|
self.file_ref = kwargs["file_ref"]
|
|
self.mime_type = kwargs["mime_type"]
|
|
self.date = kwargs["date"]
|
|
|
|
|
|
class MockVideo:
|
|
def __init__(self, **kwargs):
|
|
self.file_ref = kwargs["file_ref"]
|
|
self.mime_type = kwargs["mime_type"]
|
|
|
|
|
|
async def async_get_media_meta(message_media, _type):
|
|
result = await _get_media_meta(message_media, _type)
|
|
return result
|
|
|
|
|
|
async def async_download_media(client, message, media_types, file_formats):
|
|
result = await download_media(client, message, media_types, file_formats)
|
|
return result
|
|
|
|
|
|
async def async_begin_import(conf, pagination_limit):
|
|
result = await begin_import(conf, pagination_limit)
|
|
return result
|
|
|
|
|
|
async def mock_process_message(*args, **kwargs):
|
|
return 5
|
|
|
|
|
|
async def async_process_messages(client, messages, media_types, file_formats):
|
|
result = await process_messages(
|
|
client, messages, media_types, file_formats
|
|
)
|
|
return result
|
|
|
|
|
|
class MockClient:
|
|
def __init__(self, *args, **kwargs):
|
|
pass
|
|
|
|
def __aiter__(self):
|
|
return self
|
|
|
|
async def start(self):
|
|
pass
|
|
|
|
async def stop(self):
|
|
pass
|
|
|
|
async def iter_history(self, *args, **kwargs):
|
|
items = [
|
|
MockMessage(
|
|
id=1213,
|
|
media=True,
|
|
voice=MockVoice(
|
|
file_ref="AwADBQADbwAD2oTRVeHe5eXRFftfAg",
|
|
mime_type="audio/ogg",
|
|
date=1564066430,
|
|
),
|
|
),
|
|
MockMessage(id=1214, media=False, text="test message 1",),
|
|
MockMessage(id=1215, media=False, text="test message 2",),
|
|
MockMessage(id=1216, media=False, text="test message 3",),
|
|
]
|
|
for item in items:
|
|
yield item
|
|
|
|
async def download_media(self, *args, **kwargs):
|
|
assert "AwADBQADbwAD2oTRVeHe5eXRFftfAg", args[0]
|
|
assert platform_generic_path(
|
|
"/root/project/voice/voice_2019-07-25T14:53:50.ogg"
|
|
), kwargs["file_name"]
|
|
return kwargs["file_name"]
|
|
|
|
|
|
class MediaDownloaderTestCase(unittest.TestCase):
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
cls.loop = asyncio.get_event_loop()
|
|
|
|
@mock.patch("media_downloader.THIS_DIR", new=MOCK_DIR)
|
|
def test_get_media_meta(self):
|
|
# Test Voice notes
|
|
message = MockMessage(
|
|
id=1,
|
|
media=True,
|
|
voice=MockVoice(
|
|
file_ref="AwADBQADbwAD2oTRVeHe5eXRFftfAg",
|
|
mime_type="audio/ogg",
|
|
date=1564066430,
|
|
),
|
|
)
|
|
result = self.loop.run_until_complete(
|
|
async_get_media_meta(message.voice, "voice")
|
|
)
|
|
|
|
self.assertEqual(
|
|
(
|
|
"AwADBQADbwAD2oTRVeHe5eXRFftfAg",
|
|
platform_generic_path(
|
|
"/root/project/voice/voice_2019-07-25T14:53:50.ogg"
|
|
),
|
|
"ogg",
|
|
),
|
|
result,
|
|
)
|
|
|
|
# Test photos
|
|
message = MockMessage(
|
|
id=2,
|
|
media=True,
|
|
photo=MockPhoto(
|
|
file_ref="AgADBQAD5KkxG_FPQValJzQsJPyzhHcC", date=1565015712
|
|
),
|
|
)
|
|
result = self.loop.run_until_complete(
|
|
async_get_media_meta(message.photo, "photo")
|
|
)
|
|
self.assertEqual(
|
|
(
|
|
"AgADBQAD5KkxG_FPQValJzQsJPyzhHcC",
|
|
platform_generic_path("/root/project/photo/"),
|
|
None,
|
|
),
|
|
result,
|
|
)
|
|
|
|
# Test Documents
|
|
message = MockMessage(
|
|
id=3,
|
|
media=True,
|
|
document=MockDocument(
|
|
file_ref="AQADAgADq7LfMgAEIdy5DwAE4w4AAwI",
|
|
file_name="sample_document.pdf",
|
|
mime_type="application/pdf",
|
|
),
|
|
)
|
|
result = self.loop.run_until_complete(
|
|
async_get_media_meta(message.document, "document")
|
|
)
|
|
self.assertEqual(
|
|
(
|
|
"AQADAgADq7LfMgAEIdy5DwAE4w4AAwI",
|
|
platform_generic_path(
|
|
"/root/project/document/sample_document.pdf"
|
|
),
|
|
"pdf",
|
|
),
|
|
result,
|
|
)
|
|
|
|
# Test audio
|
|
message = MockMessage(
|
|
id=4,
|
|
media=True,
|
|
audio=MockAudio(
|
|
file_ref="AQADAgADq7LfMgAEIdy5DwAE5Q4AAgEC",
|
|
file_name="sample_audio.mp3",
|
|
mime_type="audio/mp3",
|
|
),
|
|
)
|
|
result = self.loop.run_until_complete(
|
|
async_get_media_meta(message.audio, "audio")
|
|
)
|
|
self.assertEqual(
|
|
(
|
|
"AQADAgADq7LfMgAEIdy5DwAE5Q4AAgEC",
|
|
platform_generic_path("/root/project/audio/sample_audio.mp3"),
|
|
"mp3",
|
|
),
|
|
result,
|
|
)
|
|
|
|
# Test Video
|
|
message = MockMessage(
|
|
id=5,
|
|
media=True,
|
|
video=MockVideo(
|
|
file_ref="CQADBQADeQIAAlL60FUCNMBdK8OjlAI",
|
|
mime_type="video/mp4",
|
|
),
|
|
)
|
|
result = self.loop.run_until_complete(
|
|
async_get_media_meta(message.video, "video")
|
|
)
|
|
self.assertEqual(
|
|
(
|
|
"CQADBQADeQIAAlL60FUCNMBdK8OjlAI",
|
|
platform_generic_path("/root/project/video/"),
|
|
"mp4",
|
|
),
|
|
result,
|
|
)
|
|
|
|
@mock.patch("media_downloader.THIS_DIR", new=MOCK_DIR)
|
|
def test_download_media(self):
|
|
client = MockClient()
|
|
message = MockMessage(
|
|
id=5,
|
|
media=True,
|
|
video=MockVideo(
|
|
file_ref="CQADBQADeQIAAlL60FUCNMBdK8OjlAI",
|
|
file_name="sample_video.mp4",
|
|
mime_type="video/mp4",
|
|
),
|
|
)
|
|
result = self.loop.run_until_complete(
|
|
async_download_media(
|
|
client, message, ["video", "photo"], {"video": ["mp4"]}
|
|
)
|
|
)
|
|
self.assertEqual(5, result)
|
|
|
|
message_1 = MockMessage(
|
|
id=6,
|
|
media=True,
|
|
video=MockVideo(
|
|
file_ref="CQADBQADeQIAAlL60FUCNMBdK8OjlAI",
|
|
file_name="sample_video.mov",
|
|
mime_type="video/mov",
|
|
),
|
|
)
|
|
result = self.loop.run_until_complete(
|
|
async_download_media(
|
|
client, message_1, ["video", "photo"], {"video": ["all"]}
|
|
)
|
|
)
|
|
self.assertEqual(6, result)
|
|
|
|
@mock.patch("__main__.__builtins__.open", new_callable=mock.mock_open)
|
|
@mock.patch("media_downloader.yaml", autospec=True)
|
|
def test_update_config(self, mock_yaml, mock_open):
|
|
conf = {"api_id": 123, "api_hash": "hasw5Tgawsuj67"}
|
|
update_config(conf)
|
|
mock_open.assert_called_with("config.yaml", "w")
|
|
mock_yaml.dump.assert_called_with(
|
|
conf, mock.ANY, default_flow_style=False
|
|
)
|
|
|
|
@mock.patch("media_downloader.pyrogram.Client", new=MockClient)
|
|
@mock.patch("media_downloader.process_messages", new=mock_process_message)
|
|
def test_begin_import(self):
|
|
result = self.loop.run_until_complete(async_begin_import(MOCK_CONF, 3))
|
|
conf = copy.deepcopy(MOCK_CONF)
|
|
conf["last_read_message_id"] = 5
|
|
self.assertDictEqual(result, conf)
|
|
|
|
def test_process_message(self):
|
|
client = MockClient()
|
|
result = self.loop.run_until_complete(
|
|
async_process_messages(
|
|
client,
|
|
[
|
|
MockMessage(
|
|
id=1213,
|
|
media=True,
|
|
voice=MockVoice(
|
|
file_ref="AwADBQADbwAD2oTRVeHe5eXRFftfAg",
|
|
mime_type="audio/ogg",
|
|
date=1564066430,
|
|
),
|
|
),
|
|
MockMessage(id=1214, media=False, text="test message 1",),
|
|
MockMessage(id=1215, media=False, text="test message 2",),
|
|
MockMessage(id=1216, media=False, text="test message 3",),
|
|
],
|
|
["voice", "photo"],
|
|
{"audio": ["all"], "voice": ["all"]},
|
|
)
|
|
)
|
|
self.assertEqual(result, 1216)
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
cls.loop.close()
|