telegram_media_downloader/tests/test_media_downloader.py

416 lines
12 KiB
Python
Raw Normal View History

2019-08-06 22:09:42 +08:00
"""Unittest module for media downloader."""
import os
import copy
import logging
2020-07-11 15:23:38 +08:00
import platform
2019-08-06 22:09:42 +08:00
import unittest
import mock
import pytest
2020-05-25 20:24:20 +08:00
import asyncio
2019-08-06 22:09:42 +08:00
from media_downloader import (
_get_media_meta,
2020-07-22 02:05:26 +08:00
_can_download,
_is_exist,
2019-08-06 22:09:42 +08:00
download_media,
update_config,
begin_import,
2020-05-25 20:24:20 +08:00
process_messages,
2019-08-06 22:09:42 +08:00
)
2020-07-11 15:23:38 +08:00
MOCK_DIR: str = "/root/project"
if platform.system() == "Windows":
MOCK_DIR = "\\root\\project"
2019-08-06 22:09:42 +08:00
MOCK_CONF = {
"api_id": 123,
"api_hash": "hasw5Tgawsuj67",
"last_read_message_id": 0,
"chat_id": 8654123,
"media_types": ["audio", "voice"],
2020-06-11 23:33:22 +08:00
"file_formats": {"audio": ["all"], "voice": ["all"]},
2019-08-06 22:09:42 +08:00
}
2020-07-11 15:23:38 +08:00
def platform_generic_path(_path: str) -> str:
platform_specific_path: str = _path
if platform.system() == "Windows":
platform_specific_path = platform_specific_path.replace("/", "\\")
return platform_specific_path
2020-07-11 14:40:17 +08:00
2020-07-22 02:05:26 +08:00
def mock_manage_duplicate_file(file_path: str) -> str:
return file_path
2019-08-06 22:09:42 +08:00
class MockMessage:
def __init__(self, **kwargs):
self.message_id = kwargs.get("id")
self.media = kwargs.get("media")
self.audio = kwargs.get("audio", None)
self.document = kwargs.get("document", None)
self.photo = kwargs.get("photo", None)
self.video = kwargs.get("video", None)
self.voice = kwargs.get("voice", None)
class MockAudio:
def __init__(self, **kwargs):
2019-11-08 00:56:33 +08:00
self.file_ref = kwargs["file_ref"]
2019-08-06 22:09:42 +08:00
self.file_name = kwargs["file_name"]
2020-06-11 23:33:22 +08:00
self.mime_type = kwargs["mime_type"]
2019-08-06 22:09:42 +08:00
class MockDocument:
def __init__(self, **kwargs):
2019-11-08 00:56:33 +08:00
self.file_ref = kwargs["file_ref"]
2019-08-06 22:09:42 +08:00
self.file_name = kwargs["file_name"]
2020-06-11 23:33:22 +08:00
self.mime_type = kwargs["mime_type"]
2019-08-06 22:09:42 +08:00
class MockPhoto:
def __init__(self, **kwargs):
2019-11-08 00:56:33 +08:00
self.file_ref = kwargs["file_ref"]
2019-08-06 22:09:42 +08:00
self.date = kwargs["date"]
class MockVoice:
def __init__(self, **kwargs):
2019-11-08 00:56:33 +08:00
self.file_ref = kwargs["file_ref"]
2019-08-06 22:09:42 +08:00
self.mime_type = kwargs["mime_type"]
self.date = kwargs["date"]
class MockVideo:
def __init__(self, **kwargs):
2019-11-08 00:56:33 +08:00
self.file_ref = kwargs["file_ref"]
2020-06-11 23:33:22 +08:00
self.mime_type = kwargs["mime_type"]
2019-08-06 22:09:42 +08:00
2020-05-25 20:24:20 +08:00
async def async_get_media_meta(message_media, _type):
result = await _get_media_meta(message_media, _type)
return result
2020-06-11 23:33:22 +08:00
async def async_download_media(client, message, media_types, file_formats):
result = await download_media(client, message, media_types, file_formats)
2020-05-25 20:24:20 +08:00
return result
2020-07-10 20:08:06 +08:00
async def async_begin_import(conf, pagination_limit):
result = await begin_import(conf, pagination_limit)
2020-05-25 20:24:20 +08:00
return result
async def mock_process_message(*args, **kwargs):
return 5
2020-07-10 20:08:06 +08:00
async def async_process_messages(client, messages, media_types, file_formats):
2020-05-25 20:24:20 +08:00
result = await process_messages(
2020-07-10 16:43:49 +08:00
client, messages, media_types, file_formats
2020-05-25 20:24:20 +08:00
)
return result
2019-08-06 22:09:42 +08:00
class MockClient:
def __init__(self, *args, **kwargs):
pass
2020-05-25 20:24:20 +08:00
def __aiter__(self):
return self
async def start(self):
pass
async def stop(self):
pass
async def iter_history(self, *args, **kwargs):
items = [
2019-08-06 22:09:42 +08:00
MockMessage(
id=1213,
media=True,
voice=MockVoice(
2019-11-08 00:56:33 +08:00
file_ref="AwADBQADbwAD2oTRVeHe5eXRFftfAg",
2019-08-06 22:09:42 +08:00
mime_type="audio/ogg",
date=1564066430,
),
2020-05-25 20:24:20 +08:00
),
MockMessage(id=1214, media=False, text="test message 1",),
MockMessage(id=1215, media=False, text="test message 2",),
MockMessage(id=1216, media=False, text="test message 3",),
2019-08-06 22:09:42 +08:00
]
2020-05-25 20:24:20 +08:00
for item in items:
yield item
2019-08-06 22:09:42 +08:00
2020-05-25 20:24:20 +08:00
async def download_media(self, *args, **kwargs):
2019-08-06 22:09:42 +08:00
assert "AwADBQADbwAD2oTRVeHe5eXRFftfAg", args[0]
2020-07-11 14:40:17 +08:00
assert platform_generic_path(
"/root/project/voice/voice_2019-07-25T14:53:50.ogg"
), kwargs["file_name"]
2019-08-06 22:09:42 +08:00
return kwargs["file_name"]
class MediaDownloaderTestCase(unittest.TestCase):
2020-05-25 20:24:20 +08:00
@classmethod
def setUpClass(cls):
cls.loop = asyncio.get_event_loop()
2019-08-06 22:09:42 +08:00
@mock.patch("media_downloader.THIS_DIR", new=MOCK_DIR)
def test_get_media_meta(self):
# Test Voice notes
message = MockMessage(
id=1,
media=True,
voice=MockVoice(
2019-11-08 00:56:33 +08:00
file_ref="AwADBQADbwAD2oTRVeHe5eXRFftfAg",
2019-08-06 22:09:42 +08:00
mime_type="audio/ogg",
date=1564066430,
),
)
2020-05-25 20:24:20 +08:00
result = self.loop.run_until_complete(
async_get_media_meta(message.voice, "voice")
)
2019-08-06 22:09:42 +08:00
self.assertEqual(
(
"AwADBQADbwAD2oTRVeHe5eXRFftfAg",
2020-07-11 14:40:17 +08:00
platform_generic_path(
"/root/project/voice/voice_2019-07-25T14:53:50.ogg"
),
2020-06-11 23:33:22 +08:00
"ogg",
2019-08-06 22:09:42 +08:00
),
result,
)
# Test photos
message = MockMessage(
id=2,
media=True,
photo=MockPhoto(
2019-11-08 00:56:33 +08:00
file_ref="AgADBQAD5KkxG_FPQValJzQsJPyzhHcC", date=1565015712
2019-08-06 22:09:42 +08:00
),
)
2020-05-25 20:24:20 +08:00
result = self.loop.run_until_complete(
async_get_media_meta(message.photo, "photo")
)
2019-08-06 22:09:42 +08:00
self.assertEqual(
2020-07-11 14:40:17 +08:00
(
"AgADBQAD5KkxG_FPQValJzQsJPyzhHcC",
platform_generic_path("/root/project/photo/"),
None,
),
2019-08-06 22:09:42 +08:00
result,
)
# Test Documents
message = MockMessage(
id=3,
media=True,
document=MockDocument(
2019-11-08 00:56:33 +08:00
file_ref="AQADAgADq7LfMgAEIdy5DwAE4w4AAwI",
2019-08-06 22:09:42 +08:00
file_name="sample_document.pdf",
2020-06-11 23:33:22 +08:00
mime_type="application/pdf",
2019-08-06 22:09:42 +08:00
),
)
2020-05-25 20:24:20 +08:00
result = self.loop.run_until_complete(
async_get_media_meta(message.document, "document")
)
2019-08-06 22:09:42 +08:00
self.assertEqual(
(
"AQADAgADq7LfMgAEIdy5DwAE4w4AAwI",
2020-07-11 14:40:17 +08:00
platform_generic_path(
"/root/project/document/sample_document.pdf"
),
2020-06-11 23:33:22 +08:00
"pdf",
2019-08-06 22:09:42 +08:00
),
result,
)
# Test audio
message = MockMessage(
id=4,
media=True,
audio=MockAudio(
2019-11-08 00:56:33 +08:00
file_ref="AQADAgADq7LfMgAEIdy5DwAE5Q4AAgEC",
2019-08-06 22:09:42 +08:00
file_name="sample_audio.mp3",
2020-06-11 23:33:22 +08:00
mime_type="audio/mp3",
2019-08-06 22:09:42 +08:00
),
)
2020-05-25 20:24:20 +08:00
result = self.loop.run_until_complete(
async_get_media_meta(message.audio, "audio")
)
2019-08-06 22:09:42 +08:00
self.assertEqual(
(
"AQADAgADq7LfMgAEIdy5DwAE5Q4AAgEC",
2020-07-11 14:40:17 +08:00
platform_generic_path("/root/project/audio/sample_audio.mp3"),
2020-06-11 23:33:22 +08:00
"mp3",
2019-08-06 22:09:42 +08:00
),
result,
)
# Test Video
message = MockMessage(
id=5,
media=True,
2020-06-11 23:33:22 +08:00
video=MockVideo(
file_ref="CQADBQADeQIAAlL60FUCNMBdK8OjlAI",
mime_type="video/mp4",
),
2019-08-06 22:09:42 +08:00
)
2020-05-25 20:24:20 +08:00
result = self.loop.run_until_complete(
async_get_media_meta(message.video, "video")
)
2019-08-06 22:09:42 +08:00
self.assertEqual(
2020-07-11 14:40:17 +08:00
(
"CQADBQADeQIAAlL60FUCNMBdK8OjlAI",
platform_generic_path("/root/project/video/"),
"mp4",
),
2019-08-06 22:09:42 +08:00
result,
)
@mock.patch("media_downloader.THIS_DIR", new=MOCK_DIR)
def test_download_media(self):
client = MockClient()
2020-05-25 20:24:20 +08:00
message = MockMessage(
id=5,
media=True,
video=MockVideo(
file_ref="CQADBQADeQIAAlL60FUCNMBdK8OjlAI",
file_name="sample_video.mp4",
2020-06-11 23:33:22 +08:00
mime_type="video/mp4",
2020-05-25 20:24:20 +08:00
),
)
result = self.loop.run_until_complete(
2020-06-11 23:33:22 +08:00
async_download_media(
client, message, ["video", "photo"], {"video": ["mp4"]}
)
2020-05-25 20:24:20 +08:00
)
self.assertEqual(5, result)
2019-08-06 22:09:42 +08:00
2020-06-11 23:33:22 +08:00
message_1 = MockMessage(
id=6,
media=True,
video=MockVideo(
file_ref="CQADBQADeQIAAlL60FUCNMBdK8OjlAI",
file_name="sample_video.mov",
mime_type="video/mov",
),
)
result = self.loop.run_until_complete(
async_download_media(
client, message_1, ["video", "photo"], {"video": ["all"]}
)
)
self.assertEqual(6, result)
2019-08-06 22:09:42 +08:00
@mock.patch("__main__.__builtins__.open", new_callable=mock.mock_open)
@mock.patch("media_downloader.yaml", autospec=True)
def test_update_config(self, mock_yaml, mock_open):
conf = {"api_id": 123, "api_hash": "hasw5Tgawsuj67"}
update_config(conf)
mock_open.assert_called_with("config.yaml", "w")
mock_yaml.dump.assert_called_with(
conf, mock.ANY, default_flow_style=False
)
2020-05-25 20:24:20 +08:00
@mock.patch("media_downloader.pyrogram.Client", new=MockClient)
@mock.patch("media_downloader.process_messages", new=mock_process_message)
def test_begin_import(self):
2020-07-10 20:08:06 +08:00
result = self.loop.run_until_complete(async_begin_import(MOCK_CONF, 3))
2019-08-06 22:09:42 +08:00
conf = copy.deepcopy(MOCK_CONF)
2020-07-10 16:43:49 +08:00
conf["last_read_message_id"] = 5
2020-05-25 20:24:20 +08:00
self.assertDictEqual(result, conf)
def test_process_message(self):
client = MockClient()
result = self.loop.run_until_complete(
async_process_messages(
2020-06-11 23:33:22 +08:00
client,
2020-07-10 16:43:49 +08:00
[
MockMessage(
id=1213,
media=True,
voice=MockVoice(
file_ref="AwADBQADbwAD2oTRVeHe5eXRFftfAg",
mime_type="audio/ogg",
2020-07-22 02:05:26 +08:00
date=1564066340,
),
),
MockMessage(id=1214, media=False, text="test message 1",),
MockMessage(id=1215, media=False, text="test message 2",),
MockMessage(id=1216, media=False, text="test message 3",),
],
["voice", "photo"],
{"audio": ["all"], "voice": ["all"]},
)
)
self.assertEqual(result, 1216)
@mock.patch("media_downloader._is_exist", return_value=True)
@mock.patch(
"media_downloader.manage_duplicate_file",
new=mock_manage_duplicate_file,
)
def test_process_message_when_file_exists(self, mock_is_exist):
client = MockClient()
result = self.loop.run_until_complete(
async_process_messages(
client,
[
MockMessage(
id=1213,
media=True,
voice=MockVoice(
file_ref="AwADBQADbwAD2oTRVeHe5eXRFftfAg",
mime_type="audio/ogg",
date=1564066340,
2020-07-10 16:43:49 +08:00
),
),
MockMessage(id=1214, media=False, text="test message 1",),
MockMessage(id=1215, media=False, text="test message 2",),
MockMessage(id=1216, media=False, text="test message 3",),
],
2020-06-11 23:33:22 +08:00
["voice", "photo"],
{"audio": ["all"], "voice": ["all"]},
2020-05-25 20:24:20 +08:00
)
)
self.assertEqual(result, 1216)
2020-07-22 02:05:26 +08:00
def test_can_download(self):
file_formats = {
"audio": ["mp3"],
"video": ["mp4"],
"document": ["all"],
}
result = _can_download("audio", file_formats, "mp3")
self.assertEqual(result, True)
result1 = _can_download("audio", file_formats, "ogg")
self.assertEqual(result1, False)
result2 = _can_download("document", file_formats, "pdf")
self.assertEqual(result2, True)
result3 = _can_download("document", file_formats, "epub")
self.assertEqual(result3, True)
def test_is_exist(self):
this_dir = os.path.dirname(os.path.abspath(__file__))
result = _is_exist(os.path.join(this_dir, "__init__.py"))
self.assertEqual(result, True)
result1 = _is_exist(os.path.join(this_dir, "init.py"))
self.assertEqual(result1, False)
result2 = _is_exist(this_dir)
self.assertEqual(result2, False)
2020-05-25 20:24:20 +08:00
@classmethod
def tearDownClass(cls):
cls.loop.close()