2022-02-04 06:21:27 +08:00
|
|
|
import logging
|
|
|
|
|
|
|
|
from aio_pika import IncomingMessage
|
|
|
|
|
|
|
|
from core.video_service import VideoService
|
|
|
|
from yt_shared.db import get_db
|
|
|
|
from yt_shared.schemas.video import VideoPayload
|
|
|
|
|
|
|
|
|
|
|
|
class _Callbacks:
|
2022-06-11 04:35:48 +08:00
|
|
|
"""RabbitMQ callbacks."""
|
2022-02-04 06:21:27 +08:00
|
|
|
|
|
|
|
def __init__(self) -> None:
|
|
|
|
self._log = logging.getLogger(self.__class__.__name__)
|
|
|
|
self._video_service = VideoService()
|
|
|
|
|
|
|
|
async def on_input_message(self, message: IncomingMessage) -> None:
|
|
|
|
self._log.info('[x] Received message %s', message.body)
|
|
|
|
try:
|
|
|
|
video_payload = VideoPayload.parse_raw(message.body)
|
|
|
|
except Exception:
|
|
|
|
self._log.exception('Failed to decode message body')
|
|
|
|
await self._reject_invalid_body(message)
|
|
|
|
return
|
|
|
|
|
|
|
|
async for session in get_db():
|
2022-06-11 04:35:48 +08:00
|
|
|
await self._video_service.process(video_payload, session)
|
2022-02-04 06:21:27 +08:00
|
|
|
|
|
|
|
await message.ack()
|
2022-06-11 04:35:48 +08:00
|
|
|
self._log.info('Download done with %s', video_payload)
|
2022-02-04 06:21:27 +08:00
|
|
|
|
|
|
|
async def _reject_invalid_body(self, message: IncomingMessage) -> None:
|
|
|
|
body = message.body
|
|
|
|
self._log.error('Invalid message body: %s, type: %s', body, type(body))
|
|
|
|
await message.reject(requeue=False)
|
|
|
|
|
|
|
|
|
|
|
|
callbacks = _Callbacks()
|