Polish logic

This commit is contained in:
Taras Terletskyi 2022-06-13 23:46:54 +03:00
parent a4702c4f2d
commit 1b7014d245
21 changed files with 102 additions and 32 deletions

1
.gitignore vendored
View file

@ -1,6 +1,7 @@
# Byte-compiled / optimized / DLL files
*.session
*.session-journal
config.yml
config.yaml

View file

@ -34,7 +34,7 @@ environment variable in `.env_common` file to `True`.
Simple as `docker-compose up -d`.
Your telegram bot should send you a startup message:
`<YOUR_BOT_NAME> bot started, paste video URL to start download` and that's it.
`<YOUR_BOT_NAME> started, paste video URL to start download` and that's it.
After pasting video URL bot will send you appropriate message whether it was downloaded or something went wrong.

View file

@ -0,0 +1,29 @@
"""empty message
Revision ID: 8021be777d1d
Revises: ff03785a1f0d
Create Date: 2022-06-13 20:16:10.845151
"""
from alembic import op
import sqlalchemy as sa
import sqlalchemy_utils
# revision identifiers, used by Alembic.
revision = '8021be777d1d'
down_revision = 'ff03785a1f0d'
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('task', sa.Column('from_user_id', sa.Integer(), nullable=True))
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column('task', 'from_user_id')
# ### end Alembic commands ###

View file

@ -40,6 +40,7 @@ class TaskSimpleSchema(BaseOrmModel):
status: TaskStatus
url: str
source: TaskSource
from_user_id: Optional[int]
message_id: Optional[int]
yt_dlp_version: Optional[str]
error: Optional[str]

View file

@ -5,6 +5,7 @@ from pyrogram import Client
from core.config.config import get_main_config
from core.tasks.manager import RabbitWorkerManager
from core.tasks.ytdlp import YtdlpNewVersionNotifyTask
from core.utils import bold
from yt_shared.rabbit import get_rabbitmq
from yt_shared.task_utils.tasks import create_task
@ -43,7 +44,7 @@ class VideoBot(Client):
"""Send welcome message after bot launch."""
self._log.info('Sending welcome message')
await self.send_message_all(
f'{(await self.get_me()).first_name} bot started, paste video URL to '
f'{bold((await self.get_me()).first_name)} started, paste video URL to '
f'start download'
)

View file

@ -1,12 +1,13 @@
import logging
from pyrogram import Client
from pyrogram.enums import ParseMode
from pyrogram.types import Message
from core.bot import VideoBot
from core.service import URLService
from core.utils import bold
from yt_shared.emoji import SUCCESS_EMOJI
from yt_shared.schemas.url import URL
class TelegramCallback:
@ -17,11 +18,14 @@ class TelegramCallback:
self._log = logging.getLogger(self.__class__.__name__)
self._url_service = URLService()
async def on_message(self, client: Client, message: Message) -> None:
async def on_message(self, client: VideoBot, message: Message) -> None:
"""Receive video URL and send to the download worker."""
is_sent = await self._url_service.process_url(message.text, message.id)
url = URL(
url=message.text, from_user_id=message.from_user.id, message_id=message.id
)
is_sent = await self._url_service.process_url(url)
await message.reply(
self._MSG_SEND_OK if is_sent else self._MSG_SEND_FAIL,
self._MSG_SEND_OK.format(url=url.url) if is_sent else self._MSG_SEND_FAIL,
parse_mode=ParseMode.HTML,
reply_to_message_id=message.id,
)

View file

@ -2,6 +2,7 @@ import logging
from yt_shared.constants import TaskSource
from yt_shared.rabbit.publisher import Publisher
from yt_shared.schemas.url import URL
from yt_shared.schemas.video import VideoPayload
@ -10,12 +11,17 @@ class URLService:
self._log = logging.getLogger(self.__class__.__name__)
self._publisher = Publisher()
async def process_url(self, url: str, message_id: int) -> bool:
return await self._send_to_worker(url, message_id)
async def process_url(self, url: URL) -> bool:
return await self._send_to_worker(url)
async def _send_to_worker(self, url: str, message_id: int) -> bool:
payload = VideoPayload(url=url, message_id=message_id, source=TaskSource.BOT)
async def _send_to_worker(self, url: URL) -> bool:
payload = VideoPayload(
url=url.url,
message_id=url.message_id,
from_user_id=url.from_user_id,
source=TaskSource.BOT,
)
is_sent = await self._publisher.send_for_download(payload)
if not is_sent:
self._log.error('Failed to publish URL %s to message broker', url)
self._log.error('Failed to publish URL %s to message broker', url.url)
return is_sent

View file

@ -87,13 +87,12 @@ class SuccessResultWorkerTask(AbstractResultWorkerTask):
SCHEMA_CLS = SuccessPayload
async def _process_body(self, body: SuccessPayload) -> None:
process_coros = [self._send_success_text(body)]
await self._send_success_text(body)
filepath: str = os.path.join(TMP_DOWNLOAD_PATH, body.filename)
if self._eligible_for_upload(filepath):
process_coros.append(self._create_upload_task(body))
await self._create_upload_task(body)
else:
self._log.warning('File %s will not be uploaded to Telegram', body.filename)
await asyncio.gather(*process_coros, return_exceptions=True)
self._cleanup(filepath)
def _cleanup(self, filepath: str) -> None:

View file

@ -6,6 +6,7 @@ from pyrogram.types import Message
from tenacity import retry, stop_after_attempt, wait_fixed
from core.tasks.abstract import AbstractTask
from core.utils import bold
from yt_shared.config import TMP_DOWNLOAD_PATH
from yt_shared.db import get_db
from yt_shared.repositories.task import TaskRepository
@ -25,30 +26,30 @@ class UploadTask(AbstractTask):
self._bot = bot
async def run(self) -> None:
tg_msg = f'Uploading {self.filename}'
tg_msg = f'⬆️ {bold("Uploading")} {self.filename}'
self._log.info(tg_msg)
try:
await self._bot.send_message(self._bot.user_ids[0], tg_msg)
await self._bot.send_message(self._body.from_user_id, tg_msg)
message = await self._upload_video_file()
if message:
await self._save_cache(message)
except Exception:
self._log.exception('Something went wrong during video file upload.')
self._log.exception('Something went wrong during video file upload')
@retry(wait=wait_fixed(10), stop=stop_after_attempt(10), reraise=True)
async def _upload_video_file(self) -> Optional[Message]:
user_id = self._body.from_user_id
try:
await self._bot.send_chat_action(
self._bot.user_ids[0], action=ChatAction.UPLOAD_VIDEO
)
self._log.info('Uploading for user id %s', user_id)
await self._bot.send_chat_action(user_id, action=ChatAction.UPLOAD_VIDEO)
return await self._bot.send_video(
chat_id=self._bot.user_ids[0],
chat_id=user_id,
file_name=self.filename,
video=self.filepath,
reply_to_message_id=self._body.message_id,
)
except Exception:
self._log.exception('Failed to upload %s', self.filename)
raise
async def _save_cache(self, message: Message) -> None:
self._log.debug('Saving telegram file cache - %s', message)

View file

@ -64,7 +64,7 @@ class YtdlpNewVersionNotifyTask(AbstractTask):
"""Send startup message that yt-dlp version is up to date."""
text = (
f'{INFORMATION_EMOJI} Your {code("yt-dlp")} version '
f'{bold(ctx.current.version)} is up to date. Have fun.'
f'{bold(ctx.current.version)} is up to date, have fun'
)
await self._send_to_chat(text)

View file

@ -1,10 +1,20 @@
#!/bin/bash
while ! nc -z rabbitmq "${RABBITMQ_PORT}"
check_reachability() {
while ! nc -z "$1" "${!2}"
do
echo "Waiting for RabbitMQ to be reachable on port ${RABBITMQ_PORT}"
echo "Waiting for $3 to be reachable on port ${!2}"
sleep 1
done
echo "Connection to $3 on port ${!2} verified"
return 0
}
echo "Connection to RabbitMQ on port ${RABBITMQ_PORT} verified"
wait_for_services_to_be_reachable() {
check_reachability rabbitmq RABBITMQ_PORT RabbitMQ
check_reachability postgres POSTGRES_PORT PostgreSQL
}
wait_for_services_to_be_reachable
exit 0

View file

@ -7,7 +7,7 @@ from yt_shared.db import get_db
from yt_shared.schemas.video import VideoPayload
class _Callbacks:
class _RMQCallbacks:
"""RabbitMQ callbacks."""
def __init__(self) -> None:
@ -35,4 +35,4 @@ class _Callbacks:
await message.reject(requeue=False)
callbacks = _Callbacks()
rmq_callbacks = _RMQCallbacks()

View file

@ -3,7 +3,7 @@ import logging
from yt_dlp import version as ytdlp_version
from core.callbacks import callbacks as cb
from core.callbacks import rmq_callbacks as cb
from yt_shared.config import MAX_SIMULTANEOUS_DOWNLOADS
from yt_shared.db import get_db
from yt_shared.rabbit import get_rabbitmq

View file

@ -75,7 +75,12 @@ class VideoService:
)
async def _send_finished_task(self, task: Task) -> None:
success_payload = SuccessPayload(task_id=task.id, filename=task.file.name)
success_payload = SuccessPayload(
task_id=task.id,
filename=task.file.name,
message_id=task.message_id,
from_user_id=task.from_user_id,
)
await self._publisher.send_download_finished(success_payload)
async def _send_failed_task(
@ -88,6 +93,7 @@ class VideoService:
err_payload = ErrorPayload(
task_id=task.id,
message_id=task.message_id,
from_user_id=video_payload.from_user_id,
message='Download error',
url=task.url,
original_body=video_payload.dict(),

View file

@ -9,5 +9,4 @@ YTDL_OPTS = {
'noplaylist': True,
'max_downloads': 1,
'concurrent_fragment_downloads': 2,
'restrictfilenames': True,
}

View file

@ -36,5 +36,5 @@ MAX_SIMULTANEOUS_DOWNLOADS = int(os.getenv('MAX_SIMULTANEOUS_DOWNLOADS', 10))
YTDLP_VERSION_CHECK_INTERVAL = int(os.getenv('YTDLP_VERSION_CHECK_INTERVAL', 86400))
SAVE_VIDEO_FILE = get_env_bool(os.getenv('SAVE_VIDEO_FILE', True))
UPLOAD_VIDEO_FILE = get_env_bool(os.getenv('UPLOAD_VIDEO_FILE', True))
UPLOAD_VIDEO_FILE = get_env_bool(os.getenv('UPLOAD_VIDEO_FILE', False))
MAX_UPLOAD_VIDEO_FILE_SIZE = int(os.getenv('MAX_UPLOAD_VIDEO_FILE_SIZE', 2147483648))

View file

@ -34,6 +34,7 @@ class Task(Base, Timestamp):
cascade='all, delete-orphan',
)
added_at = sa.Column(sa.DateTime, nullable=False)
from_user_id = sa.Column(sa.Integer, nullable=True)
message_id = sa.Column(sa.Integer, nullable=True)
error = sa.Column(sa.String, nullable=True)
yt_dlp_version = sa.Column(

View file

@ -8,6 +8,7 @@ from yt_shared.schemas.base import RealBaseModel
class ErrorPayload(RealBaseModel):
task_id: uuid.UUID
from_user_id: Optional[int]
message_id: Optional[int]
message: StrictStr
url: StrictStr

View file

@ -8,5 +8,6 @@ from yt_shared.schemas.base import RealBaseModel
class SuccessPayload(RealBaseModel):
task_id: uuid.UUID
from_user_id: Optional[int]
message_id: Optional[int]
filename: StrictStr

View file

@ -0,0 +1,9 @@
from pydantic import StrictStr
from yt_shared.schemas.base import RealBaseModel
class URL(RealBaseModel):
url: StrictStr
from_user_id: int
message_id: int

View file

@ -10,6 +10,7 @@ from yt_shared.schemas.base import RealBaseModel
class VideoPayload(RealBaseModel):
id: Optional[uuid.UUID]
from_user_id: Optional[int]
message_id: Optional[int]
url: StrictStr
source: TaskSource