yt-dlp-bot/worker/core/video_service.py

131 lines
4.7 KiB
Python
Raw Normal View History

2022-02-04 06:21:27 +08:00
import asyncio
import logging
2022-06-11 04:35:48 +08:00
import os
import shutil
2022-02-04 06:21:27 +08:00
from typing import Optional
from sqlalchemy.ext.asyncio import AsyncSession
2022-10-14 03:55:18 +08:00
from core.config import settings
2022-06-11 04:35:48 +08:00
from core.downloader import VideoDownloader
2022-06-14 07:24:25 +08:00
from core.tasks.ffprobe_context import GetFfprobeContextTask
from core.tasks.thumbnail import MakeThumbnailTask
2022-02-04 06:21:27 +08:00
from yt_shared.constants import TaskStatus
2022-06-11 04:35:48 +08:00
from yt_shared.models import Task
from yt_shared.rabbit.publisher import Publisher
from yt_shared.repositories.task import TaskRepository
2022-02-04 06:21:27 +08:00
from yt_shared.schemas.error import ErrorPayload
from yt_shared.schemas.success import SuccessPayload
2022-06-11 04:35:48 +08:00
from yt_shared.schemas.video import DownVideo, VideoPayload
2022-02-04 06:21:27 +08:00
class VideoService:
def __init__(self) -> None:
self._log = logging.getLogger(self.__class__.__name__)
self._downloader = VideoDownloader()
self._repository = TaskRepository()
2022-06-11 04:35:48 +08:00
self._publisher = Publisher()
2022-02-04 06:21:27 +08:00
2022-06-11 04:35:48 +08:00
async def process(self, video_payload: VideoPayload, db: AsyncSession) -> None:
2022-02-04 06:21:27 +08:00
task = await self._repository.get_or_create_task(db, video_payload)
if task.status != TaskStatus.PENDING:
return
2022-06-11 04:35:48 +08:00
await self._repository.save_as_processing(db, task)
2022-02-04 06:21:27 +08:00
downloaded_video = await self._start_download(db, task, video_payload)
if downloaded_video:
2022-06-11 04:35:48 +08:00
await self._post_process_file(downloaded_video, task, db)
async def _post_process_file(
self, video: DownVideo, task: Task, db: AsyncSession
) -> None:
2022-10-14 03:55:18 +08:00
file_path = os.path.join(settings.TMP_DOWNLOAD_PATH, video.name)
thumb_path = os.path.join(settings.TMP_DOWNLOAD_PATH, video.thumb_name)
2022-06-14 07:24:25 +08:00
post_process_coros = [
self._set_probe_ctx(file_path, video),
MakeThumbnailTask(thumb_path, file_path).run(),
]
2022-06-11 04:35:48 +08:00
2022-10-14 03:55:18 +08:00
if settings.SAVE_VIDEO_FILE:
2022-06-11 04:35:48 +08:00
post_process_coros.append(self._copy_file_to_storage(video))
await asyncio.gather(*post_process_coros)
2022-06-14 07:24:25 +08:00
await self._repository.save_as_done(db, task, video),
await self._send_finished_task(task, video)
@staticmethod
async def _set_probe_ctx(file_path: str, video: DownVideo) -> None:
probe_ctx = await GetFfprobeContextTask(file_path).run()
if not probe_ctx:
return
video_streams = [
stream for stream in probe_ctx['streams'] if stream['codec_type'] == 'video'
]
video.duration = int(float(probe_ctx['format']['duration']))
video.width = video_streams[0]['width']
video.height = video_streams[0]['height']
2022-02-04 06:21:27 +08:00
@staticmethod
2022-06-11 04:35:48 +08:00
async def _copy_file_to_storage(video: DownVideo) -> None:
2022-10-14 03:55:18 +08:00
src = os.path.join(settings.TMP_DOWNLOAD_PATH, video.name)
dst = os.path.join(settings.STORAGE_PATH, video.name)
2022-06-11 04:35:48 +08:00
await asyncio.to_thread(shutil.copy2, src, dst)
2022-02-04 06:21:27 +08:00
async def _start_download(
2022-06-11 04:35:48 +08:00
self, db: AsyncSession, task: Task, video_payload: VideoPayload
) -> Optional[DownVideo]:
2022-02-04 06:21:27 +08:00
try:
return await asyncio.get_running_loop().run_in_executor(
2022-06-11 04:35:48 +08:00
None, lambda: self._downloader.download_video(task.url)
)
2022-02-04 06:21:27 +08:00
except Exception as err:
await self._handle_download_exception(db, err, task, video_payload)
return None
2022-06-11 04:35:48 +08:00
async def _handle_download_exception(
self, db: AsyncSession, err: Exception, task: Task, video_payload: VideoPayload
) -> None:
2022-02-04 06:21:27 +08:00
exception_msg = str(err)
2022-06-11 04:35:48 +08:00
await self._repository.save_as_failed(db, task, exception_msg)
await self._send_failed_task(
task=task,
video_payload=video_payload,
exception_msg=exception_msg,
err=err,
)
2022-06-14 07:24:25 +08:00
async def _send_finished_task(self, task: Task, video: DownVideo) -> None:
2022-06-14 04:46:54 +08:00
success_payload = SuccessPayload(
task_id=task.id,
filename=task.file.name,
2022-06-14 07:24:25 +08:00
thumb_name=video.thumb_name,
duration=video.duration,
width=video.width,
height=video.height,
2022-06-14 04:46:54 +08:00
message_id=task.message_id,
from_user_id=task.from_user_id,
)
2022-06-11 04:35:48 +08:00
await self._publisher.send_download_finished(success_payload)
async def _send_failed_task(
self,
task: Task,
video_payload: VideoPayload,
exception_msg: str,
err: Exception,
) -> None:
2022-02-04 06:21:27 +08:00
err_payload = ErrorPayload(
task_id=task.id,
message_id=task.message_id,
2022-06-14 04:46:54 +08:00
from_user_id=video_payload.from_user_id,
2022-02-04 06:21:27 +08:00
message='Download error',
url=task.url,
original_body=video_payload.dict(),
2022-06-11 04:35:48 +08:00
yt_dlp_version=task.yt_dlp_version,
2022-02-04 06:21:27 +08:00
exception_msg=exception_msg,
exception_type=err.__class__.__name__,
)
2022-06-11 04:35:48 +08:00
await self._publisher.send_download_error(err_payload)