yt-dlp-bot/worker/core/video_service.py

142 lines
5.6 KiB
Python
Raw Normal View History

2022-02-04 06:21:27 +08:00
import asyncio
import logging
2022-06-11 04:35:48 +08:00
import os
import shutil
2022-02-04 06:21:27 +08:00
2022-10-14 03:55:18 +08:00
from core.config import settings
2022-06-11 04:35:48 +08:00
from core.downloader import VideoDownloader
2022-11-03 01:56:19 +08:00
from core.exceptions import DownloadVideoServiceError
2022-06-14 07:24:25 +08:00
from core.tasks.ffprobe_context import GetFfprobeContextTask
from core.tasks.thumbnail import MakeThumbnailTask
2022-11-03 01:56:19 +08:00
from sqlalchemy.ext.asyncio import AsyncSession
from yt_shared.enums import TaskStatus
2022-06-11 04:35:48 +08:00
from yt_shared.models import Task
from yt_shared.repositories.task import TaskRepository
from yt_shared.schemas.video import DownVideo, VideoPayload
2022-11-03 01:56:19 +08:00
from yt_shared.utils.file import file_cleanup
from yt_shared.utils.tasks.tasks import create_task
2022-02-04 06:21:27 +08:00
class VideoService:
def __init__(self) -> None:
self._log = logging.getLogger(self.__class__.__name__)
self._downloader = VideoDownloader()
self._repository = TaskRepository()
2022-11-03 01:56:19 +08:00
async def process(
self, video_payload: VideoPayload, db: AsyncSession
) -> tuple[DownVideo | None, Task | None]:
task = await self._repository.get_or_create_task(db, video_payload)
if task.status != TaskStatus.PENDING.value:
return None, None
return (
await self._process(video_payload=video_payload, task=task, db=db),
task,
)
2022-10-15 00:49:09 +08:00
2022-11-03 01:56:19 +08:00
async def _process(
self, video_payload: VideoPayload, task: Task, db: AsyncSession
) -> DownVideo:
await self._repository.save_as_processing(db, task)
downloaded_video = await self._start_download(task, video_payload, db)
2022-10-15 00:49:09 +08:00
try:
2022-11-03 01:56:19 +08:00
await self._post_process_file(downloaded_video, task, db)
except Exception:
self._log.exception('Failed to post-process file %s', downloaded_video)
self._err_file_cleanup(downloaded_video)
raise
return downloaded_video
async def _start_download(
self, task: Task, video_payload: VideoPayload, db: AsyncSession
) -> DownVideo:
try:
return await asyncio.get_running_loop().run_in_executor(
None, lambda: self._downloader.download_video(task.url)
)
except Exception as err:
self._log.exception('Failed to download video. Context: %s', video_payload)
await self._handle_download_exception(err, task, db)
exception = DownloadVideoServiceError(str(err))
exception.task = task
raise exception
2022-06-11 04:35:48 +08:00
2022-11-03 01:56:19 +08:00
async def _post_process_file(
self,
video: DownVideo,
task: Task,
db: AsyncSession,
) -> None:
file_path: str = os.path.join(settings.TMP_DOWNLOAD_PATH, video.name)
thumb_path: str = os.path.join(settings.TMP_DOWNLOAD_PATH, video.thumb_name)
2022-06-14 07:24:25 +08:00
2022-11-03 01:56:19 +08:00
# yt-dlp meta may not contain needed video metadata.
if not all([video.duration, video.height, video.width]):
# TODO: Move to higher level and re-raise as DownloadVideoServiceError with task,
# TODO: or create new exception type.
try:
await self._set_probe_ctx(file_path, video)
except RuntimeError as err:
exception = DownloadVideoServiceError(str(err))
exception.task = task
raise exception
2022-06-11 04:35:48 +08:00
2022-11-03 01:56:19 +08:00
tasks = [self._create_thumbnail_task(file_path, thumb_path, video.duration)]
2022-10-14 03:55:18 +08:00
if settings.SAVE_VIDEO_FILE:
2022-11-03 01:56:19 +08:00
tasks.append(self._create_copy_file_task(video))
2022-11-05 00:45:33 +08:00
await asyncio.gather(*tasks)
2022-06-11 04:35:48 +08:00
2022-11-03 01:56:19 +08:00
final_coros = [self._repository.save_as_done(db, task, video)]
2022-11-05 00:45:33 +08:00
await asyncio.gather(*final_coros)
2022-06-14 07:24:25 +08:00
@staticmethod
async def _set_probe_ctx(file_path: str, video: DownVideo) -> None:
probe_ctx = await GetFfprobeContextTask(file_path).run()
if not probe_ctx:
return
video_streams = [
stream for stream in probe_ctx['streams'] if stream['codec_type'] == 'video'
]
video.duration = float(probe_ctx['format']['duration'])
2022-06-14 07:24:25 +08:00
video.width = video_streams[0]['width']
video.height = video_streams[0]['height']
2022-02-04 06:21:27 +08:00
2022-11-03 01:56:19 +08:00
def _create_copy_file_task(self, video: DownVideo) -> asyncio.Task:
task_name = 'Copy file to storage task'
return create_task(
self._copy_file_to_storage(video),
task_name=task_name,
logger=self._log,
2022-11-06 04:20:27 +08:00
exception_message='Task "%s" raised an exception',
2022-11-03 01:56:19 +08:00
exception_message_args=(task_name,),
)
def _create_thumbnail_task(
self, file_path: str, thumb_path: str, duration: float
2022-11-03 01:56:19 +08:00
) -> asyncio.Task:
return create_task(
MakeThumbnailTask(thumb_path, file_path, duration=duration).run(),
task_name=MakeThumbnailTask.__class__.__name__,
logger=self._log,
2022-11-06 04:20:27 +08:00
exception_message='Task "%s" raised an exception',
2022-11-03 01:56:19 +08:00
exception_message_args=(MakeThumbnailTask.__class__.__name__,),
)
2022-02-04 06:21:27 +08:00
@staticmethod
2022-06-11 04:35:48 +08:00
async def _copy_file_to_storage(video: DownVideo) -> None:
2022-10-15 00:49:09 +08:00
src: str = os.path.join(settings.TMP_DOWNLOAD_PATH, video.name)
dst: str = os.path.join(settings.STORAGE_PATH, video.name)
2022-06-11 04:35:48 +08:00
await asyncio.to_thread(shutil.copy2, src, dst)
2022-02-04 06:21:27 +08:00
2022-11-03 01:56:19 +08:00
def _err_file_cleanup(self, downloaded_video: DownVideo) -> None:
"""Cleanup any downloaded/created data if post-processing failed."""
_file_paths = (downloaded_video.name, downloaded_video.thumb_name)
file_cleanup(file_paths=tuple(x for x in _file_paths if x), log=self._log)
2022-06-11 04:35:48 +08:00
2022-11-03 01:56:19 +08:00
async def _handle_download_exception(
self, err: Exception, task: Task, db: AsyncSession
2022-06-11 04:35:48 +08:00
) -> None:
2022-11-03 01:56:19 +08:00
exception_msg = str(err)
await self._repository.save_as_failed(db, task, exception_msg)