From 3513a0077a327dc7d91039e5dead21bc5e01a5d4 Mon Sep 17 00:00:00 2001 From: Taras Terletskyi <888784+tropicoo@users.noreply.github.com> Date: Sat, 25 Feb 2023 19:25:39 +0200 Subject: [PATCH] Version 1.0. Details in /.releases/release_1.0.md --- .env | 2 +- .gitignore | 6 +- .releases/release_1.0.md | 24 ++ README.md | 8 +- app_api/api/api/api_v1/schemas/task.py | 10 +- app_api/api/core/repository.py | 9 +- app_api/api/core/services/task.py | 11 +- app_bot/bot/core/callbacks.py | 9 +- app_bot/bot/core/config/schema.py | 3 + app_bot/bot/core/handlers/success.py | 73 ++--- app_bot/bot/core/service.py | 9 +- app_bot/bot/core/tasks/upload.py | 262 ++++++++++++------ app_bot/bot/version.py | 2 +- app_bot/config-example.yml | 2 + app_worker/alembic/versions/10ab08fc321b_.py | 28 ++ app_worker/worker/core/callbacks.py | 20 +- app_worker/worker/core/config.py | 1 - app_worker/worker/core/downloader.py | 226 ++++++++++++--- .../{video_service.py => media_service.py} | 123 +++++--- app_worker/worker/core/payload_handler.py | 70 +++-- app_worker/ytdl_opts/default.py | 38 ++- envs/.env_worker | 1 - yt_shared/yt_shared/db/session.py | 7 +- yt_shared/yt_shared/enums.py | 36 ++- yt_shared/yt_shared/models/task.py | 5 +- yt_shared/yt_shared/rabbit/publisher.py | 6 +- yt_shared/yt_shared/repositories/task.py | 66 +++-- yt_shared/yt_shared/schemas/error.py | 4 +- yt_shared/yt_shared/schemas/media.py | 77 +++++ yt_shared/yt_shared/schemas/success.py | 15 +- yt_shared/yt_shared/schemas/url.py | 6 +- yt_shared/yt_shared/schemas/video.py | 39 --- yt_shared/yt_shared/utils/common.py | 3 + 33 files changed, 813 insertions(+), 388 deletions(-) create mode 100644 .releases/release_1.0.md create mode 100644 app_worker/alembic/versions/10ab08fc321b_.py rename app_worker/worker/core/{video_service.py => media_service.py} (52%) create mode 100644 yt_shared/yt_shared/schemas/media.py delete mode 100644 yt_shared/yt_shared/schemas/video.py diff --git a/.env b/.env index b50fe24..cee8834 100644 --- a/.env +++ b/.env @@ -1 +1 @@ -COMPOSE_PROJECT_NAME=YT +COMPOSE_PROJECT_NAME=yt diff --git a/.gitignore b/.gitignore index 13f5823..773de65 100644 --- a/.gitignore +++ b/.gitignore @@ -1,9 +1,11 @@ # Byte-compiled / optimized / DLL files +*tmp* *.session *.session-journal -config.yml -config.yaml +config*.yml +config*.yaml +!app_bot/config-example.yml .ruff_cache __pycache__/ diff --git a/.releases/release_1.0.md b/.releases/release_1.0.md new file mode 100644 index 0000000..f153cbc --- /dev/null +++ b/.releases/release_1.0.md @@ -0,0 +1,24 @@ +# Release info + +Version: 1.0 + +Release date: February 25, 2023 + +# Important + +1. Changed content yt-dlp options in `app_worker/DEFAULT_YTDL_OPTS/default.py` +2. Added two new user config options in `app_bot/config-example.yml`: + 1. `download_media_type`: What do download - audio (mp3), video or both. Values can + be `AUDIO`, `VIDEO`, `AUDIO_VIDEO`. + 2. `save_to_storage`: Moved from `envs/.env_worker` +3. Creating task on API now requires previously mentioned two fields in payload to be + sent. + +# New features + +1. Now bot can download audio (mp3), video (default), or both. Just configure the + preferred mode for the particular user/group. + +# Misc + +N/A diff --git a/README.md b/README.md index c3c1721..ac976b8 100644 --- a/README.md +++ b/README.md @@ -2,15 +2,15 @@ Simple and reliable YouTube Download Telegram Bot. -Version: 0.9. [Release details](.releases/release_0.9.md). +Version: 1.0. [Release details](.releases/release_1.0.md). ![frames](.assets/download_success.png) ## 😂 Features -* Download videos from any [yt-dlp](https://github.com/yt-dlp/yt-dlp) supported website +* Download audio and videos from any [yt-dlp](https://github.com/yt-dlp/yt-dlp) supported website to your storage -* Upload downloaded videos to the Telegram chat +* Upload downloaded audio and videos to the Telegram chat * Trigger video download by sending link to an API * Track download tasks via API @@ -26,7 +26,7 @@ Version: 0.9. [Release details](.releases/release_0.9.md). and change or remove `forward_group_id` value (if you want to forward the video to some group when upload is enabled) 7. Check the default environment variables in `envs/.env_common` and change if needed -8. Video storage path (`STORAGE_PATH` environment variable) is located in +8. Video `STORAGE_PATH` environment variable is located in the `envs/.env_worker` file. By default, it's `/filestorage` path inside the container. What you want is to map the real path to this inside the `docker-compose.yml` file for `worker` service, e.g. if you're on Windows, next diff --git a/app_api/api/api/api_v1/schemas/task.py b/app_api/api/api/api_v1/schemas/task.py index 6b4e066..c618f3b 100644 --- a/app_api/api/api/api_v1/schemas/task.py +++ b/app_api/api/api/api_v1/schemas/task.py @@ -1,8 +1,8 @@ import uuid from datetime import datetime -from pydantic import StrictFloat, StrictInt, StrictStr -from yt_shared.enums import TaskSource, TaskStatus +from pydantic import StrictBool, StrictFloat, StrictInt, StrictStr +from yt_shared.enums import DownMediaType, TaskSource, TaskStatus from yt_shared.schemas.base import RealBaseModel from api.api.api_v1.schemas.base import BaseOrmModel @@ -47,15 +47,17 @@ class TaskSimpleSchema(BaseOrmModel): message_id: StrictInt | None yt_dlp_version: StrictStr | None error: StrictStr | None - file: FileSimpleSchema | None + files: list[FileSimpleSchema] class TaskSchema(TaskSimpleSchema): - file: FileSchema | None + files: list[FileSchema] class CreateTaskIn(RealBaseModel): url: StrictStr + download_media_type: DownMediaType + save_to_storage: StrictBool class CreateTaskOut(RealBaseModel): diff --git a/app_api/api/core/repository.py b/app_api/api/core/repository.py index 6a04192..acebb65 100644 --- a/app_api/api/core/repository.py +++ b/app_api/api/core/repository.py @@ -24,7 +24,7 @@ class DatabaseRepository: stmt = ( select(Task) .options( - joinedload(Task.file) + joinedload(Task.files) .load_only(*self._get_load_file_cols(include_meta)) .options(joinedload(File.cache)) ) @@ -37,6 +37,7 @@ class DatabaseRepository: stmt = stmt.filter(Task.status.in_(status)) tasks = await self._db.execute(stmt) + tasks.unique() return tasks.scalars().all() @staticmethod @@ -60,13 +61,14 @@ class DatabaseRepository: stmt = ( select(Task) .options( - joinedload(Task.file) + joinedload(Task.files) .load_only(*self._get_load_file_cols(include_meta)) .options(joinedload(File.cache)) ) .filter(Task.id == id) ) task = await self._db.execute(stmt) + task.unique() return task.scalar_one() async def get_latest_task(self, include_meta: bool) -> Task: @@ -74,13 +76,14 @@ class DatabaseRepository: select(Task) .order_by(desc(Task.created)) .options( - joinedload(Task.file) + joinedload(Task.files) .load_only(*self._get_load_file_cols(include_meta)) .options(joinedload(File.cache)) ) .limit(1) ) task = await self._db.execute(stmt) + task.unique() return task.scalar_one() async def delete_task(self, id: str | uuid.UUID) -> None: diff --git a/app_api/api/core/services/task.py b/app_api/api/core/services/task.py index a9beedc..b108a33 100644 --- a/app_api/api/core/services/task.py +++ b/app_api/api/core/services/task.py @@ -6,7 +6,7 @@ from sqlalchemy.ext.asyncio import AsyncSession from yt_shared.enums import TaskSource, TaskStatus from yt_shared.models import Task from yt_shared.rabbit.publisher import Publisher -from yt_shared.schemas.video import VideoPayload +from yt_shared.schemas.media import IncomingMediaPayload from api.api.api_v1.schemas.task import ( CreateTaskIn, @@ -66,8 +66,13 @@ class TaskService: task_id = uuid.uuid4() source = TaskSource.API added_at = datetime.now(timezone.utc) - payload = VideoPayload( - id=task_id, url=task.url, added_at=added_at, source=source + payload = IncomingMediaPayload( + id=task_id, + url=task.url, + added_at=added_at, + source=source, + download_media_type=task.download_media_type, + save_to_storage=task.save_to_storage, ) if not await publisher.send_for_download(payload): raise TaskServiceError('Failed to create task') diff --git a/app_bot/bot/core/callbacks.py b/app_bot/bot/core/callbacks.py index 5041e7b..002752b 100644 --- a/app_bot/bot/core/callbacks.py +++ b/app_bot/bot/core/callbacks.py @@ -30,7 +30,7 @@ class TelegramCallback: async def on_message(self, client: VideoBot, message: Message) -> None: """Receive video URL and send to the download worker.""" self._log.debug(message) - urls = self._get_urls(message) + urls = self._parse_urls(message) await self._url_service.process_urls(urls=urls) await self._send_acknowledge_message(message=message, urls=urls) @@ -48,8 +48,9 @@ class TelegramCallback: reply_to_message_id=message.id, ) - @staticmethod - def _get_urls(message: Message) -> list[URL]: + def _parse_urls(self, message: Message) -> list[URL]: + bot: VideoBot = message._client # noqa + user = bot.allowed_users[message.from_user.id] return [ URL( url=url, @@ -57,6 +58,8 @@ class TelegramCallback: from_chat_type=TelegramChatType(message.chat.type.value), from_user_id=message.from_user.id, message_id=message.id, + save_to_storage=user.save_to_storage, + download_media_type=user.download_media_type, ) for url in message.text.splitlines() ] diff --git a/app_bot/bot/core/config/schema.py b/app_bot/bot/core/config/schema.py index 425f229..2736024 100644 --- a/app_bot/bot/core/config/schema.py +++ b/app_bot/bot/core/config/schema.py @@ -5,6 +5,7 @@ from pydantic import ( constr, validator, ) +from yt_shared.enums import DownMediaType from yt_shared.schemas.base import RealBaseModel _LANG_CODE_LEN = 2 @@ -36,6 +37,8 @@ class UserUploadSchema(RealBaseModel): class UserSchema(BaseUserSchema): send_startup_message: StrictBool + download_media_type: DownMediaType + save_to_storage: StrictBool upload: UserUploadSchema @property diff --git a/app_bot/bot/core/handlers/success.py b/app_bot/bot/core/handlers/success.py index 3232d19..4f3ee5d 100644 --- a/app_bot/bot/core/handlers/success.py +++ b/app_bot/bot/core/handlers/success.py @@ -4,19 +4,24 @@ import traceback from pyrogram.enums import ParseMode from yt_shared.emoji import SUCCESS_EMOJI -from yt_shared.enums import TaskSource +from yt_shared.enums import MediaFileType, TaskSource from yt_shared.rabbit.publisher import Publisher from yt_shared.schemas.error import ErrorGeneralPayload +from yt_shared.schemas.media import Audio, Video from yt_shared.schemas.success import SuccessPayload from yt_shared.utils.file import remove_dir from yt_shared.utils.tasks.tasks import create_task from bot.core.handlers.abstract import AbstractHandler -from bot.core.tasks.upload import UploadTask +from bot.core.tasks.upload import AudioUploadTask, VideoUploadTask class SuccessHandler(AbstractHandler): _body: SuccessPayload + _UPLOAD_TASK_MAP = { + MediaFileType.AUDIO: AudioUploadTask, + MediaFileType.VIDEO: VideoUploadTask, + } def __init__(self, *args, **kwargs) -> None: super().__init__(*args, **kwargs) @@ -24,9 +29,12 @@ class SuccessHandler(AbstractHandler): async def handle(self) -> None: try: - await self._handle() - except Exception as err: - await self._publish_error_message(err) + coro_tasks = [] + for media_object in self._body.media.get_media_objects(): + coro_tasks.append(self._handle(media_object)) + await asyncio.gather(*coro_tasks) + finally: + self._cleanup() async def _publish_error_message(self, err: Exception) -> None: err_payload = ErrorGeneralPayload( @@ -44,44 +52,43 @@ class SuccessHandler(AbstractHandler): ) await self._publisher.send_download_error(err_payload) - async def _handle(self) -> None: - await self._send_success_text() + async def _handle(self, media_object: Audio | Video) -> None: try: + await self._send_success_text(media_object) if self._upload_is_enabled(): - self._validate_file_size_for_upload() - await self._create_upload_task() + self._validate_file_size_for_upload(media_object) + await self._create_upload_task(media_object) else: self._log.warning( 'File %s will not be uploaded due to upload configuration', - self._body.filepath, + media_object.filepath, ) - except Exception: - self._log.exception( - 'Upload of "%s" failed, performing cleanup', self._body.filepath - ) - raise - finally: - self._cleanup() + except Exception as err: + self._log.exception('Upload of "%s" failed', media_object.filepath) + await self._publish_error_message(err) def _cleanup(self) -> None: + root_path = self._body.media.root_path self._log.info( 'Final task "%s" cleanup. Removing download content directory "%s" with files %s', self._body.task_id, - self._body.root_path, - os.listdir(self._body.root_path), + root_path, + os.listdir(root_path), ) - remove_dir(self._body.root_path) + remove_dir(root_path) - async def _create_upload_task(self) -> None: + async def _create_upload_task(self, media_object: Audio | Video) -> None: """Upload video to Telegram chat.""" semaphore = asyncio.Semaphore(value=self._bot.conf.telegram.max_upload_tasks) - task_name = UploadTask.__class__.__name__ + upload_task_cls = self._UPLOAD_TASK_MAP[media_object.file_type] + task_name = upload_task_cls.__class__.__name__ await create_task( - UploadTask( - body=self._body, + upload_task_cls( + media_object=media_object, users=self._receiving_users, bot=self._bot, semaphore=semaphore, + context=self._body, ).run(), task_name=task_name, logger=self._log, @@ -89,8 +96,8 @@ class SuccessHandler(AbstractHandler): exception_message_args=(task_name,), ) - async def _send_success_text(self) -> None: - text = f'{SUCCESS_EMOJI} Downloaded {self._body.filename}' + async def _send_success_text(self, media_object: Audio | Video) -> None: + text = f'{SUCCESS_EMOJI} Downloaded {media_object.filename}' for user in self._receiving_users: kwargs = { 'chat_id': user.id, @@ -109,21 +116,23 @@ class SuccessHandler(AbstractHandler): user = self._bot.allowed_users[self._get_sender_id()] return user.upload.upload_video_file - def _validate_file_size_for_upload(self) -> None: + def _validate_file_size_for_upload(self, media_object: Audio | Video) -> None: if self._body.context.source is TaskSource.API: max_file_size = self._bot.conf.telegram.api.upload_video_max_file_size else: user = self._bot.allowed_users[self._get_sender_id()] max_file_size = user.upload.upload_video_max_file_size - if not os.path.exists(self._body.filepath): - raise ValueError(f'Video {self._body.filepath} not found') + if not os.path.exists(media_object.filepath): + raise ValueError( + f'{media_object.file_type} {media_object.filepath} not found' + ) - file_size = os.stat(self._body.filepath).st_size + file_size = os.stat(media_object.filepath).st_size if file_size > max_file_size: err_msg = ( - f'Video file size {file_size} bytes bigger then allowed {max_file_size}' - f' bytes. Will not upload' + f'{media_object.file_type} file size {file_size} bytes bigger than ' + f'allowed {max_file_size} bytes. Will not upload' ) self._log.warning(err_msg) raise ValueError(err_msg) diff --git a/app_bot/bot/core/service.py b/app_bot/bot/core/service.py index 5140af8..327bba8 100644 --- a/app_bot/bot/core/service.py +++ b/app_bot/bot/core/service.py @@ -2,8 +2,8 @@ import logging from yt_shared.enums import TaskSource from yt_shared.rabbit.publisher import Publisher +from yt_shared.schemas.media import IncomingMediaPayload from yt_shared.schemas.url import URL -from yt_shared.schemas.video import VideoPayload class URLService: @@ -11,21 +11,20 @@ class URLService: self._log = logging.getLogger(self.__class__.__name__) self._publisher = Publisher() - async def process_url(self, url: URL) -> bool: - return await self._send_to_worker(url) - async def process_urls(self, urls: list[URL]) -> None: for url in urls: await self._send_to_worker(url) async def _send_to_worker(self, url: URL) -> bool: - payload = VideoPayload( + payload = IncomingMediaPayload( url=url.url, message_id=url.message_id, from_user_id=url.from_user_id, from_chat_id=url.from_chat_id, from_chat_type=url.from_chat_type, source=TaskSource.BOT, + save_to_storage=url.save_to_storage, + download_media_type=url.download_media_type, ) is_sent = await self._publisher.send_for_download(payload) if not is_sent: diff --git a/app_bot/bot/core/tasks/upload.py b/app_bot/bot/core/tasks/upload.py index 4283304..4abffe7 100644 --- a/app_bot/bot/core/tasks/upload.py +++ b/app_bot/bot/core/tasks/upload.py @@ -1,15 +1,19 @@ +import abc import asyncio from itertools import chain from typing import TYPE_CHECKING, Coroutine from pydantic import StrictFloat, StrictInt, StrictStr from pyrogram.enums import ChatAction, MessageMediaType, ParseMode -from pyrogram.types import Animation, Message, Video +from pyrogram.types import Animation, Message +from pyrogram.types import Audio as _Audio +from pyrogram.types import Video as _Video from tenacity import retry, stop_after_attempt, wait_fixed from yt_shared.db.session import get_db from yt_shared.repositories.task import TaskRepository from yt_shared.schemas.base import RealBaseModel from yt_shared.schemas.cache import CacheSchema +from yt_shared.schemas.media import Audio, Video from yt_shared.schemas.success import SuccessPayload from yt_shared.utils.tasks.abstract import AbstractTask from yt_shared.utils.tasks.tasks import create_task @@ -22,55 +26,64 @@ if TYPE_CHECKING: from bot.core.bot import VideoBot -class VideoContext(RealBaseModel): +class AbstractUploadContext(RealBaseModel): caption: StrictStr - file_name: StrictStr - video_path: StrictStr + filename: StrictStr + filepath: StrictStr duration: StrictFloat - height: StrictInt - width: StrictInt - thumb: StrictStr type: MessageMediaType -class UploadTask(AbstractTask): +class VideoUploadContext(AbstractUploadContext): + height: StrictInt + width: StrictInt + thumb: StrictStr + + +class AudioUploadContext(AbstractUploadContext): + pass + + +class AbstractUploadTask(AbstractTask, metaclass=abc.ABCMeta): + _UPLOAD_ACTION: ChatAction + def __init__( self, - body: SuccessPayload, + media_object: Audio | Video, users: list[BaseUserSchema | UserSchema], bot: 'VideoBot', semaphore: asyncio.Semaphore, + context: SuccessPayload, ) -> None: super().__init__() self._config = get_main_config() - self._body = body - self.filename = body.filename - self.filepath = body.filepath - self.thumb_path = body.thumb_path + self._media_object = media_object + self._filename = media_object.filename + self._filepath = media_object.filepath self._bot = bot self._users = users self._semaphore = semaphore + self._ctx = context + self._media_ctx = self._create_media_context() self._upload_chat_ids, self._forward_chat_ids = self._get_upload_chat_ids() - - self._video_ctx = self._create_video_context() self._cached_message: Message | None = None async def run(self) -> None: async with self._semaphore: - self._log.debug('Semaphore for "%s" acquired', self.filename) + self._log.debug('Semaphore for "%s" acquired', self._filename) await self._run() - self._log.debug('Semaphore for "%s" released', self.filename) + self._log.debug('Semaphore for "%s" released', self._filename) async def _run(self) -> None: try: - await asyncio.gather(*(self._send_upload_text(), self._upload_video_file())) + await asyncio.gather(*(self._send_upload_text(), self._upload_file())) except Exception: - self._log.exception('Exception in upload task for "%s"', self.filename) + self._log.exception('Exception in upload task for "%s"', self._filename) raise async def _send_upload_text(self) -> None: - text = f'⬆️ {bold("Uploading")} {self.filename}' + text = f'⬆️ {bold("Uploading")} {self._filename}' coros = [] for chat_id in self._upload_chat_ids: kwargs = { @@ -78,8 +91,8 @@ class UploadTask(AbstractTask): 'text': text, 'parse_mode': ParseMode.HTML, } - if self._body.message_id: - kwargs['reply_to_message_id'] = self._body.message_id + if self._ctx.message_id: + kwargs['reply_to_message_id'] = self._ctx.message_id coros.append(self._bot.send_message(**kwargs)) await asyncio.gather(*coros) @@ -93,16 +106,29 @@ class UploadTask(AbstractTask): forward_chat_ids.append(user.upload.forward_group_id) return chat_ids, forward_chat_ids - async def _upload_video_file(self) -> None: + @retry(wait=wait_fixed(3), stop=stop_after_attempt(3), reraise=True) + async def __upload(self, chat_id: int) -> Message | None: + self._log.debug('Uploading to "%d" with context: %s', chat_id, self._media_ctx) + return await self._generate_send_media_coroutine(chat_id) + + @abc.abstractmethod + def _generate_send_media_coroutine(self, chat_id: int) -> Coroutine: + pass + + @abc.abstractmethod + def _create_media_context(self) -> AudioUploadContext | VideoUploadContext: + pass + + async def _upload_file(self) -> None: for chat_id in chain(self._upload_chat_ids, self._forward_chat_ids): - self._log.info('Uploading "%s" to chat id "%d"', self.filename, chat_id) - await self._bot.send_chat_action(chat_id, action=ChatAction.UPLOAD_VIDEO) + self._log.info('Uploading "%s" to chat id "%d"', self._filename, chat_id) + await self._bot.send_chat_action(chat_id, action=self._UPLOAD_ACTION) try: message = await self.__upload(chat_id=chat_id) except Exception: self._log.error( 'Failed to upload "%s" to "%d"', - self._video_ctx.video_path, + self._media_ctx.filepath, chat_id, ) raise @@ -111,27 +137,130 @@ class UploadTask(AbstractTask): if not self._cached_message and message: self._cache_data(message) - @retry(wait=wait_fixed(3), stop=stop_after_attempt(3), reraise=True) - async def __upload(self, chat_id: int) -> Message | None: - self._log.debug('Uploading to "%d" with context: %s', chat_id, self._video_ctx) - return await self._generate_send_media_coroutine(chat_id) + def _create_cache_task(self, cache_object: _Audio | _Video | Animation) -> None: + self._log.info('Creating cache task for %s', cache_object) + db_cache_task_name = 'Save cache to DB' + create_task( + self._save_cache_to_db(cache_object), + task_name=db_cache_task_name, + logger=self._log, + exception_message='Task "%s" raised an exception', + exception_message_args=(db_cache_task_name,), + ) + + @abc.abstractmethod + def _cache_data(self, message: Message) -> None: + pass + + async def _save_cache_to_db(self, file: _Audio | _Video | Animation) -> None: + cache = CacheSchema( + cache_id=file.file_id, + cache_unique_id=file.file_unique_id, + file_size=file.file_size, + date_timestamp=file.date, + ) + + async for db in get_db(): + await TaskRepository().save_file_cache( + cache=cache, + file_id=self._media_object.orm_file_id, + db=db, + ) + + +class AudioUploadTask(AbstractUploadTask): + _UPLOAD_ACTION = ChatAction.UPLOAD_AUDIO + _media_ctx: AudioUploadContext def _generate_send_media_coroutine(self, chat_id: int) -> Coroutine: kwargs = { 'chat_id': chat_id, - 'caption': self._video_ctx.caption, - 'file_name': self._video_ctx.file_name, - 'duration': int(self._video_ctx.duration), - 'height': self._video_ctx.height, - 'width': self._video_ctx.width, - 'thumb': self._video_ctx.thumb, + 'audio': self._media_ctx.filepath, + 'caption': self._media_ctx.caption, + 'file_name': self._media_ctx.filename, + 'duration': int(self._media_ctx.duration), } - if self._video_ctx.type is MessageMediaType.ANIMATION: + return self._bot.send_audio(**kwargs) + + def _create_media_context(self) -> AudioUploadContext: + return AudioUploadContext( + caption=self._generate_file_caption(), + filename=self._filename, + filepath=self._filepath, + duration=self._media_object.duration or 0.0, + type=MessageMediaType.AUDIO, + ) + + def _cache_data(self, message: Message) -> None: + self._log.info('Saving telegram file cache') + audio = message.audio + if not audio: + err_msg = 'Telegram message response does not contain audio' + self._log.error('%s: %s', err_msg, message) + raise RuntimeError(err_msg) + + self._media_ctx.type = message.media + self._media_ctx.filepath = audio.file_id + self._media_ctx.duration = audio.duration + self._cached_message = message + + self._create_cache_task(cache_object=audio) + + def _generate_file_caption(self) -> str: + caption_items = [] + caption_items.append(f'{bold("Title:")} {self._media_object.title}') + caption_items.append(f'{bold("Filename:")} {self._media_object.filename}') + caption_items.append(f'{bold("URL:")} {self._ctx.context.url}') + return '\n'.join(caption_items) + + +class VideoUploadTask(AbstractUploadTask): + _UPLOAD_ACTION = ChatAction.UPLOAD_VIDEO + _media_ctx: VideoUploadContext + + def _create_media_context(self) -> VideoUploadContext: + return VideoUploadContext( + caption=self._generate_file_caption(), + filename=self._filename, + filepath=self._filepath, + duration=self._media_object.duration or 0.0, + height=self._media_object.height or 0, + width=self._media_object.width or 0, + thumb=self._media_object.thumb_path, + type=MessageMediaType.VIDEO, + ) + + def _generate_file_caption(self) -> str: + caption_items = [] + if self._users[0].is_base_user: + caption_conf = self._bot.conf.telegram.api.video_caption + else: + caption_conf = self._users[0].upload.video_caption + + if caption_conf.include_title: + caption_items.append(f'{bold("Title:")} {self._media_object.title}') + if caption_conf.include_filename: + caption_items.append(f'{bold("Filename:")} {self._media_object.filename}') + if caption_conf.include_link: + caption_items.append(f'{bold("URL:")} {self._ctx.context.url}') + return '\n'.join(caption_items) + + def _generate_send_media_coroutine(self, chat_id: int) -> Coroutine: + kwargs = { + 'chat_id': chat_id, + 'caption': self._media_ctx.caption, + 'file_name': self._media_ctx.filename, + 'duration': int(self._media_ctx.duration), + 'height': self._media_ctx.height, + 'width': self._media_ctx.width, + 'thumb': self._media_ctx.thumb, + } + if self._media_ctx.type is MessageMediaType.ANIMATION: send_func_name = 'send_animation' - kwargs['animation'] = self._video_ctx.video_path + kwargs['animation'] = self._media_ctx.filepath else: send_func_name = 'send_video' - kwargs['video'] = self._video_ctx.video_path + kwargs['video'] = self._media_ctx.filepath kwargs['supports_streaming'] = True return getattr(self._bot, send_func_name)(**kwargs) @@ -143,56 +272,9 @@ class UploadTask(AbstractTask): self._log.error('%s: %s', err_msg, message) raise RuntimeError(err_msg) - self._video_ctx.type = message.media - self._video_ctx.video_path = video.file_id - self._video_ctx.thumb = video.thumbs[0].file_id + self._media_ctx.type = message.media + self._media_ctx.filepath = video.file_id + self._media_ctx.thumb = video.thumbs[0].file_id self._cached_message = message - db_cache_task_name = 'Save cache to DB' - create_task( - self._save_cache_to_db(video), - task_name=db_cache_task_name, - logger=self._log, - exception_message='Task "%s" raised an exception', - exception_message_args=(db_cache_task_name,), - ) - - def _create_video_context(self) -> VideoContext: - return VideoContext( - caption=self._generate_video_caption(), - file_name=self.filename, - video_path=self.filepath, - duration=self._body.duration or 0, - height=self._body.height or 0, - width=self._body.width or 0, - thumb=self.thumb_path, - type=MessageMediaType.VIDEO, - ) - - def _generate_video_caption(self) -> str: - caption_items = [] - if self._users[0].is_base_user: - caption_conf = self._bot.conf.telegram.api.video_caption - else: - caption_conf = self._users[0].upload.video_caption - - if caption_conf.include_title: - caption_items.append(f'{bold("Title:")} {self._body.title}') - if caption_conf.include_filename: - caption_items.append(f'{bold("Filename:")} {self._body.filename}') - if caption_conf.include_link: - caption_items.append(f'{bold("URL:")} {self._body.context.url}') - return '\n'.join(caption_items) - - async def _save_cache_to_db(self, video: Video | Animation) -> None: - cache = CacheSchema( - cache_id=video.file_id, - cache_unique_id=video.file_unique_id, - file_size=video.file_size, - date_timestamp=video.date, - ) - - async for db in get_db(): - await TaskRepository().save_file_cache( - cache=cache, task_id=self._body.task_id, db=db - ) + self._create_cache_task(cache_object=video) diff --git a/app_bot/bot/version.py b/app_bot/bot/version.py index e46aee1..7e49527 100644 --- a/app_bot/bot/version.py +++ b/app_bot/bot/version.py @@ -1 +1 @@ -__version__ = '0.9' +__version__ = '1.0' diff --git a/app_bot/config-example.yml b/app_bot/config-example.yml index 62e4750..f9a7651 100644 --- a/app_bot/config-example.yml +++ b/app_bot/config-example.yml @@ -7,6 +7,8 @@ telegram: allowed_users: - id: 00000000000 send_startup_message: !!bool True + download_media_type: !!str "VIDEO" + save_to_storage: !!bool False upload: upload_video_file: !!bool False upload_video_max_file_size: 2147483648 diff --git a/app_worker/alembic/versions/10ab08fc321b_.py b/app_worker/alembic/versions/10ab08fc321b_.py new file mode 100644 index 0000000..d4b2fa2 --- /dev/null +++ b/app_worker/alembic/versions/10ab08fc321b_.py @@ -0,0 +1,28 @@ +"""empty message + +Revision ID: 10ab08fc321b +Revises: ba7716dca30a +Create Date: 2023-02-25 15:47:37.542906 + +""" +from alembic import op + +# revision identifiers, used by Alembic. +revision = '10ab08fc321b' +down_revision = 'ba7716dca30a' +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.drop_index('ix_file_task_id', table_name='file') + op.create_index(op.f('ix_file_task_id'), 'file', ['task_id'], unique=False) + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.drop_index(op.f('ix_file_task_id'), table_name='file') + op.create_index('ix_file_task_id', 'file', ['task_id'], unique=False) + # ### end Alembic commands ### diff --git a/app_worker/worker/core/callbacks.py b/app_worker/worker/core/callbacks.py index 634704f..186fd68 100644 --- a/app_worker/worker/core/callbacks.py +++ b/app_worker/worker/core/callbacks.py @@ -1,13 +1,13 @@ import logging from aio_pika import IncomingMessage -from yt_shared.schemas.video import VideoPayload +from yt_shared.schemas.media import IncomingMediaPayload from worker.core.payload_handler import PayloadHandler class _RMQCallbacks: - """RabbitMQ callbacks.""" + """RabbitMQ's callbacks.""" def __init__(self) -> None: self._log = logging.getLogger(self.__class__.__name__) @@ -22,20 +22,22 @@ class _RMQCallbacks: async def _process_incoming_message(self, message: IncomingMessage) -> None: self._log.info('[x] Received message %s', message.body) - video_payload = self._deserialize_message(message) - if not video_payload: + media_payload = self._deserialize_message(message) + if not media_payload: await self._reject_invalid_message(message) return - await self._payload_handler.handle(video_payload=video_payload) + await self._payload_handler.handle(media_payload=media_payload) await message.ack() - self._log.info('Processing done with payload: %s', video_payload) + self._log.info('Processing done with payload: %s', media_payload) - def _deserialize_message(self, message: IncomingMessage) -> VideoPayload | None: + def _deserialize_message( + self, message: IncomingMessage + ) -> IncomingMediaPayload | None: try: - return VideoPayload.parse_raw(message.body) + return IncomingMediaPayload.parse_raw(message.body) except Exception: - self._log.exception('Failed to deserialize message body') + self._log.exception('Failed to deserialize message body: %s', message.body) return None async def _reject_invalid_message(self, message: IncomingMessage) -> None: diff --git a/app_worker/worker/core/config.py b/app_worker/worker/core/config.py index e837b47..3be2781 100644 --- a/app_worker/worker/core/config.py +++ b/app_worker/worker/core/config.py @@ -4,7 +4,6 @@ from yt_shared.config import Settings class WorkerSettings(Settings): APPLICATION_NAME: str MAX_SIMULTANEOUS_DOWNLOADS: int - SAVE_VIDEO_FILE: bool STORAGE_PATH: str THUMBNAIL_FRAME_SECOND: float diff --git a/app_worker/worker/core/downloader.py b/app_worker/worker/core/downloader.py index bc12c8f..2497fe4 100644 --- a/app_worker/worker/core/downloader.py +++ b/app_worker/worker/core/downloader.py @@ -6,102 +6,209 @@ from copy import deepcopy from tempfile import TemporaryDirectory import yt_dlp -from yt_shared.schemas.video import DownVideo -from yt_shared.utils.common import random_string from worker.core.config import settings from worker.utils import cli_to_api +from yt_shared.enums import DownMediaType +from yt_shared.schemas.media import Audio, DownMedia, Video +from yt_shared.utils.common import random_string try: - from ytdl_opts.user import YTDL_OPTS + from ytdl_opts.user import ( + DEFAULT_YTDL_OPTS, + AUDIO_YTDL_OPTS, + AUDIO_FORMAT_YTDL_OPTS, + VIDEO_YTDL_OPTS, + FINAL_AUDIO_FORMAT, + FINAL_THUMBNAIL_FORMAT, + ) except ImportError: - from ytdl_opts.default import YTDL_OPTS + from ytdl_opts.default import ( + DEFAULT_YTDL_OPTS, + AUDIO_YTDL_OPTS, + AUDIO_FORMAT_YTDL_OPTS, + VIDEO_YTDL_OPTS, + FINAL_AUDIO_FORMAT, + FINAL_THUMBNAIL_FORMAT, + ) -class VideoDownloader: +class MediaDownloader: _PLAYLIST_TYPE = 'playlist' _DESTINATION_TMP_DIR_NAME_LEN = 4 + _KEEP_VIDEO_OPTION = '--keep-video' def __init__(self) -> None: self._log = logging.getLogger(self.__class__.__name__) - self._ytdl_opts = cli_to_api(YTDL_OPTS) - def download_video(self, url: str) -> DownVideo: + def download(self, url: str, media_type: DownMediaType) -> DownMedia: try: - return self._download(url) + return self._download(url=url, media_type=media_type) except Exception: self._log.exception('Failed to download %s', url) raise - def _download(self, url: str) -> DownVideo: + def _configure_ytdl_opts( + self, media_type: DownMediaType, curr_tmp_dir: str + ) -> dict: + ytdl_opts = deepcopy(DEFAULT_YTDL_OPTS) + match media_type: # noqa: E999 + case DownMediaType.AUDIO: + ytdl_opts.extend(AUDIO_YTDL_OPTS) + ytdl_opts.extend(AUDIO_FORMAT_YTDL_OPTS) + case DownMediaType.VIDEO: + ytdl_opts.extend(VIDEO_YTDL_OPTS) + case DownMediaType.AUDIO_VIDEO: + ytdl_opts.extend(AUDIO_YTDL_OPTS) + ytdl_opts.extend(VIDEO_YTDL_OPTS) + ytdl_opts.append(self._KEEP_VIDEO_OPTION) + + ytdl_opts = cli_to_api(ytdl_opts) + ytdl_opts['outtmpl']['default'] = os.path.join( + curr_tmp_dir, + ytdl_opts['outtmpl']['default'], + ) + return ytdl_opts + + def _download(self, url: str, media_type: DownMediaType) -> DownMedia: + self._log.info('Downloading %s, media_type %s', url, media_type) tmp_down_path = os.path.join( settings.TMP_DOWNLOAD_ROOT_PATH, settings.TMP_DOWNLOAD_DIR ) - with TemporaryDirectory(prefix='tmp_video_dir-', dir=tmp_down_path) as tmp_dir: + with TemporaryDirectory(prefix='tmp_media_dir-', dir=tmp_down_path) as tmp_dir: curr_tmp_dir = os.path.join(tmp_down_path, tmp_dir) - ytdl_opts = deepcopy(self._ytdl_opts) - ytdl_opts['outtmpl']['default'] = os.path.join( - curr_tmp_dir, - ytdl_opts['outtmpl']['default'], + ytdl_opts = self._configure_ytdl_opts( + media_type=media_type, curr_tmp_dir=curr_tmp_dir ) with yt_dlp.YoutubeDL(ytdl_opts) as ytdl: self._log.info('Downloading %s', url) self._log.info('Downloading to %s', curr_tmp_dir) - self._log.debug('Downloading with options %s', ytdl_opts) + self._log.info('Downloading with options %s', ytdl_opts) meta = ytdl.extract_info(url, download=True) meta_sanitized = ytdl.sanitize_info(meta) self._log.info('Finished downloading %s', url) - self._log.debug('Downloaded "%s" meta: %s', url, meta_sanitized) + self._log.info('Downloaded "%s" meta: %s', url, meta_sanitized) self._log.info( 'Content of "%s": %s', curr_tmp_dir, os.listdir(curr_tmp_dir) ) - filename = self._get_filename(meta) - filepath = os.path.join(curr_tmp_dir, filename) destination_dir = os.path.join( os.path.join( settings.TMP_DOWNLOAD_ROOT_PATH, settings.TMP_DOWNLOADED_DIR ), random_string(number=self._DESTINATION_TMP_DIR_NAME_LEN), ) - self._log.info('Moving "%s" to "%s"', filepath, destination_dir) os.mkdir(destination_dir) - shutil.move(filepath, destination_dir) - - thumb_path: str | None = None - thumb_name = self._find_downloaded_thumbnail(curr_tmp_dir) - if thumb_name: - _thumb_path = os.path.join(curr_tmp_dir, thumb_name) - shutil.move(_thumb_path, destination_dir) - thumb_path = os.path.join(destination_dir, thumb_name) + audio, video = self._create_media_dtos( + media_type=media_type, + meta=meta, + curr_tmp_dir=curr_tmp_dir, + destination_dir=destination_dir, + ) self._log.info( 'Removing temporary download directory "%s" with leftover files %s', curr_tmp_dir, os.listdir(curr_tmp_dir), ) + return DownMedia( + media_type=media_type, + audio=audio, + video=video, + meta=meta_sanitized, + root_path=destination_dir, + ) + + def _create_media_dtos( + self, + media_type: DownMediaType, + meta: dict, + curr_tmp_dir: str, + destination_dir: str, + ) -> tuple[Audio | None, Video | None]: + get_audio = lambda: self._create_audio_dto( + meta=meta, + curr_tmp_dir=curr_tmp_dir, + destination_dir=destination_dir, + ) + get_video = lambda: self._create_video_dto( + meta=meta, + curr_tmp_dir=curr_tmp_dir, + destination_dir=destination_dir, + ) + + match media_type: # noqa: E999 + case DownMediaType.AUDIO: + return get_audio(), None + case DownMediaType.VIDEO: + return None, get_video() + case DownMediaType.AUDIO_VIDEO: + return get_audio(), get_video() + case _: + raise RuntimeError(f'Unknown media type "{media_type}"') + + def _create_video_dto( + self, + meta: dict, + curr_tmp_dir: str, + destination_dir: str, + ) -> Video: + video_filename = self._get_video_filename(meta) + video_filepath = os.path.join(curr_tmp_dir, video_filename) + + self._log.info('Moving "%s" to "%s"', video_filepath, destination_dir) + shutil.move(video_filepath, destination_dir) + + thumb_path: str | None = None + thumb_name = self._find_downloaded_file( + root_path=curr_tmp_dir, + extension=FINAL_THUMBNAIL_FORMAT, + ) + if thumb_name: + _thumb_path = os.path.join(curr_tmp_dir, thumb_name) + shutil.move(_thumb_path, destination_dir) + thumb_path = os.path.join(destination_dir, thumb_name) + duration, width, height = self._get_video_context(meta) - return DownVideo( + return Video( title=meta['title'], - name=filename, + filename=video_filename, duration=duration, width=width, height=height, - meta=meta_sanitized, - filepath=os.path.join(destination_dir, filename), - root_path=destination_dir, + filepath=os.path.join(destination_dir, video_filename), thumb_path=thumb_path, thumb_name=thumb_name, ) - def _find_downloaded_thumbnail(self, root_path: str) -> str | None: - """Try to find downloaded thumbnail jpg.""" - for file_name in glob.glob("*.jpg", root_dir=root_path): - self._log.info('Found downloaded thumbnail "%s"', file_name) + def _create_audio_dto( + self, + meta: dict, + curr_tmp_dir: str, + destination_dir: str, + ) -> Audio: + audio_filename = self._find_downloaded_file( + root_path=curr_tmp_dir, + extension=FINAL_AUDIO_FORMAT, + ) + audio_filepath = os.path.join(curr_tmp_dir, audio_filename) + self._log.info('Moving "%s" to "%s"', audio_filepath, destination_dir) + shutil.move(audio_filepath, destination_dir) + return Audio( + title=meta['title'], + filename=audio_filename, + duration=None, + filepath=os.path.join(destination_dir, audio_filename), + ) + + def _find_downloaded_file(self, root_path: str, extension: str) -> str | None: + """Try to find downloaded audio or thumbnail file.""" + for file_name in glob.glob(f"*.{extension}", root_dir=root_path): + self._log.info('Found downloaded %s "%s"', extension, file_name) return file_name - self._log.info('Downloaded thumb not found in "%s"', root_path) + self._log.info('Downloaded %s not found in "%s"', extension, root_path) return None def _get_video_context( @@ -113,18 +220,37 @@ class VideoDownloader: 'Item said to be downloaded but no entries to process.' ) entry: dict = meta['entries'][0] - requested_video: dict = entry['requested_downloads'][0] + requested_video = self._get_requested_video(entry['requested_downloads']) return ( self._to_float(entry.get('duration')), requested_video.get('width'), requested_video.get('height'), ) + requested_video = self._get_requested_video(meta['requested_downloads']) return ( self._to_float(meta.get('duration')), - meta['requested_downloads'][0].get('width'), - meta['requested_downloads'][0].get('height'), + requested_video.get('width'), + requested_video.get('height'), ) + def _get_requested_video(self, requested_downloads: list[dict]) -> dict | None: + for download_obj in requested_downloads: + if download_obj['ext'] != FINAL_AUDIO_FORMAT: + return download_obj + + # When video was converted to audio but video kept. + for download_obj in requested_downloads: + if download_obj['ext'] != download_obj['_filename'].rsplit('.', 1)[-1]: + download_obj = download_obj.copy() + self._log.info( + 'Replacing video path in meta "%s" with "%s"', + download_obj['filepath'], + download_obj['_filename'], + ) + download_obj['filepath'] = download_obj['_filename'] + return download_obj + return None + @staticmethod def _to_float(duration: int | float | None) -> float | None: try: @@ -132,10 +258,20 @@ class VideoDownloader: except TypeError: return duration - def _get_filename(self, meta: dict) -> str: - return self._get_filepath(meta).rsplit('/', maxsplit=1)[-1] + def _get_video_filename(self, meta: dict) -> str: + return self._get_video_filepath(meta).rsplit('/', maxsplit=1)[-1] - def _get_filepath(self, meta: dict) -> str: + def _get_video_filepath(self, meta: dict) -> str: if meta['_type'] == self._PLAYLIST_TYPE: - return meta['entries'][0]['requested_downloads'][0]['filepath'] - return meta['requested_downloads'][0]['filepath'] + requested_downloads: list[dict] = meta['entries'][0]['requested_downloads'] + requested_video = self._get_requested_video(requested_downloads) + else: + requested_downloads = meta['requested_downloads'] + requested_video = self._get_requested_video(requested_downloads) + + try: + return requested_video['filepath'] + except (AttributeError, KeyError): + err_msg = 'Video filepath not found' + self._log.exception('%s, meta: %s', err_msg, meta) + raise ValueError(err_msg) diff --git a/app_worker/worker/core/video_service.py b/app_worker/worker/core/media_service.py similarity index 52% rename from app_worker/worker/core/video_service.py rename to app_worker/worker/core/media_service.py index c9b3609..14124f8 100644 --- a/app_worker/worker/core/video_service.py +++ b/app_worker/worker/core/media_service.py @@ -4,69 +4,101 @@ import os import shutil from sqlalchemy.ext.asyncio import AsyncSession -from yt_shared.enums import TaskStatus -from yt_shared.models import Task -from yt_shared.repositories.task import TaskRepository -from yt_shared.schemas.video import DownVideo, VideoPayload -from yt_shared.utils.file import remove_dir -from yt_shared.utils.tasks.tasks import create_task from worker.core.config import settings -from worker.core.downloader import VideoDownloader +from worker.core.downloader import MediaDownloader from worker.core.exceptions import DownloadVideoServiceError from worker.core.tasks.ffprobe_context import GetFfprobeContextTask from worker.core.tasks.thumbnail import MakeThumbnailTask +from yt_shared.enums import DownMediaType, TaskStatus +from yt_shared.models import Task +from yt_shared.repositories.task import TaskRepository +from yt_shared.schemas.media import Audio, DownMedia, IncomingMediaPayload, Video +from yt_shared.utils.file import remove_dir +from yt_shared.utils.tasks.tasks import create_task -class VideoService: +class MediaService: def __init__(self) -> None: self._log = logging.getLogger(self.__class__.__name__) - self._downloader = VideoDownloader() + self._downloader = MediaDownloader() self._repository = TaskRepository() async def process( - self, video_payload: VideoPayload, db: AsyncSession - ) -> tuple[DownVideo | None, Task | None]: - task = await self._repository.get_or_create_task(db, video_payload) + self, media_payload: IncomingMediaPayload, db: AsyncSession + ) -> tuple[DownMedia | None, Task | None]: + task = await self._repository.get_or_create_task(db, media_payload) if task.status != TaskStatus.PENDING.value: return None, None return ( - await self._process(video_payload=video_payload, task=task, db=db), + await self._process(media_payload=media_payload, task=task, db=db), task, ) async def _process( - self, video_payload: VideoPayload, task: Task, db: AsyncSession - ) -> DownVideo: + self, media_payload: IncomingMediaPayload, task: Task, db: AsyncSession + ) -> DownMedia: await self._repository.save_as_processing(db, task) - downloaded_video = await self._start_download(task, video_payload, db) + media = await self._start_download(task, media_payload, db) try: - await self._post_process_file(downloaded_video, task, db) + await self._post_process_media(media, task, media_payload, db) except Exception: - self._log.exception('Failed to post-process file %s', downloaded_video) - self._err_file_cleanup(downloaded_video) + self._log.exception('Failed to post-process media %s', media) + self._err_file_cleanup(media) raise - return downloaded_video + return media async def _start_download( - self, task: Task, video_payload: VideoPayload, db: AsyncSession - ) -> DownVideo: + self, task: Task, media_payload: IncomingMediaPayload, db: AsyncSession + ) -> DownMedia: try: return await asyncio.get_running_loop().run_in_executor( - None, lambda: self._downloader.download_video(task.url) + None, + lambda: self._downloader.download( + task.url, media_type=media_payload.download_media_type + ), ) except Exception as err: - self._log.exception('Failed to download video. Context: %s', video_payload) + self._log.exception('Failed to download media. Context: %s', media_payload) await self._handle_download_exception(err, task, db) raise DownloadVideoServiceError(message=str(err), task=task) - async def _post_process_file( + async def _post_process_media( self, - video: DownVideo, + media: DownMedia, + task: Task, + media_payload: IncomingMediaPayload, + db: AsyncSession, + ) -> None: + post_process_audio = lambda: self._post_process_audio( + media=media, + media_payload=media_payload, + task=task, + db=db, + ) + post_process_video = lambda: self._post_process_video( + media=media, media_payload=media_payload, task=task, db=db + ) + + match media.media_type: # noqa: E999 + case DownMediaType.AUDIO: + await post_process_audio() + case DownMediaType.VIDEO: + await post_process_video() + case DownMediaType.AUDIO_VIDEO: + await asyncio.gather(*(post_process_audio(), post_process_video())) + + await self._repository.save_as_done(db, task) + + async def _post_process_video( + self, + media: DownMedia, + media_payload: IncomingMediaPayload, task: Task, db: AsyncSession, ) -> None: """Post-process downloaded media files, e.g. make thumbnail and copy to storage.""" + video = media.video # yt-dlp's 'info-meta' may not contain all needed video metadata. if not all([video.duration, video.height, video.width]): # TODO: Move to higher level and re-raise as DownloadVideoServiceError with task, @@ -78,7 +110,7 @@ class VideoService: coro_tasks = [] if not video.thumb_path: - thumb_path = os.path.join(video.root_path, video.thumb_name) + thumb_path = os.path.join(media.root_path, video.thumb_name) video.thumb_path = thumb_path coro_tasks.append( self._create_thumb_task( @@ -87,15 +119,30 @@ class VideoService: duration=video.duration, ) ) - if settings.SAVE_VIDEO_FILE: + + if media_payload.save_to_storage: coro_tasks.append(self._create_copy_file_task(video)) await asyncio.gather(*coro_tasks) - final_coros = [self._repository.save_as_done(db, task, video)] - await asyncio.gather(*final_coros) + file = await self._repository.save_file(db, task, media.video, media.meta) + video.orm_file_id = file.id + + async def _post_process_audio( + self, + media: DownMedia, + media_payload: IncomingMediaPayload, + task: Task, + db: AsyncSession, + ) -> None: + coro_tasks = [self._repository.save_file(db, task, media.audio, media.meta)] + if media_payload.save_to_storage: + coro_tasks.append(self._create_copy_file_task(media.audio)) + results = await asyncio.gather(*coro_tasks) + file = results[0] + media.audio.orm_file_id = file.id @staticmethod - async def _set_probe_ctx(video: DownVideo) -> None: + async def _set_probe_ctx(video: Video) -> None: probe_ctx = await GetFfprobeContextTask(video.filepath).run() if not probe_ctx: return @@ -106,10 +153,10 @@ class VideoService: video.width = video_streams[0]['width'] video.height = video_streams[0]['height'] - def _create_copy_file_task(self, video: DownVideo) -> asyncio.Task: - task_name = 'Copy file to storage task' + def _create_copy_file_task(self, file: Audio | Video) -> asyncio.Task: + task_name = f'Copy {file.file_type} file to storage task' return create_task( - self._copy_file_to_storage(video), + self._copy_file_to_storage(file), task_name=task_name, logger=self._log, exception_message='Task "%s" raised an exception', @@ -128,11 +175,11 @@ class VideoService: ) @staticmethod - async def _copy_file_to_storage(video: DownVideo) -> None: - dst = os.path.join(settings.STORAGE_PATH, video.name) - await asyncio.to_thread(shutil.copy2, video.filepath, dst) + async def _copy_file_to_storage(file: Audio | Video) -> None: + dst = os.path.join(settings.STORAGE_PATH, file.filename) + await asyncio.to_thread(shutil.copy2, file.filepath, dst) - def _err_file_cleanup(self, video: DownVideo) -> None: + def _err_file_cleanup(self, video: DownMedia) -> None: """Cleanup any downloaded/created data if post-processing failed.""" self._log.info('Performing error cleanup. Removing %s', video.root_path) remove_dir(video.root_path) diff --git a/app_worker/worker/core/payload_handler.py b/app_worker/worker/core/payload_handler.py index 8bcf180..ad7cef3 100644 --- a/app_worker/worker/core/payload_handler.py +++ b/app_worker/worker/core/payload_handler.py @@ -6,61 +6,53 @@ from yt_shared.db.session import get_db from yt_shared.models import Task from yt_shared.rabbit.publisher import Publisher from yt_shared.schemas.error import ErrorDownloadPayload, ErrorGeneralPayload +from yt_shared.schemas.media import DownMedia, IncomingMediaPayload from yt_shared.schemas.success import SuccessPayload -from yt_shared.schemas.video import DownVideo, VideoPayload from worker.core.exceptions import DownloadVideoServiceError, GeneralVideoServiceError -from worker.core.video_service import VideoService +from worker.core.media_service import MediaService class PayloadHandler: def __init__(self) -> None: self._log = logging.getLogger(self.__class__.__name__) - self._video_service = VideoService() + self._media_service = MediaService() self._publisher = Publisher() - async def handle(self, video_payload: VideoPayload) -> None: + async def handle(self, media_payload: IncomingMediaPayload) -> None: try: - await self._handle(video_payload) + await self._handle(media_payload) except Exception as err: - await self._send_general_error(err, video_payload) + await self._send_general_error(err, media_payload) - async def _handle(self, video_payload: VideoPayload) -> None: + async def _handle(self, media_payload: IncomingMediaPayload) -> None: async for session in get_db(): try: - video, task = await self._video_service.process(video_payload, session) + media, task = await self._media_service.process(media_payload, session) except DownloadVideoServiceError as err: - await self._send_failed_video_download_task(err, video_payload) + await self._send_failed_video_download_task(err, media_payload) return - if not video or not task: + if not media or not task: err_msg = ( - f'Video or task is None, cannot proceed: ' - f'video - {video}, task - {task}' + f'Media or task is None, cannot proceed: ' + f'media - {media}, task - {task}' ) self._log.error(err_msg) raise RuntimeError(err_msg) - await self._send_finished_task(task, video, video_payload) + await self._send_finished_task(task, media, media_payload) async def _send_finished_task( - self, task: Task, video: DownVideo, video_payload: VideoPayload + self, task: Task, media: DownMedia, media_payload: IncomingMediaPayload ) -> None: success_payload = SuccessPayload( task_id=task.id, - title=video.title, - filename=video.name, - thumb_name=video.thumb_name, - filepath=video.filepath, - thumb_path=video.thumb_path, - root_path=video.root_path, - duration=video.duration, - width=video.width, - height=video.height, + media=media, message_id=task.message_id, - from_chat_id=video_payload.from_chat_id, - from_chat_type=video_payload.from_chat_type, + from_chat_id=media_payload.from_chat_id, + from_chat_type=media_payload.from_chat_type, from_user_id=task.from_user_id, - context=video_payload, + context=media_payload, yt_dlp_version=ytdlp_version.__version__, ) await self._publisher.send_download_finished(success_payload) @@ -68,18 +60,18 @@ class PayloadHandler: async def _send_failed_video_download_task( self, err: DownloadVideoServiceError, - video_payload: VideoPayload, + media_payload: IncomingMediaPayload, ) -> None: task = err.task err_payload = ErrorDownloadPayload( task_id=task.id, message_id=task.message_id, - from_chat_id=video_payload.from_chat_id, - from_chat_type=video_payload.from_chat_type, - from_user_id=video_payload.from_user_id, + from_chat_id=media_payload.from_chat_id, + from_chat_type=media_payload.from_chat_type, + from_user_id=media_payload.from_user_id, message='Download error', - url=video_payload.url, - context=video_payload, + url=media_payload.url, + context=media_payload, yt_dlp_version=ytdlp_version.__version__, exception_msg=str(err), exception_type=err.__class__.__name__, @@ -89,18 +81,18 @@ class PayloadHandler: async def _send_general_error( self, err: GeneralVideoServiceError | Exception, - video_payload: VideoPayload, + media_payload: IncomingMediaPayload, ) -> None: task: Task | None = getattr(err, 'task', None) err_payload = ErrorGeneralPayload( task_id=task.id if task else 'N/A', - message_id=video_payload.message_id, - from_chat_id=video_payload.from_chat_id, - from_chat_type=video_payload.from_chat_type, - from_user_id=video_payload.from_user_id, + message_id=media_payload.message_id, + from_chat_id=media_payload.from_chat_id, + from_chat_type=media_payload.from_chat_type, + from_user_id=media_payload.from_user_id, message='General worker error', - url=video_payload.url, - context=video_payload, + url=media_payload.url, + context=media_payload, yt_dlp_version=ytdlp_version.__version__, exception_msg=traceback.format_exc(), exception_type=err.__class__.__name__, diff --git a/app_worker/ytdl_opts/default.py b/app_worker/ytdl_opts/default.py index 0eeacb3..6c21ac9 100644 --- a/app_worker/ytdl_opts/default.py +++ b/app_worker/ytdl_opts/default.py @@ -1,16 +1,40 @@ -# More here https://github.com/yt-dlp/yt-dlp/blob/master/yt_dlp/options.py or 'yt-dlp --help' -YTDL_OPTS = [ +"""yt-dlp download CLI options. + +Only CLI options are allowed to be stored as configuration. They are later converted to internal API options. +More here https://github.com/yt-dlp/yt-dlp/blob/master/yt_dlp/options.py or 'yt-dlp --help' +""" + +FINAL_AUDIO_FORMAT = 'mp3' +FINAL_THUMBNAIL_FORMAT = 'jpg' + +DEFAULT_YTDL_OPTS = [ '--output', '%(title).200B.%(ext)s', - '--format', - 'bestvideo[ext=mp4]+bestaudio[ext=m4a]/mp4', '--no-playlist', '--playlist-items', '1:1', - '--write-thumbnail', - '--convert-thumbnails', - 'jpg', '--concurrent-fragments', '5', '--verbose', ] + +AUDIO_YTDL_OPTS = [ + '--extract-audio', + '--audio-quality', + '0', + '--audio-format', + FINAL_AUDIO_FORMAT, +] + +AUDIO_FORMAT_YTDL_OPTS = [ + '--format', + 'bestaudio/best', +] + +VIDEO_YTDL_OPTS = [ + '--format', + 'bestvideo[ext=mp4]+bestaudio[ext=m4a]/mp4', + '--write-thumbnail', + '--convert-thumbnails', + FINAL_THUMBNAIL_FORMAT, +] diff --git a/envs/.env_worker b/envs/.env_worker index 9fe4067..97ed6d1 100644 --- a/envs/.env_worker +++ b/envs/.env_worker @@ -1,5 +1,4 @@ APPLICATION_NAME=yt_worker -SAVE_VIDEO_FILE=False MAX_SIMULTANEOUS_DOWNLOADS=2 STORAGE_PATH=/filestorage THUMBNAIL_FRAME_SECOND=10.0 diff --git a/yt_shared/yt_shared/db/session.py b/yt_shared/yt_shared/db/session.py index ec8212e..6ffb657 100644 --- a/yt_shared/yt_shared/db/session.py +++ b/yt_shared/yt_shared/db/session.py @@ -4,8 +4,8 @@ from typing import AsyncGenerator import sqlalchemy as sa from sqlalchemy import MetaData -from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine -from sqlalchemy.orm import declarative_base, declared_attr, sessionmaker +from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine +from sqlalchemy.orm import declarative_base, declared_attr from sqlalchemy_utils import UUIDType from yt_shared.config import settings @@ -18,9 +18,8 @@ engine = create_async_engine( metadata = MetaData() metadata.bind = engine -AsyncSessionLocal = sessionmaker( +AsyncSessionLocal = async_sessionmaker( engine, - class_=AsyncSession, expire_on_commit=settings.SQLALCHEMY_EXPIRE_ON_COMMIT, ) diff --git a/yt_shared/yt_shared/enums.py b/yt_shared/yt_shared/enums.py index 3ae5692..b9e185d 100644 --- a/yt_shared/yt_shared/enums.py +++ b/yt_shared/yt_shared/enums.py @@ -1,34 +1,52 @@ -from enum import Enum, auto, unique +from enum import Enum, unique @unique -class ChoiceEnum(Enum): +class StrChoiceEnum(str, Enum): @classmethod def choices(cls) -> tuple[str, ...]: return tuple(x.value for x in cls) -class TaskStatus(str, ChoiceEnum): +class TaskStatus(StrChoiceEnum): PENDING = 'PENDING' PROCESSING = 'PROCESSING' FAILED = 'FAILED' DONE = 'DONE' -class TaskSource(str, ChoiceEnum): +class TaskSource(StrChoiceEnum): API = 'API' BOT = 'BOT' -class RabbitPayloadType(ChoiceEnum): - DOWNLOAD_ERROR = auto() - GENERAL_ERROR = auto() - SUCCESS = auto() +class RabbitPayloadType(StrChoiceEnum): + DOWNLOAD_ERROR = 'DOWNLOAD_ERROR' + GENERAL_ERROR = 'GENERAL_ERROR' + SUCCESS = 'SUCCESS' -class TelegramChatType(ChoiceEnum): +class TelegramChatType(StrChoiceEnum): PRIVATE = 'private' BOT = 'bot' GROUP = 'group' SUPERGROUP = 'supergroup' CHANNEL = 'channel' + + +class DownMediaType(StrChoiceEnum): + """Media can be audio, video or both. + + 1. Only download/extract audio + 2. Video with muxed audio + 3. Both 1) and 2) + """ + + AUDIO = 'AUDIO' + VIDEO = 'VIDEO' + AUDIO_VIDEO = 'AUDIO_VIDEO' + + +class MediaFileType(StrChoiceEnum): + AUDIO = 'AUDIO' + VIDEO = 'VIDEO' diff --git a/yt_shared/yt_shared/models/task.py b/yt_shared/yt_shared/models/task.py index db37051..d26c753 100644 --- a/yt_shared/yt_shared/models/task.py +++ b/yt_shared/yt_shared/models/task.py @@ -27,10 +27,9 @@ class Task(Base, Timestamp): nullable=False, index=True, ) - file = relationship( + files = relationship( 'File', backref='task', - uselist=False, cascade='all, delete-orphan', ) added_at = sa.Column(sa.DateTime, nullable=False) @@ -66,7 +65,7 @@ class File(Base, Timestamp): UUIDType(binary=False), sa.ForeignKey('task.id', ondelete='CASCADE'), nullable=False, - unique=True, + unique=False, index=True, ) cache = relationship( diff --git a/yt_shared/yt_shared/rabbit/publisher.py b/yt_shared/yt_shared/rabbit/publisher.py index c77669b..a44f533 100644 --- a/yt_shared/yt_shared/rabbit/publisher.py +++ b/yt_shared/yt_shared/rabbit/publisher.py @@ -14,8 +14,8 @@ from yt_shared.rabbit.rabbit_config import ( SUCCESS_QUEUE, ) from yt_shared.schemas.error import ErrorDownloadPayload, ErrorGeneralPayload +from yt_shared.schemas.media import IncomingMediaPayload from yt_shared.schemas.success import SuccessPayload -from yt_shared.schemas.video import VideoPayload from yt_shared.utils.common import Singleton @@ -28,8 +28,8 @@ class Publisher(metaclass=Singleton): def _is_sent(confirm: ConfirmationFrameType | None) -> bool: return isinstance(confirm, Basic.Ack) - async def send_for_download(self, video_payload: VideoPayload) -> bool: - message = aio_pika.Message(body=video_payload.json().encode()) + async def send_for_download(self, media_payload: IncomingMediaPayload) -> bool: + message = aio_pika.Message(body=media_payload.json().encode()) exchange = self._rabbit_mq.exchanges[INPUT_EXCHANGE] confirm = await exchange.publish( message, routing_key=INPUT_QUEUE, mandatory=True diff --git a/yt_shared/yt_shared/repositories/task.py b/yt_shared/yt_shared/repositories/task.py index 3bbe64d..d2a4cda 100644 --- a/yt_shared/yt_shared/repositories/task.py +++ b/yt_shared/yt_shared/repositories/task.py @@ -7,7 +7,8 @@ from sqlalchemy.ext.asyncio import AsyncSession from yt_shared.enums import TaskStatus from yt_shared.models import Cache, File, Task from yt_shared.schemas.cache import CacheSchema -from yt_shared.schemas.video import DownVideo, VideoPayload +from yt_shared.schemas.media import Audio, IncomingMediaPayload, Video +from yt_shared.utils.common import async_lock class TaskRepository: @@ -15,27 +16,29 @@ class TaskRepository: self._log = logging.getLogger(self.__class__.__name__) async def get_or_create_task( - self, db: AsyncSession, video_payload: VideoPayload + self, db: AsyncSession, media_payload: IncomingMediaPayload ) -> Task: - if video_payload.id is None: - return await self._create_task(db, video_payload) + if media_payload.id is None: + return await self._create_task(db, media_payload) - stmt = select(Task).filter_by(id=video_payload.id) + stmt = select(Task).filter_by(id=media_payload.id) task = await db.execute(stmt) try: return task.scalar_one() except NoResultFound: - return await self._create_task(db, video_payload) + return await self._create_task(db, media_payload) @staticmethod - async def _create_task(db: AsyncSession, video_payload: VideoPayload) -> Task: + async def _create_task( + db: AsyncSession, media_payload: IncomingMediaPayload + ) -> Task: task = Task( - id=video_payload.id, - url=video_payload.url, - source=video_payload.source, - from_user_id=video_payload.from_user_id, - message_id=video_payload.message_id, - added_at=video_payload.added_at, + id=media_payload.id, + url=media_payload.url, + source=media_payload.source, + from_user_id=media_payload.from_user_id, + message_id=media_payload.message_id, + added_at=media_payload.added_at, ) db.add(task) await db.commit() @@ -43,31 +46,42 @@ class TaskRepository: @staticmethod async def save_file_cache( - db: AsyncSession, task_id: str | UUID, cache: CacheSchema + db: AsyncSession, file_id: str | UUID, cache: CacheSchema ) -> None: stmt = insert(Cache).values( cache_id=cache.cache_id, cache_unique_id=cache.cache_unique_id, file_size=cache.file_size, date_timestamp=cache.date_timestamp, - file_id=(select(File.id).filter_by(task_id=task_id).scalar_subquery()), + file_id=file_id, ) await db.execute(stmt) await db.commit() @staticmethod - async def save_as_done( - db: AsyncSession, task: Task, downloaded_video: DownVideo - ) -> None: - task.file = File( - title=downloaded_video.title, - name=downloaded_video.name, - thumb_name=downloaded_video.thumb_name, - duration=downloaded_video.duration, - width=downloaded_video.width, - height=downloaded_video.height, - meta=downloaded_video.meta, + async def save_file( + db: AsyncSession, task: Task, media: Audio | Video, meta: dict + ) -> File: + file = File( + title=media.title, + name=media.filename, + duration=media.duration, + meta=meta, + task_id=task.id, ) + # TODO: Rework this. + if isinstance(media, Video): + file.width = media.width + file.height = media.height + file.thumb_name = media.thumb_name + + db.add(file) + async with async_lock: + await db.flush([file]) + return file + + @staticmethod + async def save_as_done(db: AsyncSession, task: Task) -> None: task.status = TaskStatus.DONE await db.commit() diff --git a/yt_shared/yt_shared/schemas/error.py b/yt_shared/yt_shared/schemas/error.py index 2ad8bed..3d396a5 100644 --- a/yt_shared/yt_shared/schemas/error.py +++ b/yt_shared/yt_shared/schemas/error.py @@ -5,7 +5,7 @@ from pydantic import StrictInt, StrictStr from yt_shared.enums import RabbitPayloadType, TelegramChatType from yt_shared.schemas.base import BaseRabbitPayloadModel -from yt_shared.schemas.video import VideoPayload +from yt_shared.schemas.media import IncomingMediaPayload class ErrorGeneralPayload(BaseRabbitPayloadModel): @@ -19,7 +19,7 @@ class ErrorGeneralPayload(BaseRabbitPayloadModel): message_id: StrictInt | None message: StrictStr url: StrictStr - context: VideoPayload + context: IncomingMediaPayload exception_msg: StrictStr exception_type: StrictStr yt_dlp_version: StrictStr | None diff --git a/yt_shared/yt_shared/schemas/media.py b/yt_shared/yt_shared/schemas/media.py new file mode 100644 index 0000000..186010a --- /dev/null +++ b/yt_shared/yt_shared/schemas/media.py @@ -0,0 +1,77 @@ +import uuid +from datetime import datetime, timezone +from typing import Literal + +from pydantic import ( + Field, + StrictBool, + StrictFloat, + StrictInt, + StrictStr, + root_validator, +) + +from yt_shared.enums import DownMediaType, MediaFileType, TaskSource, TelegramChatType +from yt_shared.schemas.base import RealBaseModel + + +class IncomingMediaPayload(RealBaseModel): + id: uuid.UUID | None + from_chat_id: StrictInt | None + from_chat_type: TelegramChatType | None + from_user_id: StrictInt | None + message_id: StrictInt | None + url: StrictStr + source: TaskSource + save_to_storage: StrictBool + download_media_type: DownMediaType + added_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + + +class Audio(RealBaseModel): + file_type: Literal[MediaFileType.AUDIO] = MediaFileType.AUDIO + title: StrictStr + filename: StrictStr + filepath: StrictStr + duration: StrictFloat | None = None + orm_file_id: uuid.UUID | None = None + + +class Video(RealBaseModel): + file_type: Literal[MediaFileType.VIDEO] = MediaFileType.VIDEO + title: StrictStr + filename: StrictStr + filepath: StrictStr + duration: StrictFloat | None = None + + thumb_name: StrictStr | None = None + width: int | None = None + height: int | None = None + thumb_path: StrictStr | None = None + orm_file_id: uuid.UUID | None = None + + @root_validator(pre=False) + def _set_fields(cls, values: dict) -> dict: + if not values['thumb_name']: + values['thumb_name'] = f'{values["name"]}-thumb.jpg' + return values + + +class DownMedia(RealBaseModel): + """Downloaded media (audio, video with muxed audio or both) object context.""" + + audio: Audio | None + video: Video | None + + media_type: DownMediaType + root_path: StrictStr + meta: dict + + @root_validator(pre=True) + def _validate(cls, values: dict) -> dict: + if values['audio'] is None and values['video'] is None: + raise ValueError('Provide audio, video or both.') + return values + + def get_media_objects(self) -> tuple[Audio, Video]: + return tuple(filter(None, (self.audio, self.video))) diff --git a/yt_shared/yt_shared/schemas/success.py b/yt_shared/yt_shared/schemas/success.py index 7ef85e5..4e81dcc 100644 --- a/yt_shared/yt_shared/schemas/success.py +++ b/yt_shared/yt_shared/schemas/success.py @@ -2,11 +2,10 @@ import uuid from typing import ClassVar from pydantic import StrictInt, StrictStr -from pydantic.types import StrictFloat from yt_shared.enums import RabbitPayloadType, TelegramChatType from yt_shared.schemas.base import BaseRabbitPayloadModel -from yt_shared.schemas.video import VideoPayload +from yt_shared.schemas.media import DownMedia, IncomingMediaPayload class SuccessPayload(BaseRabbitPayloadModel): @@ -18,14 +17,6 @@ class SuccessPayload(BaseRabbitPayloadModel): from_chat_type: TelegramChatType | None from_user_id: StrictInt | None message_id: StrictInt | None - title: StrictStr - filename: StrictStr - thumb_name: StrictStr - filepath: StrictStr - thumb_path: StrictStr | None = None - root_path: StrictStr - duration: StrictFloat | None - width: StrictInt | None - height: StrictInt | None - context: VideoPayload + media: DownMedia + context: IncomingMediaPayload yt_dlp_version: StrictStr | None diff --git a/yt_shared/yt_shared/schemas/url.py b/yt_shared/yt_shared/schemas/url.py index 9e029e5..07f8412 100644 --- a/yt_shared/yt_shared/schemas/url.py +++ b/yt_shared/yt_shared/schemas/url.py @@ -1,6 +1,6 @@ -from pydantic import StrictInt, StrictStr +from pydantic import StrictBool, StrictInt, StrictStr -from yt_shared.enums import TelegramChatType +from yt_shared.enums import DownMediaType, TelegramChatType from yt_shared.schemas.base import RealBaseModel @@ -10,3 +10,5 @@ class URL(RealBaseModel): from_chat_type: TelegramChatType from_user_id: StrictInt message_id: StrictInt + save_to_storage: StrictBool + download_media_type: DownMediaType diff --git a/yt_shared/yt_shared/schemas/video.py b/yt_shared/yt_shared/schemas/video.py deleted file mode 100644 index e0917b5..0000000 --- a/yt_shared/yt_shared/schemas/video.py +++ /dev/null @@ -1,39 +0,0 @@ -import uuid -from datetime import datetime, timezone - -from pydantic import Field, StrictFloat, StrictInt, StrictStr, root_validator - -from yt_shared.enums import TaskSource, TelegramChatType -from yt_shared.schemas.base import RealBaseModel - - -class VideoPayload(RealBaseModel): - id: uuid.UUID | None - from_chat_id: StrictInt | None - from_chat_type: TelegramChatType | None - from_user_id: StrictInt | None - message_id: StrictInt | None - url: StrictStr - source: TaskSource - added_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) - - -class DownVideo(RealBaseModel): - """Downloaded video object context.""" - - title: StrictStr - name: StrictStr - thumb_name: StrictStr | None = None - duration: StrictFloat | None = None - width: int | None = None - height: int | None = None - meta: dict - filepath: StrictStr - thumb_path: StrictStr | None = None - root_path: StrictStr - - @root_validator(pre=False) - def _set_fields(cls, values: dict) -> dict: - if not values['thumb_name']: - values['thumb_name'] = f'{values["name"]}-thumb.jpg' - return values diff --git a/yt_shared/yt_shared/utils/common.py b/yt_shared/yt_shared/utils/common.py index adb6a7f..1da11c5 100644 --- a/yt_shared/yt_shared/utils/common.py +++ b/yt_shared/yt_shared/utils/common.py @@ -39,3 +39,6 @@ def wrap(func): def random_string(number: int) -> str: return ''.join(random.choice(ascii_lowercase) for _ in range(number)) + + +async_lock = asyncio.Lock()