Allow several job runners to run in parallel (#2281)

* Allow several job runners to run in parallel

* Fix test

* fix: job_runner

---------

Co-authored-by: Carlos Quintana <carlos.quintana@proton.ch>
This commit is contained in:
Adrià Casajús 2025-02-25 15:02:51 +01:00 committed by GitHub
parent 7e77afa4fc
commit 9c3f346280
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 44 additions and 14 deletions

View file

@ -6,6 +6,7 @@ import time
from typing import List, Optional
import arrow
from sqlalchemy.orm.exc import ObjectDeletedError
from sqlalchemy.sql.expression import or_, and_
from app import config
@ -23,6 +24,7 @@ from app.log import LOG
from app.models import User, Job, BatchImport, Mailbox, CustomDomain, JobState
from app.monitor_utils import send_version_event
from app.user_audit_log_utils import emit_user_audit_log, UserAuditLogAction
from events.event_sink import HttpEventSink
from server import create_light_app
@ -308,16 +310,15 @@ def process_job(job: Job):
elif job.name == config.JOB_SEND_EVENT_TO_WEBHOOK:
send_job = SendEventToWebhookJob.create_from_job(job)
if send_job:
send_job.run()
send_job.run(HttpEventSink())
else:
LOG.e("Unknown job name %s", job.name)
def get_jobs_to_run() -> List[Job]:
def get_jobs_to_run(taken_before_time: arrow.Arrow) -> List[Job]:
# Get jobs that match all conditions:
# - Job.state == ready OR (Job.state == taken AND Job.taken_at < now - 30 mins AND Job.attempts < 5)
# - Job.run_at is Null OR Job.run_at < now + 10 mins
taken_at_earliest = arrow.now().shift(minutes=-config.JOB_TAKEN_RETRY_WAIT_MINS)
run_at_earliest = arrow.now().shift(minutes=+10)
query = Job.filter(
and_(
@ -325,7 +326,7 @@ def get_jobs_to_run() -> List[Job]:
Job.state == JobState.ready.value,
and_(
Job.state == JobState.taken.value,
Job.taken_at < taken_at_earliest,
Job.taken_at < taken_before_time,
Job.attempts < config.JOB_MAX_ATTEMPTS,
),
),
@ -335,23 +336,51 @@ def get_jobs_to_run() -> List[Job]:
return query.all()
def take_job(job: Job, taken_before_time: arrow.Arrow) -> bool:
sql = """
UPDATE job
SET
taken_at = :taken_time,
attempts = attempts + 1,
state = :taken_state
WHERE id = :job_id
AND (state = :ready_state OR (state=:taken_state AND taken_at < :taken_before_time))
"""
args = {
"taken_time": arrow.now().datetime,
"job_id": job.id,
"ready_state": JobState.ready.value,
"taken_state": JobState.taken.value,
"taken_before_time": taken_before_time.datetime,
}
try:
res = Session.execute(sql, args)
Session.commit()
except ObjectDeletedError:
return False
return res.rowcount > 0
if __name__ == "__main__":
send_version_event("job_runner")
while True:
# wrap in an app context to benefit from app setup like database cleanup, sentry integration, etc
with create_light_app().app_context():
for job in get_jobs_to_run():
LOG.d("Take job %s", job)
taken_before_time = arrow.now().shift(
minutes=-config.JOB_TAKEN_RETRY_WAIT_MINS
)
# mark the job as taken, whether it will be executed successfully or not
job.taken = True
job.taken_at = arrow.now()
job.state = JobState.taken.value
job.attempts += 1
Session.commit()
jobs_done = 0
for job in get_jobs_to_run(taken_before_time):
if not take_job(job, taken_before_time):
continue
LOG.d("Take job %s", job)
process_job(job)
job.state = JobState.done.value
Session.commit()
jobs_done += 1
time.sleep(10)
if jobs_done == 0:
time.sleep(10)

View file

@ -66,7 +66,8 @@ def test_get_jobs_to_run(flask_client):
),
)
Session.commit()
jobs = get_jobs_to_run()
taken_before_time = arrow.now().shift(minutes=-config.JOB_TAKEN_RETRY_WAIT_MINS)
jobs = get_jobs_to_run(taken_before_time)
assert len(jobs) == len(expected_jobs_to_run)
job_ids = [job.id for job in jobs]
for job in expected_jobs_to_run: