From b7d18545286c41132351f42b691f92c96b7b6948 Mon Sep 17 00:00:00 2001 From: Taras Terletskyi <888784+tropicoo@users.noreply.github.com> Date: Thu, 13 Oct 2022 22:55:18 +0300 Subject: [PATCH] Refinements --- .github/dependabot.yml | 6 ++ README.md | 30 ++++----- alembic/env.py | 12 ++-- api/Dockerfile | 24 ++++--- api/core/app.py | 5 +- api/core/config.py | 8 +++ api/core/settings.py | 8 --- api/requirements.txt | 7 +- bot/Dockerfile | 27 +++++--- bot/core/bot/bot.py | 25 ++------ bot/core/bot/launcher.py | 19 +++--- bot/core/config/__init__.py | 6 ++ bot/core/config/config.py | 11 ++++ bot/core/config/schema.py | 2 +- bot/core/tasks/rabbit_tasks.py | 33 +++++----- bot/core/tasks/upload.py | 10 +-- bot/core/tasks/ytdlp.py | 11 ++-- bot/requirements.txt | 5 +- docker-compose.yml | 4 +- envs/.env_bot | 2 + envs/.env_common | 4 -- envs/.env_worker | 2 + worker/Dockerfile | 25 +++++--- worker/core/config.py | 11 ++++ worker/core/launcher.py | 9 ++- worker/core/tasks/abstract.py | 12 +--- worker/core/video_service.py | 12 ++-- worker/requirements.txt | 4 +- worker/ytdl_opts/default.py | 6 +- yt_shared/requirements_shared.txt | 13 ++-- yt_shared/yt_shared/config.py | 64 ++++++++++--------- yt_shared/yt_shared/db.py | 15 ++--- yt_shared/yt_shared/rabbit/rabbit.py | 4 +- yt_shared/yt_shared/repositories/task.py | 6 +- yt_shared/yt_shared/schemas/base.py | 7 -- .../yt_shared/task_utils}/abstract.py | 1 + yt_shared/yt_shared/task_utils/tasks.py | 1 - 37 files changed, 238 insertions(+), 213 deletions(-) create mode 100644 .github/dependabot.yml create mode 100644 api/core/config.py delete mode 100644 api/core/settings.py create mode 100644 bot/core/config/__init__.py create mode 100644 worker/core/config.py rename {bot/core/tasks => yt_shared/yt_shared/task_utils}/abstract.py (88%) diff --git a/.github/dependabot.yml b/.github/dependabot.yml new file mode 100644 index 0000000..b01e4cf --- /dev/null +++ b/.github/dependabot.yml @@ -0,0 +1,6 @@ +version: 2 +updates: + - package-ecosystem: 'pip' + directory: '/' + schedule: + interval: 'daily' diff --git a/README.md b/README.md index c333ca1..39dfb24 100644 --- a/README.md +++ b/README.md @@ -8,27 +8,26 @@ Simple and reliable YouTube Download Telegram Bot. * Trigger video download by sending link to the Telegram bot or by API call * Upload downloaded videos to Telegram * Track download tasks in the database or API -* Everything is run in Docker containers ## ⚙ Quick Setup -1. Create Telegram bot using [BotFather](https://t.me/BotFather) and get your `token`. -2. [Get your own Telegram API key](https://my.telegram.org/apps) (`api_id` and `api_hash`) -3. Find your Telegram User ID [here](https://stackoverflow.com/questions/32683992/find-out-my-own-user-id-for-sending-a-message-with-telegram-api). -4. Copy `bot/config-example.yml` to `bot/config.yml`. -5. Write `token`, `api_id`, `api_hash` and your User ID to `bot/config.yml` by changing respective placeholders. -6. Check the default environment variables in `envs/.env_common` and change if needed. -7. Video storage path (`STORAGE_PATH` environment variable) is located in `envs/.env_common` file. +1. Create Telegram bot using [BotFather](https://t.me/BotFather) and get your `token` +2. [Get own Telegram API key](https://my.telegram.org/apps) (`api_id` and `api_hash`) +3. [Find your Telegram User ID](https://stackoverflow.com/questions/32683992/find-out-my-own-user-id-for-sending-a-message-with-telegram-api) +4. Copy `bot/config-example.yml` to `bot/config.yml` +5. Write `token`, `api_id`, `api_hash` and your User ID to `bot/config.yml` by changing respective placeholders +6. Check the default environment variables in `envs/.env_common` and change if needed +7. Video storage path (`STORAGE_PATH` environment variable) is located in the `envs/.env_worker` file By default, it's `/filestorage` path inside the container. What you want is to map the real path to this inside the `docker-compose.yml` file for `worker` service e.g. if you're on Windows, next strings mean container path `/filestorage` is mapped to real `D:/Videos` so your videos will be saved to your `Videos` folder. + ```yml + worker: + ... + volumes: + - "D:/Videos:/filestorage" + ``` 8. If you want your downloaded video to be uploaded back to Telegram, set `UPLOAD_VIDEO_FILE` -environment variable in `.env_common` file to `True`. -```yml - worker: - ... - volumes: - - "D:/Videos:/filestorage" -``` +environment variable in the `.env_bot` file to `True` ## 🏃 Run Simple as `docker-compose up -d`. @@ -39,7 +38,6 @@ After pasting video URL bot will send you appropriate message whether it was dow ## Advanced setup - 1. If you want to change `yt-dlp` download options, go to the `worker/ytdl_opts` directory, copy content from `default.py` to `user.py` and modify as you wish by checking [official documentation](https://github.com/timethrow/yt-dlp/blob/patch-1/README.md#embedding-yt-dlp). diff --git a/alembic/env.py b/alembic/env.py index 4c976ba..c99557c 100644 --- a/alembic/env.py +++ b/alembic/env.py @@ -7,12 +7,12 @@ from sqlalchemy.ext.asyncio import AsyncEngine # this is the Alembic Config object, which provides # access to the values within the .ini file in use. -from yt_shared.config import SQLALCHEMY_DATABASE_URI_ASYNC +from yt_shared.config import settings from yt_shared.db import Base from yt_shared.models import * # noqa config = context.config -config.set_main_option("sqlalchemy.url", SQLALCHEMY_DATABASE_URI_ASYNC) +config.set_main_option('sqlalchemy.url', settings.SQLALCHEMY_DATABASE_URI_ASYNC) # Interpret the config file for Python logging. # This line sets up loggers basically. @@ -27,7 +27,7 @@ target_metadata = Base.metadata # other values from the config, defined by the needs of env.py, # can be acquired: -# my_important_option = config.get_main_option("my_important_option") +# my_important_option = config.get_main_option('my_important_option') # ... etc. @@ -43,12 +43,12 @@ def run_migrations_offline(): script output. """ - url = config.get_main_option("sqlalchemy.url") + url = config.get_main_option('sqlalchemy.url') context.configure( url=url, target_metadata=target_metadata, literal_binds=True, - dialect_opts={"paramstyle": "named"}, + dialect_opts={'paramstyle': 'named'}, ) with context.begin_transaction(): @@ -72,7 +72,7 @@ async def run_migrations_online(): connectable = AsyncEngine( engine_from_config( config.get_section(config.config_ini_section), - prefix="sqlalchemy.", + prefix='sqlalchemy.', poolclass=pool.NullPool, future=True, ) diff --git a/api/Dockerfile b/api/Dockerfile index 0459294..424b46f 100644 --- a/api/Dockerfile +++ b/api/Dockerfile @@ -1,21 +1,25 @@ -FROM python:3.10-slim-buster +FROM python:3.10-alpine -RUN apt update \ - && apt install --yes --no-install-recommends \ - bash htop net-tools iputils-ping procps netcat \ - && rm -rf /var/lib/apt/lists/* +RUN apk add --no-cache \ + tzdata \ + htop \ + bash \ + netcat-openbsd COPY ./api/requirements.txt ./yt_shared/requirements_shared.txt /app/ WORKDIR /app -RUN apt update \ - && apt install --yes gcc g++ libffi-dev libjpeg-dev zlib1g-dev python3-dev build-essential libtool automake \ +RUN apk add --no-cache --virtual .build-deps \ + linux-headers \ + libffi-dev \ + zlib-dev \ + build-base \ && pip install --upgrade pip setuptools wheel \ + && MAKEFLAGS="-j$(nproc)" \ && pip install --no-cache-dir -r requirements.txt -r requirements_shared.txt \ - && apt-get autoremove --yes gcc g++ libffi-dev libjpeg-dev zlib1g-dev python3-dev build-essential libtool automake \ - && rm -rf /var/lib/apt/lists/* \ - && rm requirements_shared.txt + && rm requirements_shared.txt \ + && apk --purge del .build-deps COPY ./api /app diff --git a/api/core/app.py b/api/core/app.py index e059c5f..7c665a6 100644 --- a/api/core/app.py +++ b/api/core/app.py @@ -1,11 +1,12 @@ -import aioredis from fastapi import FastAPI from fastapi.middleware.gzip import GZipMiddleware from fastapi_cache import FastAPICache from fastapi_cache.backends.redis import RedisBackend +from redis import asyncio as aioredis from api.api_v1.urls import v1_router from api.root.endpoints.healthcheck import healthcheck_router +from core.config import settings from core.constants import GZIP_MIN_SIZE from yt_shared.rabbit import get_rabbitmq @@ -28,7 +29,7 @@ app = create_app() @app.on_event('startup') async def startup_event(): redis = aioredis.from_url( - 'redis://yt_redis', encoding='utf8', decode_responses=True + settings.REDIS_URL, encoding='utf8', decode_responses=True ) FastAPICache.init(RedisBackend(redis), prefix='fastapi-cache') await get_rabbitmq().register() diff --git a/api/core/config.py b/api/core/config.py new file mode 100644 index 0000000..e2cb618 --- /dev/null +++ b/api/core/config.py @@ -0,0 +1,8 @@ +from yt_shared.config import Settings + + +class ApiSettings(Settings): + pass + + +settings = ApiSettings() diff --git a/api/core/settings.py b/api/core/settings.py deleted file mode 100644 index 4e3a5eb..0000000 --- a/api/core/settings.py +++ /dev/null @@ -1,8 +0,0 @@ -from pydantic import BaseSettings - - -class Settings(BaseSettings): - title: str = 'test app title or what' - - -settings = Settings() diff --git a/api/requirements.txt b/api/requirements.txt index 7a8cba0..85e17c6 100644 --- a/api/requirements.txt +++ b/api/requirements.txt @@ -1,4 +1,3 @@ -fastapi-cache2[redis]==0.1.8 -fastapi==0.78.0 -uvicorn==0.17.6 -uvloop==0.16.0 +fastapi-cache2[redis]==0.1.9 +fastapi==0.85.0 +uvicorn==0.18.3 diff --git a/bot/Dockerfile b/bot/Dockerfile index 9ba1beb..6ac34f7 100644 --- a/bot/Dockerfile +++ b/bot/Dockerfile @@ -1,21 +1,28 @@ -FROM python:3.10-slim-buster +FROM python:3.10-alpine -RUN apt update \ - && apt install --yes --no-install-recommends \ - bash htop net-tools iputils-ping procps netcat \ - && rm -rf /var/lib/apt/lists/* +RUN apk add --no-cache \ + tzdata \ + htop \ + bash \ + netcat-openbsd COPY ./bot/requirements.txt ./yt_shared/requirements_shared.txt /app/ WORKDIR /app -RUN apt update \ - && apt install --yes gcc g++ libffi-dev libjpeg-dev zlib1g-dev python3-dev build-essential libtool automake \ +RUN apk add --no-cache --virtual .build-deps \ + linux-headers \ + libffi-dev \ + zlib-dev \ + build-base \ + musl-dev \ + openssl-dev \ + python3-dev \ && pip install --upgrade pip setuptools wheel \ + && MAKEFLAGS="-j$(nproc)" \ && pip install --no-cache-dir -r requirements.txt -r requirements_shared.txt \ - && apt-get autoremove --yes gcc g++ libffi-dev libjpeg-dev zlib1g-dev python3-dev build-essential libtool automake \ - && rm -rf /var/lib/apt/lists/* \ - && rm requirements_shared.txt + && rm requirements_shared.txt \ + && apk --purge del .build-deps COPY ./bot /app diff --git a/bot/core/bot/bot.py b/bot/core/bot/bot.py index 314ae6c..c70ef46 100644 --- a/bot/core/bot/bot.py +++ b/bot/core/bot/bot.py @@ -1,14 +1,11 @@ +import asyncio import logging from pyrogram import Client from pyrogram.enums import ParseMode from core.config.config import get_main_config -from core.tasks.manager import RabbitWorkerManager -from core.tasks.ytdlp import YtdlpNewVersionNotifyTask from core.utils import bold -from yt_shared.rabbit import get_rabbitmq -from yt_shared.task_utils.tasks import create_task class VideoBot(Client): @@ -26,20 +23,12 @@ class VideoBot(Client): self._log.info('Initializing bot client') self.user_ids: list[int] = conf.telegram.allowed_user_ids - self.rabbit_mq = get_rabbitmq() - self.rabbit_worker_manager = RabbitWorkerManager(bot=self) - async def start_tasks(self): - await self.rabbit_worker_manager.start_worker_tasks() - - task_name = YtdlpNewVersionNotifyTask.__class__.__name__ - create_task( - YtdlpNewVersionNotifyTask(bot=self).run(), - task_name=task_name, - logger=self._log, - exception_message='Task %s raised an exception', - exception_message_args=(task_name,), - ) + @staticmethod + async def run_forever() -> None: + """Firstly 'await bot.start()' should be called.""" + while True: + await asyncio.sleep(86400) async def send_startup_message(self) -> None: """Send welcome message after bot launch.""" @@ -58,5 +47,5 @@ class VideoBot(Client): await self.send_message(user_id, text, parse_mode=parse_mode) except Exception: self._log.exception( - 'Failed to send message "%s" to user ID ' '%s', text, user_id + 'Failed to send message "%s" to user ID %s', text, user_id ) diff --git a/bot/core/bot/launcher.py b/bot/core/bot/launcher.py index 063510c..fbaf7ea 100644 --- a/bot/core/bot/launcher.py +++ b/bot/core/bot/launcher.py @@ -1,4 +1,3 @@ -import asyncio import logging from pyrogram import filters @@ -7,6 +6,7 @@ from pyrogram.handlers import MessageHandler from core.bot import VideoBot from core.callbacks import TelegramCallback from core.config.config import get_main_config +from core.tasks.manager import RabbitWorkerManager from yt_shared.rabbit import get_rabbitmq from yt_shared.task_utils.tasks import create_task @@ -24,14 +24,15 @@ class BotLauncher: logging.getLogger().setLevel(get_main_config().log_level) self._bot = VideoBot() self._rabbit_mq = get_rabbitmq() + self._rabbit_worker_manager = RabbitWorkerManager(bot=self._bot) async def run(self) -> None: """Run bot.""" await self._setup_rabbit() - await self._setup_handlers() + self._setup_handlers() await self._start_bot() - async def _setup_handlers(self): + def _setup_handlers(self): cb = TelegramCallback() self._bot.add_handler( MessageHandler( @@ -60,16 +61,14 @@ class BotLauncher: exception_message_args=(task_name,), ) + async def _start_tasks(self): + await self._rabbit_worker_manager.start_worker_tasks() + async def _start_bot(self) -> None: """Start telegram bot and related processes.""" await self._bot.start() self._log.info('Starting %s bot', (await self._bot.get_me()).first_name) - await self._bot.start_tasks() + await self._start_tasks() await self._bot.send_startup_message() - await self._run_bot_forever() - - @staticmethod - async def _run_bot_forever() -> None: - while True: - await asyncio.sleep(86400) + await self._bot.run_forever() diff --git a/bot/core/config/__init__.py b/bot/core/config/__init__.py new file mode 100644 index 0000000..4c87c34 --- /dev/null +++ b/bot/core/config/__init__.py @@ -0,0 +1,6 @@ +from core.config.config import settings + + +__all__ = [ + 'settings', +] diff --git a/bot/core/config/config.py b/bot/core/config/config.py index d56adc0..9aed77c 100644 --- a/bot/core/config/config.py +++ b/bot/core/config/config.py @@ -10,6 +10,7 @@ from pydantic import ValidationError from core.config.schema import ConfigSchema from core.exceptions import ConfigError +from yt_shared.config import Settings class ConfigLoader: @@ -58,3 +59,13 @@ _CONF_MAIN = config_loader.load_config() def get_main_config() -> ConfigSchema: return _CONF_MAIN + + +class BotSettings(Settings): + YTDLP_VERSION_CHECK_INTERVAL: int + + UPLOAD_VIDEO_FILE: bool + MAX_UPLOAD_VIDEO_FILE_SIZE: int + + +settings = BotSettings() diff --git a/bot/core/config/schema.py b/bot/core/config/schema.py index 800c8ba..823597f 100644 --- a/bot/core/config/schema.py +++ b/bot/core/config/schema.py @@ -5,7 +5,7 @@ from yt_shared.schemas.base import RealBaseModel _LOG_LEVELS = {'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'} _LANG_CODE_LEN = 2 -_LANG_CODE_REGEX = fr'^[a-z]{{{_LANG_CODE_LEN}}}$' +_LANG_CODE_REGEX = rf'^[a-z]{{{_LANG_CODE_LEN}}}$' class TelegramSchema(RealBaseModel): diff --git a/bot/core/tasks/rabbit_tasks.py b/bot/core/tasks/rabbit_tasks.py index 1805cc8..ea87a1f 100644 --- a/bot/core/tasks/rabbit_tasks.py +++ b/bot/core/tasks/rabbit_tasks.py @@ -1,5 +1,4 @@ import abc -import asyncio import enum import os from typing import Optional, TYPE_CHECKING, Type @@ -8,20 +7,16 @@ from aio_pika import IncomingMessage from pydantic import BaseModel from pyrogram.enums import ParseMode +from core.config import settings from core.exceptions import InvalidBodyError -from core.tasks.abstract import AbstractTask from core.tasks.upload import UploadTask from core.utils import bold -from yt_shared.config import ( - MAX_UPLOAD_VIDEO_FILE_SIZE, - TMP_DOWNLOAD_PATH, - UPLOAD_VIDEO_FILE, -) from yt_shared.emoji import SUCCESS_EMOJI from yt_shared.rabbit import get_rabbitmq from yt_shared.rabbit.rabbit_config import ERROR_QUEUE, SUCCESS_QUEUE from yt_shared.schemas.error import ErrorPayload from yt_shared.schemas.success import SuccessPayload +from yt_shared.task_utils.abstract import AbstractTask from yt_shared.task_utils.tasks import create_task if TYPE_CHECKING: @@ -88,8 +83,8 @@ class SuccessResultWorkerTask(AbstractResultWorkerTask): async def _process_body(self, body: SuccessPayload) -> None: await self._send_success_text(body) - video_path: str = os.path.join(TMP_DOWNLOAD_PATH, body.filename) - thumb_path: str = os.path.join(TMP_DOWNLOAD_PATH, body.thumb_name) + video_path: str = os.path.join(settings.TMP_DOWNLOAD_PATH, body.filename) + thumb_path: str = os.path.join(settings.TMP_DOWNLOAD_PATH, body.thumb_name) if self._eligible_for_upload(video_path): await self._create_upload_task(body) else: @@ -106,8 +101,8 @@ class SuccessResultWorkerTask(AbstractResultWorkerTask): @staticmethod def _eligible_for_upload(video_path: str) -> bool: return ( - UPLOAD_VIDEO_FILE - and os.stat(video_path).st_size <= MAX_UPLOAD_VIDEO_FILE_SIZE + settings.UPLOAD_VIDEO_FILE + and os.stat(video_path).st_size <= settings.MAX_UPLOAD_VIDEO_FILE_SIZE ) async def _create_upload_task(self, body: SuccessPayload) -> None: @@ -123,12 +118,16 @@ class SuccessResultWorkerTask(AbstractResultWorkerTask): async def _send_success_text(self, body: SuccessPayload) -> None: text = f'{SUCCESS_EMOJI} Downloaded {bold(body.filename)}' - await self._bot.send_message( - chat_id=body.from_user_id, - reply_to_message_id=body.message_id, - text=text, - parse_mode=ParseMode.HTML, - ) + kwargs = { + 'text': text, + 'parse_mode': ParseMode.HTML, + } + if body.from_user_id: + kwargs['chat_id'] = body.from_user_id + if body.message_id: + kwargs['reply_to_message_id'] = body.message_id + + await self._bot.send_message(**kwargs) class ErrorResultWorkerTask(AbstractResultWorkerTask): diff --git a/bot/core/tasks/upload.py b/bot/core/tasks/upload.py index 290c906..440d330 100644 --- a/bot/core/tasks/upload.py +++ b/bot/core/tasks/upload.py @@ -5,9 +5,9 @@ from pyrogram.enums import ChatAction from pyrogram.types import Message from tenacity import retry, stop_after_attempt, wait_fixed -from core.tasks.abstract import AbstractTask +from core.config import settings +from yt_shared.task_utils.abstract import AbstractTask from core.utils import bold -from yt_shared.config import TMP_DOWNLOAD_PATH from yt_shared.db import get_db from yt_shared.repositories.task import TaskRepository from yt_shared.schemas.cache import CacheSchema @@ -22,8 +22,8 @@ class UploadTask(AbstractTask): super().__init__() self._body = body self.filename = body.filename - self.filepath = os.path.join(TMP_DOWNLOAD_PATH, self.filename) - self.thumb_path = os.path.join(TMP_DOWNLOAD_PATH, body.thumb_name) + self.filepath = os.path.join(settings.TMP_DOWNLOAD_PATH, self.filename) + self.thumb_path = os.path.join(settings.TMP_DOWNLOAD_PATH, body.thumb_name) self._bot = bot async def run(self) -> None: @@ -40,8 +40,8 @@ class UploadTask(AbstractTask): @retry(wait=wait_fixed(10), stop=stop_after_attempt(10), reraise=True) async def _upload_video_file(self) -> Optional[Message]: user_id = self._body.from_user_id + self._log.info('Uploading to user id %s', user_id) try: - self._log.info('Uploading for user id %s', user_id) await self._bot.send_chat_action(user_id, action=ChatAction.UPLOAD_VIDEO) return await self._bot.send_video( chat_id=user_id, diff --git a/bot/core/tasks/ytdlp.py b/bot/core/tasks/ytdlp.py index f4f9149..ff8f45b 100644 --- a/bot/core/tasks/ytdlp.py +++ b/bot/core/tasks/ytdlp.py @@ -2,14 +2,13 @@ import asyncio import datetime from typing import TYPE_CHECKING -from pyrogram.enums import ParseMode - -from core.tasks.abstract import AbstractTask from core.utils import bold, code -from yt_shared.config import YTDLP_VERSION_CHECK_INTERVAL + +from core.config import settings from yt_shared.db import get_db from yt_shared.emoji import INFORMATION_EMOJI from yt_shared.schemas.ytdlp import VersionContext +from yt_shared.task_utils.abstract import AbstractTask from yt_shared.ytdlp.version_checker import VersionChecker if TYPE_CHECKING: @@ -34,13 +33,13 @@ class YtdlpNewVersionNotifyTask(AbstractTask): 'Next yt-dlp version check planned at %s', self._get_next_check_datetime().isoformat(' '), ) - await asyncio.sleep(YTDLP_VERSION_CHECK_INTERVAL) + await asyncio.sleep(settings.YTDLP_VERSION_CHECK_INTERVAL) @staticmethod def _get_next_check_datetime() -> datetime.datetime: return ( datetime.datetime.now() - + datetime.timedelta(seconds=YTDLP_VERSION_CHECK_INTERVAL) + + datetime.timedelta(seconds=settings.YTDLP_VERSION_CHECK_INTERVAL) ).replace(microsecond=0) async def _notify_if_new_version(self) -> None: diff --git a/bot/requirements.txt b/bot/requirements.txt index e385a6f..68a449e 100644 --- a/bot/requirements.txt +++ b/bot/requirements.txt @@ -1,6 +1,5 @@ PyYAML==6.0 -Pyrogram==2.0.27 +Pyrogram==2.0.57 addict==2.4.0 -tenacity==8.0.1 +tenacity==8.1.0 tgcrypto==1.2.3 -uvloop==0.16.0 diff --git a/docker-compose.yml b/docker-compose.yml index 910aa70..d2c21ff 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,4 +1,4 @@ -version: "3" +version: "3.8" services: api: @@ -56,7 +56,7 @@ services: - "shared-tmpfs:/tmp/download_tmpfs" postgres: container_name: yt_postgres - image: postgres:14 + image: "postgres:14" env_file: - envs/.env_common ports: diff --git a/envs/.env_bot b/envs/.env_bot index a528fac..0b1ca92 100644 --- a/envs/.env_bot +++ b/envs/.env_bot @@ -1,2 +1,4 @@ APPLICATION_NAME=yt_bot YTDLP_VERSION_CHECK_INTERVAL=300 +UPLOAD_VIDEO_FILE=True +MAX_UPLOAD_VIDEO_FILE_SIZE=2147483648 diff --git a/envs/.env_common b/envs/.env_common index 57c74d3..3d62667 100644 --- a/envs/.env_common +++ b/envs/.env_common @@ -19,7 +19,3 @@ RABBITMQ_PORT=5672 REDIS_HOST=yt_redis TMP_DOWNLOAD_PATH=/tmp/download_tmpfs -STORAGE_PATH=/filestorage -SAVE_VIDEO_FILE=True -UPLOAD_VIDEO_FILE=False -MAX_UPLOAD_VIDEO_FILE_SIZE=2147483648 diff --git a/envs/.env_worker b/envs/.env_worker index 6fd7b99..dd8beaa 100644 --- a/envs/.env_worker +++ b/envs/.env_worker @@ -1,2 +1,4 @@ APPLICATION_NAME=yt_worker +SAVE_VIDEO_FILE=False MAX_SIMULTANEOUS_DOWNLOADS=10 +STORAGE_PATH=/filestorage diff --git a/worker/Dockerfile b/worker/Dockerfile index 07c38ae..db7f7e4 100644 --- a/worker/Dockerfile +++ b/worker/Dockerfile @@ -1,21 +1,26 @@ -FROM python:3.10-slim-buster +FROM python:3.10-alpine -RUN apt update \ - && apt install --yes --no-install-recommends \ - bash htop net-tools iputils-ping procps netcat ffmpeg \ - && rm -rf /var/lib/apt/lists/* +RUN apk add --no-cache \ + tzdata \ + htop \ + bash \ + netcat-openbsd \ + ffmpeg COPY ./worker/requirements.txt ./yt_shared/requirements_shared.txt /app/ WORKDIR /app -RUN apt update \ - && apt install --yes gcc g++ libffi-dev libjpeg-dev zlib1g-dev python3-dev build-essential libtool automake \ +RUN apk add --no-cache --virtual .build-deps \ + linux-headers \ + libffi-dev \ + zlib-dev \ + build-base \ && pip install --upgrade pip setuptools wheel \ + && MAKEFLAGS="-j$(nproc)" \ && pip install --no-cache-dir -r requirements.txt -r requirements_shared.txt \ - && apt-get autoremove --yes gcc g++ libffi-dev libjpeg-dev zlib1g-dev python3-dev build-essential libtool automake \ - && rm -rf /var/lib/apt/lists/* \ - && rm requirements_shared.txt + && rm requirements_shared.txt \ + && apk --purge del .build-deps COPY ./worker /app COPY ./alembic /app/alembic diff --git a/worker/core/config.py b/worker/core/config.py new file mode 100644 index 0000000..242cca2 --- /dev/null +++ b/worker/core/config.py @@ -0,0 +1,11 @@ +from yt_shared.config import Settings + + +class WorkerSettings(Settings): + APPLICATION_NAME: str + MAX_SIMULTANEOUS_DOWNLOADS: int + SAVE_VIDEO_FILE: bool + STORAGE_PATH: str + + +settings = WorkerSettings() diff --git a/worker/core/launcher.py b/worker/core/launcher.py index 97354eb..113df6a 100644 --- a/worker/core/launcher.py +++ b/worker/core/launcher.py @@ -4,7 +4,7 @@ import logging from yt_dlp import version as ytdlp_version from core.callbacks import rmq_callbacks as cb -from yt_shared.config import MAX_SIMULTANEOUS_DOWNLOADS +from core.config import settings from yt_shared.db import get_db from yt_shared.rabbit import get_rabbitmq from yt_shared.rabbit.rabbit_config import INPUT_QUEUE @@ -26,12 +26,17 @@ class WorkerLauncher: loop.run_until_complete(self.stop()) async def _start(self) -> None: + await self._perform_setup() + + async def _perform_setup(self) -> None: await asyncio.gather(self._setup_rabbit(), self._set_yt_dlp_version()) async def _setup_rabbit(self) -> None: self._log.info('Setting up RabbitMQ connection') await self._rabbit_mq.register() - await self._rabbit_mq.channel.set_qos(prefetch_count=MAX_SIMULTANEOUS_DOWNLOADS) + await self._rabbit_mq.channel.set_qos( + prefetch_count=settings.MAX_SIMULTANEOUS_DOWNLOADS + ) await self._rabbit_mq.queues[INPUT_QUEUE].consume(cb.on_input_message) async def _set_yt_dlp_version(self): diff --git a/worker/core/tasks/abstract.py b/worker/core/tasks/abstract.py index 9dfb212..b2b39a4 100644 --- a/worker/core/tasks/abstract.py +++ b/worker/core/tasks/abstract.py @@ -1,19 +1,18 @@ -import abc import asyncio -import logging import os import signal from typing import Optional +from yt_shared.task_utils.abstract import AbstractTask from yt_shared.utils import wrap -class AbstractFfBinaryTask(metaclass=abc.ABCMeta): +class AbstractFfBinaryTask(AbstractTask): _CMD: Optional[str] = None _CMD_TIMEOUT = 10 def __init__(self, file_path: str) -> None: - self._log = logging.getLogger(self.__class__.__name__) + super().__init__() self._file_path = file_path self._killpg = wrap(os.killpg) @@ -37,8 +36,3 @@ class AbstractFfBinaryTask(metaclass=abc.ABCMeta): async def _get_stdout_stderr(proc: asyncio.subprocess.Process) -> tuple[str, str]: stdout, stderr = await proc.stdout.read(), await proc.stderr.read() return stdout.decode().strip(), stderr.decode().strip() - - @abc.abstractmethod - async def run(self) -> None: - """Main entry point.""" - pass diff --git a/worker/core/video_service.py b/worker/core/video_service.py index 7868b45..0452d41 100644 --- a/worker/core/video_service.py +++ b/worker/core/video_service.py @@ -6,10 +6,10 @@ from typing import Optional from sqlalchemy.ext.asyncio import AsyncSession +from core.config import settings from core.downloader import VideoDownloader from core.tasks.ffprobe_context import GetFfprobeContextTask from core.tasks.thumbnail import MakeThumbnailTask -from yt_shared.config import SAVE_VIDEO_FILE, STORAGE_PATH, TMP_DOWNLOAD_PATH from yt_shared.constants import TaskStatus from yt_shared.models import Task from yt_shared.rabbit.publisher import Publisher @@ -39,15 +39,15 @@ class VideoService: async def _post_process_file( self, video: DownVideo, task: Task, db: AsyncSession ) -> None: - file_path = os.path.join(TMP_DOWNLOAD_PATH, video.name) - thumb_path = os.path.join(TMP_DOWNLOAD_PATH, video.thumb_name) + file_path = os.path.join(settings.TMP_DOWNLOAD_PATH, video.name) + thumb_path = os.path.join(settings.TMP_DOWNLOAD_PATH, video.thumb_name) post_process_coros = [ self._set_probe_ctx(file_path, video), MakeThumbnailTask(thumb_path, file_path).run(), ] - if SAVE_VIDEO_FILE: + if settings.SAVE_VIDEO_FILE: post_process_coros.append(self._copy_file_to_storage(video)) await asyncio.gather(*post_process_coros) @@ -69,8 +69,8 @@ class VideoService: @staticmethod async def _copy_file_to_storage(video: DownVideo) -> None: - src = os.path.join(TMP_DOWNLOAD_PATH, video.name) - dst = os.path.join(STORAGE_PATH, video.name) + src = os.path.join(settings.TMP_DOWNLOAD_PATH, video.name) + dst = os.path.join(settings.STORAGE_PATH, video.name) await asyncio.to_thread(shutil.copy2, src, dst) async def _start_download( diff --git a/worker/requirements.txt b/worker/requirements.txt index ebd3b37..52cb066 100644 --- a/worker/requirements.txt +++ b/worker/requirements.txt @@ -1,4 +1,4 @@ PyYAML==6.0 -alembic==1.8.0 -python-dotenv==0.20.0 +alembic==1.8.1 +python-dotenv==0.21.0 yt-dlp diff --git a/worker/ytdl_opts/default.py b/worker/ytdl_opts/default.py index 8c0b284..27e24b6 100644 --- a/worker/ytdl_opts/default.py +++ b/worker/ytdl_opts/default.py @@ -1,12 +1,12 @@ import os -from yt_shared.config import TMP_DOWNLOAD_PATH +from core.config import settings # More here https://github.com/yt-dlp/yt-dlp/blob/master/yt_dlp/options.py YTDL_OPTS = { - 'outtmpl': os.path.join(TMP_DOWNLOAD_PATH, '%(title)s.%(ext)s'), + 'outtmpl': os.path.join(settings.TMP_DOWNLOAD_PATH, '%(title)s.%(ext)s'), 'format': 'bestvideo[ext=mp4]+bestaudio[ext=m4a]/mp4', 'noplaylist': True, 'max_downloads': 1, - 'concurrent_fragment_downloads': 2, + 'concurrent_fragment_downloads': 5, } diff --git a/yt_shared/requirements_shared.txt b/yt_shared/requirements_shared.txt index ca9ee49..77eb255 100644 --- a/yt_shared/requirements_shared.txt +++ b/yt_shared/requirements_shared.txt @@ -1,7 +1,6 @@ -SQLAlchemy-Utils==0.38.2 -SQLAlchemy==1.4.37 -aio-pika==8.0.3 -aiohttp==3.8.1 -asyncpg==0.25.0 -orjson==3.7.2 -pydantic==1.9.1 +SQLAlchemy-Utils==0.38.3 +SQLAlchemy[asyncio]==1.4.41 +aio-pika==8.2.2 +aiohttp==3.8.3 +asyncpg==0.26.0 +pydantic==1.10.2 diff --git a/yt_shared/yt_shared/config.py b/yt_shared/yt_shared/config.py index e6e3766..2808d8f 100644 --- a/yt_shared/yt_shared/config.py +++ b/yt_shared/yt_shared/config.py @@ -1,40 +1,42 @@ -import os +from pydantic import BaseSettings, Field -from yt_shared.utils import get_env_bool -APPLICATION_NAME = os.getenv('APPLICATION_NAME', 'APPLICATION_NAME_NOT_SET') +class Settings(BaseSettings): + APPLICATION_NAME: str -POSTGRES_USER = os.getenv('POSTGRES_USER', 'yt') -POSTGRES_PASSWORD = os.getenv('POSTGRES_PASSWORD', 'yt') -POSTGRES_HOST = os.getenv('POSTGRES_HOST', 'localhost') -POSTGRES_PORT = os.getenv('POSTGRES_PORT', 5432) -POSTGRES_DB = os.getenv('POSTGRES_DB', 'yt') -POSTGRES_TEST_DB = os.getenv('POSTGRES_TEST_DB', 'yt_test') + POSTGRES_USER: str + POSTGRES_PASSWORD: str + POSTGRES_HOST: str + POSTGRES_PORT: int + POSTGRES_DB: str + POSTGRES_TEST_DB: str = Field(default='yt_test') -SQLALCHEMY_DATABASE_URI_ASYNC = f'postgresql+asyncpg://{POSTGRES_USER}:{POSTGRES_PASSWORD}@{POSTGRES_HOST}:{POSTGRES_PORT}/{POSTGRES_DB}' -SQLALCHEMY_ECHO = get_env_bool(os.getenv('SQLALCHEMY_ECHO', False)) -SQLALCHEMY_EXPIRE_ON_COMMIT = get_env_bool( - os.getenv('SQLALCHEMY_EXPIRE_ON_COMMIT', False) -) + @property + def SQLALCHEMY_DATABASE_URI_ASYNC(self) -> str: + return f'postgresql+asyncpg://{self.POSTGRES_USER}:{self.POSTGRES_PASSWORD}@{self.POSTGRES_HOST}:{self.POSTGRES_PORT}/{self.POSTGRES_DB}' -RABBITMQ_USER = os.getenv('RABBITMQ_USER', 'guest') -RABBITMQ_PASSWORD = os.getenv('RABBITMQ_PASSWORD', 'guest') -RABBITMQ_HOST = os.getenv('RABBITMQ_HOST', 'localhost') -RABBITMQ_PORT = os.getenv('RABBITMQ_PORT', 5672) -RABBITMQ_URI = ( - f'amqp://{RABBITMQ_USER}:{RABBITMQ_PASSWORD}@{RABBITMQ_HOST}:{RABBITMQ_PORT}/' -) + SQLALCHEMY_ECHO: bool + SQLALCHEMY_EXPIRE_ON_COMMIT: bool -CONSUMER_NUMBER_OF_RETRY = os.getenv('CONSUMER_NUMBER_OF_RETRY', 2) -RESEND_DELAY_MS = os.getenv('RESEND_DELAY_MS', 60000) + RABBITMQ_USER: str + RABBITMQ_PASSWORD: str + RABBITMQ_HOST: str + RABBITMQ_PORT: int -REDIS_HOST = os.getenv('REDIS_HOST', 'yt_redis') + @property + def RABBITMQ_URI(self) -> str: + return f'amqp://{self.RABBITMQ_USER}:{self.RABBITMQ_PASSWORD}@{self.RABBITMQ_HOST}:{self.RABBITMQ_PORT}/' -TMP_DOWNLOAD_PATH = os.getenv('TMP_DOWNLOAD_PATH', '/tmp/download_tmpfs') -STORAGE_PATH = os.getenv('STORAGE_PATH', '/') -MAX_SIMULTANEOUS_DOWNLOADS = int(os.getenv('MAX_SIMULTANEOUS_DOWNLOADS', 10)) -YTDLP_VERSION_CHECK_INTERVAL = int(os.getenv('YTDLP_VERSION_CHECK_INTERVAL', 86400)) + CONSUMER_NUMBER_OF_RETRY: int = Field(default=2) + RESEND_DELAY_MS: int = Field(default=60000) -SAVE_VIDEO_FILE = get_env_bool(os.getenv('SAVE_VIDEO_FILE', True)) -UPLOAD_VIDEO_FILE = get_env_bool(os.getenv('UPLOAD_VIDEO_FILE', False)) -MAX_UPLOAD_VIDEO_FILE_SIZE = int(os.getenv('MAX_UPLOAD_VIDEO_FILE_SIZE', 2147483648)) + REDIS_HOST: str + + @property + def REDIS_URL(self) -> str: + return f'redis://{self.REDIS_HOST}' + + TMP_DOWNLOAD_PATH: str + + +settings = Settings() diff --git a/yt_shared/yt_shared/db.py b/yt_shared/yt_shared/db.py index 7801941..4316889 100644 --- a/yt_shared/yt_shared/db.py +++ b/yt_shared/yt_shared/db.py @@ -12,18 +12,13 @@ from sqlalchemy.orm import ( ) from sqlalchemy_utils import UUIDType -from yt_shared.config import ( - APPLICATION_NAME, - SQLALCHEMY_DATABASE_URI_ASYNC, - SQLALCHEMY_ECHO, - SQLALCHEMY_EXPIRE_ON_COMMIT, -) +from yt_shared.config import settings engine = create_async_engine( - SQLALCHEMY_DATABASE_URI_ASYNC, - echo=SQLALCHEMY_ECHO, + settings.SQLALCHEMY_DATABASE_URI_ASYNC, + echo=settings.SQLALCHEMY_ECHO, pool_pre_ping=True, - connect_args={'server_settings': {'application_name': APPLICATION_NAME}}, + connect_args={'server_settings': {'application_name': settings.APPLICATION_NAME}}, ) metadata = MetaData() @@ -31,7 +26,7 @@ metadata.bind = engine AsyncSessionLocal = sessionmaker( engine, class_=AsyncSession, - expire_on_commit=SQLALCHEMY_EXPIRE_ON_COMMIT, + expire_on_commit=settings.SQLALCHEMY_EXPIRE_ON_COMMIT, ) diff --git a/yt_shared/yt_shared/rabbit/rabbit.py b/yt_shared/yt_shared/rabbit/rabbit.py index 9032c9a..6b8f706 100644 --- a/yt_shared/yt_shared/rabbit/rabbit.py +++ b/yt_shared/yt_shared/rabbit/rabbit.py @@ -5,7 +5,7 @@ import aio_pika from aio_pika import RobustChannel, RobustConnection from aio_pika.abc import AbstractRobustExchange, AbstractRobustQueue -from yt_shared.config import RABBITMQ_URI +from yt_shared.config import settings from yt_shared.rabbit.rabbit_config import get_rabbit_config @@ -30,7 +30,7 @@ class RabbitMQ: async def _set_connection(self): self.connection = await aio_pika.connect_robust( - RABBITMQ_URI, + settings.RABBITMQ_URI, loop=get_running_loop(), reconnect_interval=self.RABBITMQ_RECONNECT_INTERVAL, ) diff --git a/yt_shared/yt_shared/repositories/task.py b/yt_shared/yt_shared/repositories/task.py index 4d84c5e..b6e6253 100644 --- a/yt_shared/yt_shared/repositories/task.py +++ b/yt_shared/yt_shared/repositories/task.py @@ -2,6 +2,7 @@ import logging from uuid import UUID from sqlalchemy import insert, select +from sqlalchemy.exc import NoResultFound from sqlalchemy.ext.asyncio import AsyncSession from yt_shared.constants import TaskStatus @@ -22,7 +23,10 @@ class TaskRepository: stmt = select(Task).filter_by(id=video_payload.id) task = await db.execute(stmt) - return task.scalar_one() + try: + return task.scalar_one() + except NoResultFound: + return await self._create_task(db, video_payload) @staticmethod async def _create_task(db: AsyncSession, video_payload: VideoPayload) -> Task: diff --git a/yt_shared/yt_shared/schemas/base.py b/yt_shared/yt_shared/schemas/base.py index 5046fc0..e05100e 100644 --- a/yt_shared/yt_shared/schemas/base.py +++ b/yt_shared/yt_shared/schemas/base.py @@ -1,13 +1,6 @@ -import orjson from pydantic import BaseModel, Extra -def orjson_dumps(v, *, default) -> str: - return orjson.dumps(v, default=default).decode() - - class RealBaseModel(BaseModel): class Config: extra = Extra.forbid - json_loads = orjson.loads - json_dumps = orjson_dumps diff --git a/bot/core/tasks/abstract.py b/yt_shared/yt_shared/task_utils/abstract.py similarity index 88% rename from bot/core/tasks/abstract.py rename to yt_shared/yt_shared/task_utils/abstract.py index c4cbc33..42655c5 100644 --- a/bot/core/tasks/abstract.py +++ b/yt_shared/yt_shared/task_utils/abstract.py @@ -8,4 +8,5 @@ class AbstractTask(metaclass=abc.ABCMeta): @abc.abstractmethod async def run(self) -> None: + """Main entry point.""" pass diff --git a/yt_shared/yt_shared/task_utils/tasks.py b/yt_shared/yt_shared/task_utils/tasks.py index 7b387bd..693aff3 100644 --- a/yt_shared/yt_shared/task_utils/tasks.py +++ b/yt_shared/yt_shared/task_utils/tasks.py @@ -1,7 +1,6 @@ import asyncio import functools import logging -from functools import partial, wraps from typing import Any, Awaitable, Tuple, TypeVar