From 1b7014d245023ba4335a8276138c93b2b35ca7db Mon Sep 17 00:00:00 2001 From: Taras Terletskyi <888784+tropicoo@users.noreply.github.com> Date: Mon, 13 Jun 2022 23:46:54 +0300 Subject: [PATCH] Polish logic --- .gitignore | 1 + README.md | 2 +- alembic/versions/8021be777d1d_.py | 29 ++++++++++++++++++++++++++ api/api/api_v1/schemas/task.py | 1 + bot/core/bot/bot.py | 3 ++- bot/core/callbacks.py | 12 +++++++---- bot/core/service.py | 16 +++++++++----- bot/core/tasks/rabbit_tasks.py | 5 ++--- bot/core/tasks/upload.py | 17 ++++++++------- bot/core/tasks/ytdlp.py | 2 +- start.sh | 16 +++++++++++--- worker/core/callbacks.py | 4 ++-- worker/core/launcher.py | 2 +- worker/core/video_service.py | 8 ++++++- worker/ytdl_opts/default.py | 1 - yt_shared/yt_shared/config.py | 2 +- yt_shared/yt_shared/models/task.py | 1 + yt_shared/yt_shared/schemas/error.py | 1 + yt_shared/yt_shared/schemas/success.py | 1 + yt_shared/yt_shared/schemas/url.py | 9 ++++++++ yt_shared/yt_shared/schemas/video.py | 1 + 21 files changed, 102 insertions(+), 32 deletions(-) create mode 100644 alembic/versions/8021be777d1d_.py create mode 100644 yt_shared/yt_shared/schemas/url.py diff --git a/.gitignore b/.gitignore index 1cbf928..5125eb6 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,7 @@ # Byte-compiled / optimized / DLL files *.session +*.session-journal config.yml config.yaml diff --git a/README.md b/README.md index f596b5b..ba9e634 100644 --- a/README.md +++ b/README.md @@ -34,7 +34,7 @@ environment variable in `.env_common` file to `True`. Simple as `docker-compose up -d`. Your telegram bot should send you a startup message: -` bot started, paste video URL to start download` and that's it. +` started, paste video URL to start download` and that's it. After pasting video URL bot will send you appropriate message whether it was downloaded or something went wrong. diff --git a/alembic/versions/8021be777d1d_.py b/alembic/versions/8021be777d1d_.py new file mode 100644 index 0000000..68314da --- /dev/null +++ b/alembic/versions/8021be777d1d_.py @@ -0,0 +1,29 @@ +"""empty message + +Revision ID: 8021be777d1d +Revises: ff03785a1f0d +Create Date: 2022-06-13 20:16:10.845151 + +""" +from alembic import op +import sqlalchemy as sa +import sqlalchemy_utils + + +# revision identifiers, used by Alembic. +revision = '8021be777d1d' +down_revision = 'ff03785a1f0d' +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.add_column('task', sa.Column('from_user_id', sa.Integer(), nullable=True)) + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.drop_column('task', 'from_user_id') + # ### end Alembic commands ### diff --git a/api/api/api_v1/schemas/task.py b/api/api/api_v1/schemas/task.py index f652446..75877b9 100644 --- a/api/api/api_v1/schemas/task.py +++ b/api/api/api_v1/schemas/task.py @@ -40,6 +40,7 @@ class TaskSimpleSchema(BaseOrmModel): status: TaskStatus url: str source: TaskSource + from_user_id: Optional[int] message_id: Optional[int] yt_dlp_version: Optional[str] error: Optional[str] diff --git a/bot/core/bot/bot.py b/bot/core/bot/bot.py index bd92fbe..ac83ee9 100644 --- a/bot/core/bot/bot.py +++ b/bot/core/bot/bot.py @@ -5,6 +5,7 @@ from pyrogram import Client from core.config.config import get_main_config from core.tasks.manager import RabbitWorkerManager from core.tasks.ytdlp import YtdlpNewVersionNotifyTask +from core.utils import bold from yt_shared.rabbit import get_rabbitmq from yt_shared.task_utils.tasks import create_task @@ -43,7 +44,7 @@ class VideoBot(Client): """Send welcome message after bot launch.""" self._log.info('Sending welcome message') await self.send_message_all( - f'{(await self.get_me()).first_name} bot started, paste video URL to ' + f'✨ {bold((await self.get_me()).first_name)} started, paste video URL to ' f'start download' ) diff --git a/bot/core/callbacks.py b/bot/core/callbacks.py index 907187b..d0d2f19 100644 --- a/bot/core/callbacks.py +++ b/bot/core/callbacks.py @@ -1,12 +1,13 @@ import logging -from pyrogram import Client from pyrogram.enums import ParseMode from pyrogram.types import Message +from core.bot import VideoBot from core.service import URLService from core.utils import bold from yt_shared.emoji import SUCCESS_EMOJI +from yt_shared.schemas.url import URL class TelegramCallback: @@ -17,11 +18,14 @@ class TelegramCallback: self._log = logging.getLogger(self.__class__.__name__) self._url_service = URLService() - async def on_message(self, client: Client, message: Message) -> None: + async def on_message(self, client: VideoBot, message: Message) -> None: """Receive video URL and send to the download worker.""" - is_sent = await self._url_service.process_url(message.text, message.id) + url = URL( + url=message.text, from_user_id=message.from_user.id, message_id=message.id + ) + is_sent = await self._url_service.process_url(url) await message.reply( - self._MSG_SEND_OK if is_sent else self._MSG_SEND_FAIL, + self._MSG_SEND_OK.format(url=url.url) if is_sent else self._MSG_SEND_FAIL, parse_mode=ParseMode.HTML, reply_to_message_id=message.id, ) diff --git a/bot/core/service.py b/bot/core/service.py index 5d5bbc1..51590a3 100644 --- a/bot/core/service.py +++ b/bot/core/service.py @@ -2,6 +2,7 @@ import logging from yt_shared.constants import TaskSource from yt_shared.rabbit.publisher import Publisher +from yt_shared.schemas.url import URL from yt_shared.schemas.video import VideoPayload @@ -10,12 +11,17 @@ class URLService: self._log = logging.getLogger(self.__class__.__name__) self._publisher = Publisher() - async def process_url(self, url: str, message_id: int) -> bool: - return await self._send_to_worker(url, message_id) + async def process_url(self, url: URL) -> bool: + return await self._send_to_worker(url) - async def _send_to_worker(self, url: str, message_id: int) -> bool: - payload = VideoPayload(url=url, message_id=message_id, source=TaskSource.BOT) + async def _send_to_worker(self, url: URL) -> bool: + payload = VideoPayload( + url=url.url, + message_id=url.message_id, + from_user_id=url.from_user_id, + source=TaskSource.BOT, + ) is_sent = await self._publisher.send_for_download(payload) if not is_sent: - self._log.error('Failed to publish URL %s to message broker', url) + self._log.error('Failed to publish URL %s to message broker', url.url) return is_sent diff --git a/bot/core/tasks/rabbit_tasks.py b/bot/core/tasks/rabbit_tasks.py index d23dcc1..47de6dd 100644 --- a/bot/core/tasks/rabbit_tasks.py +++ b/bot/core/tasks/rabbit_tasks.py @@ -87,13 +87,12 @@ class SuccessResultWorkerTask(AbstractResultWorkerTask): SCHEMA_CLS = SuccessPayload async def _process_body(self, body: SuccessPayload) -> None: - process_coros = [self._send_success_text(body)] + await self._send_success_text(body) filepath: str = os.path.join(TMP_DOWNLOAD_PATH, body.filename) if self._eligible_for_upload(filepath): - process_coros.append(self._create_upload_task(body)) + await self._create_upload_task(body) else: self._log.warning('File %s will not be uploaded to Telegram', body.filename) - await asyncio.gather(*process_coros, return_exceptions=True) self._cleanup(filepath) def _cleanup(self, filepath: str) -> None: diff --git a/bot/core/tasks/upload.py b/bot/core/tasks/upload.py index 5d89926..f60bff6 100644 --- a/bot/core/tasks/upload.py +++ b/bot/core/tasks/upload.py @@ -6,6 +6,7 @@ from pyrogram.types import Message from tenacity import retry, stop_after_attempt, wait_fixed from core.tasks.abstract import AbstractTask +from core.utils import bold from yt_shared.config import TMP_DOWNLOAD_PATH from yt_shared.db import get_db from yt_shared.repositories.task import TaskRepository @@ -25,30 +26,30 @@ class UploadTask(AbstractTask): self._bot = bot async def run(self) -> None: - tg_msg = f'Uploading {self.filename}' + tg_msg = f'⬆️ {bold("Uploading")} {self.filename}' self._log.info(tg_msg) try: - await self._bot.send_message(self._bot.user_ids[0], tg_msg) + await self._bot.send_message(self._body.from_user_id, tg_msg) message = await self._upload_video_file() if message: await self._save_cache(message) except Exception: - self._log.exception('Something went wrong during video file upload.') + self._log.exception('Something went wrong during video file upload') @retry(wait=wait_fixed(10), stop=stop_after_attempt(10), reraise=True) async def _upload_video_file(self) -> Optional[Message]: + user_id = self._body.from_user_id try: - await self._bot.send_chat_action( - self._bot.user_ids[0], action=ChatAction.UPLOAD_VIDEO - ) + self._log.info('Uploading for user id %s', user_id) + await self._bot.send_chat_action(user_id, action=ChatAction.UPLOAD_VIDEO) return await self._bot.send_video( - chat_id=self._bot.user_ids[0], + chat_id=user_id, file_name=self.filename, video=self.filepath, - reply_to_message_id=self._body.message_id, ) except Exception: self._log.exception('Failed to upload %s', self.filename) + raise async def _save_cache(self, message: Message) -> None: self._log.debug('Saving telegram file cache - %s', message) diff --git a/bot/core/tasks/ytdlp.py b/bot/core/tasks/ytdlp.py index 08adca3..136bbde 100644 --- a/bot/core/tasks/ytdlp.py +++ b/bot/core/tasks/ytdlp.py @@ -64,7 +64,7 @@ class YtdlpNewVersionNotifyTask(AbstractTask): """Send startup message that yt-dlp version is up to date.""" text = ( f'{INFORMATION_EMOJI} Your {code("yt-dlp")} version ' - f'{bold(ctx.current.version)} is up to date. Have fun.' + f'{bold(ctx.current.version)} is up to date, have fun' ) await self._send_to_chat(text) diff --git a/start.sh b/start.sh index ff0871c..3d3dc94 100644 --- a/start.sh +++ b/start.sh @@ -1,10 +1,20 @@ #!/bin/bash -while ! nc -z rabbitmq "${RABBITMQ_PORT}" +check_reachability() { +while ! nc -z "$1" "${!2}" do - echo "Waiting for RabbitMQ to be reachable on port ${RABBITMQ_PORT}" + echo "Waiting for $3 to be reachable on port ${!2}" sleep 1 done +echo "Connection to $3 on port ${!2} verified" +return 0 +} -echo "Connection to RabbitMQ on port ${RABBITMQ_PORT} verified" + +wait_for_services_to_be_reachable() { + check_reachability rabbitmq RABBITMQ_PORT RabbitMQ + check_reachability postgres POSTGRES_PORT PostgreSQL +} + +wait_for_services_to_be_reachable exit 0 diff --git a/worker/core/callbacks.py b/worker/core/callbacks.py index 532346d..156a07e 100644 --- a/worker/core/callbacks.py +++ b/worker/core/callbacks.py @@ -7,7 +7,7 @@ from yt_shared.db import get_db from yt_shared.schemas.video import VideoPayload -class _Callbacks: +class _RMQCallbacks: """RabbitMQ callbacks.""" def __init__(self) -> None: @@ -35,4 +35,4 @@ class _Callbacks: await message.reject(requeue=False) -callbacks = _Callbacks() +rmq_callbacks = _RMQCallbacks() diff --git a/worker/core/launcher.py b/worker/core/launcher.py index 3bebad6..97354eb 100644 --- a/worker/core/launcher.py +++ b/worker/core/launcher.py @@ -3,7 +3,7 @@ import logging from yt_dlp import version as ytdlp_version -from core.callbacks import callbacks as cb +from core.callbacks import rmq_callbacks as cb from yt_shared.config import MAX_SIMULTANEOUS_DOWNLOADS from yt_shared.db import get_db from yt_shared.rabbit import get_rabbitmq diff --git a/worker/core/video_service.py b/worker/core/video_service.py index 9f125d7..2be2881 100644 --- a/worker/core/video_service.py +++ b/worker/core/video_service.py @@ -75,7 +75,12 @@ class VideoService: ) async def _send_finished_task(self, task: Task) -> None: - success_payload = SuccessPayload(task_id=task.id, filename=task.file.name) + success_payload = SuccessPayload( + task_id=task.id, + filename=task.file.name, + message_id=task.message_id, + from_user_id=task.from_user_id, + ) await self._publisher.send_download_finished(success_payload) async def _send_failed_task( @@ -88,6 +93,7 @@ class VideoService: err_payload = ErrorPayload( task_id=task.id, message_id=task.message_id, + from_user_id=video_payload.from_user_id, message='Download error', url=task.url, original_body=video_payload.dict(), diff --git a/worker/ytdl_opts/default.py b/worker/ytdl_opts/default.py index 038c317..8c0b284 100644 --- a/worker/ytdl_opts/default.py +++ b/worker/ytdl_opts/default.py @@ -9,5 +9,4 @@ YTDL_OPTS = { 'noplaylist': True, 'max_downloads': 1, 'concurrent_fragment_downloads': 2, - 'restrictfilenames': True, } diff --git a/yt_shared/yt_shared/config.py b/yt_shared/yt_shared/config.py index a4f4f78..e6e3766 100644 --- a/yt_shared/yt_shared/config.py +++ b/yt_shared/yt_shared/config.py @@ -36,5 +36,5 @@ MAX_SIMULTANEOUS_DOWNLOADS = int(os.getenv('MAX_SIMULTANEOUS_DOWNLOADS', 10)) YTDLP_VERSION_CHECK_INTERVAL = int(os.getenv('YTDLP_VERSION_CHECK_INTERVAL', 86400)) SAVE_VIDEO_FILE = get_env_bool(os.getenv('SAVE_VIDEO_FILE', True)) -UPLOAD_VIDEO_FILE = get_env_bool(os.getenv('UPLOAD_VIDEO_FILE', True)) +UPLOAD_VIDEO_FILE = get_env_bool(os.getenv('UPLOAD_VIDEO_FILE', False)) MAX_UPLOAD_VIDEO_FILE_SIZE = int(os.getenv('MAX_UPLOAD_VIDEO_FILE_SIZE', 2147483648)) diff --git a/yt_shared/yt_shared/models/task.py b/yt_shared/yt_shared/models/task.py index bb1fa82..5f4991d 100644 --- a/yt_shared/yt_shared/models/task.py +++ b/yt_shared/yt_shared/models/task.py @@ -34,6 +34,7 @@ class Task(Base, Timestamp): cascade='all, delete-orphan', ) added_at = sa.Column(sa.DateTime, nullable=False) + from_user_id = sa.Column(sa.Integer, nullable=True) message_id = sa.Column(sa.Integer, nullable=True) error = sa.Column(sa.String, nullable=True) yt_dlp_version = sa.Column( diff --git a/yt_shared/yt_shared/schemas/error.py b/yt_shared/yt_shared/schemas/error.py index b7c0b4d..0088cd9 100644 --- a/yt_shared/yt_shared/schemas/error.py +++ b/yt_shared/yt_shared/schemas/error.py @@ -8,6 +8,7 @@ from yt_shared.schemas.base import RealBaseModel class ErrorPayload(RealBaseModel): task_id: uuid.UUID + from_user_id: Optional[int] message_id: Optional[int] message: StrictStr url: StrictStr diff --git a/yt_shared/yt_shared/schemas/success.py b/yt_shared/yt_shared/schemas/success.py index 9adb67a..eeb30a1 100644 --- a/yt_shared/yt_shared/schemas/success.py +++ b/yt_shared/yt_shared/schemas/success.py @@ -8,5 +8,6 @@ from yt_shared.schemas.base import RealBaseModel class SuccessPayload(RealBaseModel): task_id: uuid.UUID + from_user_id: Optional[int] message_id: Optional[int] filename: StrictStr diff --git a/yt_shared/yt_shared/schemas/url.py b/yt_shared/yt_shared/schemas/url.py new file mode 100644 index 0000000..97b88ac --- /dev/null +++ b/yt_shared/yt_shared/schemas/url.py @@ -0,0 +1,9 @@ +from pydantic import StrictStr + +from yt_shared.schemas.base import RealBaseModel + + +class URL(RealBaseModel): + url: StrictStr + from_user_id: int + message_id: int diff --git a/yt_shared/yt_shared/schemas/video.py b/yt_shared/yt_shared/schemas/video.py index 08c09ef..deea433 100644 --- a/yt_shared/yt_shared/schemas/video.py +++ b/yt_shared/yt_shared/schemas/video.py @@ -10,6 +10,7 @@ from yt_shared.schemas.base import RealBaseModel class VideoPayload(RealBaseModel): id: Optional[uuid.UUID] + from_user_id: Optional[int] message_id: Optional[int] url: StrictStr source: TaskSource