From 8d0d6d2b70d669dc17deb074523b7f08e84cbaef Mon Sep 17 00:00:00 2001 From: Taras Terletsky <888784+tropicoo@users.noreply.github.com> Date: Thu, 25 Apr 2024 23:09:50 +0300 Subject: [PATCH] Version 1.6 --- README.md | 6 +- RELEASES.md | 28 +++++ app_api/api/api/api_v1/schemas/task.py | 19 ++-- app_api/api/api/root/schemas/healthcheck.py | 9 +- app_api/api/core/services/task.py | 4 +- app_bot/bot/core/handlers/abstract.py | 6 +- app_bot/bot/core/handlers/success.py | 17 +-- app_bot/bot/core/schema.py | 76 ++++++------- app_bot/bot/core/service.py | 2 + app_bot/bot/core/tasks/upload.py | 39 ++++--- app_bot/bot/core/utils.py | 14 --- app_bot/bot/core/workers/abstract.py | 16 ++- app_bot/bot/core/workers/enums.py | 6 ++ app_bot/bot/version.py | 2 +- app_worker/worker/core/config.py | 3 +- app_worker/worker/core/downloader.py | 96 +++++++++-------- app_worker/worker/core/launcher.py | 13 +-- app_worker/worker/core/media_service.py | 100 ++++++++--------- app_worker/worker/core/payload_handler.py | 10 +- app_worker/worker/core/tasks/abstract.py | 3 +- app_worker/worker/core/tasks/encode.py | 8 +- app_worker/worker/core/tasks/thumbnail.py | 4 +- app_worker/worker/utils.py | 12 ++- app_worker/ytdl_opts/per_host/_base.py | 29 +++-- app_worker/ytdl_opts/per_host/_default.py | 4 +- app_worker/ytdl_opts/per_host/instagram.py | 4 +- app_worker/ytdl_opts/per_host/tiktok.py | 4 +- app_worker/ytdl_opts/per_host/twitter.py | 4 +- pyproject.toml | 2 +- yt_shared/yt_shared/config.py | 31 ++++-- yt_shared/yt_shared/repositories/task.py | 55 ++++------ yt_shared/yt_shared/schemas/base.py | 34 ++++-- yt_shared/yt_shared/schemas/base_rabbit.py | 14 ++- yt_shared/yt_shared/schemas/error.py | 14 ++- yt_shared/yt_shared/schemas/media.py | 113 ++++++++++++-------- yt_shared/yt_shared/schemas/url.py | 22 ++-- yt_shared/yt_shared/schemas/ytdlp.py | 27 +++-- yt_shared/yt_shared/utils/common.py | 16 ++- yt_shared/yt_shared/utils/file.py | 23 ++-- yt_shared/yt_shared/utils/tasks/abstract.py | 6 +- 40 files changed, 480 insertions(+), 415 deletions(-) create mode 100644 app_bot/bot/core/workers/enums.py diff --git a/README.md b/README.md index 0b8cc8a..22c7075 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ Simple and reliable self-hosted Video Download Telegram Bot. -Version: 1.5. [Release details](RELEASES.md). +Version: 1.6. [Release details](RELEASES.md). ![frames](.assets/download_success.png) @@ -173,7 +173,9 @@ documentations lives at `http://127.0.0.1:1984/docs`. { "url": "https://www.youtube.com/watch?v=PavYAOpVpJI", "download_media_type": "AUDIO_VIDEO", - "save_to_storage": false + "save_to_storage": false, + "custom_filename": "cool.mp4", + "automatic_extension": false } ``` Response diff --git a/RELEASES.md b/RELEASES.md index 587b465..c5cb5c6 100644 --- a/RELEASES.md +++ b/RELEASES.md @@ -1,3 +1,31 @@ +## Release 1.6 + +Release date: April 25, 2024 + +## New Features + + - Ability to set custom video name in `POST` API request: + ```json + { + "url": "", + "download_media_type": "VIDEO", + "save_to_storage": true, + "custom_filename": "your cool custom name and extension.mp4", + "automatic_extension": false + } + ``` + - Saving into storage with the same name won't overwrite the file but append timestamp to the filename. + +## Important + +N/A + +## Misc + +N/A + +--- + ## Release 1.5 Release date: March 20, 2024 diff --git a/app_api/api/api/api_v1/schemas/task.py b/app_api/api/api/api_v1/schemas/task.py index d1a95ac..2ca6f1f 100644 --- a/app_api/api/api/api_v1/schemas/task.py +++ b/app_api/api/api/api_v1/schemas/task.py @@ -1,9 +1,10 @@ import uuid from datetime import datetime -from pydantic import StrictBool, StrictFloat, StrictInt, StrictStr +from pydantic import Field, StrictFloat, StrictInt, StrictStr +from typing_extensions import Annotated from yt_shared.enums import DownMediaType, TaskSource, TaskStatus -from yt_shared.schemas.base import BaseOrmModel, RealBaseModel +from yt_shared.schemas.base import BaseOrmModel, StrictRealBaseModel class CacheSchema(BaseOrmModel): @@ -52,15 +53,17 @@ class TaskSchema(TaskSimpleSchema): files: list[FileSchema] -class CreateTaskIn(RealBaseModel): - url: StrictStr - download_media_type: DownMediaType - save_to_storage: StrictBool +class CreateTaskIn(StrictRealBaseModel): + url: str = ... + download_media_type: Annotated[DownMediaType, Field(strict=False)] = ... + save_to_storage: bool = ... + custom_filename: str = ... + automatic_extension: bool = ... -class CreateTaskOut(RealBaseModel): +class CreateTaskOut(StrictRealBaseModel): id: uuid.UUID - url: StrictStr + url: str source: TaskSource added_at: datetime diff --git a/app_api/api/api/root/schemas/healthcheck.py b/app_api/api/api/root/schemas/healthcheck.py index 0dd4453..594c681 100644 --- a/app_api/api/api/root/schemas/healthcheck.py +++ b/app_api/api/api/root/schemas/healthcheck.py @@ -1,6 +1,7 @@ -from pydantic import StrictStr -from yt_shared.schemas.base import RealBaseModel +from typing import Literal + +from yt_shared.schemas.base import StrictRealBaseModel -class HealthcheckSchema(RealBaseModel): - status: StrictStr = 'OK' +class HealthcheckSchema(StrictRealBaseModel): + status: Literal['OK'] = 'OK' diff --git a/app_api/api/core/services/task.py b/app_api/api/core/services/task.py index aac5357..84a72cb 100644 --- a/app_api/api/core/services/task.py +++ b/app_api/api/core/services/task.py @@ -51,7 +51,7 @@ class TaskService: status: list[TaskStatus] | None = None, limit: int = 100, offset: int = 0, - ) -> list[Task]: + ) -> list[TaskSimpleSchema | TaskSchema]: schema = self._get_schema(include_meta) tasks = await self._repository.get_all_tasks( include_meta, status, limit, offset @@ -79,6 +79,8 @@ class TaskService: from_user_id=None, message_id=None, ack_message_id=None, + custom_filename=task.custom_filename, + automatic_extension=task.automatic_extension, ) if not await publisher.send_for_download(payload): raise TaskServiceError('Failed to create task') diff --git a/app_bot/bot/core/handlers/abstract.py b/app_bot/bot/core/handlers/abstract.py index 1b66665..4d2da1d 100644 --- a/app_bot/bot/core/handlers/abstract.py +++ b/app_bot/bot/core/handlers/abstract.py @@ -1,5 +1,5 @@ -import abc import logging +from abc import ABC, abstractmethod from typing import TYPE_CHECKING from yt_shared.enums import TaskSource, TelegramChatType @@ -11,7 +11,7 @@ if TYPE_CHECKING: from bot.bot import VideoBotClient -class AbstractDownloadHandler(abc.ABC): +class AbstractDownloadHandler(ABC): def __init__( self, body: BaseRabbitDownloadPayload, @@ -22,7 +22,7 @@ class AbstractDownloadHandler(abc.ABC): self._bot = bot self._receiving_users = self._get_receiving_users() - @abc.abstractmethod + @abstractmethod async def handle(self) -> None: pass diff --git a/app_bot/bot/core/handlers/success.py b/app_bot/bot/core/handlers/success.py index cffc27e..6ef0f7f 100644 --- a/app_bot/bot/core/handlers/success.py +++ b/app_bot/bot/core/handlers/success.py @@ -1,5 +1,4 @@ import asyncio -import os import traceback from pyrogram.enums import ParseMode @@ -10,7 +9,7 @@ from yt_shared.rabbit.publisher import RmqPublisher from yt_shared.schemas.error import ErrorDownloadGeneralPayload from yt_shared.schemas.media import BaseMedia from yt_shared.schemas.success import SuccessDownloadPayload -from yt_shared.utils.file import list_files, remove_dir +from yt_shared.utils.file import list_files_human, remove_dir from yt_shared.utils.tasks.tasks import create_task from bot.core.handlers.abstract import AbstractDownloadHandler @@ -101,10 +100,10 @@ class SuccessDownloadHandler(AbstractDownloadHandler): else: self._log.warning( 'File "%s" will not be uploaded due to upload configuration', - media_object.filepath, + media_object.current_filepath, ) except Exception as err: - self._log.exception('Upload of "%s" failed', media_object.filepath) + self._log.exception('Upload of "%s" failed', media_object.current_filepath) await self._publish_error_message(err) def _cleanup(self) -> None: @@ -113,7 +112,7 @@ class SuccessDownloadHandler(AbstractDownloadHandler): 'Cleaning up task "%s": removing download content directory "%s" with files %s', self._body.task_id, root_path, - list_files(root_path), + list_files_human(root_path), ) remove_dir(root_path) @@ -138,7 +137,7 @@ class SuccessDownloadHandler(AbstractDownloadHandler): @staticmethod def _create_success_text(media_object: BaseMedia) -> str: - text = f'{SUCCESS_EMOJI} {bold("Downloaded")} {media_object.filename}' + text = f'{SUCCESS_EMOJI} {bold("Downloaded")} {media_object.current_filename}' if media_object.saved_to_storage: text = f'{text}\nšŸ’¾ {bold("Saved to media storage")}' return f'{text}\nšŸ“ {bold("Size")} {media_object.file_size_human()}' @@ -171,8 +170,10 @@ class SuccessDownloadHandler(AbstractDownloadHandler): user = self._bot.allowed_users[self._get_sender_id()] max_file_size = user.upload.upload_video_max_file_size - if not os.path.exists(media_obj.filepath): - raise ValueError(f'{media_obj.file_type} {media_obj.filepath} not found') + if not media_obj.current_filepath.exists(): + raise ValueError( + f'{media_obj.file_type} {media_obj.current_filepath} not found' + ) _file_size = media_obj.current_file_size() if _file_size > max_file_size: diff --git a/app_bot/bot/core/schema.py b/app_bot/bot/core/schema.py index eb23e6f..a82c039 100644 --- a/app_bot/bot/core/schema.py +++ b/app_bot/bot/core/schema.py @@ -1,58 +1,52 @@ -import abc +from abc import ABC -from pydantic import ( - StrictBool, - StrictInt, - StrictStr, - StringConstraints, - field_validator, -) +from pydantic import Field, PositiveInt, StringConstraints, field_validator from typing_extensions import Annotated from yt_shared.enums import DownMediaType -from yt_shared.schemas.base import RealBaseModel +from yt_shared.schemas.base import StrictBaseConfigModel _LANG_CODE_LEN = 2 _LANG_CODE_REGEX = rf'^[a-z]{{{_LANG_CODE_LEN}}}$' -class _BaseUserSchema(RealBaseModel, abc.ABC): - id: StrictInt +class _BaseUserSchema(StrictBaseConfigModel, ABC): + id: int class AnonymousUserSchema(_BaseUserSchema): pass -class VideoCaptionSchema(RealBaseModel): - include_title: StrictBool - include_filename: StrictBool - include_link: StrictBool - include_size: StrictBool +class VideoCaptionSchema(StrictBaseConfigModel): + include_title: bool + include_filename: bool + include_link: bool + include_size: bool -class UploadSchema(RealBaseModel): - upload_video_file: StrictBool - upload_video_max_file_size: StrictInt - forward_to_group: StrictBool - forward_group_id: StrictInt | None - silent: StrictBool +class UploadSchema(StrictBaseConfigModel): + upload_video_file: bool + upload_video_max_file_size: PositiveInt + forward_to_group: bool + forward_group_id: int | None + silent: bool video_caption: VideoCaptionSchema class UserSchema(_BaseUserSchema): - is_admin: StrictBool - send_startup_message: StrictBool - download_media_type: DownMediaType - save_to_storage: StrictBool - use_url_regex_match: StrictBool + is_admin: bool + send_startup_message: bool + download_media_type: Annotated[DownMediaType, Field(strict=False)] + save_to_storage: bool + use_url_regex_match: bool upload: UploadSchema -class ApiSchema(RealBaseModel): - upload_video_file: StrictBool - upload_video_max_file_size: StrictInt +class ApiSchema(StrictBaseConfigModel): + upload_video_file: bool + upload_video_max_file_size: PositiveInt upload_to_chat_ids: list[AnonymousUserSchema] - silent: StrictBool + silent: bool video_caption: VideoCaptionSchema @field_validator('upload_to_chat_ids', mode='before') @@ -61,25 +55,25 @@ class ApiSchema(RealBaseModel): return [AnonymousUserSchema(id=id_) for id_ in values] -class TelegramSchema(RealBaseModel): - api_id: StrictInt - api_hash: StrictStr - token: StrictStr +class TelegramSchema(StrictBaseConfigModel): + api_id: int + api_hash: str + token: str lang_code: Annotated[ str, StringConstraints(pattern=_LANG_CODE_REGEX, to_lower=True) ] - max_upload_tasks: StrictInt + max_upload_tasks: PositiveInt url_validation_regexes: list[str] allowed_users: list[UserSchema] api: ApiSchema -class YtdlpSchema(RealBaseModel): - version_check_enabled: StrictBool - version_check_interval: StrictInt - notify_users_on_new_version: StrictBool +class YtdlpSchema(StrictBaseConfigModel): + version_check_enabled: bool + version_check_interval: PositiveInt + notify_users_on_new_version: bool -class ConfigSchema(RealBaseModel): +class ConfigSchema(StrictBaseConfigModel): telegram: TelegramSchema ytdlp: YtdlpSchema diff --git a/app_bot/bot/core/service.py b/app_bot/bot/core/service.py index 410f9b1..6a7576b 100644 --- a/app_bot/bot/core/service.py +++ b/app_bot/bot/core/service.py @@ -35,6 +35,8 @@ class UrlService: source=TaskSource.BOT, save_to_storage=url.save_to_storage, download_media_type=url.download_media_type, + custom_filename=None, + automatic_extension=False, ) is_sent = await self._rmq_publisher.send_for_download(payload) if not is_sent: diff --git a/app_bot/bot/core/tasks/upload.py b/app_bot/bot/core/tasks/upload.py index 201e096..617e95c 100644 --- a/app_bot/bot/core/tasks/upload.py +++ b/app_bot/bot/core/tasks/upload.py @@ -1,9 +1,9 @@ -import abc import asyncio +from abc import ABC, abstractmethod from itertools import chain from typing import TYPE_CHECKING, Coroutine -from pydantic import StrictBool, StrictFloat, StrictInt, StrictStr +from pydantic import ConfigDict, FilePath from pyrogram.enums import ChatAction, MessageMediaType, ParseMode from pyrogram.types import Animation, Message from pyrogram.types import Audio as _Audio @@ -27,25 +27,26 @@ if TYPE_CHECKING: class BaseUploadContext(RealBaseModel): - caption: StrictStr - filename: StrictStr - filepath: StrictStr - duration: StrictFloat + model_config = ConfigDict(**RealBaseModel.model_config, strict=True) + caption: str + filename: str + filepath: FilePath + duration: float type: MessageMediaType - is_cached: StrictBool = False + is_cached: bool = False class VideoUploadContext(BaseUploadContext): - height: StrictInt | StrictFloat - width: StrictInt | StrictFloat - thumb: StrictStr | None = None + height: int | float + width: int | float + thumb: FilePath | None = None class AudioUploadContext(BaseUploadContext): pass -class AbstractUploadTask(AbstractTask, abc.ABC): +class AbstractUploadTask(AbstractTask, ABC): _UPLOAD_ACTION: ChatAction def __init__( @@ -60,12 +61,8 @@ class AbstractUploadTask(AbstractTask, abc.ABC): self._config = get_main_config() self._media_object = media_object - if media_object.is_converted: - self._filename = media_object.converted_filename - self._filepath = media_object.converted_filepath - else: - self._filename = media_object.filename - self._filepath = media_object.filepath + self._filename = media_object.current_filename + self._filepath = media_object.current_filepath self._bot = bot self._users = users @@ -89,7 +86,7 @@ class AbstractUploadTask(AbstractTask, abc.ABC): self._log.exception('Exception in upload task for "%s"', self._filename) raise - @abc.abstractmethod + @abstractmethod def _generate_caption_items(self) -> list[str]: pass @@ -127,11 +124,11 @@ class AbstractUploadTask(AbstractTask, abc.ABC): self._log.debug('Uploading to "%d" with context: %s', chat_id, self._media_ctx) return await self._generate_send_media_coroutine(chat_id) - @abc.abstractmethod + @abstractmethod def _generate_send_media_coroutine(self, chat_id: int) -> Coroutine: pass - @abc.abstractmethod + @abstractmethod def _create_media_context(self) -> AudioUploadContext | VideoUploadContext: pass @@ -170,7 +167,7 @@ class AbstractUploadTask(AbstractTask, abc.ABC): exception_message_args=(db_cache_task_name,), ) - @abc.abstractmethod + @abstractmethod def _cache_data(self, message: Message) -> None: pass diff --git a/app_bot/bot/core/utils.py b/app_bot/bot/core/utils.py index 4c09397..f87ef6d 100644 --- a/app_bot/bot/core/utils.py +++ b/app_bot/bot/core/utils.py @@ -1,12 +1,9 @@ """Utils module.""" import asyncio -import random -import string from datetime import datetime from typing import Generator, Iterable from urllib.parse import urlparse -from uuid import uuid4 from pyrogram.enums import ChatType from pyrogram.types import Message @@ -19,17 +16,6 @@ async def shallow_sleep_async(sleep_time: float = 0.1) -> None: await asyncio.sleep(sleep_time) -def gen_uuid() -> str: - return uuid4().hex - - -def gen_random_str(length=4) -> str: - return ''.join( - random.SystemRandom().choice(string.ascii_lowercase + string.digits) - for _ in range(length) - ) - - def format_ts(ts: float, time_format: str = '%a %b %d %H:%M:%S %Y') -> str: return datetime.fromtimestamp(ts).strftime(time_format) diff --git a/app_bot/bot/core/workers/abstract.py b/app_bot/bot/core/workers/abstract.py index 8d24a8c..48d49e6 100644 --- a/app_bot/bot/core/workers/abstract.py +++ b/app_bot/bot/core/workers/abstract.py @@ -1,7 +1,6 @@ """RabbitMQ Queue abstract worker module.""" -import abc -import enum +from abc import abstractmethod from typing import TYPE_CHECKING, Type from aio_pika import IncomingMessage @@ -11,16 +10,12 @@ from yt_shared.utils.tasks.abstract import AbstractTask from bot.core.config.config import get_main_config from bot.core.exceptions import InvalidBodyError +from bot.core.workers.enums import RabbitWorkerType if TYPE_CHECKING: from bot.bot.client import VideoBotClient -class RabbitWorkerType(enum.Enum): - ERROR = 'ERROR' - SUCCESS = 'SUCCESS' - - class AbstractDownloadResultWorker(AbstractTask): TYPE: RabbitWorkerType | None = None QUEUE_TYPE: str | None = None @@ -36,7 +31,7 @@ class AbstractDownloadResultWorker(AbstractTask): async def run(self) -> None: await self._watch_queue() - @abc.abstractmethod + @abstractmethod async def _process_body(self, body: BaseModel) -> bool: pass @@ -48,7 +43,8 @@ class AbstractDownloadResultWorker(AbstractTask): await self._process_message(message) except Exception: self._log.exception('Failed to process message %s', message.body) - await message.nack(requeue=False) + if not message.processed: + await message.nack(requeue=False) async def _process_message(self, message: IncomingMessage) -> None: self._log.debug('[x] Received message %s', message.body) @@ -64,7 +60,7 @@ class AbstractDownloadResultWorker(AbstractTask): # Skip unmatched schema class that failed to parse the body. pass else: - self._log.error('Failed to decode message body: %s', message.body) + self._log.error('Failed to decode message body') await self._reject_invalid_body(message) raise InvalidBodyError diff --git a/app_bot/bot/core/workers/enums.py b/app_bot/bot/core/workers/enums.py new file mode 100644 index 0000000..2a012b3 --- /dev/null +++ b/app_bot/bot/core/workers/enums.py @@ -0,0 +1,6 @@ +from enum import StrEnum + + +class RabbitWorkerType(StrEnum): + ERROR = 'ERROR' + SUCCESS = 'SUCCESS' diff --git a/app_bot/bot/version.py b/app_bot/bot/version.py index fcb6b5d..6d5e09d 100644 --- a/app_bot/bot/version.py +++ b/app_bot/bot/version.py @@ -1 +1 @@ -__version__ = '1.5' +__version__ = '1.6' diff --git a/app_worker/worker/core/config.py b/app_worker/worker/core/config.py index 6fe0be9..c2cd1f2 100644 --- a/app_worker/worker/core/config.py +++ b/app_worker/worker/core/config.py @@ -1,10 +1,11 @@ +from pydantic import DirectoryPath from yt_shared.config import Settings class WorkerSettings(Settings): APPLICATION_NAME: str MAX_SIMULTANEOUS_DOWNLOADS: int - STORAGE_PATH: str + STORAGE_PATH: DirectoryPath THUMBNAIL_FRAME_SECOND: float INSTAGRAM_ENCODE_TO_H264: bool diff --git a/app_worker/worker/core/downloader.py b/app_worker/worker/core/downloader.py index d2a3161..3c447f3 100644 --- a/app_worker/worker/core/downloader.py +++ b/app_worker/worker/core/downloader.py @@ -1,15 +1,15 @@ import glob import logging -import os import shutil +from pathlib import Path from tempfile import TemporaryDirectory from typing import Callable import yt_dlp from yt_shared.enums import DownMediaType -from yt_shared.schemas.media import Audio, DownMedia, Video -from yt_shared.utils.common import format_bytes, random_string -from yt_shared.utils.file import file_size, list_files, remove_dir +from yt_shared.schemas.media import Audio, DownMedia, InbMediaPayload, Video +from yt_shared.utils.common import format_bytes, gen_random_str +from yt_shared.utils.file import file_size, list_files_human, remove_dir from worker.core.config import settings from worker.core.exceptions import MediaDownloaderError @@ -33,29 +33,28 @@ class MediaDownloader: def __init__(self) -> None: self._log = logging.getLogger(self.__class__.__name__) - self._tmp_downloaded_dest_dir = os.path.join( - settings.TMP_DOWNLOAD_ROOT_PATH, settings.TMP_DOWNLOADED_DIR + self._tmp_downloaded_dest_dir = ( + settings.TMP_DOWNLOAD_ROOT_PATH / settings.TMP_DOWNLOADED_DIR ) def download( - self, host_conf: AbstractHostConfig, media_type: DownMediaType + self, host_conf: AbstractHostConfig, media_payload: InbMediaPayload ) -> DownMedia: try: - return self._download(host_conf=host_conf, media_type=media_type) + return self._download(host_conf=host_conf, media_payload=media_payload) except Exception: self._log.error('Failed to download %s', host_conf.url) raise def _download( - self, host_conf: AbstractHostConfig, media_type: DownMediaType + self, host_conf: AbstractHostConfig, media_payload: InbMediaPayload ) -> DownMedia: + media_type = media_payload.download_media_type url = host_conf.url self._log.info('Downloading %s, media_type %s', url, media_type) - tmp_down_path = os.path.join( - settings.TMP_DOWNLOAD_ROOT_PATH, settings.TMP_DOWNLOAD_DIR - ) + tmp_down_path = settings.TMP_DOWNLOAD_ROOT_PATH / settings.TMP_DOWNLOAD_DIR with TemporaryDirectory(prefix='tmp_media_dir-', dir=tmp_down_path) as tmp_dir: - curr_tmp_dir = os.path.join(tmp_down_path, tmp_dir) + curr_tmp_dir = tmp_down_path / tmp_dir ytdl_opts_model = host_conf.build_config( media_type=media_type, curr_tmp_dir=curr_tmp_dir @@ -72,7 +71,7 @@ class MediaDownloader: self._log.error('%s. Meta: %s', err_msg, meta) raise MediaDownloaderError(err_msg) - current_files = os.listdir(curr_tmp_dir) + current_files = list(curr_tmp_dir.iterdir()) if not current_files: err_msg = 'Nothing downloaded. Is URL valid?' self._log.error(err_msg) @@ -83,25 +82,25 @@ class MediaDownloader: self._log.info('Finished downloading %s', url) self._log.debug('Downloaded "%s" meta: %s', url, meta_sanitized) self._log.info( - 'Content of "%s": %s', curr_tmp_dir, list_files(curr_tmp_dir) + 'Content of "%s": %s', curr_tmp_dir, list_files_human(curr_tmp_dir) ) - destination_dir = os.path.join( - self._tmp_downloaded_dest_dir, - random_string(number=self._DESTINATION_TMP_DIR_NAME_LEN), + destination_dir = self._tmp_downloaded_dest_dir / gen_random_str( + length=self._DESTINATION_TMP_DIR_NAME_LEN ) - os.mkdir(destination_dir) + destination_dir.mkdir() audio, video = self._create_media_dtos( media_type=media_type, meta=meta, curr_tmp_dir=curr_tmp_dir, destination_dir=destination_dir, + custom_video_filename=media_payload.custom_filename, ) self._log.info( 'Removing temporary download directory "%s" with leftover files %s', curr_tmp_dir, - os.listdir(curr_tmp_dir), + list_files_human(curr_tmp_dir), ) return DownMedia( @@ -118,6 +117,7 @@ class MediaDownloader: meta: dict, curr_tmp_dir: str, destination_dir: str, + custom_video_filename: str | None = None, ) -> tuple[Audio | None, Video | None]: def get_audio() -> Audio: return create_dto(self._create_audio_dto) @@ -126,14 +126,10 @@ class MediaDownloader: return create_dto(self._create_video_dto) def create_dto( - func: Callable[[dict, str, str], Audio | Video], + func: Callable[[dict, str, str, str | None], Audio | Video], ) -> Audio | Video: try: - return func( - meta, - curr_tmp_dir, - destination_dir, - ) + return func(meta, curr_tmp_dir, destination_dir, custom_video_filename) except Exception: remove_dir(destination_dir) raise @@ -151,35 +147,41 @@ class MediaDownloader: def _create_video_dto( self, meta: dict, - curr_tmp_dir: str, - destination_dir: str, + curr_tmp_dir: Path, + destination_dir: Path, + custom_video_filename: str | None = None, ) -> Video: video_filename = self._get_video_filename(meta) - video_filepath = os.path.join(curr_tmp_dir, video_filename) + video_filepath = curr_tmp_dir / video_filename - self._log.info('Moving "%s" to "%s"', video_filepath, destination_dir) - shutil.move(video_filepath, destination_dir) + if custom_video_filename: + dest_path = destination_dir / custom_video_filename + else: + dest_path = destination_dir / video_filename - thumb_path: str | None = None + self._log.info('Moving "%s" to "%s"', video_filepath, dest_path) + shutil.move(video_filepath, dest_path) + + thumb_path: Path | None = None thumb_name = self._find_downloaded_file( root_path=curr_tmp_dir, extension=FINAL_THUMBNAIL_FORMAT, ) if thumb_name: - _thumb_path = os.path.join(curr_tmp_dir, thumb_name) + _thumb_path = curr_tmp_dir / thumb_name shutil.move(_thumb_path, destination_dir) - thumb_path = os.path.join(destination_dir, thumb_name) + thumb_path = destination_dir / thumb_name duration, width, height = self._get_video_context(meta) - filepath = os.path.join(destination_dir, video_filename) return Video( title=meta['title'], - filename=video_filename, + original_filename=video_filename, + custom_filename=custom_video_filename, duration=duration, width=width, height=height, - filepath=filepath, - file_size=file_size(filepath), + directory_path=destination_dir, + file_size=file_size(dest_path), thumb_path=thumb_path, thumb_name=thumb_name, ) @@ -187,26 +189,26 @@ class MediaDownloader: def _create_audio_dto( self, meta: dict, - curr_tmp_dir: str, - destination_dir: str, + curr_tmp_dir: Path, + destination_dir: Path, + custom_video_filename: str | None = None, # TODO: Make for audio. ) -> Audio: audio_filename = self._find_downloaded_file( root_path=curr_tmp_dir, extension=FINAL_AUDIO_FORMAT, ) - audio_filepath = os.path.join(curr_tmp_dir, audio_filename) + audio_filepath = curr_tmp_dir / audio_filename self._log.info('Moving "%s" to "%s"', audio_filepath, destination_dir) shutil.move(audio_filepath, destination_dir) - filepath = os.path.join(destination_dir, audio_filename) return Audio( title=meta['title'], - filename=audio_filename, + original_filename=audio_filename, duration=None, - filepath=filepath, - file_size=file_size(filepath), + directory_path=destination_dir, + file_size=file_size(destination_dir / audio_filename), ) - def _find_downloaded_file(self, root_path: str, extension: str) -> str | None: + def _find_downloaded_file(self, root_path: Path, extension: str) -> str | None: """Try to find downloaded audio or thumbnail file.""" verbose_name = self._EXT_TO_NAME[extension] for file_name in glob.glob(f'*.{extension}', root_dir=root_path): @@ -214,7 +216,7 @@ class MediaDownloader: 'Found downloaded %s: "%s" [%s]', verbose_name, file_name, - format_bytes(file_size(os.path.join(root_path, file_name))), + format_bytes(file_size(root_path / file_name)), ) return file_name self._log.info('Downloaded %s not found in "%s"', verbose_name, root_path) diff --git a/app_worker/worker/core/launcher.py b/app_worker/worker/core/launcher.py index 8783341..a3c9f88 100644 --- a/app_worker/worker/core/launcher.py +++ b/app_worker/worker/core/launcher.py @@ -1,6 +1,5 @@ import asyncio import logging -import os from yt_dlp import version as ytdlp_version from yt_shared.db.session import get_db @@ -60,19 +59,17 @@ class WorkerLauncher: async def _create_intermediate_directories(self) -> None: """Create temporary intermediate directories on start if they do not exist.""" - tmp_download_path = os.path.join( - settings.TMP_DOWNLOAD_ROOT_PATH, settings.TMP_DOWNLOAD_DIR - ) - tmp_downloaded_path = os.path.join( - settings.TMP_DOWNLOAD_ROOT_PATH, settings.TMP_DOWNLOADED_DIR + tmp_download_path = settings.TMP_DOWNLOAD_ROOT_PATH / settings.TMP_DOWNLOAD_DIR + tmp_downloaded_path = ( + settings.TMP_DOWNLOAD_ROOT_PATH / settings.TMP_DOWNLOADED_DIR ) self._log.info( 'Creating intermediate directories %s, %s if not exist', tmp_download_path, tmp_downloaded_path, ) - os.makedirs(tmp_download_path, exist_ok=True) - os.makedirs(tmp_downloaded_path, exist_ok=True) + tmp_download_path.mkdir(parents=True, exist_ok=True) + tmp_downloaded_path.mkdir(parents=True, exist_ok=True) def _register_shutdown(self) -> None: register_shutdown(self.stop) diff --git a/app_worker/worker/core/media_service.py b/app_worker/worker/core/media_service.py index 25a84e1..5705d42 100644 --- a/app_worker/worker/core/media_service.py +++ b/app_worker/worker/core/media_service.py @@ -1,10 +1,10 @@ import asyncio import logging -import os import shutil +import time +from pathlib import Path from urllib.parse import urlsplit -from sqlalchemy.ext.asyncio import AsyncSession from yt_shared.enums import DownMediaType, TaskStatus from yt_shared.models import Task from yt_shared.repositories.task import TaskRepository @@ -14,6 +14,7 @@ from yt_shared.schemas.media import ( InbMediaPayload, Video, ) +from yt_shared.utils.common import gen_random_str from yt_shared.utils.file import remove_dir from yt_shared.utils.tasks.tasks import create_task @@ -28,37 +29,37 @@ from ytdl_opts.per_host._registry import HostConfRegistry class MediaService: - def __init__(self) -> None: + def __init__( + self, + media_payload: InbMediaPayload, + downloader: MediaDownloader, + task_repository: TaskRepository, + ) -> None: self._log = logging.getLogger(self.__class__.__name__) - self._downloader = MediaDownloader() - self._repository = TaskRepository() + self._downloader = downloader + self._repository = task_repository + self._media_payload = media_payload async def process( - self, media_payload: InbMediaPayload, db: AsyncSession + self, ) -> tuple[DownMedia | None, Task | None]: - task = await self._repository.get_or_create_task(db, media_payload) + task = await self._repository.get_or_create_task(self._media_payload) if task.status != TaskStatus.PENDING.value: return None, None return ( - await self._process(media_payload=media_payload, task=task, db=db), + await self._process(task=task), task, ) - async def _process( - self, media_payload: InbMediaPayload, task: Task, db: AsyncSession - ) -> DownMedia: + async def _process(self, task: Task) -> DownMedia: host_conf = self._get_host_conf(url=task.url) - await self._repository.save_as_processing(db, task) - media = await self._start_download( - task=task, media_payload=media_payload, host_conf=host_conf, db=db - ) + await self._repository.save_as_processing(task) + media = await self._start_download(task=task, host_conf=host_conf) try: await self._post_process_media( media=media, task=task, - media_payload=media_payload, host_conf=host_conf, - db=db, ) except Exception: self._log.exception('Failed to post-process media %s', media) @@ -75,47 +76,41 @@ class MediaService: async def _start_download( self, task: Task, - media_payload: InbMediaPayload, host_conf: AbstractHostConfig, - db: AsyncSession, ) -> DownMedia: try: return await asyncio.get_running_loop().run_in_executor( None, lambda: self._downloader.download( host_conf=host_conf, - media_type=media_payload.download_media_type, + media_payload=self._media_payload, ), ) except Exception as err: - self._log.exception('Failed to download media. Context: %s', media_payload) - await self._handle_download_exception(err, task, db) + self._log.exception( + 'Failed to download media. Context: %s', self._media_payload + ) + await self._handle_download_exception(err, task) raise DownloadVideoServiceError(message=str(err), task=task) async def _post_process_media( self, media: DownMedia, task: Task, - media_payload: InbMediaPayload, host_conf: AbstractHostConfig, - db: AsyncSession, ) -> None: def post_process_audio(): return self._post_process_audio( media=media, - media_payload=media_payload, task=task, host_conf=host_conf, - db=db, ) def post_process_video(): return self._post_process_video( media=media, - media_payload=media_payload, task=task, host_conf=host_conf, - db=db, ) match media.media_type: @@ -126,15 +121,13 @@ class MediaService: case DownMediaType.AUDIO_VIDEO: await asyncio.gather(*(post_process_audio(), post_process_video())) - await self._repository.save_as_done(db, task) + await self._repository.save_as_done(task) async def _post_process_video( self, media: DownMedia, - media_payload: InbMediaPayload, task: Task, host_conf: AbstractHostConfig, - db: AsyncSession, ) -> None: """Post-process downloaded media files, e.g. make thumbnail and copy to storage.""" video = media.video @@ -149,17 +142,17 @@ class MediaService: coro_tasks = [] if not video.thumb_path: - thumb_path = os.path.join(media.root_path, video.thumb_name) + thumb_path = Path(media.root_path) / Path(video.thumb_name) coro_tasks.append( self._create_thumb_task( - file_path=video.filepath, + file_path=video.current_filepath, thumb_path=thumb_path, duration=video.duration, video_ctx=video, ) ) - if media_payload.save_to_storage: + if self._media_payload.save_to_storage: coro_tasks.append(self._create_copy_file_task(video)) if host_conf.ENCODE_VIDEO: @@ -177,26 +170,24 @@ class MediaService: await asyncio.gather(*coro_tasks) - file = await self._repository.save_file(db, task, media.video, media.meta) + file = await self._repository.save_file(task, media.video, media.meta) video.orm_file_id = file.id async def _post_process_audio( self, media: DownMedia, - media_payload: InbMediaPayload, task: Task, host_conf: AbstractHostConfig, - db: AsyncSession, ) -> None: - coro_tasks = [self._repository.save_file(db, task, media.audio, media.meta)] - if media_payload.save_to_storage: + coro_tasks = [self._repository.save_file(task, media.audio, media.meta)] + if self._media_payload.save_to_storage: coro_tasks.append(self._create_copy_file_task(media.audio)) results = await asyncio.gather(*coro_tasks) file = results[0] media.audio.orm_file_id = file.id async def _set_probe_ctx(self, video: Video) -> None: - probe_ctx = await GetFfprobeContextTask(video.filepath).run() + probe_ctx = await GetFfprobeContextTask(video.current_filepath).run() if not probe_ctx: return @@ -225,8 +216,8 @@ class MediaService: def _create_thumb_task( self, - file_path: str, - thumb_path: str, + file_path: Path, + thumb_path: Path, duration: float, video_ctx: Video, ) -> asyncio.Task: @@ -241,16 +232,17 @@ class MediaService: ) async def _copy_file_to_storage(self, file: BaseMedia) -> None: - if file.is_converted: - filename = file.converted_filename - filepath = file.converted_filepath - else: - filename = file.filename - filepath = file.filepath + dst = settings.STORAGE_PATH / file.current_filename + if dst.is_file(): + self._log.warning('Destination file in storage already exists: %s', dst) + dst = ( + dst.parent + / f'{dst.stem}-{int(time.time())}-{gen_random_str()}{dst.suffix}' + ) + self._log.warning('Adding current timestamp to filename: %s', dst) - dst = os.path.join(settings.STORAGE_PATH, filename) - self._log.info('Copying "%s" to storage "%s"', filepath, dst) - await asyncio.to_thread(shutil.copy2, filepath, dst) + self._log.info('Copying "%s" to storage "%s"', file.current_filepath, dst) + await asyncio.to_thread(shutil.copy2, file.current_filepath, dst) file.mark_as_saved_to_storage(storage_path=dst) def _err_file_cleanup(self, video: DownMedia) -> None: @@ -258,7 +250,5 @@ class MediaService: self._log.info('Performing error cleanup: removing %s', video.root_path) remove_dir(video.root_path) - async def _handle_download_exception( - self, err: Exception, task: Task, db: AsyncSession - ) -> None: - await self._repository.save_as_failed(db=db, task=task, error_message=str(err)) + async def _handle_download_exception(self, err: Exception, task: Task) -> None: + await self._repository.save_as_failed(task=task, error_message=str(err)) diff --git a/app_worker/worker/core/payload_handler.py b/app_worker/worker/core/payload_handler.py index 6c5c13a..a66a587 100644 --- a/app_worker/worker/core/payload_handler.py +++ b/app_worker/worker/core/payload_handler.py @@ -5,10 +5,12 @@ from yt_dlp import version as ytdlp_version from yt_shared.db.session import get_db from yt_shared.models import Task from yt_shared.rabbit.publisher import RmqPublisher +from yt_shared.repositories.task import TaskRepository from yt_shared.schemas.error import ErrorDownloadGeneralPayload, ErrorDownloadPayload from yt_shared.schemas.media import DownMedia, InbMediaPayload from yt_shared.schemas.success import SuccessDownloadPayload +from worker.core.downloader import MediaDownloader from worker.core.exceptions import DownloadVideoServiceError, GeneralVideoServiceError from worker.core.media_service import MediaService @@ -16,7 +18,6 @@ from worker.core.media_service import MediaService class PayloadHandler: def __init__(self) -> None: self._log = logging.getLogger(self.__class__.__name__) - self._media_service = MediaService() self._rmq_publisher = RmqPublisher() async def handle(self, media_payload: InbMediaPayload) -> None: @@ -27,8 +28,13 @@ class PayloadHandler: async def _handle(self, media_payload: InbMediaPayload) -> None: async for session in get_db(): + media_service = MediaService( + media_payload=media_payload, + downloader=MediaDownloader(), + task_repository=TaskRepository(session), + ) try: - media, task = await self._media_service.process(media_payload, session) + media, task = await media_service.process() except DownloadVideoServiceError as err: await self._send_failed_video_download_task(err, media_payload) return diff --git a/app_worker/worker/core/tasks/abstract.py b/app_worker/worker/core/tasks/abstract.py index 103fbbf..e9f83be 100644 --- a/app_worker/worker/core/tasks/abstract.py +++ b/app_worker/worker/core/tasks/abstract.py @@ -1,6 +1,7 @@ import asyncio import os import signal +from pathlib import Path from yt_shared.utils.common import wrap from yt_shared.utils.tasks.abstract import AbstractTask @@ -10,7 +11,7 @@ class AbstractFfBinaryTask(AbstractTask): _CMD: str | None = None _CMD_TIMEOUT = 10 - def __init__(self, file_path: str) -> None: + def __init__(self, file_path: Path) -> None: super().__init__() self._file_path = file_path self._killpg = wrap(os.killpg) diff --git a/app_worker/worker/core/tasks/encode.py b/app_worker/worker/core/tasks/encode.py index 3802cbb..657c54c 100644 --- a/app_worker/worker/core/tasks/encode.py +++ b/app_worker/worker/core/tasks/encode.py @@ -1,5 +1,3 @@ -import os - from yt_shared.schemas.media import DownMedia from worker.core.tasks.abstract import AbstractFfBinaryTask @@ -14,7 +12,7 @@ class EncodeToH264Task(AbstractFfBinaryTask): def __init__( self, media: DownMedia, cmd_tpl: str, check_if_in_final_format: bool = True ) -> None: - super().__init__(file_path=media.video.filepath) + super().__init__(file_path=media.video.current_filepath) self._media = media self._video = media.video self._CMD = cmd_tpl # lol @@ -43,8 +41,8 @@ class EncodeToH264Task(AbstractFfBinaryTask): return False def _get_output_path(self) -> str: - filename = f'{self._video.filename.rsplit(".", 1)[0]}-h264.{self._EXT}' - return os.path.join(self._media.root_path, filename) + filename = f'{self._video.current_filename.rsplit(".", 1)[0]}-h264.{self._EXT}' + return self._media.root_path / filename async def _encode_video(self) -> None: output = self._get_output_path() diff --git a/app_worker/worker/core/tasks/thumbnail.py b/app_worker/worker/core/tasks/thumbnail.py index 6f2cdc8..2563eb3 100644 --- a/app_worker/worker/core/tasks/thumbnail.py +++ b/app_worker/worker/core/tasks/thumbnail.py @@ -1,3 +1,5 @@ +from pathlib import Path + from yt_shared.schemas.media import Video from worker.core.config import settings @@ -8,7 +10,7 @@ class MakeThumbnailTask(AbstractFfBinaryTask): _CMD = 'ffmpeg -y -loglevel error -i "{filepath}" -ss {time_point} -vframes 1 -q:v 7 "{thumbpath}"' def __init__( - self, thumbnail_path: str, *args, duration: float, video_ctx: Video, **kwargs + self, thumbnail_path: Path, *args, duration: float, video_ctx: Video, **kwargs ) -> None: super().__init__(*args, **kwargs) self._thumbnail_path = thumbnail_path diff --git a/app_worker/worker/utils.py b/app_worker/worker/utils.py index d86d5c8..b8019cd 100644 --- a/app_worker/worker/utils.py +++ b/app_worker/worker/utils.py @@ -1,8 +1,8 @@ -import os +from pathlib import Path import yt_dlp -_COOKIES_FILEPATH = '/app/cookies/cookies.txt' +_COOKIES_FILEPATH = Path('/app/cookies/cookies.txt') def cli_to_api(opts: list) -> dict: @@ -18,11 +18,13 @@ def cli_to_api(opts: list) -> dict: return diff -def is_file_empty(filepath: str) -> bool: +def is_file_empty(filepath: Path) -> bool: """Check whether the file is empty.""" - return os.path.isfile(filepath) and os.path.getsize(filepath) == 0 + return filepath.is_file() and filepath.stat().st_size == 0 def get_cookies_opts_if_not_empty() -> list[str]: """Return yt-dlp cookies option with cookies filepath.""" - return [] if is_file_empty(_COOKIES_FILEPATH) else ['--cookies', _COOKIES_FILEPATH] + if is_file_empty(_COOKIES_FILEPATH): + return [] + return ['--cookies', str(_COOKIES_FILEPATH)] diff --git a/app_worker/ytdl_opts/per_host/_base.py b/app_worker/ytdl_opts/per_host/_base.py index d307a76..f66e52c 100644 --- a/app_worker/ytdl_opts/per_host/_base.py +++ b/app_worker/ytdl_opts/per_host/_base.py @@ -1,10 +1,9 @@ -import abc import logging -import os +from abc import abstractmethod from copy import deepcopy +from pathlib import Path -import pydantic -from pydantic import StrictBool, StrictStr +from pydantic import BaseModel, ConfigDict from yt_shared.enums import DownMediaType from worker.utils import cli_to_api @@ -31,16 +30,17 @@ except ImportError: ) -class BaseHostConfModel(pydantic.BaseModel): +class BaseHostConfModel(BaseModel): # TODO: Add validators. + model_config = ConfigDict(strict=True, frozen=True) hostnames: tuple[str, ...] - encode_audio: StrictBool - encode_video: StrictBool + encode_audio: bool + encode_video: bool - ffmpeg_audio_opts: StrictStr | None - ffmpeg_video_opts: StrictStr | None + ffmpeg_audio_opts: str | None + ffmpeg_video_opts: str | None ytdl_opts: dict @@ -82,13 +82,13 @@ class AbstractHostConfig: if not self.ALLOW_NULL_HOSTNAMES and not self.HOSTNAMES: raise ValueError('Hostname(s) must be set before instantiation.') - @abc.abstractmethod + @abstractmethod def build_config( self, media_type: DownMediaType, curr_tmp_dir: str ) -> BaseHostConfModel: pass - def _build_ytdl_opts(self, media_type: DownMediaType, curr_tmp_dir: str) -> dict: + def _build_ytdl_opts(self, media_type: DownMediaType, curr_tmp_dir: Path) -> dict: def _add_video_opts(ytdl_opts_: list[str]) -> None: ytdl_opts_.extend(self.DEFAULT_VIDEO_YTDL_OPTS) ytdl_opts_.extend(self._build_custom_ytdl_video_opts()) @@ -107,12 +107,11 @@ class AbstractHostConfig: ytdl_opts.append(self.KEEP_VIDEO_OPTION) ytdl_opts = cli_to_api(ytdl_opts) - ytdl_opts['outtmpl']['default'] = os.path.join( - curr_tmp_dir, - ytdl_opts['outtmpl']['default'], + ytdl_opts['outtmpl']['default'] = str( + curr_tmp_dir / ytdl_opts['outtmpl']['default'] ) return ytdl_opts - @abc.abstractmethod + @abstractmethod def _build_custom_ytdl_video_opts(self) -> list[str]: pass diff --git a/app_worker/ytdl_opts/per_host/_default.py b/app_worker/ytdl_opts/per_host/_default.py index bd17720..e2f8e0a 100644 --- a/app_worker/ytdl_opts/per_host/_default.py +++ b/app_worker/ytdl_opts/per_host/_default.py @@ -1,3 +1,5 @@ +from pathlib import Path + from yt_shared.enums import DownMediaType from ytdl_opts.per_host._base import AbstractHostConfig, BaseHostConfModel @@ -15,7 +17,7 @@ class DefaultHost(AbstractHostConfig, metaclass=HostConfRegistry): ENCODE_VIDEO = False def build_config( - self, media_type: DownMediaType, curr_tmp_dir: str + self, media_type: DownMediaType, curr_tmp_dir: Path ) -> DefaultHostModel: return DefaultHostModel( hostnames=self.HOSTNAMES, diff --git a/app_worker/ytdl_opts/per_host/instagram.py b/app_worker/ytdl_opts/per_host/instagram.py index 94c7b63..100a69f 100644 --- a/app_worker/ytdl_opts/per_host/instagram.py +++ b/app_worker/ytdl_opts/per_host/instagram.py @@ -1,3 +1,5 @@ +from pathlib import Path + from yt_shared.constants import INSTAGRAM_HOSTS from yt_shared.enums import DownMediaType @@ -22,7 +24,7 @@ class InstagramHost(AbstractHostConfig, metaclass=HostConfRegistry): FFMPEG_VIDEO_OPTS = 'ffmpeg -y -loglevel error -i "{filepath}" -c:v libx264 -pix_fmt yuv420p -preset veryfast -crf 22 -movflags +faststart -c:a copy "{output}"' def build_config( - self, media_type: DownMediaType, curr_tmp_dir: str + self, media_type: DownMediaType, curr_tmp_dir: Path ) -> InstagramHostModel: return InstagramHostModel( hostnames=self.HOSTNAMES, diff --git a/app_worker/ytdl_opts/per_host/tiktok.py b/app_worker/ytdl_opts/per_host/tiktok.py index c7fbf09..3f5d551 100644 --- a/app_worker/ytdl_opts/per_host/tiktok.py +++ b/app_worker/ytdl_opts/per_host/tiktok.py @@ -1,3 +1,5 @@ +from pathlib import Path + from yt_shared.constants import TIKTOK_HOSTS from yt_shared.enums import DownMediaType @@ -16,7 +18,7 @@ class TikTokHost(AbstractHostConfig, metaclass=HostConfRegistry): ENCODE_VIDEO = False def build_config( - self, media_type: DownMediaType, curr_tmp_dir: str + self, media_type: DownMediaType, curr_tmp_dir: Path ) -> TikTokHostModel: return TikTokHostModel( hostnames=self.HOSTNAMES, diff --git a/app_worker/ytdl_opts/per_host/twitter.py b/app_worker/ytdl_opts/per_host/twitter.py index e374d23..86718d3 100644 --- a/app_worker/ytdl_opts/per_host/twitter.py +++ b/app_worker/ytdl_opts/per_host/twitter.py @@ -1,3 +1,5 @@ +from pathlib import Path + from yt_shared.constants import TWITTER_HOSTS from yt_shared.enums import DownMediaType @@ -16,7 +18,7 @@ class TwitterHost(AbstractHostConfig, metaclass=HostConfRegistry): ENCODE_VIDEO = False def build_config( - self, media_type: DownMediaType, curr_tmp_dir: str + self, media_type: DownMediaType, curr_tmp_dir: Path ) -> TwitterHostModel: return TwitterHostModel( hostnames=self.HOSTNAMES, diff --git a/pyproject.toml b/pyproject.toml index f5a5858..bb787db 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -3,7 +3,7 @@ line-length = 88 indent-width = 4 target-version = "py312" src = ["app_api", "app_bot", "app_worker"] -required-version = ">=0.3.4" +required-version = ">=0.4.2" [tool.ruff.lint] select = ["F", "E", "W", "I001", "RET", "SLF001"] diff --git a/yt_shared/yt_shared/config.py b/yt_shared/yt_shared/config.py index 4027098..01ba824 100644 --- a/yt_shared/yt_shared/config.py +++ b/yt_shared/yt_shared/config.py @@ -1,17 +1,26 @@ import logging from typing import KeysView -from pydantic import field_validator +from pydantic import ( + ConfigDict, + DirectoryPath, + NewPath, + PositiveInt, + ValidationInfo, + field_validator, +) from pydantic_settings import BaseSettings class Settings(BaseSettings): + model_config = ConfigDict(extra='forbid', validate_default=True) + APPLICATION_NAME: str POSTGRES_USER: str POSTGRES_PASSWORD: str POSTGRES_HOST: str - POSTGRES_PORT: int + POSTGRES_PORT: PositiveInt POSTGRES_DB: str POSTGRES_TEST_DB: str = 'yt_test' @@ -25,14 +34,14 @@ class Settings(BaseSettings): RABBITMQ_USER: str RABBITMQ_PASSWORD: str RABBITMQ_HOST: str - RABBITMQ_PORT: int + RABBITMQ_PORT: PositiveInt @property def RABBITMQ_URI(self) -> str: return f'amqp://{self.RABBITMQ_USER}:{self.RABBITMQ_PASSWORD}@{self.RABBITMQ_HOST}:{self.RABBITMQ_PORT}/' - CONSUMER_NUMBER_OF_RETRY: int = 2 - RESEND_DELAY_MS: int = 60000 + CONSUMER_NUMBER_OF_RETRY: PositiveInt = 2 + RESEND_DELAY_MS: PositiveInt = 60000 LOG_LEVEL: str REDIS_HOST: str @@ -41,16 +50,16 @@ class Settings(BaseSettings): def REDIS_URL(self) -> str: return f'redis://{self.REDIS_HOST}' - TMP_DOWNLOAD_ROOT_PATH: str - TMP_DOWNLOAD_DIR: str - TMP_DOWNLOADED_DIR: str + TMP_DOWNLOAD_ROOT_PATH: DirectoryPath | NewPath + TMP_DOWNLOAD_DIR: DirectoryPath | NewPath + TMP_DOWNLOADED_DIR: DirectoryPath | NewPath @field_validator('LOG_LEVEL') @classmethod - def validate_log_level_value(cls, value: str) -> str: - valid_values: KeysView[str] = logging._nameToLevel.keys() # noqa + def validate_log_level_value(cls, value: str, info: ValidationInfo) -> str: + valid_values: KeysView[str] = logging._nameToLevel.keys() # noqa: SLF001 if value not in valid_values: - raise ValueError(f'"LOG_LEVEL" must be one of {valid_values}') + raise ValueError(f'"{info.field_name}" must be one of {valid_values}') return value diff --git a/yt_shared/yt_shared/repositories/task.py b/yt_shared/yt_shared/repositories/task.py index 805c27a..e33d788 100644 --- a/yt_shared/yt_shared/repositories/task.py +++ b/yt_shared/yt_shared/repositories/task.py @@ -12,24 +12,22 @@ from yt_shared.schemas.media import BaseMedia, InbMediaPayload, Video class TaskRepository: - def __init__(self) -> None: + def __init__(self, session: AsyncSession) -> None: self._log = logging.getLogger(self.__class__.__name__) + self._session = session - async def get_or_create_task( - self, db: AsyncSession, media_payload: InbMediaPayload - ) -> Task: + async def get_or_create_task(self, media_payload: InbMediaPayload) -> Task: if media_payload.id is None: - return await self._create_task(db, media_payload) + return await self._create_task(media_payload) stmt = select(Task).filter_by(id=media_payload.id) - task = await db.execute(stmt) + task = await self._session.execute(stmt) try: return task.scalar_one() except NoResultFound: - return await self._create_task(db, media_payload) + return await self._create_task(media_payload) - @staticmethod - async def _create_task(db: AsyncSession, media_payload: InbMediaPayload) -> Task: + async def _create_task(self, media_payload: InbMediaPayload) -> Task: task = Task( id=media_payload.id, url=media_payload.url, @@ -38,14 +36,11 @@ class TaskRepository: message_id=media_payload.message_id, added_at=media_payload.added_at, ) - db.add(task) - await db.commit() + self._session.add(task) + await self._session.commit() return task - @staticmethod - async def save_file_cache( - db: AsyncSession, file_id: str | UUID, cache: CacheSchema - ) -> None: + async def save_file_cache(self, file_id: str | UUID, cache: CacheSchema) -> None: stmt = insert(Cache).values( cache_id=cache.cache_id, cache_unique_id=cache.cache_unique_id, @@ -53,16 +48,13 @@ class TaskRepository: date_timestamp=cache.date_timestamp, file_id=file_id, ) - await db.execute(stmt) - await db.commit() + await self._session.execute(stmt) + await self._session.commit() - @staticmethod - async def save_file( - db: AsyncSession, task: Task, media: BaseMedia, meta: dict - ) -> File: + async def save_file(self, task: Task, media: BaseMedia, meta: dict) -> File: file = File( title=media.title, - name=media.filename, + name=media.current_filename, duration=media.duration, meta=meta, task_id=task.id, @@ -73,23 +65,20 @@ class TaskRepository: file.height = media.height file.thumb_name = media.thumb_name - db.add(file) + self._session.add(file) async with ASYNC_LOCK: - await db.flush([file]) + await self._session.flush([file]) return file - @staticmethod - async def save_as_done(db: AsyncSession, task: Task) -> None: + async def save_as_done(self, task: Task) -> None: task.status = TaskStatus.DONE - await db.commit() + await self._session.commit() - @staticmethod - async def save_as_processing(db: AsyncSession, task: Task) -> None: + async def save_as_processing(self, task: Task) -> None: task.status = TaskStatus.PROCESSING - await db.commit() + await self._session.commit() - @staticmethod - async def save_as_failed(db: AsyncSession, task: Task, error_message: str) -> None: + async def save_as_failed(self, task: Task, error_message: str) -> None: task.status = TaskStatus.FAILED task.error = error_message - await db.commit() + await self._session.commit() diff --git a/yt_shared/yt_shared/schemas/base.py b/yt_shared/yt_shared/schemas/base.py index 3a9a55c..dba945b 100644 --- a/yt_shared/yt_shared/schemas/base.py +++ b/yt_shared/yt_shared/schemas/base.py @@ -1,21 +1,41 @@ -import abc +from abc import ABC from pydantic import BaseModel, ConfigDict from yt_shared.enums import RabbitPayloadType -class RealBaseModel(BaseModel, abc.ABC): - """Base Pydantic model. All models should inherit from this.""" +class RealBaseModel(BaseModel, ABC): + """Base Pydantic model. All non-strict models should inherit from it.""" - model_config = ConfigDict(extra='forbid') + model_config = ConfigDict(extra='forbid', validate_default=True) -class BaseOrmModel(RealBaseModel, abc.ABC): - model_config = ConfigDict(from_attributes=True, **RealBaseModel.model_config) +class StrictRealBaseModel(RealBaseModel, ABC): + """Base Pydantic model. All strict models should inherit from it.""" + + model_config = ConfigDict(**RealBaseModel.model_config, strict=True) -class BaseRabbitPayloadModel(RealBaseModel, abc.ABC): +class BaseOrmModel(RealBaseModel, ABC): + model_config = ConfigDict(**RealBaseModel.model_config, from_attributes=True) + + +class StrictBaseOrmModel(BaseOrmModel, ABC): + model_config = ConfigDict(**BaseOrmModel.model_config, strict=True) + + +class StrictBaseConfigModel(StrictRealBaseModel, ABC): + model_config = ConfigDict(**StrictRealBaseModel.model_config, frozen=True) + + +class BaseRabbitPayloadModel(RealBaseModel, ABC): """Base RabbitMQ payload model. All RabbitMQ models should inherit from this.""" type: RabbitPayloadType + + +class StrictBaseRabbitPayloadModel(BaseRabbitPayloadModel, ABC): + """Base RabbitMQ payload model. All RabbitMQ models should inherit from this.""" + + model_config = ConfigDict(**BaseRabbitPayloadModel.model_config, strict=True) diff --git a/yt_shared/yt_shared/schemas/base_rabbit.py b/yt_shared/yt_shared/schemas/base_rabbit.py index e5b41c9..5e3210a 100644 --- a/yt_shared/yt_shared/schemas/base_rabbit.py +++ b/yt_shared/yt_shared/schemas/base_rabbit.py @@ -1,15 +1,13 @@ -import abc - -from pydantic import StrictInt +from abc import ABC from yt_shared.enums import TelegramChatType -from yt_shared.schemas.base import BaseRabbitPayloadModel +from yt_shared.schemas.base import StrictBaseRabbitPayloadModel from yt_shared.schemas.media import InbMediaPayload -class BaseRabbitDownloadPayload(BaseRabbitPayloadModel, abc.ABC): +class BaseRabbitDownloadPayload(StrictBaseRabbitPayloadModel, ABC): context: InbMediaPayload - from_chat_id: StrictInt | None + from_chat_id: int | None from_chat_type: TelegramChatType | None - from_user_id: StrictInt | None - message_id: StrictInt | None + from_user_id: int | None + message_id: int | None diff --git a/yt_shared/yt_shared/schemas/error.py b/yt_shared/yt_shared/schemas/error.py index 7fc201e..e9b21ff 100644 --- a/yt_shared/yt_shared/schemas/error.py +++ b/yt_shared/yt_shared/schemas/error.py @@ -1,20 +1,18 @@ import uuid from typing import Literal -from pydantic import StrictStr - from yt_shared.enums import RabbitPayloadType from yt_shared.schemas.base_rabbit import BaseRabbitDownloadPayload class ErrorDownloadGeneralPayload(BaseRabbitDownloadPayload): type: Literal[RabbitPayloadType.GENERAL_ERROR] = RabbitPayloadType.GENERAL_ERROR - task_id: uuid.UUID | StrictStr | None - message: StrictStr - url: StrictStr - exception_msg: StrictStr - exception_type: StrictStr - yt_dlp_version: StrictStr | None + task_id: uuid.UUID | str | None + message: str + url: str + exception_msg: str + exception_type: str + yt_dlp_version: str | None class ErrorDownloadPayload(ErrorDownloadGeneralPayload): diff --git a/yt_shared/yt_shared/schemas/media.py b/yt_shared/yt_shared/schemas/media.py index b8090d2..02fccf7 100644 --- a/yt_shared/yt_shared/schemas/media.py +++ b/yt_shared/yt_shared/schemas/media.py @@ -1,58 +1,64 @@ -import abc import uuid +from abc import ABC from datetime import datetime, timezone +from pathlib import Path from typing import Literal from pydantic import ( + ConfigDict, + DirectoryPath, Field, - StrictBool, - StrictFloat, - StrictInt, - StrictStr, + FilePath, model_validator, ) +from typing_extensions import Annotated, Self from yt_shared.enums import DownMediaType, MediaFileType, TaskSource, TelegramChatType -from yt_shared.schemas.base import RealBaseModel +from yt_shared.schemas.base import StrictRealBaseModel from yt_shared.utils.common import format_bytes from yt_shared.utils.file import file_size -class InbMediaPayload(RealBaseModel): +class InbMediaPayload(StrictRealBaseModel): """RabbitMQ inbound media payload from Telegram Bot or API service.""" + model_config = ConfigDict(**StrictRealBaseModel.model_config, frozen=True) + id: uuid.UUID | None = None - from_chat_id: StrictInt | None + from_chat_id: int | None from_chat_type: TelegramChatType | None - from_user_id: StrictInt | None - message_id: StrictInt | None - ack_message_id: StrictInt | None - url: StrictStr - original_url: StrictStr + from_user_id: int | None + message_id: int | None + ack_message_id: int | None + url: str + original_url: str source: TaskSource - save_to_storage: StrictBool + save_to_storage: bool download_media_type: DownMediaType + custom_filename: str | None + automatic_extension: bool added_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) -class BaseMedia(RealBaseModel, abc.ABC): +class BaseMedia(StrictRealBaseModel, ABC): """Model representing abstract downloaded media with common fields.""" file_type: MediaFileType - title: StrictStr - filename: StrictStr - filepath: StrictStr - file_size: StrictInt - duration: StrictFloat | None = None + title: str + original_filename: str + directory_path: Annotated[DirectoryPath, Field(strict=False)] + file_size: int + duration: float | None = None orm_file_id: uuid.UUID | None = None - saved_to_storage: StrictBool = False - storage_path: StrictStr | None = None + saved_to_storage: bool = False + storage_path: Annotated[Path, Field(strict=False)] | None = None - is_converted: StrictBool = False - converted_filepath: StrictStr | None = None - converted_filename: StrictStr | None = None - converted_file_size: StrictInt | None = None + is_converted: bool = False + converted_filename: str | None = None + converted_file_size: int | None = None + + custom_filename: str | None = None def file_size_human(self) -> str: return format_bytes(num=self.current_file_size()) @@ -62,13 +68,30 @@ class BaseMedia(RealBaseModel, abc.ABC): return self.converted_file_size return self.file_size - def mark_as_saved_to_storage(self, storage_path: str) -> None: + @property + def current_filename(self) -> str: + if self.custom_filename: + return self.custom_filename + if self.is_converted: + return self.converted_filename + return self.original_filename + + @property + def current_filepath(self) -> Path: + if self.custom_filename: + filename = self.custom_filename + elif self.is_converted: + filename = self.converted_filename + else: + filename = self.original_filename + return self.directory_path / filename + + def mark_as_saved_to_storage(self, storage_path: Path) -> None: self.storage_path = storage_path self.saved_to_storage = True - def mark_as_converted(self, filepath: str) -> None: - self.converted_filepath = filepath - self.converted_filename = filepath.rsplit('/', 1)[-1] + def mark_as_converted(self, filepath: Path) -> None: + self.converted_filename = filepath.name self.converted_file_size = file_size(filepath) self.is_converted = True @@ -83,35 +106,33 @@ class Video(BaseMedia): """Model representing downloaded video file with separate thumbnail.""" file_type: Literal[MediaFileType.VIDEO] = MediaFileType.VIDEO - thumb_name: StrictStr | None = None + thumb_name: str | None = None width: int | float | None = None height: int | float | None = None - thumb_path: StrictStr | None = None + thumb_path: Annotated[FilePath, Field(strict=False)] | None = None - @model_validator(mode='before') - @classmethod - def _set_fields(cls, values: dict) -> dict: - if not values['thumb_name']: - values['thumb_name'] = f'{values["filename"]}-thumb.jpg' - return values + @model_validator(mode='after') + def set_thumb_name(self) -> Self: + if not self.thumb_name: + self.thumb_name = f'{self.current_filename}-thumb.jpg' + return self -class DownMedia(RealBaseModel): +class DownMedia(StrictRealBaseModel): """Downloaded media (audio, video with muxed audio or both) object context.""" audio: Audio | None video: Video | None - media_type: DownMediaType - root_path: StrictStr + media_type: Annotated[DownMediaType, Field(strict=False)] + root_path: Annotated[DirectoryPath, Field(strict=False)] meta: dict - @model_validator(mode='before') - @classmethod - def _validate(cls, values: dict) -> dict: - if values['audio'] is None and values['video'] is None: + @model_validator(mode='after') + def validate_media(self) -> Self: + if not (self.audio or self.video): raise ValueError('Provide audio, video or both.') - return values + return self def get_media_objects(self) -> tuple[Audio | Video, ...]: return tuple(filter(None, (self.audio, self.video))) diff --git a/yt_shared/yt_shared/schemas/url.py b/yt_shared/yt_shared/schemas/url.py index 52bd627..47c140b 100644 --- a/yt_shared/yt_shared/schemas/url.py +++ b/yt_shared/yt_shared/schemas/url.py @@ -1,16 +1,18 @@ -from pydantic import StrictBool, StrictInt, StrictStr +from pydantic import ConfigDict from yt_shared.enums import DownMediaType, TelegramChatType -from yt_shared.schemas.base import RealBaseModel +from yt_shared.schemas.base import StrictRealBaseModel -class URL(RealBaseModel): - url: StrictStr - original_url: StrictStr - from_chat_id: StrictInt +class URL(StrictRealBaseModel): + model_config = ConfigDict(**StrictRealBaseModel.model_config, frozen=True) + + url: str + original_url: str + from_chat_id: int from_chat_type: TelegramChatType - from_user_id: StrictInt | None - message_id: StrictInt - ack_message_id: StrictInt - save_to_storage: StrictBool + from_user_id: int | None + message_id: int + ack_message_id: int + save_to_storage: bool download_media_type: DownMediaType diff --git a/yt_shared/yt_shared/schemas/ytdlp.py b/yt_shared/yt_shared/schemas/ytdlp.py index 5177971..1672fb9 100644 --- a/yt_shared/yt_shared/schemas/ytdlp.py +++ b/yt_shared/yt_shared/schemas/ytdlp.py @@ -1,35 +1,32 @@ import datetime -from pydantic import Field, StrictStr, field_validator +from pydantic import Field, field_validator -from yt_shared.schemas.base import BaseOrmModel, RealBaseModel +from yt_shared.schemas.base import StrictBaseOrmModel, StrictRealBaseModel +from yt_shared.utils.common import remove_microseconds -def _remove_microseconds(dt_obj: datetime.datetime) -> datetime.datetime: - return dt_obj.replace(microsecond=0) - - -class LatestVersion(RealBaseModel): - version: StrictStr +class LatestVersion(StrictRealBaseModel): + version: str retrieved_at: datetime.datetime - @field_validator('retrieved_at', mode='before') + @field_validator('retrieved_at', mode='after') @classmethod def remove_microseconds(cls, value: datetime.datetime) -> datetime.datetime: - return _remove_microseconds(value) + return remove_microseconds(value) -class CurrentVersion(BaseOrmModel): - version: StrictStr = Field(..., alias='current_version') +class CurrentVersion(StrictBaseOrmModel): + version: str = Field(..., alias='current_version') updated_at: datetime.datetime - @field_validator('updated_at', mode='before') + @field_validator('updated_at', mode='after') @classmethod def remove_microseconds(cls, value: datetime.datetime) -> datetime.datetime: - return _remove_microseconds(value) + return remove_microseconds(value) -class VersionContext(RealBaseModel): +class VersionContext(StrictRealBaseModel): latest: LatestVersion current: CurrentVersion diff --git a/yt_shared/yt_shared/utils/common.py b/yt_shared/yt_shared/utils/common.py index 14f688f..18598f7 100644 --- a/yt_shared/yt_shared/utils/common.py +++ b/yt_shared/yt_shared/utils/common.py @@ -1,9 +1,10 @@ import asyncio import atexit +import datetime import random import signal from functools import partial, wraps -from string import ascii_lowercase +from string import ascii_lowercase, digits from typing import Any, Callable _UNIT_SIZE_NAMES = ('', 'Ki', 'Mi', 'Gi', 'Ti', 'Pi', 'Ei', 'Zi') @@ -51,11 +52,20 @@ def wrap(func): return run -def random_string(number: int) -> str: - return ''.join(random.choice(ascii_lowercase) for _ in range(number)) +def gen_random_str(length: int = 4, use_digits: bool = False) -> str: + if use_digits: + choices = ascii_lowercase + digits + else: + choices = ascii_lowercase + + return ''.join(random.SystemRandom().choice(choices) for _ in range(length)) def register_shutdown(callback: Callable) -> None: atexit.register(callback) signal.signal(signal.SIGTERM, callback) signal.signal(signal.SIGINT, callback) + + +def remove_microseconds(dt_: datetime.datetime) -> datetime.datetime: + return dt_.replace(microsecond=0) diff --git a/yt_shared/yt_shared/utils/file.py b/yt_shared/yt_shared/utils/file.py index 88aef9b..de7b866 100644 --- a/yt_shared/yt_shared/utils/file.py +++ b/yt_shared/yt_shared/utils/file.py @@ -1,33 +1,30 @@ import logging -import os import shutil +from pathlib import Path from typing import Iterable from yt_shared.utils.common import format_bytes -def file_cleanup(file_paths: Iterable[str], log: logging.Logger = None) -> None: +def file_cleanup(file_paths: Iterable[Path], log: logging.Logger = None) -> None: log = log or logging.getLogger() log.debug('Performing cleanup of %s', file_paths) for file_path in file_paths: - if os.path.exists(file_path): - try: - os.remove(file_path) - except Exception as err: - log.warning('File "%s" not deleted: %s', file_path, err) + if file_path.is_file(): + file_path.unlink() -def remove_dir(dir_path: str) -> None: +def remove_dir(dir_path: Path) -> None: shutil.rmtree(dir_path) -def file_size(filepath: str) -> int: +def file_size(filepath: Path) -> int: """Return file size in bytes.""" - return os.path.getsize(filepath) + return filepath.stat().st_size -def list_files(path: str) -> dict[str, str]: +def list_files_human(path: Path) -> dict[Path, str]: return { - filename: format_bytes(file_size(os.path.join(path, filename))) - for filename in os.listdir(path) + filename: format_bytes(file_size(path / filename)) + for filename in path.iterdir() } diff --git a/yt_shared/yt_shared/utils/tasks/abstract.py b/yt_shared/yt_shared/utils/tasks/abstract.py index 3ce825c..d154220 100644 --- a/yt_shared/yt_shared/utils/tasks/abstract.py +++ b/yt_shared/yt_shared/utils/tasks/abstract.py @@ -1,12 +1,12 @@ -import abc import logging +from abc import ABC, abstractmethod -class AbstractTask(abc.ABC): +class AbstractTask(ABC): def __init__(self) -> None: self._log = logging.getLogger(self.__class__.__name__) - @abc.abstractmethod + @abstractmethod async def run(self) -> None: """Main entry point.""" pass