Version 1.6

This commit is contained in:
Taras Terletsky 2024-04-25 23:09:50 +03:00
parent e947c4ead0
commit 8d0d6d2b70
40 changed files with 480 additions and 415 deletions

View file

@ -2,7 +2,7 @@
Simple and reliable self-hosted Video Download Telegram Bot.
Version: 1.5. [Release details](RELEASES.md).
Version: 1.6. [Release details](RELEASES.md).
![frames](.assets/download_success.png)
@ -173,7 +173,9 @@ documentations lives at `http://127.0.0.1:1984/docs`.
{
"url": "https://www.youtube.com/watch?v=PavYAOpVpJI",
"download_media_type": "AUDIO_VIDEO",
"save_to_storage": false
"save_to_storage": false,
"custom_filename": "cool.mp4",
"automatic_extension": false
}
```
Response

View file

@ -1,3 +1,31 @@
## Release 1.6
Release date: April 25, 2024
## New Features
- Ability to set custom video name in `POST` API request:
```json
{
"url": "<VIDEO_URL>",
"download_media_type": "VIDEO",
"save_to_storage": true,
"custom_filename": "your cool custom name and extension.mp4",
"automatic_extension": false
}
```
- Saving into storage with the same name won't overwrite the file but append timestamp to the filename.
## Important
N/A
## Misc
N/A
---
## Release 1.5
Release date: March 20, 2024

View file

@ -1,9 +1,10 @@
import uuid
from datetime import datetime
from pydantic import StrictBool, StrictFloat, StrictInt, StrictStr
from pydantic import Field, StrictFloat, StrictInt, StrictStr
from typing_extensions import Annotated
from yt_shared.enums import DownMediaType, TaskSource, TaskStatus
from yt_shared.schemas.base import BaseOrmModel, RealBaseModel
from yt_shared.schemas.base import BaseOrmModel, StrictRealBaseModel
class CacheSchema(BaseOrmModel):
@ -52,15 +53,17 @@ class TaskSchema(TaskSimpleSchema):
files: list[FileSchema]
class CreateTaskIn(RealBaseModel):
url: StrictStr
download_media_type: DownMediaType
save_to_storage: StrictBool
class CreateTaskIn(StrictRealBaseModel):
url: str = ...
download_media_type: Annotated[DownMediaType, Field(strict=False)] = ...
save_to_storage: bool = ...
custom_filename: str = ...
automatic_extension: bool = ...
class CreateTaskOut(RealBaseModel):
class CreateTaskOut(StrictRealBaseModel):
id: uuid.UUID
url: StrictStr
url: str
source: TaskSource
added_at: datetime

View file

@ -1,6 +1,7 @@
from pydantic import StrictStr
from yt_shared.schemas.base import RealBaseModel
from typing import Literal
from yt_shared.schemas.base import StrictRealBaseModel
class HealthcheckSchema(RealBaseModel):
status: StrictStr = 'OK'
class HealthcheckSchema(StrictRealBaseModel):
status: Literal['OK'] = 'OK'

View file

@ -51,7 +51,7 @@ class TaskService:
status: list[TaskStatus] | None = None,
limit: int = 100,
offset: int = 0,
) -> list[Task]:
) -> list[TaskSimpleSchema | TaskSchema]:
schema = self._get_schema(include_meta)
tasks = await self._repository.get_all_tasks(
include_meta, status, limit, offset
@ -79,6 +79,8 @@ class TaskService:
from_user_id=None,
message_id=None,
ack_message_id=None,
custom_filename=task.custom_filename,
automatic_extension=task.automatic_extension,
)
if not await publisher.send_for_download(payload):
raise TaskServiceError('Failed to create task')

View file

@ -1,5 +1,5 @@
import abc
import logging
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING
from yt_shared.enums import TaskSource, TelegramChatType
@ -11,7 +11,7 @@ if TYPE_CHECKING:
from bot.bot import VideoBotClient
class AbstractDownloadHandler(abc.ABC):
class AbstractDownloadHandler(ABC):
def __init__(
self,
body: BaseRabbitDownloadPayload,
@ -22,7 +22,7 @@ class AbstractDownloadHandler(abc.ABC):
self._bot = bot
self._receiving_users = self._get_receiving_users()
@abc.abstractmethod
@abstractmethod
async def handle(self) -> None:
pass

View file

@ -1,5 +1,4 @@
import asyncio
import os
import traceback
from pyrogram.enums import ParseMode
@ -10,7 +9,7 @@ from yt_shared.rabbit.publisher import RmqPublisher
from yt_shared.schemas.error import ErrorDownloadGeneralPayload
from yt_shared.schemas.media import BaseMedia
from yt_shared.schemas.success import SuccessDownloadPayload
from yt_shared.utils.file import list_files, remove_dir
from yt_shared.utils.file import list_files_human, remove_dir
from yt_shared.utils.tasks.tasks import create_task
from bot.core.handlers.abstract import AbstractDownloadHandler
@ -101,10 +100,10 @@ class SuccessDownloadHandler(AbstractDownloadHandler):
else:
self._log.warning(
'File "%s" will not be uploaded due to upload configuration',
media_object.filepath,
media_object.current_filepath,
)
except Exception as err:
self._log.exception('Upload of "%s" failed', media_object.filepath)
self._log.exception('Upload of "%s" failed', media_object.current_filepath)
await self._publish_error_message(err)
def _cleanup(self) -> None:
@ -113,7 +112,7 @@ class SuccessDownloadHandler(AbstractDownloadHandler):
'Cleaning up task "%s": removing download content directory "%s" with files %s',
self._body.task_id,
root_path,
list_files(root_path),
list_files_human(root_path),
)
remove_dir(root_path)
@ -138,7 +137,7 @@ class SuccessDownloadHandler(AbstractDownloadHandler):
@staticmethod
def _create_success_text(media_object: BaseMedia) -> str:
text = f'{SUCCESS_EMOJI} {bold("Downloaded")} {media_object.filename}'
text = f'{SUCCESS_EMOJI} {bold("Downloaded")} {media_object.current_filename}'
if media_object.saved_to_storage:
text = f'{text}\n💾 {bold("Saved to media storage")}'
return f'{text}\n📏 {bold("Size")} {media_object.file_size_human()}'
@ -171,8 +170,10 @@ class SuccessDownloadHandler(AbstractDownloadHandler):
user = self._bot.allowed_users[self._get_sender_id()]
max_file_size = user.upload.upload_video_max_file_size
if not os.path.exists(media_obj.filepath):
raise ValueError(f'{media_obj.file_type} {media_obj.filepath} not found')
if not media_obj.current_filepath.exists():
raise ValueError(
f'{media_obj.file_type} {media_obj.current_filepath} not found'
)
_file_size = media_obj.current_file_size()
if _file_size > max_file_size:

View file

@ -1,58 +1,52 @@
import abc
from abc import ABC
from pydantic import (
StrictBool,
StrictInt,
StrictStr,
StringConstraints,
field_validator,
)
from pydantic import Field, PositiveInt, StringConstraints, field_validator
from typing_extensions import Annotated
from yt_shared.enums import DownMediaType
from yt_shared.schemas.base import RealBaseModel
from yt_shared.schemas.base import StrictBaseConfigModel
_LANG_CODE_LEN = 2
_LANG_CODE_REGEX = rf'^[a-z]{{{_LANG_CODE_LEN}}}$'
class _BaseUserSchema(RealBaseModel, abc.ABC):
id: StrictInt
class _BaseUserSchema(StrictBaseConfigModel, ABC):
id: int
class AnonymousUserSchema(_BaseUserSchema):
pass
class VideoCaptionSchema(RealBaseModel):
include_title: StrictBool
include_filename: StrictBool
include_link: StrictBool
include_size: StrictBool
class VideoCaptionSchema(StrictBaseConfigModel):
include_title: bool
include_filename: bool
include_link: bool
include_size: bool
class UploadSchema(RealBaseModel):
upload_video_file: StrictBool
upload_video_max_file_size: StrictInt
forward_to_group: StrictBool
forward_group_id: StrictInt | None
silent: StrictBool
class UploadSchema(StrictBaseConfigModel):
upload_video_file: bool
upload_video_max_file_size: PositiveInt
forward_to_group: bool
forward_group_id: int | None
silent: bool
video_caption: VideoCaptionSchema
class UserSchema(_BaseUserSchema):
is_admin: StrictBool
send_startup_message: StrictBool
download_media_type: DownMediaType
save_to_storage: StrictBool
use_url_regex_match: StrictBool
is_admin: bool
send_startup_message: bool
download_media_type: Annotated[DownMediaType, Field(strict=False)]
save_to_storage: bool
use_url_regex_match: bool
upload: UploadSchema
class ApiSchema(RealBaseModel):
upload_video_file: StrictBool
upload_video_max_file_size: StrictInt
class ApiSchema(StrictBaseConfigModel):
upload_video_file: bool
upload_video_max_file_size: PositiveInt
upload_to_chat_ids: list[AnonymousUserSchema]
silent: StrictBool
silent: bool
video_caption: VideoCaptionSchema
@field_validator('upload_to_chat_ids', mode='before')
@ -61,25 +55,25 @@ class ApiSchema(RealBaseModel):
return [AnonymousUserSchema(id=id_) for id_ in values]
class TelegramSchema(RealBaseModel):
api_id: StrictInt
api_hash: StrictStr
token: StrictStr
class TelegramSchema(StrictBaseConfigModel):
api_id: int
api_hash: str
token: str
lang_code: Annotated[
str, StringConstraints(pattern=_LANG_CODE_REGEX, to_lower=True)
]
max_upload_tasks: StrictInt
max_upload_tasks: PositiveInt
url_validation_regexes: list[str]
allowed_users: list[UserSchema]
api: ApiSchema
class YtdlpSchema(RealBaseModel):
version_check_enabled: StrictBool
version_check_interval: StrictInt
notify_users_on_new_version: StrictBool
class YtdlpSchema(StrictBaseConfigModel):
version_check_enabled: bool
version_check_interval: PositiveInt
notify_users_on_new_version: bool
class ConfigSchema(RealBaseModel):
class ConfigSchema(StrictBaseConfigModel):
telegram: TelegramSchema
ytdlp: YtdlpSchema

View file

@ -35,6 +35,8 @@ class UrlService:
source=TaskSource.BOT,
save_to_storage=url.save_to_storage,
download_media_type=url.download_media_type,
custom_filename=None,
automatic_extension=False,
)
is_sent = await self._rmq_publisher.send_for_download(payload)
if not is_sent:

View file

@ -1,9 +1,9 @@
import abc
import asyncio
from abc import ABC, abstractmethod
from itertools import chain
from typing import TYPE_CHECKING, Coroutine
from pydantic import StrictBool, StrictFloat, StrictInt, StrictStr
from pydantic import ConfigDict, FilePath
from pyrogram.enums import ChatAction, MessageMediaType, ParseMode
from pyrogram.types import Animation, Message
from pyrogram.types import Audio as _Audio
@ -27,25 +27,26 @@ if TYPE_CHECKING:
class BaseUploadContext(RealBaseModel):
caption: StrictStr
filename: StrictStr
filepath: StrictStr
duration: StrictFloat
model_config = ConfigDict(**RealBaseModel.model_config, strict=True)
caption: str
filename: str
filepath: FilePath
duration: float
type: MessageMediaType
is_cached: StrictBool = False
is_cached: bool = False
class VideoUploadContext(BaseUploadContext):
height: StrictInt | StrictFloat
width: StrictInt | StrictFloat
thumb: StrictStr | None = None
height: int | float
width: int | float
thumb: FilePath | None = None
class AudioUploadContext(BaseUploadContext):
pass
class AbstractUploadTask(AbstractTask, abc.ABC):
class AbstractUploadTask(AbstractTask, ABC):
_UPLOAD_ACTION: ChatAction
def __init__(
@ -60,12 +61,8 @@ class AbstractUploadTask(AbstractTask, abc.ABC):
self._config = get_main_config()
self._media_object = media_object
if media_object.is_converted:
self._filename = media_object.converted_filename
self._filepath = media_object.converted_filepath
else:
self._filename = media_object.filename
self._filepath = media_object.filepath
self._filename = media_object.current_filename
self._filepath = media_object.current_filepath
self._bot = bot
self._users = users
@ -89,7 +86,7 @@ class AbstractUploadTask(AbstractTask, abc.ABC):
self._log.exception('Exception in upload task for "%s"', self._filename)
raise
@abc.abstractmethod
@abstractmethod
def _generate_caption_items(self) -> list[str]:
pass
@ -127,11 +124,11 @@ class AbstractUploadTask(AbstractTask, abc.ABC):
self._log.debug('Uploading to "%d" with context: %s', chat_id, self._media_ctx)
return await self._generate_send_media_coroutine(chat_id)
@abc.abstractmethod
@abstractmethod
def _generate_send_media_coroutine(self, chat_id: int) -> Coroutine:
pass
@abc.abstractmethod
@abstractmethod
def _create_media_context(self) -> AudioUploadContext | VideoUploadContext:
pass
@ -170,7 +167,7 @@ class AbstractUploadTask(AbstractTask, abc.ABC):
exception_message_args=(db_cache_task_name,),
)
@abc.abstractmethod
@abstractmethod
def _cache_data(self, message: Message) -> None:
pass

View file

@ -1,12 +1,9 @@
"""Utils module."""
import asyncio
import random
import string
from datetime import datetime
from typing import Generator, Iterable
from urllib.parse import urlparse
from uuid import uuid4
from pyrogram.enums import ChatType
from pyrogram.types import Message
@ -19,17 +16,6 @@ async def shallow_sleep_async(sleep_time: float = 0.1) -> None:
await asyncio.sleep(sleep_time)
def gen_uuid() -> str:
return uuid4().hex
def gen_random_str(length=4) -> str:
return ''.join(
random.SystemRandom().choice(string.ascii_lowercase + string.digits)
for _ in range(length)
)
def format_ts(ts: float, time_format: str = '%a %b %d %H:%M:%S %Y') -> str:
return datetime.fromtimestamp(ts).strftime(time_format)

View file

@ -1,7 +1,6 @@
"""RabbitMQ Queue abstract worker module."""
import abc
import enum
from abc import abstractmethod
from typing import TYPE_CHECKING, Type
from aio_pika import IncomingMessage
@ -11,16 +10,12 @@ from yt_shared.utils.tasks.abstract import AbstractTask
from bot.core.config.config import get_main_config
from bot.core.exceptions import InvalidBodyError
from bot.core.workers.enums import RabbitWorkerType
if TYPE_CHECKING:
from bot.bot.client import VideoBotClient
class RabbitWorkerType(enum.Enum):
ERROR = 'ERROR'
SUCCESS = 'SUCCESS'
class AbstractDownloadResultWorker(AbstractTask):
TYPE: RabbitWorkerType | None = None
QUEUE_TYPE: str | None = None
@ -36,7 +31,7 @@ class AbstractDownloadResultWorker(AbstractTask):
async def run(self) -> None:
await self._watch_queue()
@abc.abstractmethod
@abstractmethod
async def _process_body(self, body: BaseModel) -> bool:
pass
@ -48,7 +43,8 @@ class AbstractDownloadResultWorker(AbstractTask):
await self._process_message(message)
except Exception:
self._log.exception('Failed to process message %s', message.body)
await message.nack(requeue=False)
if not message.processed:
await message.nack(requeue=False)
async def _process_message(self, message: IncomingMessage) -> None:
self._log.debug('[x] Received message %s', message.body)
@ -64,7 +60,7 @@ class AbstractDownloadResultWorker(AbstractTask):
# Skip unmatched schema class that failed to parse the body.
pass
else:
self._log.error('Failed to decode message body: %s', message.body)
self._log.error('Failed to decode message body')
await self._reject_invalid_body(message)
raise InvalidBodyError

View file

@ -0,0 +1,6 @@
from enum import StrEnum
class RabbitWorkerType(StrEnum):
ERROR = 'ERROR'
SUCCESS = 'SUCCESS'

View file

@ -1 +1 @@
__version__ = '1.5'
__version__ = '1.6'

View file

@ -1,10 +1,11 @@
from pydantic import DirectoryPath
from yt_shared.config import Settings
class WorkerSettings(Settings):
APPLICATION_NAME: str
MAX_SIMULTANEOUS_DOWNLOADS: int
STORAGE_PATH: str
STORAGE_PATH: DirectoryPath
THUMBNAIL_FRAME_SECOND: float
INSTAGRAM_ENCODE_TO_H264: bool

View file

@ -1,15 +1,15 @@
import glob
import logging
import os
import shutil
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Callable
import yt_dlp
from yt_shared.enums import DownMediaType
from yt_shared.schemas.media import Audio, DownMedia, Video
from yt_shared.utils.common import format_bytes, random_string
from yt_shared.utils.file import file_size, list_files, remove_dir
from yt_shared.schemas.media import Audio, DownMedia, InbMediaPayload, Video
from yt_shared.utils.common import format_bytes, gen_random_str
from yt_shared.utils.file import file_size, list_files_human, remove_dir
from worker.core.config import settings
from worker.core.exceptions import MediaDownloaderError
@ -33,29 +33,28 @@ class MediaDownloader:
def __init__(self) -> None:
self._log = logging.getLogger(self.__class__.__name__)
self._tmp_downloaded_dest_dir = os.path.join(
settings.TMP_DOWNLOAD_ROOT_PATH, settings.TMP_DOWNLOADED_DIR
self._tmp_downloaded_dest_dir = (
settings.TMP_DOWNLOAD_ROOT_PATH / settings.TMP_DOWNLOADED_DIR
)
def download(
self, host_conf: AbstractHostConfig, media_type: DownMediaType
self, host_conf: AbstractHostConfig, media_payload: InbMediaPayload
) -> DownMedia:
try:
return self._download(host_conf=host_conf, media_type=media_type)
return self._download(host_conf=host_conf, media_payload=media_payload)
except Exception:
self._log.error('Failed to download %s', host_conf.url)
raise
def _download(
self, host_conf: AbstractHostConfig, media_type: DownMediaType
self, host_conf: AbstractHostConfig, media_payload: InbMediaPayload
) -> DownMedia:
media_type = media_payload.download_media_type
url = host_conf.url
self._log.info('Downloading %s, media_type %s', url, media_type)
tmp_down_path = os.path.join(
settings.TMP_DOWNLOAD_ROOT_PATH, settings.TMP_DOWNLOAD_DIR
)
tmp_down_path = settings.TMP_DOWNLOAD_ROOT_PATH / settings.TMP_DOWNLOAD_DIR
with TemporaryDirectory(prefix='tmp_media_dir-', dir=tmp_down_path) as tmp_dir:
curr_tmp_dir = os.path.join(tmp_down_path, tmp_dir)
curr_tmp_dir = tmp_down_path / tmp_dir
ytdl_opts_model = host_conf.build_config(
media_type=media_type, curr_tmp_dir=curr_tmp_dir
@ -72,7 +71,7 @@ class MediaDownloader:
self._log.error('%s. Meta: %s', err_msg, meta)
raise MediaDownloaderError(err_msg)
current_files = os.listdir(curr_tmp_dir)
current_files = list(curr_tmp_dir.iterdir())
if not current_files:
err_msg = 'Nothing downloaded. Is URL valid?'
self._log.error(err_msg)
@ -83,25 +82,25 @@ class MediaDownloader:
self._log.info('Finished downloading %s', url)
self._log.debug('Downloaded "%s" meta: %s', url, meta_sanitized)
self._log.info(
'Content of "%s": %s', curr_tmp_dir, list_files(curr_tmp_dir)
'Content of "%s": %s', curr_tmp_dir, list_files_human(curr_tmp_dir)
)
destination_dir = os.path.join(
self._tmp_downloaded_dest_dir,
random_string(number=self._DESTINATION_TMP_DIR_NAME_LEN),
destination_dir = self._tmp_downloaded_dest_dir / gen_random_str(
length=self._DESTINATION_TMP_DIR_NAME_LEN
)
os.mkdir(destination_dir)
destination_dir.mkdir()
audio, video = self._create_media_dtos(
media_type=media_type,
meta=meta,
curr_tmp_dir=curr_tmp_dir,
destination_dir=destination_dir,
custom_video_filename=media_payload.custom_filename,
)
self._log.info(
'Removing temporary download directory "%s" with leftover files %s',
curr_tmp_dir,
os.listdir(curr_tmp_dir),
list_files_human(curr_tmp_dir),
)
return DownMedia(
@ -118,6 +117,7 @@ class MediaDownloader:
meta: dict,
curr_tmp_dir: str,
destination_dir: str,
custom_video_filename: str | None = None,
) -> tuple[Audio | None, Video | None]:
def get_audio() -> Audio:
return create_dto(self._create_audio_dto)
@ -126,14 +126,10 @@ class MediaDownloader:
return create_dto(self._create_video_dto)
def create_dto(
func: Callable[[dict, str, str], Audio | Video],
func: Callable[[dict, str, str, str | None], Audio | Video],
) -> Audio | Video:
try:
return func(
meta,
curr_tmp_dir,
destination_dir,
)
return func(meta, curr_tmp_dir, destination_dir, custom_video_filename)
except Exception:
remove_dir(destination_dir)
raise
@ -151,35 +147,41 @@ class MediaDownloader:
def _create_video_dto(
self,
meta: dict,
curr_tmp_dir: str,
destination_dir: str,
curr_tmp_dir: Path,
destination_dir: Path,
custom_video_filename: str | None = None,
) -> Video:
video_filename = self._get_video_filename(meta)
video_filepath = os.path.join(curr_tmp_dir, video_filename)
video_filepath = curr_tmp_dir / video_filename
self._log.info('Moving "%s" to "%s"', video_filepath, destination_dir)
shutil.move(video_filepath, destination_dir)
if custom_video_filename:
dest_path = destination_dir / custom_video_filename
else:
dest_path = destination_dir / video_filename
thumb_path: str | None = None
self._log.info('Moving "%s" to "%s"', video_filepath, dest_path)
shutil.move(video_filepath, dest_path)
thumb_path: Path | None = None
thumb_name = self._find_downloaded_file(
root_path=curr_tmp_dir,
extension=FINAL_THUMBNAIL_FORMAT,
)
if thumb_name:
_thumb_path = os.path.join(curr_tmp_dir, thumb_name)
_thumb_path = curr_tmp_dir / thumb_name
shutil.move(_thumb_path, destination_dir)
thumb_path = os.path.join(destination_dir, thumb_name)
thumb_path = destination_dir / thumb_name
duration, width, height = self._get_video_context(meta)
filepath = os.path.join(destination_dir, video_filename)
return Video(
title=meta['title'],
filename=video_filename,
original_filename=video_filename,
custom_filename=custom_video_filename,
duration=duration,
width=width,
height=height,
filepath=filepath,
file_size=file_size(filepath),
directory_path=destination_dir,
file_size=file_size(dest_path),
thumb_path=thumb_path,
thumb_name=thumb_name,
)
@ -187,26 +189,26 @@ class MediaDownloader:
def _create_audio_dto(
self,
meta: dict,
curr_tmp_dir: str,
destination_dir: str,
curr_tmp_dir: Path,
destination_dir: Path,
custom_video_filename: str | None = None, # TODO: Make for audio.
) -> Audio:
audio_filename = self._find_downloaded_file(
root_path=curr_tmp_dir,
extension=FINAL_AUDIO_FORMAT,
)
audio_filepath = os.path.join(curr_tmp_dir, audio_filename)
audio_filepath = curr_tmp_dir / audio_filename
self._log.info('Moving "%s" to "%s"', audio_filepath, destination_dir)
shutil.move(audio_filepath, destination_dir)
filepath = os.path.join(destination_dir, audio_filename)
return Audio(
title=meta['title'],
filename=audio_filename,
original_filename=audio_filename,
duration=None,
filepath=filepath,
file_size=file_size(filepath),
directory_path=destination_dir,
file_size=file_size(destination_dir / audio_filename),
)
def _find_downloaded_file(self, root_path: str, extension: str) -> str | None:
def _find_downloaded_file(self, root_path: Path, extension: str) -> str | None:
"""Try to find downloaded audio or thumbnail file."""
verbose_name = self._EXT_TO_NAME[extension]
for file_name in glob.glob(f'*.{extension}', root_dir=root_path):
@ -214,7 +216,7 @@ class MediaDownloader:
'Found downloaded %s: "%s" [%s]',
verbose_name,
file_name,
format_bytes(file_size(os.path.join(root_path, file_name))),
format_bytes(file_size(root_path / file_name)),
)
return file_name
self._log.info('Downloaded %s not found in "%s"', verbose_name, root_path)

View file

@ -1,6 +1,5 @@
import asyncio
import logging
import os
from yt_dlp import version as ytdlp_version
from yt_shared.db.session import get_db
@ -60,19 +59,17 @@ class WorkerLauncher:
async def _create_intermediate_directories(self) -> None:
"""Create temporary intermediate directories on start if they do not exist."""
tmp_download_path = os.path.join(
settings.TMP_DOWNLOAD_ROOT_PATH, settings.TMP_DOWNLOAD_DIR
)
tmp_downloaded_path = os.path.join(
settings.TMP_DOWNLOAD_ROOT_PATH, settings.TMP_DOWNLOADED_DIR
tmp_download_path = settings.TMP_DOWNLOAD_ROOT_PATH / settings.TMP_DOWNLOAD_DIR
tmp_downloaded_path = (
settings.TMP_DOWNLOAD_ROOT_PATH / settings.TMP_DOWNLOADED_DIR
)
self._log.info(
'Creating intermediate directories %s, %s if not exist',
tmp_download_path,
tmp_downloaded_path,
)
os.makedirs(tmp_download_path, exist_ok=True)
os.makedirs(tmp_downloaded_path, exist_ok=True)
tmp_download_path.mkdir(parents=True, exist_ok=True)
tmp_downloaded_path.mkdir(parents=True, exist_ok=True)
def _register_shutdown(self) -> None:
register_shutdown(self.stop)

View file

@ -1,10 +1,10 @@
import asyncio
import logging
import os
import shutil
import time
from pathlib import Path
from urllib.parse import urlsplit
from sqlalchemy.ext.asyncio import AsyncSession
from yt_shared.enums import DownMediaType, TaskStatus
from yt_shared.models import Task
from yt_shared.repositories.task import TaskRepository
@ -14,6 +14,7 @@ from yt_shared.schemas.media import (
InbMediaPayload,
Video,
)
from yt_shared.utils.common import gen_random_str
from yt_shared.utils.file import remove_dir
from yt_shared.utils.tasks.tasks import create_task
@ -28,37 +29,37 @@ from ytdl_opts.per_host._registry import HostConfRegistry
class MediaService:
def __init__(self) -> None:
def __init__(
self,
media_payload: InbMediaPayload,
downloader: MediaDownloader,
task_repository: TaskRepository,
) -> None:
self._log = logging.getLogger(self.__class__.__name__)
self._downloader = MediaDownloader()
self._repository = TaskRepository()
self._downloader = downloader
self._repository = task_repository
self._media_payload = media_payload
async def process(
self, media_payload: InbMediaPayload, db: AsyncSession
self,
) -> tuple[DownMedia | None, Task | None]:
task = await self._repository.get_or_create_task(db, media_payload)
task = await self._repository.get_or_create_task(self._media_payload)
if task.status != TaskStatus.PENDING.value:
return None, None
return (
await self._process(media_payload=media_payload, task=task, db=db),
await self._process(task=task),
task,
)
async def _process(
self, media_payload: InbMediaPayload, task: Task, db: AsyncSession
) -> DownMedia:
async def _process(self, task: Task) -> DownMedia:
host_conf = self._get_host_conf(url=task.url)
await self._repository.save_as_processing(db, task)
media = await self._start_download(
task=task, media_payload=media_payload, host_conf=host_conf, db=db
)
await self._repository.save_as_processing(task)
media = await self._start_download(task=task, host_conf=host_conf)
try:
await self._post_process_media(
media=media,
task=task,
media_payload=media_payload,
host_conf=host_conf,
db=db,
)
except Exception:
self._log.exception('Failed to post-process media %s', media)
@ -75,47 +76,41 @@ class MediaService:
async def _start_download(
self,
task: Task,
media_payload: InbMediaPayload,
host_conf: AbstractHostConfig,
db: AsyncSession,
) -> DownMedia:
try:
return await asyncio.get_running_loop().run_in_executor(
None,
lambda: self._downloader.download(
host_conf=host_conf,
media_type=media_payload.download_media_type,
media_payload=self._media_payload,
),
)
except Exception as err:
self._log.exception('Failed to download media. Context: %s', media_payload)
await self._handle_download_exception(err, task, db)
self._log.exception(
'Failed to download media. Context: %s', self._media_payload
)
await self._handle_download_exception(err, task)
raise DownloadVideoServiceError(message=str(err), task=task)
async def _post_process_media(
self,
media: DownMedia,
task: Task,
media_payload: InbMediaPayload,
host_conf: AbstractHostConfig,
db: AsyncSession,
) -> None:
def post_process_audio():
return self._post_process_audio(
media=media,
media_payload=media_payload,
task=task,
host_conf=host_conf,
db=db,
)
def post_process_video():
return self._post_process_video(
media=media,
media_payload=media_payload,
task=task,
host_conf=host_conf,
db=db,
)
match media.media_type:
@ -126,15 +121,13 @@ class MediaService:
case DownMediaType.AUDIO_VIDEO:
await asyncio.gather(*(post_process_audio(), post_process_video()))
await self._repository.save_as_done(db, task)
await self._repository.save_as_done(task)
async def _post_process_video(
self,
media: DownMedia,
media_payload: InbMediaPayload,
task: Task,
host_conf: AbstractHostConfig,
db: AsyncSession,
) -> None:
"""Post-process downloaded media files, e.g. make thumbnail and copy to storage."""
video = media.video
@ -149,17 +142,17 @@ class MediaService:
coro_tasks = []
if not video.thumb_path:
thumb_path = os.path.join(media.root_path, video.thumb_name)
thumb_path = Path(media.root_path) / Path(video.thumb_name)
coro_tasks.append(
self._create_thumb_task(
file_path=video.filepath,
file_path=video.current_filepath,
thumb_path=thumb_path,
duration=video.duration,
video_ctx=video,
)
)
if media_payload.save_to_storage:
if self._media_payload.save_to_storage:
coro_tasks.append(self._create_copy_file_task(video))
if host_conf.ENCODE_VIDEO:
@ -177,26 +170,24 @@ class MediaService:
await asyncio.gather(*coro_tasks)
file = await self._repository.save_file(db, task, media.video, media.meta)
file = await self._repository.save_file(task, media.video, media.meta)
video.orm_file_id = file.id
async def _post_process_audio(
self,
media: DownMedia,
media_payload: InbMediaPayload,
task: Task,
host_conf: AbstractHostConfig,
db: AsyncSession,
) -> None:
coro_tasks = [self._repository.save_file(db, task, media.audio, media.meta)]
if media_payload.save_to_storage:
coro_tasks = [self._repository.save_file(task, media.audio, media.meta)]
if self._media_payload.save_to_storage:
coro_tasks.append(self._create_copy_file_task(media.audio))
results = await asyncio.gather(*coro_tasks)
file = results[0]
media.audio.orm_file_id = file.id
async def _set_probe_ctx(self, video: Video) -> None:
probe_ctx = await GetFfprobeContextTask(video.filepath).run()
probe_ctx = await GetFfprobeContextTask(video.current_filepath).run()
if not probe_ctx:
return
@ -225,8 +216,8 @@ class MediaService:
def _create_thumb_task(
self,
file_path: str,
thumb_path: str,
file_path: Path,
thumb_path: Path,
duration: float,
video_ctx: Video,
) -> asyncio.Task:
@ -241,16 +232,17 @@ class MediaService:
)
async def _copy_file_to_storage(self, file: BaseMedia) -> None:
if file.is_converted:
filename = file.converted_filename
filepath = file.converted_filepath
else:
filename = file.filename
filepath = file.filepath
dst = settings.STORAGE_PATH / file.current_filename
if dst.is_file():
self._log.warning('Destination file in storage already exists: %s', dst)
dst = (
dst.parent
/ f'{dst.stem}-{int(time.time())}-{gen_random_str()}{dst.suffix}'
)
self._log.warning('Adding current timestamp to filename: %s', dst)
dst = os.path.join(settings.STORAGE_PATH, filename)
self._log.info('Copying "%s" to storage "%s"', filepath, dst)
await asyncio.to_thread(shutil.copy2, filepath, dst)
self._log.info('Copying "%s" to storage "%s"', file.current_filepath, dst)
await asyncio.to_thread(shutil.copy2, file.current_filepath, dst)
file.mark_as_saved_to_storage(storage_path=dst)
def _err_file_cleanup(self, video: DownMedia) -> None:
@ -258,7 +250,5 @@ class MediaService:
self._log.info('Performing error cleanup: removing %s', video.root_path)
remove_dir(video.root_path)
async def _handle_download_exception(
self, err: Exception, task: Task, db: AsyncSession
) -> None:
await self._repository.save_as_failed(db=db, task=task, error_message=str(err))
async def _handle_download_exception(self, err: Exception, task: Task) -> None:
await self._repository.save_as_failed(task=task, error_message=str(err))

View file

@ -5,10 +5,12 @@ from yt_dlp import version as ytdlp_version
from yt_shared.db.session import get_db
from yt_shared.models import Task
from yt_shared.rabbit.publisher import RmqPublisher
from yt_shared.repositories.task import TaskRepository
from yt_shared.schemas.error import ErrorDownloadGeneralPayload, ErrorDownloadPayload
from yt_shared.schemas.media import DownMedia, InbMediaPayload
from yt_shared.schemas.success import SuccessDownloadPayload
from worker.core.downloader import MediaDownloader
from worker.core.exceptions import DownloadVideoServiceError, GeneralVideoServiceError
from worker.core.media_service import MediaService
@ -16,7 +18,6 @@ from worker.core.media_service import MediaService
class PayloadHandler:
def __init__(self) -> None:
self._log = logging.getLogger(self.__class__.__name__)
self._media_service = MediaService()
self._rmq_publisher = RmqPublisher()
async def handle(self, media_payload: InbMediaPayload) -> None:
@ -27,8 +28,13 @@ class PayloadHandler:
async def _handle(self, media_payload: InbMediaPayload) -> None:
async for session in get_db():
media_service = MediaService(
media_payload=media_payload,
downloader=MediaDownloader(),
task_repository=TaskRepository(session),
)
try:
media, task = await self._media_service.process(media_payload, session)
media, task = await media_service.process()
except DownloadVideoServiceError as err:
await self._send_failed_video_download_task(err, media_payload)
return

View file

@ -1,6 +1,7 @@
import asyncio
import os
import signal
from pathlib import Path
from yt_shared.utils.common import wrap
from yt_shared.utils.tasks.abstract import AbstractTask
@ -10,7 +11,7 @@ class AbstractFfBinaryTask(AbstractTask):
_CMD: str | None = None
_CMD_TIMEOUT = 10
def __init__(self, file_path: str) -> None:
def __init__(self, file_path: Path) -> None:
super().__init__()
self._file_path = file_path
self._killpg = wrap(os.killpg)

View file

@ -1,5 +1,3 @@
import os
from yt_shared.schemas.media import DownMedia
from worker.core.tasks.abstract import AbstractFfBinaryTask
@ -14,7 +12,7 @@ class EncodeToH264Task(AbstractFfBinaryTask):
def __init__(
self, media: DownMedia, cmd_tpl: str, check_if_in_final_format: bool = True
) -> None:
super().__init__(file_path=media.video.filepath)
super().__init__(file_path=media.video.current_filepath)
self._media = media
self._video = media.video
self._CMD = cmd_tpl # lol
@ -43,8 +41,8 @@ class EncodeToH264Task(AbstractFfBinaryTask):
return False
def _get_output_path(self) -> str:
filename = f'{self._video.filename.rsplit(".", 1)[0]}-h264.{self._EXT}'
return os.path.join(self._media.root_path, filename)
filename = f'{self._video.current_filename.rsplit(".", 1)[0]}-h264.{self._EXT}'
return self._media.root_path / filename
async def _encode_video(self) -> None:
output = self._get_output_path()

View file

@ -1,3 +1,5 @@
from pathlib import Path
from yt_shared.schemas.media import Video
from worker.core.config import settings
@ -8,7 +10,7 @@ class MakeThumbnailTask(AbstractFfBinaryTask):
_CMD = 'ffmpeg -y -loglevel error -i "{filepath}" -ss {time_point} -vframes 1 -q:v 7 "{thumbpath}"'
def __init__(
self, thumbnail_path: str, *args, duration: float, video_ctx: Video, **kwargs
self, thumbnail_path: Path, *args, duration: float, video_ctx: Video, **kwargs
) -> None:
super().__init__(*args, **kwargs)
self._thumbnail_path = thumbnail_path

View file

@ -1,8 +1,8 @@
import os
from pathlib import Path
import yt_dlp
_COOKIES_FILEPATH = '/app/cookies/cookies.txt'
_COOKIES_FILEPATH = Path('/app/cookies/cookies.txt')
def cli_to_api(opts: list) -> dict:
@ -18,11 +18,13 @@ def cli_to_api(opts: list) -> dict:
return diff
def is_file_empty(filepath: str) -> bool:
def is_file_empty(filepath: Path) -> bool:
"""Check whether the file is empty."""
return os.path.isfile(filepath) and os.path.getsize(filepath) == 0
return filepath.is_file() and filepath.stat().st_size == 0
def get_cookies_opts_if_not_empty() -> list[str]:
"""Return yt-dlp cookies option with cookies filepath."""
return [] if is_file_empty(_COOKIES_FILEPATH) else ['--cookies', _COOKIES_FILEPATH]
if is_file_empty(_COOKIES_FILEPATH):
return []
return ['--cookies', str(_COOKIES_FILEPATH)]

View file

@ -1,10 +1,9 @@
import abc
import logging
import os
from abc import abstractmethod
from copy import deepcopy
from pathlib import Path
import pydantic
from pydantic import StrictBool, StrictStr
from pydantic import BaseModel, ConfigDict
from yt_shared.enums import DownMediaType
from worker.utils import cli_to_api
@ -31,16 +30,17 @@ except ImportError:
)
class BaseHostConfModel(pydantic.BaseModel):
class BaseHostConfModel(BaseModel):
# TODO: Add validators.
model_config = ConfigDict(strict=True, frozen=True)
hostnames: tuple[str, ...]
encode_audio: StrictBool
encode_video: StrictBool
encode_audio: bool
encode_video: bool
ffmpeg_audio_opts: StrictStr | None
ffmpeg_video_opts: StrictStr | None
ffmpeg_audio_opts: str | None
ffmpeg_video_opts: str | None
ytdl_opts: dict
@ -82,13 +82,13 @@ class AbstractHostConfig:
if not self.ALLOW_NULL_HOSTNAMES and not self.HOSTNAMES:
raise ValueError('Hostname(s) must be set before instantiation.')
@abc.abstractmethod
@abstractmethod
def build_config(
self, media_type: DownMediaType, curr_tmp_dir: str
) -> BaseHostConfModel:
pass
def _build_ytdl_opts(self, media_type: DownMediaType, curr_tmp_dir: str) -> dict:
def _build_ytdl_opts(self, media_type: DownMediaType, curr_tmp_dir: Path) -> dict:
def _add_video_opts(ytdl_opts_: list[str]) -> None:
ytdl_opts_.extend(self.DEFAULT_VIDEO_YTDL_OPTS)
ytdl_opts_.extend(self._build_custom_ytdl_video_opts())
@ -107,12 +107,11 @@ class AbstractHostConfig:
ytdl_opts.append(self.KEEP_VIDEO_OPTION)
ytdl_opts = cli_to_api(ytdl_opts)
ytdl_opts['outtmpl']['default'] = os.path.join(
curr_tmp_dir,
ytdl_opts['outtmpl']['default'],
ytdl_opts['outtmpl']['default'] = str(
curr_tmp_dir / ytdl_opts['outtmpl']['default']
)
return ytdl_opts
@abc.abstractmethod
@abstractmethod
def _build_custom_ytdl_video_opts(self) -> list[str]:
pass

View file

@ -1,3 +1,5 @@
from pathlib import Path
from yt_shared.enums import DownMediaType
from ytdl_opts.per_host._base import AbstractHostConfig, BaseHostConfModel
@ -15,7 +17,7 @@ class DefaultHost(AbstractHostConfig, metaclass=HostConfRegistry):
ENCODE_VIDEO = False
def build_config(
self, media_type: DownMediaType, curr_tmp_dir: str
self, media_type: DownMediaType, curr_tmp_dir: Path
) -> DefaultHostModel:
return DefaultHostModel(
hostnames=self.HOSTNAMES,

View file

@ -1,3 +1,5 @@
from pathlib import Path
from yt_shared.constants import INSTAGRAM_HOSTS
from yt_shared.enums import DownMediaType
@ -22,7 +24,7 @@ class InstagramHost(AbstractHostConfig, metaclass=HostConfRegistry):
FFMPEG_VIDEO_OPTS = 'ffmpeg -y -loglevel error -i "{filepath}" -c:v libx264 -pix_fmt yuv420p -preset veryfast -crf 22 -movflags +faststart -c:a copy "{output}"'
def build_config(
self, media_type: DownMediaType, curr_tmp_dir: str
self, media_type: DownMediaType, curr_tmp_dir: Path
) -> InstagramHostModel:
return InstagramHostModel(
hostnames=self.HOSTNAMES,

View file

@ -1,3 +1,5 @@
from pathlib import Path
from yt_shared.constants import TIKTOK_HOSTS
from yt_shared.enums import DownMediaType
@ -16,7 +18,7 @@ class TikTokHost(AbstractHostConfig, metaclass=HostConfRegistry):
ENCODE_VIDEO = False
def build_config(
self, media_type: DownMediaType, curr_tmp_dir: str
self, media_type: DownMediaType, curr_tmp_dir: Path
) -> TikTokHostModel:
return TikTokHostModel(
hostnames=self.HOSTNAMES,

View file

@ -1,3 +1,5 @@
from pathlib import Path
from yt_shared.constants import TWITTER_HOSTS
from yt_shared.enums import DownMediaType
@ -16,7 +18,7 @@ class TwitterHost(AbstractHostConfig, metaclass=HostConfRegistry):
ENCODE_VIDEO = False
def build_config(
self, media_type: DownMediaType, curr_tmp_dir: str
self, media_type: DownMediaType, curr_tmp_dir: Path
) -> TwitterHostModel:
return TwitterHostModel(
hostnames=self.HOSTNAMES,

View file

@ -3,7 +3,7 @@ line-length = 88
indent-width = 4
target-version = "py312"
src = ["app_api", "app_bot", "app_worker"]
required-version = ">=0.3.4"
required-version = ">=0.4.2"
[tool.ruff.lint]
select = ["F", "E", "W", "I001", "RET", "SLF001"]

View file

@ -1,17 +1,26 @@
import logging
from typing import KeysView
from pydantic import field_validator
from pydantic import (
ConfigDict,
DirectoryPath,
NewPath,
PositiveInt,
ValidationInfo,
field_validator,
)
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
model_config = ConfigDict(extra='forbid', validate_default=True)
APPLICATION_NAME: str
POSTGRES_USER: str
POSTGRES_PASSWORD: str
POSTGRES_HOST: str
POSTGRES_PORT: int
POSTGRES_PORT: PositiveInt
POSTGRES_DB: str
POSTGRES_TEST_DB: str = 'yt_test'
@ -25,14 +34,14 @@ class Settings(BaseSettings):
RABBITMQ_USER: str
RABBITMQ_PASSWORD: str
RABBITMQ_HOST: str
RABBITMQ_PORT: int
RABBITMQ_PORT: PositiveInt
@property
def RABBITMQ_URI(self) -> str:
return f'amqp://{self.RABBITMQ_USER}:{self.RABBITMQ_PASSWORD}@{self.RABBITMQ_HOST}:{self.RABBITMQ_PORT}/'
CONSUMER_NUMBER_OF_RETRY: int = 2
RESEND_DELAY_MS: int = 60000
CONSUMER_NUMBER_OF_RETRY: PositiveInt = 2
RESEND_DELAY_MS: PositiveInt = 60000
LOG_LEVEL: str
REDIS_HOST: str
@ -41,16 +50,16 @@ class Settings(BaseSettings):
def REDIS_URL(self) -> str:
return f'redis://{self.REDIS_HOST}'
TMP_DOWNLOAD_ROOT_PATH: str
TMP_DOWNLOAD_DIR: str
TMP_DOWNLOADED_DIR: str
TMP_DOWNLOAD_ROOT_PATH: DirectoryPath | NewPath
TMP_DOWNLOAD_DIR: DirectoryPath | NewPath
TMP_DOWNLOADED_DIR: DirectoryPath | NewPath
@field_validator('LOG_LEVEL')
@classmethod
def validate_log_level_value(cls, value: str) -> str:
valid_values: KeysView[str] = logging._nameToLevel.keys() # noqa
def validate_log_level_value(cls, value: str, info: ValidationInfo) -> str:
valid_values: KeysView[str] = logging._nameToLevel.keys() # noqa: SLF001
if value not in valid_values:
raise ValueError(f'"LOG_LEVEL" must be one of {valid_values}')
raise ValueError(f'"{info.field_name}" must be one of {valid_values}')
return value

View file

@ -12,24 +12,22 @@ from yt_shared.schemas.media import BaseMedia, InbMediaPayload, Video
class TaskRepository:
def __init__(self) -> None:
def __init__(self, session: AsyncSession) -> None:
self._log = logging.getLogger(self.__class__.__name__)
self._session = session
async def get_or_create_task(
self, db: AsyncSession, media_payload: InbMediaPayload
) -> Task:
async def get_or_create_task(self, media_payload: InbMediaPayload) -> Task:
if media_payload.id is None:
return await self._create_task(db, media_payload)
return await self._create_task(media_payload)
stmt = select(Task).filter_by(id=media_payload.id)
task = await db.execute(stmt)
task = await self._session.execute(stmt)
try:
return task.scalar_one()
except NoResultFound:
return await self._create_task(db, media_payload)
return await self._create_task(media_payload)
@staticmethod
async def _create_task(db: AsyncSession, media_payload: InbMediaPayload) -> Task:
async def _create_task(self, media_payload: InbMediaPayload) -> Task:
task = Task(
id=media_payload.id,
url=media_payload.url,
@ -38,14 +36,11 @@ class TaskRepository:
message_id=media_payload.message_id,
added_at=media_payload.added_at,
)
db.add(task)
await db.commit()
self._session.add(task)
await self._session.commit()
return task
@staticmethod
async def save_file_cache(
db: AsyncSession, file_id: str | UUID, cache: CacheSchema
) -> None:
async def save_file_cache(self, file_id: str | UUID, cache: CacheSchema) -> None:
stmt = insert(Cache).values(
cache_id=cache.cache_id,
cache_unique_id=cache.cache_unique_id,
@ -53,16 +48,13 @@ class TaskRepository:
date_timestamp=cache.date_timestamp,
file_id=file_id,
)
await db.execute(stmt)
await db.commit()
await self._session.execute(stmt)
await self._session.commit()
@staticmethod
async def save_file(
db: AsyncSession, task: Task, media: BaseMedia, meta: dict
) -> File:
async def save_file(self, task: Task, media: BaseMedia, meta: dict) -> File:
file = File(
title=media.title,
name=media.filename,
name=media.current_filename,
duration=media.duration,
meta=meta,
task_id=task.id,
@ -73,23 +65,20 @@ class TaskRepository:
file.height = media.height
file.thumb_name = media.thumb_name
db.add(file)
self._session.add(file)
async with ASYNC_LOCK:
await db.flush([file])
await self._session.flush([file])
return file
@staticmethod
async def save_as_done(db: AsyncSession, task: Task) -> None:
async def save_as_done(self, task: Task) -> None:
task.status = TaskStatus.DONE
await db.commit()
await self._session.commit()
@staticmethod
async def save_as_processing(db: AsyncSession, task: Task) -> None:
async def save_as_processing(self, task: Task) -> None:
task.status = TaskStatus.PROCESSING
await db.commit()
await self._session.commit()
@staticmethod
async def save_as_failed(db: AsyncSession, task: Task, error_message: str) -> None:
async def save_as_failed(self, task: Task, error_message: str) -> None:
task.status = TaskStatus.FAILED
task.error = error_message
await db.commit()
await self._session.commit()

View file

@ -1,21 +1,41 @@
import abc
from abc import ABC
from pydantic import BaseModel, ConfigDict
from yt_shared.enums import RabbitPayloadType
class RealBaseModel(BaseModel, abc.ABC):
"""Base Pydantic model. All models should inherit from this."""
class RealBaseModel(BaseModel, ABC):
"""Base Pydantic model. All non-strict models should inherit from it."""
model_config = ConfigDict(extra='forbid')
model_config = ConfigDict(extra='forbid', validate_default=True)
class BaseOrmModel(RealBaseModel, abc.ABC):
model_config = ConfigDict(from_attributes=True, **RealBaseModel.model_config)
class StrictRealBaseModel(RealBaseModel, ABC):
"""Base Pydantic model. All strict models should inherit from it."""
model_config = ConfigDict(**RealBaseModel.model_config, strict=True)
class BaseRabbitPayloadModel(RealBaseModel, abc.ABC):
class BaseOrmModel(RealBaseModel, ABC):
model_config = ConfigDict(**RealBaseModel.model_config, from_attributes=True)
class StrictBaseOrmModel(BaseOrmModel, ABC):
model_config = ConfigDict(**BaseOrmModel.model_config, strict=True)
class StrictBaseConfigModel(StrictRealBaseModel, ABC):
model_config = ConfigDict(**StrictRealBaseModel.model_config, frozen=True)
class BaseRabbitPayloadModel(RealBaseModel, ABC):
"""Base RabbitMQ payload model. All RabbitMQ models should inherit from this."""
type: RabbitPayloadType
class StrictBaseRabbitPayloadModel(BaseRabbitPayloadModel, ABC):
"""Base RabbitMQ payload model. All RabbitMQ models should inherit from this."""
model_config = ConfigDict(**BaseRabbitPayloadModel.model_config, strict=True)

View file

@ -1,15 +1,13 @@
import abc
from pydantic import StrictInt
from abc import ABC
from yt_shared.enums import TelegramChatType
from yt_shared.schemas.base import BaseRabbitPayloadModel
from yt_shared.schemas.base import StrictBaseRabbitPayloadModel
from yt_shared.schemas.media import InbMediaPayload
class BaseRabbitDownloadPayload(BaseRabbitPayloadModel, abc.ABC):
class BaseRabbitDownloadPayload(StrictBaseRabbitPayloadModel, ABC):
context: InbMediaPayload
from_chat_id: StrictInt | None
from_chat_id: int | None
from_chat_type: TelegramChatType | None
from_user_id: StrictInt | None
message_id: StrictInt | None
from_user_id: int | None
message_id: int | None

View file

@ -1,20 +1,18 @@
import uuid
from typing import Literal
from pydantic import StrictStr
from yt_shared.enums import RabbitPayloadType
from yt_shared.schemas.base_rabbit import BaseRabbitDownloadPayload
class ErrorDownloadGeneralPayload(BaseRabbitDownloadPayload):
type: Literal[RabbitPayloadType.GENERAL_ERROR] = RabbitPayloadType.GENERAL_ERROR
task_id: uuid.UUID | StrictStr | None
message: StrictStr
url: StrictStr
exception_msg: StrictStr
exception_type: StrictStr
yt_dlp_version: StrictStr | None
task_id: uuid.UUID | str | None
message: str
url: str
exception_msg: str
exception_type: str
yt_dlp_version: str | None
class ErrorDownloadPayload(ErrorDownloadGeneralPayload):

View file

@ -1,58 +1,64 @@
import abc
import uuid
from abc import ABC
from datetime import datetime, timezone
from pathlib import Path
from typing import Literal
from pydantic import (
ConfigDict,
DirectoryPath,
Field,
StrictBool,
StrictFloat,
StrictInt,
StrictStr,
FilePath,
model_validator,
)
from typing_extensions import Annotated, Self
from yt_shared.enums import DownMediaType, MediaFileType, TaskSource, TelegramChatType
from yt_shared.schemas.base import RealBaseModel
from yt_shared.schemas.base import StrictRealBaseModel
from yt_shared.utils.common import format_bytes
from yt_shared.utils.file import file_size
class InbMediaPayload(RealBaseModel):
class InbMediaPayload(StrictRealBaseModel):
"""RabbitMQ inbound media payload from Telegram Bot or API service."""
model_config = ConfigDict(**StrictRealBaseModel.model_config, frozen=True)
id: uuid.UUID | None = None
from_chat_id: StrictInt | None
from_chat_id: int | None
from_chat_type: TelegramChatType | None
from_user_id: StrictInt | None
message_id: StrictInt | None
ack_message_id: StrictInt | None
url: StrictStr
original_url: StrictStr
from_user_id: int | None
message_id: int | None
ack_message_id: int | None
url: str
original_url: str
source: TaskSource
save_to_storage: StrictBool
save_to_storage: bool
download_media_type: DownMediaType
custom_filename: str | None
automatic_extension: bool
added_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
class BaseMedia(RealBaseModel, abc.ABC):
class BaseMedia(StrictRealBaseModel, ABC):
"""Model representing abstract downloaded media with common fields."""
file_type: MediaFileType
title: StrictStr
filename: StrictStr
filepath: StrictStr
file_size: StrictInt
duration: StrictFloat | None = None
title: str
original_filename: str
directory_path: Annotated[DirectoryPath, Field(strict=False)]
file_size: int
duration: float | None = None
orm_file_id: uuid.UUID | None = None
saved_to_storage: StrictBool = False
storage_path: StrictStr | None = None
saved_to_storage: bool = False
storage_path: Annotated[Path, Field(strict=False)] | None = None
is_converted: StrictBool = False
converted_filepath: StrictStr | None = None
converted_filename: StrictStr | None = None
converted_file_size: StrictInt | None = None
is_converted: bool = False
converted_filename: str | None = None
converted_file_size: int | None = None
custom_filename: str | None = None
def file_size_human(self) -> str:
return format_bytes(num=self.current_file_size())
@ -62,13 +68,30 @@ class BaseMedia(RealBaseModel, abc.ABC):
return self.converted_file_size
return self.file_size
def mark_as_saved_to_storage(self, storage_path: str) -> None:
@property
def current_filename(self) -> str:
if self.custom_filename:
return self.custom_filename
if self.is_converted:
return self.converted_filename
return self.original_filename
@property
def current_filepath(self) -> Path:
if self.custom_filename:
filename = self.custom_filename
elif self.is_converted:
filename = self.converted_filename
else:
filename = self.original_filename
return self.directory_path / filename
def mark_as_saved_to_storage(self, storage_path: Path) -> None:
self.storage_path = storage_path
self.saved_to_storage = True
def mark_as_converted(self, filepath: str) -> None:
self.converted_filepath = filepath
self.converted_filename = filepath.rsplit('/', 1)[-1]
def mark_as_converted(self, filepath: Path) -> None:
self.converted_filename = filepath.name
self.converted_file_size = file_size(filepath)
self.is_converted = True
@ -83,35 +106,33 @@ class Video(BaseMedia):
"""Model representing downloaded video file with separate thumbnail."""
file_type: Literal[MediaFileType.VIDEO] = MediaFileType.VIDEO
thumb_name: StrictStr | None = None
thumb_name: str | None = None
width: int | float | None = None
height: int | float | None = None
thumb_path: StrictStr | None = None
thumb_path: Annotated[FilePath, Field(strict=False)] | None = None
@model_validator(mode='before')
@classmethod
def _set_fields(cls, values: dict) -> dict:
if not values['thumb_name']:
values['thumb_name'] = f'{values["filename"]}-thumb.jpg'
return values
@model_validator(mode='after')
def set_thumb_name(self) -> Self:
if not self.thumb_name:
self.thumb_name = f'{self.current_filename}-thumb.jpg'
return self
class DownMedia(RealBaseModel):
class DownMedia(StrictRealBaseModel):
"""Downloaded media (audio, video with muxed audio or both) object context."""
audio: Audio | None
video: Video | None
media_type: DownMediaType
root_path: StrictStr
media_type: Annotated[DownMediaType, Field(strict=False)]
root_path: Annotated[DirectoryPath, Field(strict=False)]
meta: dict
@model_validator(mode='before')
@classmethod
def _validate(cls, values: dict) -> dict:
if values['audio'] is None and values['video'] is None:
@model_validator(mode='after')
def validate_media(self) -> Self:
if not (self.audio or self.video):
raise ValueError('Provide audio, video or both.')
return values
return self
def get_media_objects(self) -> tuple[Audio | Video, ...]:
return tuple(filter(None, (self.audio, self.video)))

View file

@ -1,16 +1,18 @@
from pydantic import StrictBool, StrictInt, StrictStr
from pydantic import ConfigDict
from yt_shared.enums import DownMediaType, TelegramChatType
from yt_shared.schemas.base import RealBaseModel
from yt_shared.schemas.base import StrictRealBaseModel
class URL(RealBaseModel):
url: StrictStr
original_url: StrictStr
from_chat_id: StrictInt
class URL(StrictRealBaseModel):
model_config = ConfigDict(**StrictRealBaseModel.model_config, frozen=True)
url: str
original_url: str
from_chat_id: int
from_chat_type: TelegramChatType
from_user_id: StrictInt | None
message_id: StrictInt
ack_message_id: StrictInt
save_to_storage: StrictBool
from_user_id: int | None
message_id: int
ack_message_id: int
save_to_storage: bool
download_media_type: DownMediaType

View file

@ -1,35 +1,32 @@
import datetime
from pydantic import Field, StrictStr, field_validator
from pydantic import Field, field_validator
from yt_shared.schemas.base import BaseOrmModel, RealBaseModel
from yt_shared.schemas.base import StrictBaseOrmModel, StrictRealBaseModel
from yt_shared.utils.common import remove_microseconds
def _remove_microseconds(dt_obj: datetime.datetime) -> datetime.datetime:
return dt_obj.replace(microsecond=0)
class LatestVersion(RealBaseModel):
version: StrictStr
class LatestVersion(StrictRealBaseModel):
version: str
retrieved_at: datetime.datetime
@field_validator('retrieved_at', mode='before')
@field_validator('retrieved_at', mode='after')
@classmethod
def remove_microseconds(cls, value: datetime.datetime) -> datetime.datetime:
return _remove_microseconds(value)
return remove_microseconds(value)
class CurrentVersion(BaseOrmModel):
version: StrictStr = Field(..., alias='current_version')
class CurrentVersion(StrictBaseOrmModel):
version: str = Field(..., alias='current_version')
updated_at: datetime.datetime
@field_validator('updated_at', mode='before')
@field_validator('updated_at', mode='after')
@classmethod
def remove_microseconds(cls, value: datetime.datetime) -> datetime.datetime:
return _remove_microseconds(value)
return remove_microseconds(value)
class VersionContext(RealBaseModel):
class VersionContext(StrictRealBaseModel):
latest: LatestVersion
current: CurrentVersion

View file

@ -1,9 +1,10 @@
import asyncio
import atexit
import datetime
import random
import signal
from functools import partial, wraps
from string import ascii_lowercase
from string import ascii_lowercase, digits
from typing import Any, Callable
_UNIT_SIZE_NAMES = ('', 'Ki', 'Mi', 'Gi', 'Ti', 'Pi', 'Ei', 'Zi')
@ -51,11 +52,20 @@ def wrap(func):
return run
def random_string(number: int) -> str:
return ''.join(random.choice(ascii_lowercase) for _ in range(number))
def gen_random_str(length: int = 4, use_digits: bool = False) -> str:
if use_digits:
choices = ascii_lowercase + digits
else:
choices = ascii_lowercase
return ''.join(random.SystemRandom().choice(choices) for _ in range(length))
def register_shutdown(callback: Callable) -> None:
atexit.register(callback)
signal.signal(signal.SIGTERM, callback)
signal.signal(signal.SIGINT, callback)
def remove_microseconds(dt_: datetime.datetime) -> datetime.datetime:
return dt_.replace(microsecond=0)

View file

@ -1,33 +1,30 @@
import logging
import os
import shutil
from pathlib import Path
from typing import Iterable
from yt_shared.utils.common import format_bytes
def file_cleanup(file_paths: Iterable[str], log: logging.Logger = None) -> None:
def file_cleanup(file_paths: Iterable[Path], log: logging.Logger = None) -> None:
log = log or logging.getLogger()
log.debug('Performing cleanup of %s', file_paths)
for file_path in file_paths:
if os.path.exists(file_path):
try:
os.remove(file_path)
except Exception as err:
log.warning('File "%s" not deleted: %s', file_path, err)
if file_path.is_file():
file_path.unlink()
def remove_dir(dir_path: str) -> None:
def remove_dir(dir_path: Path) -> None:
shutil.rmtree(dir_path)
def file_size(filepath: str) -> int:
def file_size(filepath: Path) -> int:
"""Return file size in bytes."""
return os.path.getsize(filepath)
return filepath.stat().st_size
def list_files(path: str) -> dict[str, str]:
def list_files_human(path: Path) -> dict[Path, str]:
return {
filename: format_bytes(file_size(os.path.join(path, filename)))
for filename in os.listdir(path)
filename: format_bytes(file_size(path / filename))
for filename in path.iterdir()
}

View file

@ -1,12 +1,12 @@
import abc
import logging
from abc import ABC, abstractmethod
class AbstractTask(abc.ABC):
class AbstractTask(ABC):
def __init__(self) -> None:
self._log = logging.getLogger(self.__class__.__name__)
@abc.abstractmethod
@abstractmethod
async def run(self) -> None:
"""Main entry point."""
pass