Version 1.0. Details in /.releases/release_1.0.md

This commit is contained in:
Taras Terletskyi 2023-02-25 19:25:39 +02:00
parent 274aa03428
commit 3513a0077a
33 changed files with 813 additions and 388 deletions

2
.env
View file

@ -1 +1 @@
COMPOSE_PROJECT_NAME=YT
COMPOSE_PROJECT_NAME=yt

6
.gitignore vendored
View file

@ -1,9 +1,11 @@
# Byte-compiled / optimized / DLL files
*tmp*
*.session
*.session-journal
config.yml
config.yaml
config*.yml
config*.yaml
!app_bot/config-example.yml
.ruff_cache
__pycache__/

24
.releases/release_1.0.md Normal file
View file

@ -0,0 +1,24 @@
# Release info
Version: 1.0
Release date: February 25, 2023
# Important
1. Changed content yt-dlp options in `app_worker/DEFAULT_YTDL_OPTS/default.py`
2. Added two new user config options in `app_bot/config-example.yml`:
1. `download_media_type`: What do download - audio (mp3), video or both. Values can
be `AUDIO`, `VIDEO`, `AUDIO_VIDEO`.
2. `save_to_storage`: Moved from `envs/.env_worker`
3. Creating task on API now requires previously mentioned two fields in payload to be
sent.
# New features
1. Now bot can download audio (mp3), video (default), or both. Just configure the
preferred mode for the particular user/group.
# Misc
N/A

View file

@ -2,15 +2,15 @@
Simple and reliable YouTube Download Telegram Bot.
Version: 0.9. [Release details](.releases/release_0.9.md).
Version: 1.0. [Release details](.releases/release_1.0.md).
![frames](.assets/download_success.png)
## 😂 Features
* Download videos from any [yt-dlp](https://github.com/yt-dlp/yt-dlp) supported website
* Download audio and videos from any [yt-dlp](https://github.com/yt-dlp/yt-dlp) supported website
to your storage
* Upload downloaded videos to the Telegram chat
* Upload downloaded audio and videos to the Telegram chat
* Trigger video download by sending link to an API
* Track download tasks via API
@ -26,7 +26,7 @@ Version: 0.9. [Release details](.releases/release_0.9.md).
and change or remove `forward_group_id` value (if you want to forward the video to
some group when upload is enabled)
7. Check the default environment variables in `envs/.env_common` and change if needed
8. Video storage path (`STORAGE_PATH` environment variable) is located in
8. Video `STORAGE_PATH` environment variable is located in
the `envs/.env_worker` file. By default, it's `/filestorage` path inside the
container. What you want is to map the real path to this inside
the `docker-compose.yml` file for `worker` service, e.g. if you're on Windows, next

View file

@ -1,8 +1,8 @@
import uuid
from datetime import datetime
from pydantic import StrictFloat, StrictInt, StrictStr
from yt_shared.enums import TaskSource, TaskStatus
from pydantic import StrictBool, StrictFloat, StrictInt, StrictStr
from yt_shared.enums import DownMediaType, TaskSource, TaskStatus
from yt_shared.schemas.base import RealBaseModel
from api.api.api_v1.schemas.base import BaseOrmModel
@ -47,15 +47,17 @@ class TaskSimpleSchema(BaseOrmModel):
message_id: StrictInt | None
yt_dlp_version: StrictStr | None
error: StrictStr | None
file: FileSimpleSchema | None
files: list[FileSimpleSchema]
class TaskSchema(TaskSimpleSchema):
file: FileSchema | None
files: list[FileSchema]
class CreateTaskIn(RealBaseModel):
url: StrictStr
download_media_type: DownMediaType
save_to_storage: StrictBool
class CreateTaskOut(RealBaseModel):

View file

@ -24,7 +24,7 @@ class DatabaseRepository:
stmt = (
select(Task)
.options(
joinedload(Task.file)
joinedload(Task.files)
.load_only(*self._get_load_file_cols(include_meta))
.options(joinedload(File.cache))
)
@ -37,6 +37,7 @@ class DatabaseRepository:
stmt = stmt.filter(Task.status.in_(status))
tasks = await self._db.execute(stmt)
tasks.unique()
return tasks.scalars().all()
@staticmethod
@ -60,13 +61,14 @@ class DatabaseRepository:
stmt = (
select(Task)
.options(
joinedload(Task.file)
joinedload(Task.files)
.load_only(*self._get_load_file_cols(include_meta))
.options(joinedload(File.cache))
)
.filter(Task.id == id)
)
task = await self._db.execute(stmt)
task.unique()
return task.scalar_one()
async def get_latest_task(self, include_meta: bool) -> Task:
@ -74,13 +76,14 @@ class DatabaseRepository:
select(Task)
.order_by(desc(Task.created))
.options(
joinedload(Task.file)
joinedload(Task.files)
.load_only(*self._get_load_file_cols(include_meta))
.options(joinedload(File.cache))
)
.limit(1)
)
task = await self._db.execute(stmt)
task.unique()
return task.scalar_one()
async def delete_task(self, id: str | uuid.UUID) -> None:

View file

@ -6,7 +6,7 @@ from sqlalchemy.ext.asyncio import AsyncSession
from yt_shared.enums import TaskSource, TaskStatus
from yt_shared.models import Task
from yt_shared.rabbit.publisher import Publisher
from yt_shared.schemas.video import VideoPayload
from yt_shared.schemas.media import IncomingMediaPayload
from api.api.api_v1.schemas.task import (
CreateTaskIn,
@ -66,8 +66,13 @@ class TaskService:
task_id = uuid.uuid4()
source = TaskSource.API
added_at = datetime.now(timezone.utc)
payload = VideoPayload(
id=task_id, url=task.url, added_at=added_at, source=source
payload = IncomingMediaPayload(
id=task_id,
url=task.url,
added_at=added_at,
source=source,
download_media_type=task.download_media_type,
save_to_storage=task.save_to_storage,
)
if not await publisher.send_for_download(payload):
raise TaskServiceError('Failed to create task')

View file

@ -30,7 +30,7 @@ class TelegramCallback:
async def on_message(self, client: VideoBot, message: Message) -> None:
"""Receive video URL and send to the download worker."""
self._log.debug(message)
urls = self._get_urls(message)
urls = self._parse_urls(message)
await self._url_service.process_urls(urls=urls)
await self._send_acknowledge_message(message=message, urls=urls)
@ -48,8 +48,9 @@ class TelegramCallback:
reply_to_message_id=message.id,
)
@staticmethod
def _get_urls(message: Message) -> list[URL]:
def _parse_urls(self, message: Message) -> list[URL]:
bot: VideoBot = message._client # noqa
user = bot.allowed_users[message.from_user.id]
return [
URL(
url=url,
@ -57,6 +58,8 @@ class TelegramCallback:
from_chat_type=TelegramChatType(message.chat.type.value),
from_user_id=message.from_user.id,
message_id=message.id,
save_to_storage=user.save_to_storage,
download_media_type=user.download_media_type,
)
for url in message.text.splitlines()
]

View file

@ -5,6 +5,7 @@ from pydantic import (
constr,
validator,
)
from yt_shared.enums import DownMediaType
from yt_shared.schemas.base import RealBaseModel
_LANG_CODE_LEN = 2
@ -36,6 +37,8 @@ class UserUploadSchema(RealBaseModel):
class UserSchema(BaseUserSchema):
send_startup_message: StrictBool
download_media_type: DownMediaType
save_to_storage: StrictBool
upload: UserUploadSchema
@property

View file

@ -4,19 +4,24 @@ import traceback
from pyrogram.enums import ParseMode
from yt_shared.emoji import SUCCESS_EMOJI
from yt_shared.enums import TaskSource
from yt_shared.enums import MediaFileType, TaskSource
from yt_shared.rabbit.publisher import Publisher
from yt_shared.schemas.error import ErrorGeneralPayload
from yt_shared.schemas.media import Audio, Video
from yt_shared.schemas.success import SuccessPayload
from yt_shared.utils.file import remove_dir
from yt_shared.utils.tasks.tasks import create_task
from bot.core.handlers.abstract import AbstractHandler
from bot.core.tasks.upload import UploadTask
from bot.core.tasks.upload import AudioUploadTask, VideoUploadTask
class SuccessHandler(AbstractHandler):
_body: SuccessPayload
_UPLOAD_TASK_MAP = {
MediaFileType.AUDIO: AudioUploadTask,
MediaFileType.VIDEO: VideoUploadTask,
}
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
@ -24,9 +29,12 @@ class SuccessHandler(AbstractHandler):
async def handle(self) -> None:
try:
await self._handle()
except Exception as err:
await self._publish_error_message(err)
coro_tasks = []
for media_object in self._body.media.get_media_objects():
coro_tasks.append(self._handle(media_object))
await asyncio.gather(*coro_tasks)
finally:
self._cleanup()
async def _publish_error_message(self, err: Exception) -> None:
err_payload = ErrorGeneralPayload(
@ -44,44 +52,43 @@ class SuccessHandler(AbstractHandler):
)
await self._publisher.send_download_error(err_payload)
async def _handle(self) -> None:
await self._send_success_text()
async def _handle(self, media_object: Audio | Video) -> None:
try:
await self._send_success_text(media_object)
if self._upload_is_enabled():
self._validate_file_size_for_upload()
await self._create_upload_task()
self._validate_file_size_for_upload(media_object)
await self._create_upload_task(media_object)
else:
self._log.warning(
'File %s will not be uploaded due to upload configuration',
self._body.filepath,
media_object.filepath,
)
except Exception:
self._log.exception(
'Upload of "%s" failed, performing cleanup', self._body.filepath
)
raise
finally:
self._cleanup()
except Exception as err:
self._log.exception('Upload of "%s" failed', media_object.filepath)
await self._publish_error_message(err)
def _cleanup(self) -> None:
root_path = self._body.media.root_path
self._log.info(
'Final task "%s" cleanup. Removing download content directory "%s" with files %s',
self._body.task_id,
self._body.root_path,
os.listdir(self._body.root_path),
root_path,
os.listdir(root_path),
)
remove_dir(self._body.root_path)
remove_dir(root_path)
async def _create_upload_task(self) -> None:
async def _create_upload_task(self, media_object: Audio | Video) -> None:
"""Upload video to Telegram chat."""
semaphore = asyncio.Semaphore(value=self._bot.conf.telegram.max_upload_tasks)
task_name = UploadTask.__class__.__name__
upload_task_cls = self._UPLOAD_TASK_MAP[media_object.file_type]
task_name = upload_task_cls.__class__.__name__
await create_task(
UploadTask(
body=self._body,
upload_task_cls(
media_object=media_object,
users=self._receiving_users,
bot=self._bot,
semaphore=semaphore,
context=self._body,
).run(),
task_name=task_name,
logger=self._log,
@ -89,8 +96,8 @@ class SuccessHandler(AbstractHandler):
exception_message_args=(task_name,),
)
async def _send_success_text(self) -> None:
text = f'{SUCCESS_EMOJI} <b>Downloaded</b> {self._body.filename}'
async def _send_success_text(self, media_object: Audio | Video) -> None:
text = f'{SUCCESS_EMOJI} <b>Downloaded</b> {media_object.filename}'
for user in self._receiving_users:
kwargs = {
'chat_id': user.id,
@ -109,21 +116,23 @@ class SuccessHandler(AbstractHandler):
user = self._bot.allowed_users[self._get_sender_id()]
return user.upload.upload_video_file
def _validate_file_size_for_upload(self) -> None:
def _validate_file_size_for_upload(self, media_object: Audio | Video) -> None:
if self._body.context.source is TaskSource.API:
max_file_size = self._bot.conf.telegram.api.upload_video_max_file_size
else:
user = self._bot.allowed_users[self._get_sender_id()]
max_file_size = user.upload.upload_video_max_file_size
if not os.path.exists(self._body.filepath):
raise ValueError(f'Video {self._body.filepath} not found')
if not os.path.exists(media_object.filepath):
raise ValueError(
f'{media_object.file_type} {media_object.filepath} not found'
)
file_size = os.stat(self._body.filepath).st_size
file_size = os.stat(media_object.filepath).st_size
if file_size > max_file_size:
err_msg = (
f'Video file size {file_size} bytes bigger then allowed {max_file_size}'
f' bytes. Will not upload'
f'{media_object.file_type} file size {file_size} bytes bigger than '
f'allowed {max_file_size} bytes. Will not upload'
)
self._log.warning(err_msg)
raise ValueError(err_msg)

View file

@ -2,8 +2,8 @@ import logging
from yt_shared.enums import TaskSource
from yt_shared.rabbit.publisher import Publisher
from yt_shared.schemas.media import IncomingMediaPayload
from yt_shared.schemas.url import URL
from yt_shared.schemas.video import VideoPayload
class URLService:
@ -11,21 +11,20 @@ class URLService:
self._log = logging.getLogger(self.__class__.__name__)
self._publisher = Publisher()
async def process_url(self, url: URL) -> bool:
return await self._send_to_worker(url)
async def process_urls(self, urls: list[URL]) -> None:
for url in urls:
await self._send_to_worker(url)
async def _send_to_worker(self, url: URL) -> bool:
payload = VideoPayload(
payload = IncomingMediaPayload(
url=url.url,
message_id=url.message_id,
from_user_id=url.from_user_id,
from_chat_id=url.from_chat_id,
from_chat_type=url.from_chat_type,
source=TaskSource.BOT,
save_to_storage=url.save_to_storage,
download_media_type=url.download_media_type,
)
is_sent = await self._publisher.send_for_download(payload)
if not is_sent:

View file

@ -1,15 +1,19 @@
import abc
import asyncio
from itertools import chain
from typing import TYPE_CHECKING, Coroutine
from pydantic import StrictFloat, StrictInt, StrictStr
from pyrogram.enums import ChatAction, MessageMediaType, ParseMode
from pyrogram.types import Animation, Message, Video
from pyrogram.types import Animation, Message
from pyrogram.types import Audio as _Audio
from pyrogram.types import Video as _Video
from tenacity import retry, stop_after_attempt, wait_fixed
from yt_shared.db.session import get_db
from yt_shared.repositories.task import TaskRepository
from yt_shared.schemas.base import RealBaseModel
from yt_shared.schemas.cache import CacheSchema
from yt_shared.schemas.media import Audio, Video
from yt_shared.schemas.success import SuccessPayload
from yt_shared.utils.tasks.abstract import AbstractTask
from yt_shared.utils.tasks.tasks import create_task
@ -22,55 +26,64 @@ if TYPE_CHECKING:
from bot.core.bot import VideoBot
class VideoContext(RealBaseModel):
class AbstractUploadContext(RealBaseModel):
caption: StrictStr
file_name: StrictStr
video_path: StrictStr
filename: StrictStr
filepath: StrictStr
duration: StrictFloat
height: StrictInt
width: StrictInt
thumb: StrictStr
type: MessageMediaType
class UploadTask(AbstractTask):
class VideoUploadContext(AbstractUploadContext):
height: StrictInt
width: StrictInt
thumb: StrictStr
class AudioUploadContext(AbstractUploadContext):
pass
class AbstractUploadTask(AbstractTask, metaclass=abc.ABCMeta):
_UPLOAD_ACTION: ChatAction
def __init__(
self,
body: SuccessPayload,
media_object: Audio | Video,
users: list[BaseUserSchema | UserSchema],
bot: 'VideoBot',
semaphore: asyncio.Semaphore,
context: SuccessPayload,
) -> None:
super().__init__()
self._config = get_main_config()
self._body = body
self.filename = body.filename
self.filepath = body.filepath
self.thumb_path = body.thumb_path
self._media_object = media_object
self._filename = media_object.filename
self._filepath = media_object.filepath
self._bot = bot
self._users = users
self._semaphore = semaphore
self._ctx = context
self._media_ctx = self._create_media_context()
self._upload_chat_ids, self._forward_chat_ids = self._get_upload_chat_ids()
self._video_ctx = self._create_video_context()
self._cached_message: Message | None = None
async def run(self) -> None:
async with self._semaphore:
self._log.debug('Semaphore for "%s" acquired', self.filename)
self._log.debug('Semaphore for "%s" acquired', self._filename)
await self._run()
self._log.debug('Semaphore for "%s" released', self.filename)
self._log.debug('Semaphore for "%s" released', self._filename)
async def _run(self) -> None:
try:
await asyncio.gather(*(self._send_upload_text(), self._upload_video_file()))
await asyncio.gather(*(self._send_upload_text(), self._upload_file()))
except Exception:
self._log.exception('Exception in upload task for "%s"', self.filename)
self._log.exception('Exception in upload task for "%s"', self._filename)
raise
async def _send_upload_text(self) -> None:
text = f'⬆️ {bold("Uploading")} {self.filename}'
text = f'⬆️ {bold("Uploading")} {self._filename}'
coros = []
for chat_id in self._upload_chat_ids:
kwargs = {
@ -78,8 +91,8 @@ class UploadTask(AbstractTask):
'text': text,
'parse_mode': ParseMode.HTML,
}
if self._body.message_id:
kwargs['reply_to_message_id'] = self._body.message_id
if self._ctx.message_id:
kwargs['reply_to_message_id'] = self._ctx.message_id
coros.append(self._bot.send_message(**kwargs))
await asyncio.gather(*coros)
@ -93,16 +106,29 @@ class UploadTask(AbstractTask):
forward_chat_ids.append(user.upload.forward_group_id)
return chat_ids, forward_chat_ids
async def _upload_video_file(self) -> None:
@retry(wait=wait_fixed(3), stop=stop_after_attempt(3), reraise=True)
async def __upload(self, chat_id: int) -> Message | None:
self._log.debug('Uploading to "%d" with context: %s', chat_id, self._media_ctx)
return await self._generate_send_media_coroutine(chat_id)
@abc.abstractmethod
def _generate_send_media_coroutine(self, chat_id: int) -> Coroutine:
pass
@abc.abstractmethod
def _create_media_context(self) -> AudioUploadContext | VideoUploadContext:
pass
async def _upload_file(self) -> None:
for chat_id in chain(self._upload_chat_ids, self._forward_chat_ids):
self._log.info('Uploading "%s" to chat id "%d"', self.filename, chat_id)
await self._bot.send_chat_action(chat_id, action=ChatAction.UPLOAD_VIDEO)
self._log.info('Uploading "%s" to chat id "%d"', self._filename, chat_id)
await self._bot.send_chat_action(chat_id, action=self._UPLOAD_ACTION)
try:
message = await self.__upload(chat_id=chat_id)
except Exception:
self._log.error(
'Failed to upload "%s" to "%d"',
self._video_ctx.video_path,
self._media_ctx.filepath,
chat_id,
)
raise
@ -111,27 +137,130 @@ class UploadTask(AbstractTask):
if not self._cached_message and message:
self._cache_data(message)
@retry(wait=wait_fixed(3), stop=stop_after_attempt(3), reraise=True)
async def __upload(self, chat_id: int) -> Message | None:
self._log.debug('Uploading to "%d" with context: %s', chat_id, self._video_ctx)
return await self._generate_send_media_coroutine(chat_id)
def _create_cache_task(self, cache_object: _Audio | _Video | Animation) -> None:
self._log.info('Creating cache task for %s', cache_object)
db_cache_task_name = 'Save cache to DB'
create_task(
self._save_cache_to_db(cache_object),
task_name=db_cache_task_name,
logger=self._log,
exception_message='Task "%s" raised an exception',
exception_message_args=(db_cache_task_name,),
)
@abc.abstractmethod
def _cache_data(self, message: Message) -> None:
pass
async def _save_cache_to_db(self, file: _Audio | _Video | Animation) -> None:
cache = CacheSchema(
cache_id=file.file_id,
cache_unique_id=file.file_unique_id,
file_size=file.file_size,
date_timestamp=file.date,
)
async for db in get_db():
await TaskRepository().save_file_cache(
cache=cache,
file_id=self._media_object.orm_file_id,
db=db,
)
class AudioUploadTask(AbstractUploadTask):
_UPLOAD_ACTION = ChatAction.UPLOAD_AUDIO
_media_ctx: AudioUploadContext
def _generate_send_media_coroutine(self, chat_id: int) -> Coroutine:
kwargs = {
'chat_id': chat_id,
'caption': self._video_ctx.caption,
'file_name': self._video_ctx.file_name,
'duration': int(self._video_ctx.duration),
'height': self._video_ctx.height,
'width': self._video_ctx.width,
'thumb': self._video_ctx.thumb,
'audio': self._media_ctx.filepath,
'caption': self._media_ctx.caption,
'file_name': self._media_ctx.filename,
'duration': int(self._media_ctx.duration),
}
if self._video_ctx.type is MessageMediaType.ANIMATION:
return self._bot.send_audio(**kwargs)
def _create_media_context(self) -> AudioUploadContext:
return AudioUploadContext(
caption=self._generate_file_caption(),
filename=self._filename,
filepath=self._filepath,
duration=self._media_object.duration or 0.0,
type=MessageMediaType.AUDIO,
)
def _cache_data(self, message: Message) -> None:
self._log.info('Saving telegram file cache')
audio = message.audio
if not audio:
err_msg = 'Telegram message response does not contain audio'
self._log.error('%s: %s', err_msg, message)
raise RuntimeError(err_msg)
self._media_ctx.type = message.media
self._media_ctx.filepath = audio.file_id
self._media_ctx.duration = audio.duration
self._cached_message = message
self._create_cache_task(cache_object=audio)
def _generate_file_caption(self) -> str:
caption_items = []
caption_items.append(f'{bold("Title:")} {self._media_object.title}')
caption_items.append(f'{bold("Filename:")} {self._media_object.filename}')
caption_items.append(f'{bold("URL:")} {self._ctx.context.url}')
return '\n'.join(caption_items)
class VideoUploadTask(AbstractUploadTask):
_UPLOAD_ACTION = ChatAction.UPLOAD_VIDEO
_media_ctx: VideoUploadContext
def _create_media_context(self) -> VideoUploadContext:
return VideoUploadContext(
caption=self._generate_file_caption(),
filename=self._filename,
filepath=self._filepath,
duration=self._media_object.duration or 0.0,
height=self._media_object.height or 0,
width=self._media_object.width or 0,
thumb=self._media_object.thumb_path,
type=MessageMediaType.VIDEO,
)
def _generate_file_caption(self) -> str:
caption_items = []
if self._users[0].is_base_user:
caption_conf = self._bot.conf.telegram.api.video_caption
else:
caption_conf = self._users[0].upload.video_caption
if caption_conf.include_title:
caption_items.append(f'{bold("Title:")} {self._media_object.title}')
if caption_conf.include_filename:
caption_items.append(f'{bold("Filename:")} {self._media_object.filename}')
if caption_conf.include_link:
caption_items.append(f'{bold("URL:")} {self._ctx.context.url}')
return '\n'.join(caption_items)
def _generate_send_media_coroutine(self, chat_id: int) -> Coroutine:
kwargs = {
'chat_id': chat_id,
'caption': self._media_ctx.caption,
'file_name': self._media_ctx.filename,
'duration': int(self._media_ctx.duration),
'height': self._media_ctx.height,
'width': self._media_ctx.width,
'thumb': self._media_ctx.thumb,
}
if self._media_ctx.type is MessageMediaType.ANIMATION:
send_func_name = 'send_animation'
kwargs['animation'] = self._video_ctx.video_path
kwargs['animation'] = self._media_ctx.filepath
else:
send_func_name = 'send_video'
kwargs['video'] = self._video_ctx.video_path
kwargs['video'] = self._media_ctx.filepath
kwargs['supports_streaming'] = True
return getattr(self._bot, send_func_name)(**kwargs)
@ -143,56 +272,9 @@ class UploadTask(AbstractTask):
self._log.error('%s: %s', err_msg, message)
raise RuntimeError(err_msg)
self._video_ctx.type = message.media
self._video_ctx.video_path = video.file_id
self._video_ctx.thumb = video.thumbs[0].file_id
self._media_ctx.type = message.media
self._media_ctx.filepath = video.file_id
self._media_ctx.thumb = video.thumbs[0].file_id
self._cached_message = message
db_cache_task_name = 'Save cache to DB'
create_task(
self._save_cache_to_db(video),
task_name=db_cache_task_name,
logger=self._log,
exception_message='Task "%s" raised an exception',
exception_message_args=(db_cache_task_name,),
)
def _create_video_context(self) -> VideoContext:
return VideoContext(
caption=self._generate_video_caption(),
file_name=self.filename,
video_path=self.filepath,
duration=self._body.duration or 0,
height=self._body.height or 0,
width=self._body.width or 0,
thumb=self.thumb_path,
type=MessageMediaType.VIDEO,
)
def _generate_video_caption(self) -> str:
caption_items = []
if self._users[0].is_base_user:
caption_conf = self._bot.conf.telegram.api.video_caption
else:
caption_conf = self._users[0].upload.video_caption
if caption_conf.include_title:
caption_items.append(f'{bold("Title:")} {self._body.title}')
if caption_conf.include_filename:
caption_items.append(f'{bold("Filename:")} {self._body.filename}')
if caption_conf.include_link:
caption_items.append(f'{bold("URL:")} {self._body.context.url}')
return '\n'.join(caption_items)
async def _save_cache_to_db(self, video: Video | Animation) -> None:
cache = CacheSchema(
cache_id=video.file_id,
cache_unique_id=video.file_unique_id,
file_size=video.file_size,
date_timestamp=video.date,
)
async for db in get_db():
await TaskRepository().save_file_cache(
cache=cache, task_id=self._body.task_id, db=db
)
self._create_cache_task(cache_object=video)

View file

@ -1 +1 @@
__version__ = '0.9'
__version__ = '1.0'

View file

@ -7,6 +7,8 @@ telegram:
allowed_users:
- id: 00000000000
send_startup_message: !!bool True
download_media_type: !!str "VIDEO"
save_to_storage: !!bool False
upload:
upload_video_file: !!bool False
upload_video_max_file_size: 2147483648

View file

@ -0,0 +1,28 @@
"""empty message
Revision ID: 10ab08fc321b
Revises: ba7716dca30a
Create Date: 2023-02-25 15:47:37.542906
"""
from alembic import op
# revision identifiers, used by Alembic.
revision = '10ab08fc321b'
down_revision = 'ba7716dca30a'
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index('ix_file_task_id', table_name='file')
op.create_index(op.f('ix_file_task_id'), 'file', ['task_id'], unique=False)
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(op.f('ix_file_task_id'), table_name='file')
op.create_index('ix_file_task_id', 'file', ['task_id'], unique=False)
# ### end Alembic commands ###

View file

@ -1,13 +1,13 @@
import logging
from aio_pika import IncomingMessage
from yt_shared.schemas.video import VideoPayload
from yt_shared.schemas.media import IncomingMediaPayload
from worker.core.payload_handler import PayloadHandler
class _RMQCallbacks:
"""RabbitMQ callbacks."""
"""RabbitMQ's callbacks."""
def __init__(self) -> None:
self._log = logging.getLogger(self.__class__.__name__)
@ -22,20 +22,22 @@ class _RMQCallbacks:
async def _process_incoming_message(self, message: IncomingMessage) -> None:
self._log.info('[x] Received message %s', message.body)
video_payload = self._deserialize_message(message)
if not video_payload:
media_payload = self._deserialize_message(message)
if not media_payload:
await self._reject_invalid_message(message)
return
await self._payload_handler.handle(video_payload=video_payload)
await self._payload_handler.handle(media_payload=media_payload)
await message.ack()
self._log.info('Processing done with payload: %s', video_payload)
self._log.info('Processing done with payload: %s', media_payload)
def _deserialize_message(self, message: IncomingMessage) -> VideoPayload | None:
def _deserialize_message(
self, message: IncomingMessage
) -> IncomingMediaPayload | None:
try:
return VideoPayload.parse_raw(message.body)
return IncomingMediaPayload.parse_raw(message.body)
except Exception:
self._log.exception('Failed to deserialize message body')
self._log.exception('Failed to deserialize message body: %s', message.body)
return None
async def _reject_invalid_message(self, message: IncomingMessage) -> None:

View file

@ -4,7 +4,6 @@ from yt_shared.config import Settings
class WorkerSettings(Settings):
APPLICATION_NAME: str
MAX_SIMULTANEOUS_DOWNLOADS: int
SAVE_VIDEO_FILE: bool
STORAGE_PATH: str
THUMBNAIL_FRAME_SECOND: float

View file

@ -6,102 +6,209 @@ from copy import deepcopy
from tempfile import TemporaryDirectory
import yt_dlp
from yt_shared.schemas.video import DownVideo
from yt_shared.utils.common import random_string
from worker.core.config import settings
from worker.utils import cli_to_api
from yt_shared.enums import DownMediaType
from yt_shared.schemas.media import Audio, DownMedia, Video
from yt_shared.utils.common import random_string
try:
from ytdl_opts.user import YTDL_OPTS
from ytdl_opts.user import (
DEFAULT_YTDL_OPTS,
AUDIO_YTDL_OPTS,
AUDIO_FORMAT_YTDL_OPTS,
VIDEO_YTDL_OPTS,
FINAL_AUDIO_FORMAT,
FINAL_THUMBNAIL_FORMAT,
)
except ImportError:
from ytdl_opts.default import YTDL_OPTS
from ytdl_opts.default import (
DEFAULT_YTDL_OPTS,
AUDIO_YTDL_OPTS,
AUDIO_FORMAT_YTDL_OPTS,
VIDEO_YTDL_OPTS,
FINAL_AUDIO_FORMAT,
FINAL_THUMBNAIL_FORMAT,
)
class VideoDownloader:
class MediaDownloader:
_PLAYLIST_TYPE = 'playlist'
_DESTINATION_TMP_DIR_NAME_LEN = 4
_KEEP_VIDEO_OPTION = '--keep-video'
def __init__(self) -> None:
self._log = logging.getLogger(self.__class__.__name__)
self._ytdl_opts = cli_to_api(YTDL_OPTS)
def download_video(self, url: str) -> DownVideo:
def download(self, url: str, media_type: DownMediaType) -> DownMedia:
try:
return self._download(url)
return self._download(url=url, media_type=media_type)
except Exception:
self._log.exception('Failed to download %s', url)
raise
def _download(self, url: str) -> DownVideo:
def _configure_ytdl_opts(
self, media_type: DownMediaType, curr_tmp_dir: str
) -> dict:
ytdl_opts = deepcopy(DEFAULT_YTDL_OPTS)
match media_type: # noqa: E999
case DownMediaType.AUDIO:
ytdl_opts.extend(AUDIO_YTDL_OPTS)
ytdl_opts.extend(AUDIO_FORMAT_YTDL_OPTS)
case DownMediaType.VIDEO:
ytdl_opts.extend(VIDEO_YTDL_OPTS)
case DownMediaType.AUDIO_VIDEO:
ytdl_opts.extend(AUDIO_YTDL_OPTS)
ytdl_opts.extend(VIDEO_YTDL_OPTS)
ytdl_opts.append(self._KEEP_VIDEO_OPTION)
ytdl_opts = cli_to_api(ytdl_opts)
ytdl_opts['outtmpl']['default'] = os.path.join(
curr_tmp_dir,
ytdl_opts['outtmpl']['default'],
)
return ytdl_opts
def _download(self, url: str, media_type: DownMediaType) -> DownMedia:
self._log.info('Downloading %s, media_type %s', url, media_type)
tmp_down_path = os.path.join(
settings.TMP_DOWNLOAD_ROOT_PATH, settings.TMP_DOWNLOAD_DIR
)
with TemporaryDirectory(prefix='tmp_video_dir-', dir=tmp_down_path) as tmp_dir:
with TemporaryDirectory(prefix='tmp_media_dir-', dir=tmp_down_path) as tmp_dir:
curr_tmp_dir = os.path.join(tmp_down_path, tmp_dir)
ytdl_opts = deepcopy(self._ytdl_opts)
ytdl_opts['outtmpl']['default'] = os.path.join(
curr_tmp_dir,
ytdl_opts['outtmpl']['default'],
ytdl_opts = self._configure_ytdl_opts(
media_type=media_type, curr_tmp_dir=curr_tmp_dir
)
with yt_dlp.YoutubeDL(ytdl_opts) as ytdl:
self._log.info('Downloading %s', url)
self._log.info('Downloading to %s', curr_tmp_dir)
self._log.debug('Downloading with options %s', ytdl_opts)
self._log.info('Downloading with options %s', ytdl_opts)
meta = ytdl.extract_info(url, download=True)
meta_sanitized = ytdl.sanitize_info(meta)
self._log.info('Finished downloading %s', url)
self._log.debug('Downloaded "%s" meta: %s', url, meta_sanitized)
self._log.info('Downloaded "%s" meta: %s', url, meta_sanitized)
self._log.info(
'Content of "%s": %s', curr_tmp_dir, os.listdir(curr_tmp_dir)
)
filename = self._get_filename(meta)
filepath = os.path.join(curr_tmp_dir, filename)
destination_dir = os.path.join(
os.path.join(
settings.TMP_DOWNLOAD_ROOT_PATH, settings.TMP_DOWNLOADED_DIR
),
random_string(number=self._DESTINATION_TMP_DIR_NAME_LEN),
)
self._log.info('Moving "%s" to "%s"', filepath, destination_dir)
os.mkdir(destination_dir)
shutil.move(filepath, destination_dir)
thumb_path: str | None = None
thumb_name = self._find_downloaded_thumbnail(curr_tmp_dir)
if thumb_name:
_thumb_path = os.path.join(curr_tmp_dir, thumb_name)
shutil.move(_thumb_path, destination_dir)
thumb_path = os.path.join(destination_dir, thumb_name)
audio, video = self._create_media_dtos(
media_type=media_type,
meta=meta,
curr_tmp_dir=curr_tmp_dir,
destination_dir=destination_dir,
)
self._log.info(
'Removing temporary download directory "%s" with leftover files %s',
curr_tmp_dir,
os.listdir(curr_tmp_dir),
)
return DownMedia(
media_type=media_type,
audio=audio,
video=video,
meta=meta_sanitized,
root_path=destination_dir,
)
def _create_media_dtos(
self,
media_type: DownMediaType,
meta: dict,
curr_tmp_dir: str,
destination_dir: str,
) -> tuple[Audio | None, Video | None]:
get_audio = lambda: self._create_audio_dto(
meta=meta,
curr_tmp_dir=curr_tmp_dir,
destination_dir=destination_dir,
)
get_video = lambda: self._create_video_dto(
meta=meta,
curr_tmp_dir=curr_tmp_dir,
destination_dir=destination_dir,
)
match media_type: # noqa: E999
case DownMediaType.AUDIO:
return get_audio(), None
case DownMediaType.VIDEO:
return None, get_video()
case DownMediaType.AUDIO_VIDEO:
return get_audio(), get_video()
case _:
raise RuntimeError(f'Unknown media type "{media_type}"')
def _create_video_dto(
self,
meta: dict,
curr_tmp_dir: str,
destination_dir: str,
) -> Video:
video_filename = self._get_video_filename(meta)
video_filepath = os.path.join(curr_tmp_dir, video_filename)
self._log.info('Moving "%s" to "%s"', video_filepath, destination_dir)
shutil.move(video_filepath, destination_dir)
thumb_path: str | None = None
thumb_name = self._find_downloaded_file(
root_path=curr_tmp_dir,
extension=FINAL_THUMBNAIL_FORMAT,
)
if thumb_name:
_thumb_path = os.path.join(curr_tmp_dir, thumb_name)
shutil.move(_thumb_path, destination_dir)
thumb_path = os.path.join(destination_dir, thumb_name)
duration, width, height = self._get_video_context(meta)
return DownVideo(
return Video(
title=meta['title'],
name=filename,
filename=video_filename,
duration=duration,
width=width,
height=height,
meta=meta_sanitized,
filepath=os.path.join(destination_dir, filename),
root_path=destination_dir,
filepath=os.path.join(destination_dir, video_filename),
thumb_path=thumb_path,
thumb_name=thumb_name,
)
def _find_downloaded_thumbnail(self, root_path: str) -> str | None:
"""Try to find downloaded thumbnail jpg."""
for file_name in glob.glob("*.jpg", root_dir=root_path):
self._log.info('Found downloaded thumbnail "%s"', file_name)
def _create_audio_dto(
self,
meta: dict,
curr_tmp_dir: str,
destination_dir: str,
) -> Audio:
audio_filename = self._find_downloaded_file(
root_path=curr_tmp_dir,
extension=FINAL_AUDIO_FORMAT,
)
audio_filepath = os.path.join(curr_tmp_dir, audio_filename)
self._log.info('Moving "%s" to "%s"', audio_filepath, destination_dir)
shutil.move(audio_filepath, destination_dir)
return Audio(
title=meta['title'],
filename=audio_filename,
duration=None,
filepath=os.path.join(destination_dir, audio_filename),
)
def _find_downloaded_file(self, root_path: str, extension: str) -> str | None:
"""Try to find downloaded audio or thumbnail file."""
for file_name in glob.glob(f"*.{extension}", root_dir=root_path):
self._log.info('Found downloaded %s "%s"', extension, file_name)
return file_name
self._log.info('Downloaded thumb not found in "%s"', root_path)
self._log.info('Downloaded %s not found in "%s"', extension, root_path)
return None
def _get_video_context(
@ -113,18 +220,37 @@ class VideoDownloader:
'Item said to be downloaded but no entries to process.'
)
entry: dict = meta['entries'][0]
requested_video: dict = entry['requested_downloads'][0]
requested_video = self._get_requested_video(entry['requested_downloads'])
return (
self._to_float(entry.get('duration')),
requested_video.get('width'),
requested_video.get('height'),
)
requested_video = self._get_requested_video(meta['requested_downloads'])
return (
self._to_float(meta.get('duration')),
meta['requested_downloads'][0].get('width'),
meta['requested_downloads'][0].get('height'),
requested_video.get('width'),
requested_video.get('height'),
)
def _get_requested_video(self, requested_downloads: list[dict]) -> dict | None:
for download_obj in requested_downloads:
if download_obj['ext'] != FINAL_AUDIO_FORMAT:
return download_obj
# When video was converted to audio but video kept.
for download_obj in requested_downloads:
if download_obj['ext'] != download_obj['_filename'].rsplit('.', 1)[-1]:
download_obj = download_obj.copy()
self._log.info(
'Replacing video path in meta "%s" with "%s"',
download_obj['filepath'],
download_obj['_filename'],
)
download_obj['filepath'] = download_obj['_filename']
return download_obj
return None
@staticmethod
def _to_float(duration: int | float | None) -> float | None:
try:
@ -132,10 +258,20 @@ class VideoDownloader:
except TypeError:
return duration
def _get_filename(self, meta: dict) -> str:
return self._get_filepath(meta).rsplit('/', maxsplit=1)[-1]
def _get_video_filename(self, meta: dict) -> str:
return self._get_video_filepath(meta).rsplit('/', maxsplit=1)[-1]
def _get_filepath(self, meta: dict) -> str:
def _get_video_filepath(self, meta: dict) -> str:
if meta['_type'] == self._PLAYLIST_TYPE:
return meta['entries'][0]['requested_downloads'][0]['filepath']
return meta['requested_downloads'][0]['filepath']
requested_downloads: list[dict] = meta['entries'][0]['requested_downloads']
requested_video = self._get_requested_video(requested_downloads)
else:
requested_downloads = meta['requested_downloads']
requested_video = self._get_requested_video(requested_downloads)
try:
return requested_video['filepath']
except (AttributeError, KeyError):
err_msg = 'Video filepath not found'
self._log.exception('%s, meta: %s', err_msg, meta)
raise ValueError(err_msg)

View file

@ -4,69 +4,101 @@ import os
import shutil
from sqlalchemy.ext.asyncio import AsyncSession
from yt_shared.enums import TaskStatus
from yt_shared.models import Task
from yt_shared.repositories.task import TaskRepository
from yt_shared.schemas.video import DownVideo, VideoPayload
from yt_shared.utils.file import remove_dir
from yt_shared.utils.tasks.tasks import create_task
from worker.core.config import settings
from worker.core.downloader import VideoDownloader
from worker.core.downloader import MediaDownloader
from worker.core.exceptions import DownloadVideoServiceError
from worker.core.tasks.ffprobe_context import GetFfprobeContextTask
from worker.core.tasks.thumbnail import MakeThumbnailTask
from yt_shared.enums import DownMediaType, TaskStatus
from yt_shared.models import Task
from yt_shared.repositories.task import TaskRepository
from yt_shared.schemas.media import Audio, DownMedia, IncomingMediaPayload, Video
from yt_shared.utils.file import remove_dir
from yt_shared.utils.tasks.tasks import create_task
class VideoService:
class MediaService:
def __init__(self) -> None:
self._log = logging.getLogger(self.__class__.__name__)
self._downloader = VideoDownloader()
self._downloader = MediaDownloader()
self._repository = TaskRepository()
async def process(
self, video_payload: VideoPayload, db: AsyncSession
) -> tuple[DownVideo | None, Task | None]:
task = await self._repository.get_or_create_task(db, video_payload)
self, media_payload: IncomingMediaPayload, db: AsyncSession
) -> tuple[DownMedia | None, Task | None]:
task = await self._repository.get_or_create_task(db, media_payload)
if task.status != TaskStatus.PENDING.value:
return None, None
return (
await self._process(video_payload=video_payload, task=task, db=db),
await self._process(media_payload=media_payload, task=task, db=db),
task,
)
async def _process(
self, video_payload: VideoPayload, task: Task, db: AsyncSession
) -> DownVideo:
self, media_payload: IncomingMediaPayload, task: Task, db: AsyncSession
) -> DownMedia:
await self._repository.save_as_processing(db, task)
downloaded_video = await self._start_download(task, video_payload, db)
media = await self._start_download(task, media_payload, db)
try:
await self._post_process_file(downloaded_video, task, db)
await self._post_process_media(media, task, media_payload, db)
except Exception:
self._log.exception('Failed to post-process file %s', downloaded_video)
self._err_file_cleanup(downloaded_video)
self._log.exception('Failed to post-process media %s', media)
self._err_file_cleanup(media)
raise
return downloaded_video
return media
async def _start_download(
self, task: Task, video_payload: VideoPayload, db: AsyncSession
) -> DownVideo:
self, task: Task, media_payload: IncomingMediaPayload, db: AsyncSession
) -> DownMedia:
try:
return await asyncio.get_running_loop().run_in_executor(
None, lambda: self._downloader.download_video(task.url)
None,
lambda: self._downloader.download(
task.url, media_type=media_payload.download_media_type
),
)
except Exception as err:
self._log.exception('Failed to download video. Context: %s', video_payload)
self._log.exception('Failed to download media. Context: %s', media_payload)
await self._handle_download_exception(err, task, db)
raise DownloadVideoServiceError(message=str(err), task=task)
async def _post_process_file(
async def _post_process_media(
self,
video: DownVideo,
media: DownMedia,
task: Task,
media_payload: IncomingMediaPayload,
db: AsyncSession,
) -> None:
post_process_audio = lambda: self._post_process_audio(
media=media,
media_payload=media_payload,
task=task,
db=db,
)
post_process_video = lambda: self._post_process_video(
media=media, media_payload=media_payload, task=task, db=db
)
match media.media_type: # noqa: E999
case DownMediaType.AUDIO:
await post_process_audio()
case DownMediaType.VIDEO:
await post_process_video()
case DownMediaType.AUDIO_VIDEO:
await asyncio.gather(*(post_process_audio(), post_process_video()))
await self._repository.save_as_done(db, task)
async def _post_process_video(
self,
media: DownMedia,
media_payload: IncomingMediaPayload,
task: Task,
db: AsyncSession,
) -> None:
"""Post-process downloaded media files, e.g. make thumbnail and copy to storage."""
video = media.video
# yt-dlp's 'info-meta' may not contain all needed video metadata.
if not all([video.duration, video.height, video.width]):
# TODO: Move to higher level and re-raise as DownloadVideoServiceError with task,
@ -78,7 +110,7 @@ class VideoService:
coro_tasks = []
if not video.thumb_path:
thumb_path = os.path.join(video.root_path, video.thumb_name)
thumb_path = os.path.join(media.root_path, video.thumb_name)
video.thumb_path = thumb_path
coro_tasks.append(
self._create_thumb_task(
@ -87,15 +119,30 @@ class VideoService:
duration=video.duration,
)
)
if settings.SAVE_VIDEO_FILE:
if media_payload.save_to_storage:
coro_tasks.append(self._create_copy_file_task(video))
await asyncio.gather(*coro_tasks)
final_coros = [self._repository.save_as_done(db, task, video)]
await asyncio.gather(*final_coros)
file = await self._repository.save_file(db, task, media.video, media.meta)
video.orm_file_id = file.id
async def _post_process_audio(
self,
media: DownMedia,
media_payload: IncomingMediaPayload,
task: Task,
db: AsyncSession,
) -> None:
coro_tasks = [self._repository.save_file(db, task, media.audio, media.meta)]
if media_payload.save_to_storage:
coro_tasks.append(self._create_copy_file_task(media.audio))
results = await asyncio.gather(*coro_tasks)
file = results[0]
media.audio.orm_file_id = file.id
@staticmethod
async def _set_probe_ctx(video: DownVideo) -> None:
async def _set_probe_ctx(video: Video) -> None:
probe_ctx = await GetFfprobeContextTask(video.filepath).run()
if not probe_ctx:
return
@ -106,10 +153,10 @@ class VideoService:
video.width = video_streams[0]['width']
video.height = video_streams[0]['height']
def _create_copy_file_task(self, video: DownVideo) -> asyncio.Task:
task_name = 'Copy file to storage task'
def _create_copy_file_task(self, file: Audio | Video) -> asyncio.Task:
task_name = f'Copy {file.file_type} file to storage task'
return create_task(
self._copy_file_to_storage(video),
self._copy_file_to_storage(file),
task_name=task_name,
logger=self._log,
exception_message='Task "%s" raised an exception',
@ -128,11 +175,11 @@ class VideoService:
)
@staticmethod
async def _copy_file_to_storage(video: DownVideo) -> None:
dst = os.path.join(settings.STORAGE_PATH, video.name)
await asyncio.to_thread(shutil.copy2, video.filepath, dst)
async def _copy_file_to_storage(file: Audio | Video) -> None:
dst = os.path.join(settings.STORAGE_PATH, file.filename)
await asyncio.to_thread(shutil.copy2, file.filepath, dst)
def _err_file_cleanup(self, video: DownVideo) -> None:
def _err_file_cleanup(self, video: DownMedia) -> None:
"""Cleanup any downloaded/created data if post-processing failed."""
self._log.info('Performing error cleanup. Removing %s', video.root_path)
remove_dir(video.root_path)

View file

@ -6,61 +6,53 @@ from yt_shared.db.session import get_db
from yt_shared.models import Task
from yt_shared.rabbit.publisher import Publisher
from yt_shared.schemas.error import ErrorDownloadPayload, ErrorGeneralPayload
from yt_shared.schemas.media import DownMedia, IncomingMediaPayload
from yt_shared.schemas.success import SuccessPayload
from yt_shared.schemas.video import DownVideo, VideoPayload
from worker.core.exceptions import DownloadVideoServiceError, GeneralVideoServiceError
from worker.core.video_service import VideoService
from worker.core.media_service import MediaService
class PayloadHandler:
def __init__(self) -> None:
self._log = logging.getLogger(self.__class__.__name__)
self._video_service = VideoService()
self._media_service = MediaService()
self._publisher = Publisher()
async def handle(self, video_payload: VideoPayload) -> None:
async def handle(self, media_payload: IncomingMediaPayload) -> None:
try:
await self._handle(video_payload)
await self._handle(media_payload)
except Exception as err:
await self._send_general_error(err, video_payload)
await self._send_general_error(err, media_payload)
async def _handle(self, video_payload: VideoPayload) -> None:
async def _handle(self, media_payload: IncomingMediaPayload) -> None:
async for session in get_db():
try:
video, task = await self._video_service.process(video_payload, session)
media, task = await self._media_service.process(media_payload, session)
except DownloadVideoServiceError as err:
await self._send_failed_video_download_task(err, video_payload)
await self._send_failed_video_download_task(err, media_payload)
return
if not video or not task:
if not media or not task:
err_msg = (
f'Video or task is None, cannot proceed: '
f'video - {video}, task - {task}'
f'Media or task is None, cannot proceed: '
f'media - {media}, task - {task}'
)
self._log.error(err_msg)
raise RuntimeError(err_msg)
await self._send_finished_task(task, video, video_payload)
await self._send_finished_task(task, media, media_payload)
async def _send_finished_task(
self, task: Task, video: DownVideo, video_payload: VideoPayload
self, task: Task, media: DownMedia, media_payload: IncomingMediaPayload
) -> None:
success_payload = SuccessPayload(
task_id=task.id,
title=video.title,
filename=video.name,
thumb_name=video.thumb_name,
filepath=video.filepath,
thumb_path=video.thumb_path,
root_path=video.root_path,
duration=video.duration,
width=video.width,
height=video.height,
media=media,
message_id=task.message_id,
from_chat_id=video_payload.from_chat_id,
from_chat_type=video_payload.from_chat_type,
from_chat_id=media_payload.from_chat_id,
from_chat_type=media_payload.from_chat_type,
from_user_id=task.from_user_id,
context=video_payload,
context=media_payload,
yt_dlp_version=ytdlp_version.__version__,
)
await self._publisher.send_download_finished(success_payload)
@ -68,18 +60,18 @@ class PayloadHandler:
async def _send_failed_video_download_task(
self,
err: DownloadVideoServiceError,
video_payload: VideoPayload,
media_payload: IncomingMediaPayload,
) -> None:
task = err.task
err_payload = ErrorDownloadPayload(
task_id=task.id,
message_id=task.message_id,
from_chat_id=video_payload.from_chat_id,
from_chat_type=video_payload.from_chat_type,
from_user_id=video_payload.from_user_id,
from_chat_id=media_payload.from_chat_id,
from_chat_type=media_payload.from_chat_type,
from_user_id=media_payload.from_user_id,
message='Download error',
url=video_payload.url,
context=video_payload,
url=media_payload.url,
context=media_payload,
yt_dlp_version=ytdlp_version.__version__,
exception_msg=str(err),
exception_type=err.__class__.__name__,
@ -89,18 +81,18 @@ class PayloadHandler:
async def _send_general_error(
self,
err: GeneralVideoServiceError | Exception,
video_payload: VideoPayload,
media_payload: IncomingMediaPayload,
) -> None:
task: Task | None = getattr(err, 'task', None)
err_payload = ErrorGeneralPayload(
task_id=task.id if task else 'N/A',
message_id=video_payload.message_id,
from_chat_id=video_payload.from_chat_id,
from_chat_type=video_payload.from_chat_type,
from_user_id=video_payload.from_user_id,
message_id=media_payload.message_id,
from_chat_id=media_payload.from_chat_id,
from_chat_type=media_payload.from_chat_type,
from_user_id=media_payload.from_user_id,
message='General worker error',
url=video_payload.url,
context=video_payload,
url=media_payload.url,
context=media_payload,
yt_dlp_version=ytdlp_version.__version__,
exception_msg=traceback.format_exc(),
exception_type=err.__class__.__name__,

View file

@ -1,16 +1,40 @@
# More here https://github.com/yt-dlp/yt-dlp/blob/master/yt_dlp/options.py or 'yt-dlp --help'
YTDL_OPTS = [
"""yt-dlp download CLI options.
Only CLI options are allowed to be stored as configuration. They are later converted to internal API options.
More here https://github.com/yt-dlp/yt-dlp/blob/master/yt_dlp/options.py or 'yt-dlp --help'
"""
FINAL_AUDIO_FORMAT = 'mp3'
FINAL_THUMBNAIL_FORMAT = 'jpg'
DEFAULT_YTDL_OPTS = [
'--output',
'%(title).200B.%(ext)s',
'--format',
'bestvideo[ext=mp4]+bestaudio[ext=m4a]/mp4',
'--no-playlist',
'--playlist-items',
'1:1',
'--write-thumbnail',
'--convert-thumbnails',
'jpg',
'--concurrent-fragments',
'5',
'--verbose',
]
AUDIO_YTDL_OPTS = [
'--extract-audio',
'--audio-quality',
'0',
'--audio-format',
FINAL_AUDIO_FORMAT,
]
AUDIO_FORMAT_YTDL_OPTS = [
'--format',
'bestaudio/best',
]
VIDEO_YTDL_OPTS = [
'--format',
'bestvideo[ext=mp4]+bestaudio[ext=m4a]/mp4',
'--write-thumbnail',
'--convert-thumbnails',
FINAL_THUMBNAIL_FORMAT,
]

View file

@ -1,5 +1,4 @@
APPLICATION_NAME=yt_worker
SAVE_VIDEO_FILE=False
MAX_SIMULTANEOUS_DOWNLOADS=2
STORAGE_PATH=/filestorage
THUMBNAIL_FRAME_SECOND=10.0

View file

@ -4,8 +4,8 @@ from typing import AsyncGenerator
import sqlalchemy as sa
from sqlalchemy import MetaData
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import declarative_base, declared_attr, sessionmaker
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from sqlalchemy.orm import declarative_base, declared_attr
from sqlalchemy_utils import UUIDType
from yt_shared.config import settings
@ -18,9 +18,8 @@ engine = create_async_engine(
metadata = MetaData()
metadata.bind = engine
AsyncSessionLocal = sessionmaker(
AsyncSessionLocal = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=settings.SQLALCHEMY_EXPIRE_ON_COMMIT,
)

View file

@ -1,34 +1,52 @@
from enum import Enum, auto, unique
from enum import Enum, unique
@unique
class ChoiceEnum(Enum):
class StrChoiceEnum(str, Enum):
@classmethod
def choices(cls) -> tuple[str, ...]:
return tuple(x.value for x in cls)
class TaskStatus(str, ChoiceEnum):
class TaskStatus(StrChoiceEnum):
PENDING = 'PENDING'
PROCESSING = 'PROCESSING'
FAILED = 'FAILED'
DONE = 'DONE'
class TaskSource(str, ChoiceEnum):
class TaskSource(StrChoiceEnum):
API = 'API'
BOT = 'BOT'
class RabbitPayloadType(ChoiceEnum):
DOWNLOAD_ERROR = auto()
GENERAL_ERROR = auto()
SUCCESS = auto()
class RabbitPayloadType(StrChoiceEnum):
DOWNLOAD_ERROR = 'DOWNLOAD_ERROR'
GENERAL_ERROR = 'GENERAL_ERROR'
SUCCESS = 'SUCCESS'
class TelegramChatType(ChoiceEnum):
class TelegramChatType(StrChoiceEnum):
PRIVATE = 'private'
BOT = 'bot'
GROUP = 'group'
SUPERGROUP = 'supergroup'
CHANNEL = 'channel'
class DownMediaType(StrChoiceEnum):
"""Media can be audio, video or both.
1. Only download/extract audio
2. Video with muxed audio
3. Both 1) and 2)
"""
AUDIO = 'AUDIO'
VIDEO = 'VIDEO'
AUDIO_VIDEO = 'AUDIO_VIDEO'
class MediaFileType(StrChoiceEnum):
AUDIO = 'AUDIO'
VIDEO = 'VIDEO'

View file

@ -27,10 +27,9 @@ class Task(Base, Timestamp):
nullable=False,
index=True,
)
file = relationship(
files = relationship(
'File',
backref='task',
uselist=False,
cascade='all, delete-orphan',
)
added_at = sa.Column(sa.DateTime, nullable=False)
@ -66,7 +65,7 @@ class File(Base, Timestamp):
UUIDType(binary=False),
sa.ForeignKey('task.id', ondelete='CASCADE'),
nullable=False,
unique=True,
unique=False,
index=True,
)
cache = relationship(

View file

@ -14,8 +14,8 @@ from yt_shared.rabbit.rabbit_config import (
SUCCESS_QUEUE,
)
from yt_shared.schemas.error import ErrorDownloadPayload, ErrorGeneralPayload
from yt_shared.schemas.media import IncomingMediaPayload
from yt_shared.schemas.success import SuccessPayload
from yt_shared.schemas.video import VideoPayload
from yt_shared.utils.common import Singleton
@ -28,8 +28,8 @@ class Publisher(metaclass=Singleton):
def _is_sent(confirm: ConfirmationFrameType | None) -> bool:
return isinstance(confirm, Basic.Ack)
async def send_for_download(self, video_payload: VideoPayload) -> bool:
message = aio_pika.Message(body=video_payload.json().encode())
async def send_for_download(self, media_payload: IncomingMediaPayload) -> bool:
message = aio_pika.Message(body=media_payload.json().encode())
exchange = self._rabbit_mq.exchanges[INPUT_EXCHANGE]
confirm = await exchange.publish(
message, routing_key=INPUT_QUEUE, mandatory=True

View file

@ -7,7 +7,8 @@ from sqlalchemy.ext.asyncio import AsyncSession
from yt_shared.enums import TaskStatus
from yt_shared.models import Cache, File, Task
from yt_shared.schemas.cache import CacheSchema
from yt_shared.schemas.video import DownVideo, VideoPayload
from yt_shared.schemas.media import Audio, IncomingMediaPayload, Video
from yt_shared.utils.common import async_lock
class TaskRepository:
@ -15,27 +16,29 @@ class TaskRepository:
self._log = logging.getLogger(self.__class__.__name__)
async def get_or_create_task(
self, db: AsyncSession, video_payload: VideoPayload
self, db: AsyncSession, media_payload: IncomingMediaPayload
) -> Task:
if video_payload.id is None:
return await self._create_task(db, video_payload)
if media_payload.id is None:
return await self._create_task(db, media_payload)
stmt = select(Task).filter_by(id=video_payload.id)
stmt = select(Task).filter_by(id=media_payload.id)
task = await db.execute(stmt)
try:
return task.scalar_one()
except NoResultFound:
return await self._create_task(db, video_payload)
return await self._create_task(db, media_payload)
@staticmethod
async def _create_task(db: AsyncSession, video_payload: VideoPayload) -> Task:
async def _create_task(
db: AsyncSession, media_payload: IncomingMediaPayload
) -> Task:
task = Task(
id=video_payload.id,
url=video_payload.url,
source=video_payload.source,
from_user_id=video_payload.from_user_id,
message_id=video_payload.message_id,
added_at=video_payload.added_at,
id=media_payload.id,
url=media_payload.url,
source=media_payload.source,
from_user_id=media_payload.from_user_id,
message_id=media_payload.message_id,
added_at=media_payload.added_at,
)
db.add(task)
await db.commit()
@ -43,31 +46,42 @@ class TaskRepository:
@staticmethod
async def save_file_cache(
db: AsyncSession, task_id: str | UUID, cache: CacheSchema
db: AsyncSession, file_id: str | UUID, cache: CacheSchema
) -> None:
stmt = insert(Cache).values(
cache_id=cache.cache_id,
cache_unique_id=cache.cache_unique_id,
file_size=cache.file_size,
date_timestamp=cache.date_timestamp,
file_id=(select(File.id).filter_by(task_id=task_id).scalar_subquery()),
file_id=file_id,
)
await db.execute(stmt)
await db.commit()
@staticmethod
async def save_as_done(
db: AsyncSession, task: Task, downloaded_video: DownVideo
) -> None:
task.file = File(
title=downloaded_video.title,
name=downloaded_video.name,
thumb_name=downloaded_video.thumb_name,
duration=downloaded_video.duration,
width=downloaded_video.width,
height=downloaded_video.height,
meta=downloaded_video.meta,
async def save_file(
db: AsyncSession, task: Task, media: Audio | Video, meta: dict
) -> File:
file = File(
title=media.title,
name=media.filename,
duration=media.duration,
meta=meta,
task_id=task.id,
)
# TODO: Rework this.
if isinstance(media, Video):
file.width = media.width
file.height = media.height
file.thumb_name = media.thumb_name
db.add(file)
async with async_lock:
await db.flush([file])
return file
@staticmethod
async def save_as_done(db: AsyncSession, task: Task) -> None:
task.status = TaskStatus.DONE
await db.commit()

View file

@ -5,7 +5,7 @@ from pydantic import StrictInt, StrictStr
from yt_shared.enums import RabbitPayloadType, TelegramChatType
from yt_shared.schemas.base import BaseRabbitPayloadModel
from yt_shared.schemas.video import VideoPayload
from yt_shared.schemas.media import IncomingMediaPayload
class ErrorGeneralPayload(BaseRabbitPayloadModel):
@ -19,7 +19,7 @@ class ErrorGeneralPayload(BaseRabbitPayloadModel):
message_id: StrictInt | None
message: StrictStr
url: StrictStr
context: VideoPayload
context: IncomingMediaPayload
exception_msg: StrictStr
exception_type: StrictStr
yt_dlp_version: StrictStr | None

View file

@ -0,0 +1,77 @@
import uuid
from datetime import datetime, timezone
from typing import Literal
from pydantic import (
Field,
StrictBool,
StrictFloat,
StrictInt,
StrictStr,
root_validator,
)
from yt_shared.enums import DownMediaType, MediaFileType, TaskSource, TelegramChatType
from yt_shared.schemas.base import RealBaseModel
class IncomingMediaPayload(RealBaseModel):
id: uuid.UUID | None
from_chat_id: StrictInt | None
from_chat_type: TelegramChatType | None
from_user_id: StrictInt | None
message_id: StrictInt | None
url: StrictStr
source: TaskSource
save_to_storage: StrictBool
download_media_type: DownMediaType
added_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
class Audio(RealBaseModel):
file_type: Literal[MediaFileType.AUDIO] = MediaFileType.AUDIO
title: StrictStr
filename: StrictStr
filepath: StrictStr
duration: StrictFloat | None = None
orm_file_id: uuid.UUID | None = None
class Video(RealBaseModel):
file_type: Literal[MediaFileType.VIDEO] = MediaFileType.VIDEO
title: StrictStr
filename: StrictStr
filepath: StrictStr
duration: StrictFloat | None = None
thumb_name: StrictStr | None = None
width: int | None = None
height: int | None = None
thumb_path: StrictStr | None = None
orm_file_id: uuid.UUID | None = None
@root_validator(pre=False)
def _set_fields(cls, values: dict) -> dict:
if not values['thumb_name']:
values['thumb_name'] = f'{values["name"]}-thumb.jpg'
return values
class DownMedia(RealBaseModel):
"""Downloaded media (audio, video with muxed audio or both) object context."""
audio: Audio | None
video: Video | None
media_type: DownMediaType
root_path: StrictStr
meta: dict
@root_validator(pre=True)
def _validate(cls, values: dict) -> dict:
if values['audio'] is None and values['video'] is None:
raise ValueError('Provide audio, video or both.')
return values
def get_media_objects(self) -> tuple[Audio, Video]:
return tuple(filter(None, (self.audio, self.video)))

View file

@ -2,11 +2,10 @@ import uuid
from typing import ClassVar
from pydantic import StrictInt, StrictStr
from pydantic.types import StrictFloat
from yt_shared.enums import RabbitPayloadType, TelegramChatType
from yt_shared.schemas.base import BaseRabbitPayloadModel
from yt_shared.schemas.video import VideoPayload
from yt_shared.schemas.media import DownMedia, IncomingMediaPayload
class SuccessPayload(BaseRabbitPayloadModel):
@ -18,14 +17,6 @@ class SuccessPayload(BaseRabbitPayloadModel):
from_chat_type: TelegramChatType | None
from_user_id: StrictInt | None
message_id: StrictInt | None
title: StrictStr
filename: StrictStr
thumb_name: StrictStr
filepath: StrictStr
thumb_path: StrictStr | None = None
root_path: StrictStr
duration: StrictFloat | None
width: StrictInt | None
height: StrictInt | None
context: VideoPayload
media: DownMedia
context: IncomingMediaPayload
yt_dlp_version: StrictStr | None

View file

@ -1,6 +1,6 @@
from pydantic import StrictInt, StrictStr
from pydantic import StrictBool, StrictInt, StrictStr
from yt_shared.enums import TelegramChatType
from yt_shared.enums import DownMediaType, TelegramChatType
from yt_shared.schemas.base import RealBaseModel
@ -10,3 +10,5 @@ class URL(RealBaseModel):
from_chat_type: TelegramChatType
from_user_id: StrictInt
message_id: StrictInt
save_to_storage: StrictBool
download_media_type: DownMediaType

View file

@ -1,39 +0,0 @@
import uuid
from datetime import datetime, timezone
from pydantic import Field, StrictFloat, StrictInt, StrictStr, root_validator
from yt_shared.enums import TaskSource, TelegramChatType
from yt_shared.schemas.base import RealBaseModel
class VideoPayload(RealBaseModel):
id: uuid.UUID | None
from_chat_id: StrictInt | None
from_chat_type: TelegramChatType | None
from_user_id: StrictInt | None
message_id: StrictInt | None
url: StrictStr
source: TaskSource
added_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
class DownVideo(RealBaseModel):
"""Downloaded video object context."""
title: StrictStr
name: StrictStr
thumb_name: StrictStr | None = None
duration: StrictFloat | None = None
width: int | None = None
height: int | None = None
meta: dict
filepath: StrictStr
thumb_path: StrictStr | None = None
root_path: StrictStr
@root_validator(pre=False)
def _set_fields(cls, values: dict) -> dict:
if not values['thumb_name']:
values['thumb_name'] = f'{values["name"]}-thumb.jpg'
return values

View file

@ -39,3 +39,6 @@ def wrap(func):
def random_string(number: int) -> str:
return ''.join(random.choice(ascii_lowercase) for _ in range(number))
async_lock = asyncio.Lock()