mirror of
https://github.com/simple-login/app.git
synced 2024-09-20 06:55:59 +08:00
Feature: Use new job status to retry killed jobs (#1130)
* Feature: Use new job status to retry killed jobs * Set attermpts and time via config * Update timing condition Co-authored-by: Adrià Casajús <adria.casajus@proton.ch>
This commit is contained in:
parent
93968d00b6
commit
88dd07e48d
|
@ -486,3 +486,6 @@ DISABLE_CREATE_CONTACTS_FOR_FREE_USERS = False
|
|||
PARTNER_API_TOKEN_SECRET = os.environ.get("PARTNER_API_TOKEN_SECRET") or (
|
||||
FLASK_SECRET + "partnerapitoken"
|
||||
)
|
||||
|
||||
JOB_MAX_ATTEMPTS = 5
|
||||
JOB_TAKEN_RETRY_WAIT_MINS = 30
|
||||
|
|
|
@ -3,8 +3,10 @@ Run scheduled jobs.
|
|||
Not meant for running job at precise time (+- 1h)
|
||||
"""
|
||||
import time
|
||||
from typing import List
|
||||
|
||||
import arrow
|
||||
from sqlalchemy.sql.expression import or_, and_
|
||||
|
||||
from app import config
|
||||
from app.db import Session
|
||||
|
@ -220,19 +222,33 @@ SimpleLogin team.
|
|||
LOG.e("Unknown job name %s", job.name)
|
||||
|
||||
|
||||
def get_jobs_to_run() -> List[Job]:
|
||||
# Get jobs that match all conditions:
|
||||
# - Job.state == ready OR (Job.state == taken AND Job.taken_at < now - 30 mins AND Job.attempts < 5)
|
||||
# - Job.run_at is Null OR Job.run_at < now + 10 mins
|
||||
taken_at_earliest = arrow.now().shift(minutes=-config.JOB_TAKEN_RETRY_WAIT_MINS)
|
||||
run_at_earliest = arrow.now().shift(minutes=+10)
|
||||
query = Job.filter(
|
||||
and_(
|
||||
or_(
|
||||
Job.state == JobState.ready.value,
|
||||
and_(
|
||||
Job.state == JobState.taken.value,
|
||||
Job.taken_at < taken_at_earliest,
|
||||
Job.attempts < config.JOB_MAX_ATTEMPTS,
|
||||
),
|
||||
),
|
||||
or_(Job.run_at.is_(None), and_(Job.run_at <= run_at_earliest)),
|
||||
)
|
||||
)
|
||||
return query.all()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
while True:
|
||||
# wrap in an app context to benefit from app setup like database cleanup, sentry integration, etc
|
||||
with create_light_app().app_context():
|
||||
# run a job 1h earlier or later is not a big deal ...
|
||||
min_dt = arrow.now().shift(hours=-1)
|
||||
max_dt = arrow.now().shift(hours=1)
|
||||
|
||||
# TODO: Change This condition after deploying this MR
|
||||
# to Job.state == ready or (Job.state == taken and job.taken_at < arrow.now.shift(minutes=-10))
|
||||
for job in Job.filter(
|
||||
Job.taken.is_(False), Job.run_at > min_dt, Job.run_at <= max_dt
|
||||
).all():
|
||||
for job in get_jobs_to_run():
|
||||
LOG.d("Take job %s", job)
|
||||
|
||||
# mark the job as taken, whether it will be executed successfully or not
|
||||
|
|
71
tests/jobs/test_job_runner.py
Normal file
71
tests/jobs/test_job_runner.py
Normal file
|
@ -0,0 +1,71 @@
|
|||
from app import config
|
||||
from app.db import Session
|
||||
from job_runner import get_jobs_to_run
|
||||
from app.models import Job, JobState
|
||||
import arrow
|
||||
|
||||
|
||||
def test_get_jobs_to_run(flask_client):
|
||||
now = arrow.now()
|
||||
for job in Job.all():
|
||||
Job.delete(job.id)
|
||||
expected_jobs_to_run = [
|
||||
# Jobs in ready state
|
||||
Job.create(name="", payload=""),
|
||||
Job.create(name="", payload="", run_at=now),
|
||||
# Jobs in taken state
|
||||
Job.create(
|
||||
name="",
|
||||
payload="",
|
||||
state=JobState.taken.value,
|
||||
taken_at=now.shift(minutes=-(config.JOB_TAKEN_RETRY_WAIT_MINS + 10)),
|
||||
),
|
||||
Job.create(
|
||||
name="",
|
||||
payload="",
|
||||
state=JobState.taken.value,
|
||||
taken_at=now.shift(minutes=-(config.JOB_TAKEN_RETRY_WAIT_MINS + 10)),
|
||||
attempts=config.JOB_MAX_ATTEMPTS - 1,
|
||||
),
|
||||
Job.create(
|
||||
name="",
|
||||
payload="",
|
||||
state=JobState.taken.value,
|
||||
taken_at=now.shift(minutes=-(config.JOB_TAKEN_RETRY_WAIT_MINS + 10)),
|
||||
run_at=now,
|
||||
),
|
||||
]
|
||||
# Jobs not to run
|
||||
# Job to run in the future
|
||||
Job.create(name="", payload="", run_at=now.shift(hours=2))
|
||||
# Job in done state
|
||||
Job.create(name="", payload="", state=JobState.done.value)
|
||||
# Job taken but not enough time has passed
|
||||
Job.create(
|
||||
name="",
|
||||
payload="",
|
||||
state=JobState.taken.value,
|
||||
taken_at=now.shift(minutes=-(config.JOB_TAKEN_RETRY_WAIT_MINS - 10)),
|
||||
)
|
||||
# Job taken with enough time but out of run_at zone
|
||||
Job.create(
|
||||
name="",
|
||||
payload="",
|
||||
state=JobState.taken.value,
|
||||
taken_at=now.shift(minutes=-(config.JOB_TAKEN_RETRY_WAIT_MINS + 10)),
|
||||
run_at=now.shift(hours=3),
|
||||
)
|
||||
# Job out of attempts
|
||||
Job.create(
|
||||
name="",
|
||||
payload="",
|
||||
state=JobState.taken.value,
|
||||
taken_at=now.shift(minutes=-(config.JOB_TAKEN_RETRY_WAIT_MINS + 10)),
|
||||
attempts=config.JOB_MAX_ATTEMPTS + 1,
|
||||
),
|
||||
Session.commit()
|
||||
jobs = get_jobs_to_run()
|
||||
assert len(jobs) == len(expected_jobs_to_run)
|
||||
job_ids = [job.id for job in jobs]
|
||||
for job in expected_jobs_to_run:
|
||||
assert job.id in job_ids
|
Loading…
Reference in a new issue