mirror of
https://github.com/tropicoo/yt-dlp-bot.git
synced 2024-09-20 06:46:08 +08:00
Version 1.4.3
This commit is contained in:
parent
caffaa3020
commit
a3f121b615
|
@ -2,7 +2,7 @@
|
|||
|
||||
Simple and reliable self-hosted YouTube Download Telegram Bot.
|
||||
|
||||
Version: 1.4.2. [Release details](RELEASES.md).
|
||||
Version: 1.4.3. [Release details](RELEASES.md).
|
||||
|
||||
![frames](.assets/download_success.png)
|
||||
|
||||
|
|
19
RELEASES.md
19
RELEASES.md
|
@ -1,3 +1,22 @@
|
|||
## Release 1.4.3
|
||||
|
||||
Release date: September 20, 2023
|
||||
|
||||
## New Features
|
||||
|
||||
- Encode Instagram VP9 videos to H264 (not played in Telegram on iOS)
|
||||
- New environment variable in `envs/.env_worker`: `INSTAGRAM_ENCODE_TO_H264=True`
|
||||
|
||||
## Important
|
||||
|
||||
N/A
|
||||
|
||||
## Misc
|
||||
|
||||
- Maintenance and refactor
|
||||
|
||||
---
|
||||
|
||||
## Release 1.4.2
|
||||
|
||||
Release date: July 06, 2023
|
||||
|
|
|
@ -3,8 +3,8 @@ import logging
|
|||
from typing import TYPE_CHECKING
|
||||
|
||||
from yt_shared.enums import TaskSource, TelegramChatType
|
||||
from yt_shared.schemas.error import ErrorDownloadPayload, ErrorGeneralPayload
|
||||
from yt_shared.schemas.success import SuccessPayload
|
||||
from yt_shared.schemas.error import ErrorDownloadGeneralPayload, ErrorDownloadPayload
|
||||
from yt_shared.schemas.success import SuccessDownloadPayload
|
||||
|
||||
from bot.core.config.schema import AnonymousUserSchema, UserSchema
|
||||
|
||||
|
@ -12,10 +12,12 @@ if TYPE_CHECKING:
|
|||
from bot.core.bot import VideoBot
|
||||
|
||||
|
||||
class AbstractHandler(metaclass=abc.ABCMeta):
|
||||
class AbstractDownloadHandler(metaclass=abc.ABCMeta):
|
||||
def __init__(
|
||||
self,
|
||||
body: SuccessPayload | ErrorDownloadPayload | ErrorGeneralPayload,
|
||||
body: SuccessDownloadPayload
|
||||
| ErrorDownloadPayload
|
||||
| ErrorDownloadGeneralPayload,
|
||||
bot: 'VideoBot',
|
||||
) -> None:
|
||||
self._log = logging.getLogger(self.__class__.__name__)
|
||||
|
@ -23,11 +25,8 @@ class AbstractHandler(metaclass=abc.ABCMeta):
|
|||
self._bot = bot
|
||||
self._receiving_users = self._get_receiving_users()
|
||||
|
||||
async def handle(self) -> None:
|
||||
await self._handle()
|
||||
|
||||
@abc.abstractmethod
|
||||
async def _handle(self, *args, **kwargs) -> None:
|
||||
async def handle(self) -> None:
|
||||
pass
|
||||
|
||||
def _get_sender_id(self) -> int | None:
|
||||
|
|
|
@ -3,16 +3,16 @@ import html
|
|||
|
||||
from pyrogram.enums import ParseMode
|
||||
from yt_shared.enums import RabbitPayloadType
|
||||
from yt_shared.schemas.error import ErrorDownloadPayload, ErrorGeneralPayload
|
||||
from yt_shared.schemas.error import ErrorDownloadGeneralPayload, ErrorDownloadPayload
|
||||
|
||||
from bot.core.config import settings
|
||||
from bot.core.handlers.abstract import AbstractHandler
|
||||
from bot.core.handlers.abstract import AbstractDownloadHandler
|
||||
from bot.core.utils import split_telegram_message
|
||||
from bot.version import __version__
|
||||
|
||||
|
||||
class ErrorHandler(AbstractHandler):
|
||||
_body: ErrorDownloadPayload | ErrorGeneralPayload
|
||||
class ErrorDownloadHandler(AbstractDownloadHandler):
|
||||
_body: ErrorDownloadPayload | ErrorDownloadGeneralPayload
|
||||
_ERR_MSG_TPL = (
|
||||
'🛑 <b>{header}</b>\n\n'
|
||||
'ℹ <b>Task ID:</b> <code>{task_id}</code>\n'
|
||||
|
@ -31,9 +31,6 @@ class ErrorHandler(AbstractHandler):
|
|||
}
|
||||
|
||||
async def handle(self) -> None:
|
||||
await self._handle()
|
||||
|
||||
async def _handle(self) -> None:
|
||||
self._send_error_text()
|
||||
|
||||
def _send_error_text(self) -> None:
|
||||
|
|
|
@ -6,19 +6,19 @@ from pyrogram.enums import ParseMode
|
|||
from yt_shared.emoji import SUCCESS_EMOJI
|
||||
from yt_shared.enums import MediaFileType, TaskSource
|
||||
from yt_shared.rabbit.publisher import RmqPublisher
|
||||
from yt_shared.schemas.error import ErrorGeneralPayload
|
||||
from yt_shared.schemas.error import ErrorDownloadGeneralPayload
|
||||
from yt_shared.schemas.media import BaseMedia
|
||||
from yt_shared.schemas.success import SuccessPayload
|
||||
from yt_shared.schemas.success import SuccessDownloadPayload
|
||||
from yt_shared.utils.file import remove_dir
|
||||
from yt_shared.utils.tasks.tasks import create_task
|
||||
|
||||
from bot.core.handlers.abstract import AbstractHandler
|
||||
from bot.core.handlers.abstract import AbstractDownloadHandler
|
||||
from bot.core.tasks.upload import AudioUploadTask, VideoUploadTask
|
||||
from bot.core.utils import bold
|
||||
|
||||
|
||||
class SuccessHandler(AbstractHandler):
|
||||
_body: SuccessPayload
|
||||
class SuccessDownloadHandler(AbstractDownloadHandler):
|
||||
_body: SuccessDownloadPayload
|
||||
_UPLOAD_TASK_MAP = {
|
||||
MediaFileType.AUDIO: AudioUploadTask,
|
||||
MediaFileType.VIDEO: VideoUploadTask,
|
||||
|
@ -29,16 +29,19 @@ class SuccessHandler(AbstractHandler):
|
|||
self._rmq_publisher = RmqPublisher()
|
||||
|
||||
async def handle(self) -> None:
|
||||
coro_tasks = []
|
||||
try:
|
||||
for media_object in self._body.media.get_media_objects():
|
||||
coro_tasks.append(self._handle(media_object))
|
||||
await asyncio.gather(*coro_tasks)
|
||||
await self._handle()
|
||||
finally:
|
||||
self._cleanup()
|
||||
|
||||
async def _handle(self) -> None:
|
||||
coro_tasks = []
|
||||
for media_object in self._body.media.get_media_objects():
|
||||
coro_tasks.append(self._handle_media_object(media_object))
|
||||
await asyncio.gather(*coro_tasks)
|
||||
|
||||
async def _publish_error_message(self, err: Exception) -> None:
|
||||
err_payload = ErrorGeneralPayload(
|
||||
err_payload = ErrorDownloadGeneralPayload(
|
||||
task_id=self._body.task_id,
|
||||
message_id=self._body.message_id,
|
||||
from_chat_id=self._body.from_chat_id,
|
||||
|
@ -53,7 +56,7 @@ class SuccessHandler(AbstractHandler):
|
|||
)
|
||||
await self._rmq_publisher.send_download_error(err_payload)
|
||||
|
||||
async def _handle(self, media_object: BaseMedia) -> None:
|
||||
async def _handle_media_object(self, media_object: BaseMedia) -> None:
|
||||
try:
|
||||
await self._send_success_text(media_object)
|
||||
if self._upload_is_enabled():
|
||||
|
@ -61,7 +64,7 @@ class SuccessHandler(AbstractHandler):
|
|||
await self._create_upload_task(media_object)
|
||||
else:
|
||||
self._log.warning(
|
||||
'File %s will not be uploaded due to upload configuration',
|
||||
'File "%s" will not be uploaded due to upload configuration',
|
||||
media_object.filepath,
|
||||
)
|
||||
except Exception as err:
|
||||
|
@ -139,7 +142,7 @@ class SuccessHandler(AbstractHandler):
|
|||
file_size = os.stat(media_obj.filepath).st_size
|
||||
if file_size > max_file_size:
|
||||
err_msg = (
|
||||
f'{media_obj.file_type} file size {file_size} bytes bigger than '
|
||||
f'{media_obj.file_type} file size of {file_size} bytes bigger than '
|
||||
f'allowed {max_file_size} bytes. Will not upload'
|
||||
)
|
||||
self._log.warning(err_msg)
|
||||
|
|
|
@ -14,12 +14,12 @@ from yt_shared.repositories.task import TaskRepository
|
|||
from yt_shared.schemas.base import RealBaseModel
|
||||
from yt_shared.schemas.cache import CacheSchema
|
||||
from yt_shared.schemas.media import BaseMedia, Video
|
||||
from yt_shared.schemas.success import SuccessPayload
|
||||
from yt_shared.schemas.success import SuccessDownloadPayload
|
||||
from yt_shared.utils.tasks.abstract import AbstractTask
|
||||
from yt_shared.utils.tasks.tasks import create_task
|
||||
|
||||
from bot.core.config.config import get_main_config
|
||||
from bot.core.config.schema import AnonymousUserSchema, UserSchema
|
||||
from bot.core.config.schema import AnonymousUserSchema, UserSchema, VideoCaptionSchema
|
||||
from bot.core.utils import bold
|
||||
|
||||
if TYPE_CHECKING:
|
||||
|
@ -36,8 +36,8 @@ class BaseUploadContext(RealBaseModel):
|
|||
|
||||
|
||||
class VideoUploadContext(BaseUploadContext):
|
||||
height: StrictInt
|
||||
width: StrictInt
|
||||
height: StrictInt | StrictFloat
|
||||
width: StrictInt | StrictFloat
|
||||
thumb: StrictStr | None = None
|
||||
|
||||
|
||||
|
@ -54,13 +54,19 @@ class AbstractUploadTask(AbstractTask, metaclass=abc.ABCMeta):
|
|||
users: list[AnonymousUserSchema | UserSchema],
|
||||
bot: 'VideoBot',
|
||||
semaphore: asyncio.Semaphore,
|
||||
context: SuccessPayload,
|
||||
context: SuccessDownloadPayload,
|
||||
) -> None:
|
||||
super().__init__()
|
||||
self._config = get_main_config()
|
||||
self._media_object = media_object
|
||||
|
||||
if media_object.is_converted:
|
||||
self._filename = media_object.converted_filename
|
||||
self._filepath = media_object.converted_filepath
|
||||
else:
|
||||
self._filename = media_object.filename
|
||||
self._filepath = media_object.filepath
|
||||
|
||||
self._bot = bot
|
||||
self._users = users
|
||||
self._semaphore = semaphore
|
||||
|
@ -244,23 +250,23 @@ class VideoUploadTask(AbstractUploadTask):
|
|||
type=MessageMediaType.VIDEO,
|
||||
)
|
||||
|
||||
def _get_caption_conf(self) -> VideoCaptionSchema:
|
||||
if self._users[0].is_anonymous_user:
|
||||
return self._bot.conf.telegram.api.video_caption
|
||||
return self._users[0].upload.video_caption
|
||||
|
||||
def _generate_file_caption(self) -> str:
|
||||
caption_items = []
|
||||
if self._users[0].is_anonymous_user:
|
||||
caption_conf = self._bot.conf.telegram.api.video_caption
|
||||
else:
|
||||
caption_conf = self._users[0].upload.video_caption
|
||||
caption_conf = self._get_caption_conf()
|
||||
|
||||
if caption_conf.include_title:
|
||||
caption_items.append(f'{bold("Title:")} {self._media_object.title}')
|
||||
caption_items.append(f'📝 {self._media_object.title}')
|
||||
if caption_conf.include_filename:
|
||||
caption_items.append(f'{bold("Filename:")} {self._filename}')
|
||||
caption_items.append(f'ℹ️ {self._filename}')
|
||||
if caption_conf.include_link:
|
||||
caption_items.append(f'{bold("URL:")} {self._ctx.context.url}')
|
||||
caption_items.append(f'👀 {self._ctx.context.url}')
|
||||
if caption_conf.include_size:
|
||||
caption_items.append(
|
||||
f'{bold("Size:")} {self._media_object.file_size_human()}'
|
||||
)
|
||||
caption_items.append(f'💾 {self._media_object.file_size_human()}')
|
||||
return '\n'.join(caption_items)
|
||||
|
||||
def _generate_send_media_coroutine(self, chat_id: int) -> Coroutine:
|
||||
|
@ -269,8 +275,8 @@ class VideoUploadTask(AbstractUploadTask):
|
|||
'caption': self._media_ctx.caption,
|
||||
'file_name': self._media_ctx.filename,
|
||||
'duration': int(self._media_ctx.duration),
|
||||
'height': self._media_ctx.height,
|
||||
'width': self._media_ctx.width,
|
||||
'height': int(self._media_ctx.height),
|
||||
'width': int(self._media_ctx.width),
|
||||
}
|
||||
|
||||
if self._media_ctx.thumb:
|
||||
|
|
|
@ -20,7 +20,7 @@ class RabbitWorkerType(enum.Enum):
|
|||
SUCCESS = 'SUCCESS'
|
||||
|
||||
|
||||
class AbstractResultWorker(AbstractTask):
|
||||
class AbstractDownloadResultWorker(AbstractTask):
|
||||
TYPE: RabbitWorkerType | None = None
|
||||
QUEUE_TYPE: str | None = None
|
||||
SCHEMA_CLS: tuple[Type[BaseModel]] = ()
|
||||
|
@ -58,8 +58,9 @@ class AbstractResultWorker(AbstractTask):
|
|||
async def _deserialize_message(self, message: IncomingMessage) -> BaseModel:
|
||||
for schema_cls in self.SCHEMA_CLS:
|
||||
try:
|
||||
return schema_cls.parse_raw(message.body)
|
||||
return schema_cls.model_validate_json(message.body)
|
||||
except Exception:
|
||||
# Skip unmatched schema class that failed to parse the body.
|
||||
pass
|
||||
else:
|
||||
self._log.error('Failed to decode message body: %s', message.body)
|
||||
|
|
|
@ -1,17 +1,17 @@
|
|||
from yt_shared.rabbit.rabbit_config import ERROR_QUEUE
|
||||
from yt_shared.schemas.error import ErrorDownloadPayload, ErrorGeneralPayload
|
||||
from yt_shared.schemas.error import ErrorDownloadGeneralPayload, ErrorDownloadPayload
|
||||
|
||||
from bot.core.handlers.error import ErrorHandler
|
||||
from bot.core.workers.abstract import AbstractResultWorker, RabbitWorkerType
|
||||
from bot.core.handlers.error import ErrorDownloadHandler
|
||||
from bot.core.workers.abstract import AbstractDownloadResultWorker, RabbitWorkerType
|
||||
|
||||
|
||||
class ErrorResultWorker(AbstractResultWorker):
|
||||
class ErrorDownloadResultWorker(AbstractDownloadResultWorker):
|
||||
TYPE = RabbitWorkerType.ERROR
|
||||
QUEUE_TYPE = ERROR_QUEUE
|
||||
SCHEMA_CLS = (ErrorDownloadPayload, ErrorGeneralPayload)
|
||||
HANDLER_CLS = ErrorHandler
|
||||
SCHEMA_CLS = (ErrorDownloadPayload, ErrorDownloadGeneralPayload)
|
||||
HANDLER_CLS = ErrorDownloadHandler
|
||||
|
||||
async def _process_body(
|
||||
self, body: ErrorDownloadPayload | ErrorGeneralPayload
|
||||
self, body: ErrorDownloadPayload | ErrorDownloadGeneralPayload
|
||||
) -> None:
|
||||
await self.HANDLER_CLS(body=body, bot=self._bot).handle()
|
||||
|
|
|
@ -5,15 +5,15 @@ from typing import TYPE_CHECKING
|
|||
from yt_shared.utils.tasks.tasks import create_task
|
||||
|
||||
from bot.core.workers.abstract import RabbitWorkerType
|
||||
from bot.core.workers.error import ErrorResultWorker
|
||||
from bot.core.workers.success import SuccessResultWorker
|
||||
from bot.core.workers.error import ErrorDownloadResultWorker
|
||||
from bot.core.workers.success import SuccessDownloadResultWorker
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from bot.core.bot import VideoBot
|
||||
|
||||
|
||||
class RabbitWorkerManager:
|
||||
_TASK_TYPES = (ErrorResultWorker, SuccessResultWorker)
|
||||
_TASK_TYPES = (ErrorDownloadResultWorker, SuccessDownloadResultWorker)
|
||||
|
||||
def __init__(self, bot: 'VideoBot') -> None:
|
||||
self._log = logging.getLogger(self.__class__.__name__)
|
||||
|
|
|
@ -1,21 +1,21 @@
|
|||
from yt_shared.rabbit.rabbit_config import SUCCESS_QUEUE
|
||||
from yt_shared.schemas.success import SuccessPayload
|
||||
from yt_shared.schemas.success import SuccessDownloadPayload
|
||||
from yt_shared.utils.tasks.tasks import create_task
|
||||
|
||||
from bot.core.handlers.success import SuccessHandler
|
||||
from bot.core.workers.abstract import AbstractResultWorker, RabbitWorkerType
|
||||
from bot.core.handlers.success import SuccessDownloadHandler
|
||||
from bot.core.workers.abstract import AbstractDownloadResultWorker, RabbitWorkerType
|
||||
|
||||
|
||||
class SuccessResultWorker(AbstractResultWorker):
|
||||
class SuccessDownloadResultWorker(AbstractDownloadResultWorker):
|
||||
TYPE = RabbitWorkerType.SUCCESS
|
||||
QUEUE_TYPE = SUCCESS_QUEUE
|
||||
SCHEMA_CLS = (SuccessPayload,)
|
||||
HANDLER_CLS = SuccessHandler
|
||||
SCHEMA_CLS = (SuccessDownloadPayload,)
|
||||
HANDLER_CLS = SuccessDownloadHandler
|
||||
|
||||
async def _process_body(self, body: SuccessPayload) -> None:
|
||||
async def _process_body(self, body: SuccessDownloadPayload) -> None:
|
||||
self._spawn_handler_task(body)
|
||||
|
||||
def _spawn_handler_task(self, body: SuccessPayload) -> None:
|
||||
def _spawn_handler_task(self, body: SuccessDownloadPayload) -> None:
|
||||
task_name = self.HANDLER_CLS.__class__.__name__
|
||||
create_task(
|
||||
self.HANDLER_CLS(body=body, bot=self._bot).handle(),
|
||||
|
|
|
@ -1 +1 @@
|
|||
__version__ = '1.4.1'
|
||||
__version__ = '1.4.3'
|
||||
|
|
|
@ -6,7 +6,7 @@ from yt_shared.schemas.media import InbMediaPayload
|
|||
from worker.core.payload_handler import PayloadHandler
|
||||
|
||||
|
||||
class _RMQCallbacks:
|
||||
class RMQCallbacks:
|
||||
"""RabbitMQ's callbacks."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
|
@ -17,7 +17,7 @@ class _RMQCallbacks:
|
|||
try:
|
||||
await self._process_incoming_message(message)
|
||||
except Exception:
|
||||
self._log.exception('Critical exception in worker rabbit callback')
|
||||
self._log.exception('Critical exception in worker RabbitMQ callback')
|
||||
await message.reject(requeue=False)
|
||||
|
||||
async def _process_incoming_message(self, message: IncomingMessage) -> None:
|
||||
|
@ -33,7 +33,7 @@ class _RMQCallbacks:
|
|||
|
||||
def _deserialize_message(self, message: IncomingMessage) -> InbMediaPayload | None:
|
||||
try:
|
||||
return InbMediaPayload.parse_raw(message.body)
|
||||
return InbMediaPayload.model_validate_json(message.body)
|
||||
except Exception:
|
||||
self._log.exception('Failed to deserialize message body: %s', message.body)
|
||||
return None
|
||||
|
@ -44,4 +44,4 @@ class _RMQCallbacks:
|
|||
await message.reject(requeue=False)
|
||||
|
||||
|
||||
rmq_callbacks = _RMQCallbacks()
|
||||
rmq_callbacks = RMQCallbacks()
|
||||
|
|
|
@ -6,6 +6,7 @@ class WorkerSettings(Settings):
|
|||
MAX_SIMULTANEOUS_DOWNLOADS: int
|
||||
STORAGE_PATH: str
|
||||
THUMBNAIL_FRAME_SECOND: float
|
||||
INSTAGRAM_ENCODE_TO_H264: bool
|
||||
|
||||
|
||||
settings = WorkerSettings()
|
||||
|
|
|
@ -230,7 +230,7 @@ class MediaDownloader:
|
|||
|
||||
def _get_video_context(
|
||||
self, meta: dict
|
||||
) -> tuple[float | None, int | None, int | None]:
|
||||
) -> tuple[float | None, int | float | None, int | float | None]:
|
||||
if meta['_type'] == self._PLAYLIST_TYPE:
|
||||
if not len(meta['entries']):
|
||||
raise ValueError(
|
||||
|
|
|
@ -19,8 +19,10 @@ from yt_shared.utils.tasks.tasks import create_task
|
|||
from worker.core.config import settings
|
||||
from worker.core.downloader import MediaDownloader
|
||||
from worker.core.exceptions import DownloadVideoServiceError
|
||||
from worker.core.tasks.encode import EncodeToH264Task
|
||||
from worker.core.tasks.ffprobe_context import GetFfprobeContextTask
|
||||
from worker.core.tasks.thumbnail import MakeThumbnailTask
|
||||
from worker.utils import is_instagram
|
||||
|
||||
|
||||
class MediaService:
|
||||
|
@ -127,6 +129,20 @@ class MediaService:
|
|||
|
||||
if media_payload.save_to_storage:
|
||||
coro_tasks.append(self._create_copy_file_task(video))
|
||||
|
||||
# Instagram returns VP9+AAC in MP4 container for logged users and needs
|
||||
# to be encoded to H264 since Telegram doesn't play VP9 on iOS
|
||||
if settings.INSTAGRAM_ENCODE_TO_H264 and is_instagram(media_payload.url):
|
||||
coro_tasks.append(
|
||||
create_task(
|
||||
EncodeToH264Task(media=media).run(),
|
||||
task_name=EncodeToH264Task.__class__.__name__,
|
||||
logger=self._log,
|
||||
exception_message='Task "%s" raised an exception',
|
||||
exception_message_args=(EncodeToH264Task.__class__.__name__,),
|
||||
)
|
||||
)
|
||||
|
||||
await asyncio.gather(*coro_tasks)
|
||||
|
||||
file = await self._repository.save_file(db, task, media.video, media.meta)
|
||||
|
@ -192,9 +208,16 @@ class MediaService:
|
|||
)
|
||||
|
||||
async def _copy_file_to_storage(self, file: BaseMedia) -> None:
|
||||
dst = os.path.join(settings.STORAGE_PATH, file.filename)
|
||||
self._log.info('Copying "%s" to storage "%s"', file.filepath, dst)
|
||||
await asyncio.to_thread(shutil.copy2, file.filepath, dst)
|
||||
if file.is_converted:
|
||||
filename = file.converted_filename
|
||||
filepath = file.converted_filepath
|
||||
else:
|
||||
filename = file.filename
|
||||
filepath = file.filepath
|
||||
|
||||
dst = os.path.join(settings.STORAGE_PATH, filename)
|
||||
self._log.info('Copying "%s" to storage "%s"', filepath, dst)
|
||||
await asyncio.to_thread(shutil.copy2, filepath, dst)
|
||||
file.mark_as_saved_to_storage(storage_path=dst)
|
||||
|
||||
def _err_file_cleanup(self, video: DownMedia) -> None:
|
||||
|
|
|
@ -5,9 +5,9 @@ from yt_dlp import version as ytdlp_version
|
|||
from yt_shared.db.session import get_db
|
||||
from yt_shared.models import Task
|
||||
from yt_shared.rabbit.publisher import RmqPublisher
|
||||
from yt_shared.schemas.error import ErrorDownloadPayload, ErrorGeneralPayload
|
||||
from yt_shared.schemas.error import ErrorDownloadGeneralPayload, ErrorDownloadPayload
|
||||
from yt_shared.schemas.media import DownMedia, InbMediaPayload
|
||||
from yt_shared.schemas.success import SuccessPayload
|
||||
from yt_shared.schemas.success import SuccessDownloadPayload
|
||||
|
||||
from worker.core.exceptions import DownloadVideoServiceError, GeneralVideoServiceError
|
||||
from worker.core.media_service import MediaService
|
||||
|
@ -45,7 +45,7 @@ class PayloadHandler:
|
|||
async def _send_finished_task(
|
||||
self, task: Task, media: DownMedia, media_payload: InbMediaPayload
|
||||
) -> None:
|
||||
success_payload = SuccessPayload(
|
||||
success_payload = SuccessDownloadPayload(
|
||||
task_id=task.id,
|
||||
media=media,
|
||||
message_id=task.message_id,
|
||||
|
@ -84,7 +84,7 @@ class PayloadHandler:
|
|||
media_payload: InbMediaPayload,
|
||||
) -> None:
|
||||
task: Task | None = getattr(err, 'task', None)
|
||||
err_payload = ErrorGeneralPayload(
|
||||
err_payload = ErrorDownloadGeneralPayload(
|
||||
task_id=task.id if task else 'N/A',
|
||||
message_id=media_payload.message_id,
|
||||
from_chat_id=media_payload.from_chat_id,
|
||||
|
|
45
app_worker/worker/core/tasks/encode.py
Normal file
45
app_worker/worker/core/tasks/encode.py
Normal file
|
@ -0,0 +1,45 @@
|
|||
import os
|
||||
|
||||
from yt_shared.schemas.media import DownMedia
|
||||
|
||||
from worker.core.tasks.abstract import AbstractFfBinaryTask
|
||||
|
||||
|
||||
class EncodeToH264Task(AbstractFfBinaryTask):
|
||||
_CMD = 'ffmpeg -y -loglevel error -i "{filepath}" -c:v libx264 -pix_fmt yuv420p -preset veryfast -crf 25 -movflags +faststart -c:a copy "{output}"'
|
||||
_EXT = 'mp4'
|
||||
_CMD_TIMEOUT = 120
|
||||
|
||||
def __init__(self, media: DownMedia):
|
||||
super().__init__(file_path=media.video.filepath)
|
||||
self._media = media
|
||||
self._video = media.video
|
||||
|
||||
async def run(self) -> None:
|
||||
await self._encode_video()
|
||||
|
||||
def _get_output_path(self) -> str:
|
||||
filename = f'{self._video.filename.rsplit(".", 1)[0]}-h264.{self._EXT}'
|
||||
return os.path.join(self._media.root_path, filename)
|
||||
|
||||
async def _encode_video(self) -> None:
|
||||
output = self._get_output_path()
|
||||
cmd = self._CMD.format(filepath=self._file_path, output=output)
|
||||
self._log.info('Encoding: %s', cmd)
|
||||
|
||||
proc = await self._run_proc(cmd)
|
||||
if not proc:
|
||||
return None
|
||||
|
||||
stdout, stderr = await self._get_stdout_stderr(proc)
|
||||
self._log.info(
|
||||
'Process %s returncode: %d, stderr: %s', cmd, proc.returncode, stderr
|
||||
)
|
||||
if proc.returncode:
|
||||
err_msg = (
|
||||
f'Failed to make video context. Is file broken? {self._file_path}?'
|
||||
)
|
||||
self._log.error(err_msg)
|
||||
raise RuntimeError(err_msg)
|
||||
|
||||
self._video.mark_as_converted(filepath=output)
|
|
@ -1,8 +1,10 @@
|
|||
import os
|
||||
from urllib.parse import urlparse
|
||||
|
||||
import yt_dlp
|
||||
|
||||
_COOKIES_FILEPATH = '/app/cookies/cookies.txt'
|
||||
_INSTAGRAM_HOST = 'instagram.com'
|
||||
|
||||
|
||||
def cli_to_api(opts: list) -> dict:
|
||||
|
@ -26,3 +28,11 @@ def is_file_empty(filepath: str) -> bool:
|
|||
def get_cookies_opts_if_not_empty() -> list[str]:
|
||||
"""Return yt-dlp cookies option with cookies filepath."""
|
||||
return [] if is_file_empty(_COOKIES_FILEPATH) else ['--cookies', _COOKIES_FILEPATH]
|
||||
|
||||
|
||||
def is_instagram(url: str) -> bool:
|
||||
return _INSTAGRAM_HOST in urlparse(url).netloc
|
||||
|
||||
|
||||
def get_media_format_options() -> str:
|
||||
pass
|
||||
|
|
|
@ -1,4 +1,7 @@
|
|||
APPLICATION_NAME=yt_worker
|
||||
|
||||
MAX_SIMULTANEOUS_DOWNLOADS=2
|
||||
STORAGE_PATH=/filestorage
|
||||
|
||||
THUMBNAIL_FRAME_SECOND=10.0
|
||||
INSTAGRAM_ENCODE_TO_H264=True
|
||||
|
|
|
@ -13,9 +13,9 @@ from yt_shared.rabbit.rabbit_config import (
|
|||
SUCCESS_EXCHANGE,
|
||||
SUCCESS_QUEUE,
|
||||
)
|
||||
from yt_shared.schemas.error import ErrorDownloadPayload, ErrorGeneralPayload
|
||||
from yt_shared.schemas.error import ErrorDownloadGeneralPayload, ErrorDownloadPayload
|
||||
from yt_shared.schemas.media import InbMediaPayload
|
||||
from yt_shared.schemas.success import SuccessPayload
|
||||
from yt_shared.schemas.success import SuccessDownloadPayload
|
||||
from yt_shared.utils.common import Singleton
|
||||
|
||||
|
||||
|
@ -37,7 +37,7 @@ class RmqPublisher(metaclass=Singleton):
|
|||
return self._is_sent(confirm)
|
||||
|
||||
async def send_download_error(
|
||||
self, error_payload: ErrorDownloadPayload | ErrorGeneralPayload
|
||||
self, error_payload: ErrorDownloadPayload | ErrorDownloadGeneralPayload
|
||||
) -> bool:
|
||||
err_exchange = self._rabbit_mq.exchanges[ERROR_EXCHANGE]
|
||||
err_message = aio_pika.Message(body=error_payload.model_dump_json().encode())
|
||||
|
@ -46,7 +46,9 @@ class RmqPublisher(metaclass=Singleton):
|
|||
)
|
||||
return self._is_sent(confirm)
|
||||
|
||||
async def send_download_finished(self, success_payload: SuccessPayload) -> bool:
|
||||
async def send_download_finished(
|
||||
self, success_payload: SuccessDownloadPayload
|
||||
) -> bool:
|
||||
message = aio_pika.Message(body=success_payload.model_dump_json().encode())
|
||||
exchange = self._rabbit_mq.exchanges[SUCCESS_EXCHANGE]
|
||||
confirm = await exchange.publish(
|
||||
|
|
|
@ -8,7 +8,7 @@ from yt_shared.schemas.base import BaseRabbitPayloadModel
|
|||
from yt_shared.schemas.media import InbMediaPayload
|
||||
|
||||
|
||||
class ErrorGeneralPayload(BaseRabbitPayloadModel):
|
||||
class ErrorDownloadGeneralPayload(BaseRabbitPayloadModel):
|
||||
type: Literal[RabbitPayloadType.GENERAL_ERROR] = RabbitPayloadType.GENERAL_ERROR
|
||||
task_id: uuid.UUID | StrictStr | None
|
||||
from_chat_id: StrictInt | None
|
||||
|
@ -23,6 +23,6 @@ class ErrorGeneralPayload(BaseRabbitPayloadModel):
|
|||
yt_dlp_version: StrictStr | None
|
||||
|
||||
|
||||
class ErrorDownloadPayload(ErrorGeneralPayload):
|
||||
class ErrorDownloadPayload(ErrorDownloadGeneralPayload):
|
||||
type: Literal[RabbitPayloadType.DOWNLOAD_ERROR] = RabbitPayloadType.DOWNLOAD_ERROR
|
||||
task_id: uuid.UUID
|
||||
|
|
|
@ -45,6 +45,10 @@ class BaseMedia(RealBaseModel):
|
|||
saved_to_storage: StrictBool = False
|
||||
storage_path: StrictStr | None = None
|
||||
|
||||
is_converted: StrictBool = False
|
||||
converted_filepath: StrictStr | None = None
|
||||
converted_filename: StrictStr | None = None
|
||||
|
||||
def file_size_human(self) -> str:
|
||||
return format_bytes(num=self.file_size)
|
||||
|
||||
|
@ -52,6 +56,11 @@ class BaseMedia(RealBaseModel):
|
|||
self.storage_path = storage_path
|
||||
self.saved_to_storage = True
|
||||
|
||||
def mark_as_converted(self, filepath: str) -> None:
|
||||
self.converted_filepath = filepath
|
||||
self.converted_filename = filepath.rsplit("/", 1)[-1]
|
||||
self.is_converted = True
|
||||
|
||||
|
||||
class Audio(BaseMedia):
|
||||
"""Model representing downloaded audio file."""
|
||||
|
@ -64,8 +73,8 @@ class Video(BaseMedia):
|
|||
|
||||
file_type: Literal[MediaFileType.VIDEO] = MediaFileType.VIDEO
|
||||
thumb_name: StrictStr | None = None
|
||||
width: int | None = None
|
||||
height: int | None = None
|
||||
width: int | float | None = None
|
||||
height: int | float | None = None
|
||||
thumb_path: StrictStr | None = None
|
||||
|
||||
@model_validator(mode='before')
|
||||
|
|
|
@ -8,7 +8,7 @@ from yt_shared.schemas.base import BaseRabbitPayloadModel
|
|||
from yt_shared.schemas.media import DownMedia, InbMediaPayload
|
||||
|
||||
|
||||
class SuccessPayload(BaseRabbitPayloadModel):
|
||||
class SuccessDownloadPayload(BaseRabbitPayloadModel):
|
||||
"""Payload with downloaded media context."""
|
||||
|
||||
type: Literal[RabbitPayloadType.SUCCESS] = RabbitPayloadType.SUCCESS
|
||||
|
|
Loading…
Reference in a new issue