telegram_media_downloader/tests/test_media_downloader.py
2020-07-11 08:40:17 +02:00

342 lines
9.8 KiB
Python

"""Unittest module for media downloader."""
import os
import copy
import logging
import unittest
import mock
import pytest
import asyncio
from media_downloader import (
_get_media_meta,
download_media,
update_config,
begin_import,
process_messages,
)
MOCK_DIR = "/root/project"
MOCK_CONF = {
"api_id": 123,
"api_hash": "hasw5Tgawsuj67",
"last_read_message_id": 0,
"chat_id": 8654123,
"media_types": ["audio", "voice"],
"file_formats": {"audio": ["all"], "voice": ["all"]},
}
def platform_generic_path(path: str) -> str:
return os.path.join("/", *path.split("/"))
class MockMessage:
def __init__(self, **kwargs):
self.message_id = kwargs.get("id")
self.media = kwargs.get("media")
self.audio = kwargs.get("audio", None)
self.document = kwargs.get("document", None)
self.photo = kwargs.get("photo", None)
self.video = kwargs.get("video", None)
self.voice = kwargs.get("voice", None)
class MockAudio:
def __init__(self, **kwargs):
self.file_ref = kwargs["file_ref"]
self.file_name = kwargs["file_name"]
self.mime_type = kwargs["mime_type"]
class MockDocument:
def __init__(self, **kwargs):
self.file_ref = kwargs["file_ref"]
self.file_name = kwargs["file_name"]
self.mime_type = kwargs["mime_type"]
class MockPhoto:
def __init__(self, **kwargs):
self.file_ref = kwargs["file_ref"]
self.date = kwargs["date"]
class MockVoice:
def __init__(self, **kwargs):
self.file_ref = kwargs["file_ref"]
self.mime_type = kwargs["mime_type"]
self.date = kwargs["date"]
class MockVideo:
def __init__(self, **kwargs):
self.file_ref = kwargs["file_ref"]
self.mime_type = kwargs["mime_type"]
async def async_get_media_meta(message_media, _type):
result = await _get_media_meta(message_media, _type)
return result
async def async_download_media(client, message, media_types, file_formats):
result = await download_media(client, message, media_types, file_formats)
return result
async def async_begin_import(conf, pagination_limit):
result = await begin_import(conf, pagination_limit)
return result
async def mock_process_message(*args, **kwargs):
return 5
async def async_process_messages(client, messages, media_types, file_formats):
result = await process_messages(
client, messages, media_types, file_formats
)
return result
class MockClient:
def __init__(self, *args, **kwargs):
pass
def __aiter__(self):
return self
async def start(self):
pass
async def stop(self):
pass
async def iter_history(self, *args, **kwargs):
items = [
MockMessage(
id=1213,
media=True,
voice=MockVoice(
file_ref="AwADBQADbwAD2oTRVeHe5eXRFftfAg",
mime_type="audio/ogg",
date=1564066430,
),
),
MockMessage(id=1214, media=False, text="test message 1",),
MockMessage(id=1215, media=False, text="test message 2",),
MockMessage(id=1216, media=False, text="test message 3",),
]
for item in items:
yield item
async def download_media(self, *args, **kwargs):
assert "AwADBQADbwAD2oTRVeHe5eXRFftfAg", args[0]
assert platform_generic_path(
"/root/project/voice/voice_2019-07-25T14:53:50.ogg"
), kwargs["file_name"]
return kwargs["file_name"]
class MediaDownloaderTestCase(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.loop = asyncio.get_event_loop()
@mock.patch("media_downloader.THIS_DIR", new=MOCK_DIR)
def test_get_media_meta(self):
# Test Voice notes
message = MockMessage(
id=1,
media=True,
voice=MockVoice(
file_ref="AwADBQADbwAD2oTRVeHe5eXRFftfAg",
mime_type="audio/ogg",
date=1564066430,
),
)
result = self.loop.run_until_complete(
async_get_media_meta(message.voice, "voice")
)
self.assertEqual(
(
"AwADBQADbwAD2oTRVeHe5eXRFftfAg",
platform_generic_path(
"/root/project/voice/voice_2019-07-25T14:53:50.ogg"
),
"ogg",
),
result,
)
# Test photos
message = MockMessage(
id=2,
media=True,
photo=MockPhoto(
file_ref="AgADBQAD5KkxG_FPQValJzQsJPyzhHcC", date=1565015712
),
)
result = self.loop.run_until_complete(
async_get_media_meta(message.photo, "photo")
)
self.assertEqual(
(
"AgADBQAD5KkxG_FPQValJzQsJPyzhHcC",
platform_generic_path("/root/project/photo/"),
None,
),
result,
)
# Test Documents
message = MockMessage(
id=3,
media=True,
document=MockDocument(
file_ref="AQADAgADq7LfMgAEIdy5DwAE4w4AAwI",
file_name="sample_document.pdf",
mime_type="application/pdf",
),
)
result = self.loop.run_until_complete(
async_get_media_meta(message.document, "document")
)
self.assertEqual(
(
"AQADAgADq7LfMgAEIdy5DwAE4w4AAwI",
platform_generic_path(
"/root/project/document/sample_document.pdf"
),
"pdf",
),
result,
)
# Test audio
message = MockMessage(
id=4,
media=True,
audio=MockAudio(
file_ref="AQADAgADq7LfMgAEIdy5DwAE5Q4AAgEC",
file_name="sample_audio.mp3",
mime_type="audio/mp3",
),
)
result = self.loop.run_until_complete(
async_get_media_meta(message.audio, "audio")
)
self.assertEqual(
(
"AQADAgADq7LfMgAEIdy5DwAE5Q4AAgEC",
platform_generic_path("/root/project/audio/sample_audio.mp3"),
"mp3",
),
result,
)
# Test Video
message = MockMessage(
id=5,
media=True,
video=MockVideo(
file_ref="CQADBQADeQIAAlL60FUCNMBdK8OjlAI",
mime_type="video/mp4",
),
)
result = self.loop.run_until_complete(
async_get_media_meta(message.video, "video")
)
self.assertEqual(
(
"CQADBQADeQIAAlL60FUCNMBdK8OjlAI",
platform_generic_path("/root/project/video/"),
"mp4",
),
result,
)
@mock.patch("media_downloader.THIS_DIR", new=MOCK_DIR)
def test_download_media(self):
client = MockClient()
message = MockMessage(
id=5,
media=True,
video=MockVideo(
file_ref="CQADBQADeQIAAlL60FUCNMBdK8OjlAI",
file_name="sample_video.mp4",
mime_type="video/mp4",
),
)
result = self.loop.run_until_complete(
async_download_media(
client, message, ["video", "photo"], {"video": ["mp4"]}
)
)
self.assertEqual(5, result)
message_1 = MockMessage(
id=6,
media=True,
video=MockVideo(
file_ref="CQADBQADeQIAAlL60FUCNMBdK8OjlAI",
file_name="sample_video.mov",
mime_type="video/mov",
),
)
result = self.loop.run_until_complete(
async_download_media(
client, message_1, ["video", "photo"], {"video": ["all"]}
)
)
self.assertEqual(6, result)
@mock.patch("__main__.__builtins__.open", new_callable=mock.mock_open)
@mock.patch("media_downloader.yaml", autospec=True)
def test_update_config(self, mock_yaml, mock_open):
conf = {"api_id": 123, "api_hash": "hasw5Tgawsuj67"}
update_config(conf)
mock_open.assert_called_with("config.yaml", "w")
mock_yaml.dump.assert_called_with(
conf, mock.ANY, default_flow_style=False
)
@mock.patch("media_downloader.pyrogram.Client", new=MockClient)
@mock.patch("media_downloader.process_messages", new=mock_process_message)
def test_begin_import(self):
result = self.loop.run_until_complete(async_begin_import(MOCK_CONF, 3))
conf = copy.deepcopy(MOCK_CONF)
conf["last_read_message_id"] = 5
self.assertDictEqual(result, conf)
def test_process_message(self):
client = MockClient()
result = self.loop.run_until_complete(
async_process_messages(
client,
[
MockMessage(
id=1213,
media=True,
voice=MockVoice(
file_ref="AwADBQADbwAD2oTRVeHe5eXRFftfAg",
mime_type="audio/ogg",
date=1564066430,
),
),
MockMessage(id=1214, media=False, text="test message 1",),
MockMessage(id=1215, media=False, text="test message 2",),
MockMessage(id=1216, media=False, text="test message 3",),
],
["voice", "photo"],
{"audio": ["all"], "voice": ["all"]},
)
)
self.assertEqual(result, 1216)
@classmethod
def tearDownClass(cls):
cls.loop.close()