mirror of
https://github.com/simple-login/app.git
synced 2025-09-04 13:44:23 +08:00
feat: add monitoring to job runner (#2404)
* feat: add monitoring to job runner * feat: add proper error handling in job runner * chore: unify newrelic metric
This commit is contained in:
parent
7ecae793d7
commit
78aa169c10
3 changed files with 85 additions and 13 deletions
|
@ -7,6 +7,8 @@ import time
|
|||
from typing import List, Optional
|
||||
|
||||
import arrow
|
||||
import newrelic.agent
|
||||
from sqlalchemy.orm import Query
|
||||
from sqlalchemy.orm.exc import ObjectDeletedError
|
||||
from sqlalchemy.sql.expression import or_, and_
|
||||
|
||||
|
@ -319,12 +321,12 @@ def process_job(job: Job):
|
|||
LOG.e("Unknown job name %s", job.name)
|
||||
|
||||
|
||||
def get_jobs_to_run(taken_before_time: arrow.Arrow) -> List[Job]:
|
||||
def get_jobs_to_run_query(taken_before_time: arrow.Arrow) -> Query:
|
||||
# Get jobs that match all conditions:
|
||||
# - Job.state == ready OR (Job.state == taken AND Job.taken_at < now - 30 mins AND Job.attempts < 5)
|
||||
# - Job.run_at is Null OR Job.run_at < now + 10 mins
|
||||
run_at_earliest = arrow.now().shift(minutes=+10)
|
||||
query = Job.filter(
|
||||
return Job.filter(
|
||||
and_(
|
||||
or_(
|
||||
Job.state == JobState.ready.value,
|
||||
|
@ -337,6 +339,10 @@ def get_jobs_to_run(taken_before_time: arrow.Arrow) -> List[Job]:
|
|||
or_(Job.run_at.is_(None), and_(Job.run_at <= run_at_earliest)),
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
def get_jobs_to_run(taken_before_time: arrow.Arrow) -> List[Job]:
|
||||
query = get_jobs_to_run_query(taken_before_time)
|
||||
return (
|
||||
query.order_by(Job.priority.desc())
|
||||
.order_by(Job.run_at.asc())
|
||||
|
@ -385,11 +391,33 @@ if __name__ == "__main__":
|
|||
if not take_job(job, taken_before_time):
|
||||
continue
|
||||
LOG.d("Take job %s", job)
|
||||
process_job(job)
|
||||
|
||||
job.state = JobState.done.value
|
||||
try:
|
||||
newrelic.agent.record_custom_event("ProcessJob", {"job": job.name})
|
||||
process_job(job)
|
||||
job_result = "success"
|
||||
|
||||
job.state = JobState.done.value
|
||||
jobs_done += 1
|
||||
except Exception as e:
|
||||
LOG.warn(f"Error processing job (id={job.id} name={job.name}): {e}")
|
||||
|
||||
# Increment manually, as the attempts increment is done by the take_job but not
|
||||
# updated in our instance
|
||||
job_attempts = job.attempts + 1
|
||||
if job_attempts >= config.JOB_MAX_ATTEMPTS:
|
||||
LOG.warn(
|
||||
f"Marking job (id={job.id} name={job.name} attempts={job_attempts}) as ERROR"
|
||||
)
|
||||
job.state = JobState.error.value
|
||||
job_result = "error"
|
||||
else:
|
||||
job_result = "retry"
|
||||
|
||||
newrelic.agent.record_custom_event(
|
||||
"JobProcessed", {"job": job.name, "result": job_result}
|
||||
)
|
||||
Session.commit()
|
||||
jobs_done += 1
|
||||
|
||||
if jobs_done == 0:
|
||||
time.sleep(10)
|
||||
|
|
|
@ -7,8 +7,11 @@ from typing import List, Dict
|
|||
import arrow
|
||||
import newrelic.agent
|
||||
|
||||
from app.models import JobState
|
||||
from app.config import JOB_MAX_ATTEMPTS, JOB_TAKEN_RETRY_WAIT_MINS
|
||||
from app.db import Session
|
||||
from app.log import LOG
|
||||
from job_runner import get_jobs_to_run_query
|
||||
from monitor.metric_exporter import MetricExporter
|
||||
|
||||
# the number of consecutive fails
|
||||
|
@ -154,6 +157,38 @@ def log_failed_events():
|
|||
newrelic.agent.record_custom_metric("Custom/sync_events_failed", failed_events)
|
||||
|
||||
|
||||
@newrelic.agent.background_task()
|
||||
def log_jobs_to_run():
|
||||
taken_before_time = arrow.now().shift(minutes=-JOB_TAKEN_RETRY_WAIT_MINS)
|
||||
query = get_jobs_to_run_query(taken_before_time)
|
||||
count = query.count()
|
||||
LOG.d(f"Pending jobs to run: {count}")
|
||||
newrelic.agent.record_custom_metric("Custom/jobs_to_run", count)
|
||||
|
||||
|
||||
@newrelic.agent.background_task()
|
||||
def log_failed_jobs():
|
||||
r = Session.execute(
|
||||
"""
|
||||
SELECT COUNT(*)
|
||||
FROM job
|
||||
WHERE (
|
||||
state = :error_state
|
||||
OR (state = :taken_state AND attempts >= :max_attempts)
|
||||
)
|
||||
""",
|
||||
{
|
||||
"error_state": JobState.error.value,
|
||||
"taken_state": JobState.taken.value,
|
||||
"max_attempts": JOB_MAX_ATTEMPTS,
|
||||
},
|
||||
)
|
||||
failed_jobs = list(r)[0][0]
|
||||
|
||||
LOG.d(f"Failed jobs: {failed_jobs}")
|
||||
newrelic.agent.record_custom_metric("Custom/failed_jobs", failed_jobs)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
exporter = MetricExporter(get_newrelic_license())
|
||||
while True:
|
||||
|
@ -163,6 +198,8 @@ if __name__ == "__main__":
|
|||
log_events_pending_dead_letter()
|
||||
log_failed_events()
|
||||
log_nb_db_connection_by_app_name()
|
||||
log_jobs_to_run()
|
||||
log_failed_jobs()
|
||||
Session.close()
|
||||
|
||||
exporter.run()
|
||||
|
|
|
@ -56,15 +56,22 @@ def test_get_jobs_to_run(flask_client):
|
|||
run_at=now.shift(hours=3),
|
||||
)
|
||||
# Job out of attempts
|
||||
(
|
||||
Job.create(
|
||||
name="",
|
||||
payload="",
|
||||
state=JobState.taken.value,
|
||||
taken_at=now.shift(minutes=-(config.JOB_TAKEN_RETRY_WAIT_MINS + 10)),
|
||||
attempts=config.JOB_MAX_ATTEMPTS + 1,
|
||||
),
|
||||
Job.create(
|
||||
name="",
|
||||
payload="",
|
||||
state=JobState.taken.value,
|
||||
taken_at=now.shift(minutes=-(config.JOB_TAKEN_RETRY_WAIT_MINS + 10)),
|
||||
attempts=config.JOB_MAX_ATTEMPTS + 1,
|
||||
)
|
||||
# Job marked as error
|
||||
Job.create(
|
||||
name="",
|
||||
payload="",
|
||||
state=JobState.error.value,
|
||||
taken_at=now.shift(minutes=-(config.JOB_TAKEN_RETRY_WAIT_MINS + 10)),
|
||||
attempts=config.JOB_MAX_ATTEMPTS + 1,
|
||||
)
|
||||
|
||||
Session.commit()
|
||||
taken_before_time = arrow.now().shift(minutes=-config.JOB_TAKEN_RETRY_WAIT_MINS)
|
||||
jobs = get_jobs_to_run(taken_before_time)
|
||||
|
|
Loading…
Add table
Reference in a new issue