yt-dlp-bot/start.py
2024-03-13 23:51:30 +02:00

111 lines
3 KiB
Python

"""Over-engineered Python 3.10+ version of bash script with netcat (nc) just for fun.
#!/bin/bash
check_reachability() {
while ! nc -z "$1" "${!2}"
do
echo "Waiting for $3 to be reachable on port ${!2}"
sleep 1
done
echo "Connection to $3 on port ${!2} verified"
return 0
}
wait_for_services_to_be_reachable() {
check_reachability rabbitmq RABBITMQ_PORT RabbitMQ
check_reachability postgres POSTGRES_PORT PostgreSQL
}
wait_for_services_to_be_reachable
exit 0
"""
import asyncio
import logging
import os
import sys
from dataclasses import dataclass, field
from typing import Generator, Type
SOCK_CONNECTED = 0
DEFAULT_PORT = 0
DEFAULT_SLEEP_TIME = 1
class ServiceRegistry(type):
REGISTRY: dict[str, type['BaseService']] = {}
def __new__(
mcs: Type['ServiceRegistry'],
name: str,
bases: tuple[type['BaseService']],
attrs: dict,
) -> type['BaseService']:
service_cls: type['BaseService'] = type.__new__(mcs, name, bases, attrs)
mcs.REGISTRY[service_cls.__name__] = service_cls
return service_cls
@classmethod
def get_registry(mcs) -> dict[str, type['BaseService']]:
return mcs.REGISTRY.copy()
@classmethod
def get_instances(mcs) -> Generator['BaseService', None, None]:
return (service_cls() for service_cls in mcs.REGISTRY.values())
@dataclass
class BaseService:
name: str = field(default='', init=False)
host: str = field(default='', init=False)
port: int = field(default=DEFAULT_PORT, init=False)
def __post_init__(self) -> None:
if self.__class__ is BaseService:
raise TypeError('Cannot instantiate abstract class.')
@dataclass
class RabbitMQService(BaseService, metaclass=ServiceRegistry):
name: str = field(default='RabbitMQ')
host: str = field(default=os.getenv('RABBITMQ_HOST'))
port: int = field(default=int(os.getenv('RABBITMQ_PORT', DEFAULT_PORT)))
@dataclass
class PostgreSQLService(BaseService, metaclass=ServiceRegistry):
name: str = field(default='PostgreSQL')
host: str = field(default=os.getenv('POSTGRES_HOST'))
port: int = field(default=int(os.getenv('POSTGRES_PORT', DEFAULT_PORT)))
async def is_port_open(host: str, port: int) -> bool:
try:
reader, writer = await asyncio.open_connection(host, port)
writer.close()
await writer.wait_closed()
return True
except Exception:
return False
async def check_reachability(service: BaseService) -> None:
while True:
print(f'[{service.name}] Waiting to be reachable on port {service.port}')
if await is_port_open(host=service.host, port=service.port):
break
await asyncio.sleep(DEFAULT_SLEEP_TIME)
print(f'[{service.name}] Connection on port {service.port} verified')
async def main() -> None:
logging.getLogger('asyncio').setLevel(logging.ERROR)
coros = [check_reachability(service) for service in ServiceRegistry.get_instances()]
await asyncio.gather(*coros)
if __name__ == '__main__':
sys.exit(asyncio.run(main()))