felicity-lims/felicity/almigrator.py
2024-11-18 01:15:00 +02:00

56 lines
2.1 KiB
Python

from alembic import command
from alembic.config import Config
from alembic.script import ScriptDirectory
from sqlalchemy import text
from sqlalchemy.engine import CursorResult
from sqlalchemy.ext.asyncio import create_async_engine
from felicity.core.config import ROOT_DIR
from felicity.core.config import settings
class FelicityMigrator:
def __init__(self, config_path: str = f"{ROOT_DIR}/alembic.ini") -> None:
self.alembic_cfg = Config(config_path)
self.database_url = settings.SQLALCHEMY_DATABASE_URI
def upgrade(self, revision: str = "head") -> None:
command.upgrade(self.alembic_cfg, revision)
def downgrade(self, revision: str) -> None:
command.downgrade(self.alembic_cfg, revision)
def create_revision(self, message: str) -> None:
command.revision(self.alembic_cfg, message=message)
def current(self) -> None:
command.current(self.alembic_cfg)
def history(self) -> None:
command.history(self.alembic_cfg)
async def get_current_db_revision(self) -> str | None:
engine = create_async_engine(self.database_url)
async with engine.connect() as connection:
result: CursorResult = await connection.execute(
text("SELECT version_num FROM alembic_version")
)
return result.scalar()
async def get_latest_revision(self) -> str | None:
script = ScriptDirectory.from_config(self.alembic_cfg)
return script.get_current_head()
async def check_for_updates(self) -> None:
# Get current and latest revisions
current_revision = await self.get_current_db_revision()
latest_revision = await self.get_latest_revision()
if current_revision != latest_revision:
print("Database is not up-to-date!")
print(f"Current revision: {current_revision}")
print(f"Latest revision: {latest_revision}")
print("You should run the Alembic upgrade.")
# Optionally, you can call `run_alembic_upgrade()` here to auto-upgrade.
else:
print("Database is up-to-date.")