Delete old email_log entries in batches to avoid table lock (#1902)

* Delete old email_log entries in batches to avoid table lock

* Avoid nested join

* Commiting after the batch delete

* Added statement count print

* Rename var
This commit is contained in:
Adrià Casajús 2023-10-02 10:50:02 +02:00 committed by GitHub
parent 7600038813
commit e6bcf81726
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

36
cron.py
View file

@ -9,7 +9,7 @@ from sqlalchemy import func, desc, or_, and_
from sqlalchemy.ext.compiler import compiles from sqlalchemy.ext.compiler import compiles
from sqlalchemy.orm import joinedload from sqlalchemy.orm import joinedload
from sqlalchemy.orm.exc import ObjectDeletedError from sqlalchemy.orm.exc import ObjectDeletedError
from sqlalchemy.sql import Insert from sqlalchemy.sql import Insert, text
from app import s3, config from app import s3, config
from app.alias_utils import nb_email_log_for_mailbox from app.alias_utils import nb_email_log_for_mailbox
@ -85,23 +85,43 @@ def delete_logs():
delete_refused_emails() delete_refused_emails()
delete_old_monitoring() delete_old_monitoring()
for t in TransactionalEmail.filter( for t_email in TransactionalEmail.filter(
TransactionalEmail.created_at < arrow.now().shift(days=-7) TransactionalEmail.created_at < arrow.now().shift(days=-7)
): ):
TransactionalEmail.delete(t.id) TransactionalEmail.delete(t_email.id)
for b in Bounce.filter(Bounce.created_at < arrow.now().shift(days=-7)): for b in Bounce.filter(Bounce.created_at < arrow.now().shift(days=-7)):
Bounce.delete(b.id) Bounce.delete(b.id)
Session.commit() Session.commit()
LOG.d("Delete EmailLog older than 2 weeks") LOG.d("Deleting EmailLog older than 2 weeks")
max_dt = arrow.now().shift(weeks=-2) total_deleted = 0
nb_deleted = EmailLog.filter(EmailLog.created_at < max_dt).delete() batch_size = 500
Session.commit() Session.execute("set session statement_timeout=30000").rowcount
queries_done = 0
cutoff_time = arrow.now().shift(days=-14)
rows_to_delete = EmailLog.filter(EmailLog.created_at < cutoff_time).count()
expected_queries = int(rows_to_delete / batch_size)
sql = text(
f"DELETE FROM email_log WHERE id IN (SELECT id FROM email_log WHERE created_at < :cutoff_time order by created_at limit :batch_size)"
)
str_cutoff_time = cutoff_time.isoformat()
while total_deleted < rows_to_delete:
deleted_count = Session.execute(
sql, {"cutoff_time": str_cutoff_time, "batch_size": batch_size}
).rowcount
Session.commit()
total_deleted += deleted_count
queries_done += 1
LOG.i(
f"[{queries_done}/{expected_queries}] Deleted {total_deleted} EmailLog entries"
)
if deleted_count < batch_size:
break
LOG.i("Delete %s email logs", nb_deleted) LOG.i("Deleted %s email logs", total_deleted)
def delete_refused_emails(): def delete_refused_emails():