From e6bcf817264256eb8614af0ce97468e034a60650 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adri=C3=A0=20Casaj=C3=BAs?= Date: Mon, 2 Oct 2023 10:50:02 +0200 Subject: [PATCH] Delete old email_log entries in batches to avoid table lock (#1902) * Delete old email_log entries in batches to avoid table lock * Avoid nested join * Commiting after the batch delete * Added statement count print * Rename var --- cron.py | 36 ++++++++++++++++++++++++++++-------- 1 file changed, 28 insertions(+), 8 deletions(-) diff --git a/cron.py b/cron.py index 6c593432..2cbb8b9f 100644 --- a/cron.py +++ b/cron.py @@ -9,7 +9,7 @@ from sqlalchemy import func, desc, or_, and_ from sqlalchemy.ext.compiler import compiles from sqlalchemy.orm import joinedload from sqlalchemy.orm.exc import ObjectDeletedError -from sqlalchemy.sql import Insert +from sqlalchemy.sql import Insert, text from app import s3, config from app.alias_utils import nb_email_log_for_mailbox @@ -85,23 +85,43 @@ def delete_logs(): delete_refused_emails() delete_old_monitoring() - for t in TransactionalEmail.filter( + for t_email in TransactionalEmail.filter( TransactionalEmail.created_at < arrow.now().shift(days=-7) ): - TransactionalEmail.delete(t.id) + TransactionalEmail.delete(t_email.id) for b in Bounce.filter(Bounce.created_at < arrow.now().shift(days=-7)): Bounce.delete(b.id) Session.commit() - LOG.d("Delete EmailLog older than 2 weeks") + LOG.d("Deleting EmailLog older than 2 weeks") - max_dt = arrow.now().shift(weeks=-2) - nb_deleted = EmailLog.filter(EmailLog.created_at < max_dt).delete() - Session.commit() + total_deleted = 0 + batch_size = 500 + Session.execute("set session statement_timeout=30000").rowcount + queries_done = 0 + cutoff_time = arrow.now().shift(days=-14) + rows_to_delete = EmailLog.filter(EmailLog.created_at < cutoff_time).count() + expected_queries = int(rows_to_delete / batch_size) + sql = text( + f"DELETE FROM email_log WHERE id IN (SELECT id FROM email_log WHERE created_at < :cutoff_time order by created_at limit :batch_size)" + ) + str_cutoff_time = cutoff_time.isoformat() + while total_deleted < rows_to_delete: + deleted_count = Session.execute( + sql, {"cutoff_time": str_cutoff_time, "batch_size": batch_size} + ).rowcount + Session.commit() + total_deleted += deleted_count + queries_done += 1 + LOG.i( + f"[{queries_done}/{expected_queries}] Deleted {total_deleted} EmailLog entries" + ) + if deleted_count < batch_size: + break - LOG.i("Delete %s email logs", nb_deleted) + LOG.i("Deleted %s email logs", total_deleted) def delete_refused_emails():