From 78aa169c10dd9b24b584f5f621edeec99d78d41b Mon Sep 17 00:00:00 2001 From: Carlos Quintana <74399022+cquintana92@users.noreply.github.com> Date: Thu, 27 Feb 2025 16:42:20 +0100 Subject: [PATCH] feat: add monitoring to job runner (#2404) * feat: add monitoring to job runner * feat: add proper error handling in job runner * chore: unify newrelic metric --- job_runner.py | 38 ++++++++++++++++++++++++++++++----- monitoring.py | 37 ++++++++++++++++++++++++++++++++++ tests/jobs/test_job_runner.py | 23 +++++++++++++-------- 3 files changed, 85 insertions(+), 13 deletions(-) diff --git a/job_runner.py b/job_runner.py index 5105be5e..23ed4087 100644 --- a/job_runner.py +++ b/job_runner.py @@ -7,6 +7,8 @@ import time from typing import List, Optional import arrow +import newrelic.agent +from sqlalchemy.orm import Query from sqlalchemy.orm.exc import ObjectDeletedError from sqlalchemy.sql.expression import or_, and_ @@ -319,12 +321,12 @@ def process_job(job: Job): LOG.e("Unknown job name %s", job.name) -def get_jobs_to_run(taken_before_time: arrow.Arrow) -> List[Job]: +def get_jobs_to_run_query(taken_before_time: arrow.Arrow) -> Query: # Get jobs that match all conditions: # - Job.state == ready OR (Job.state == taken AND Job.taken_at < now - 30 mins AND Job.attempts < 5) # - Job.run_at is Null OR Job.run_at < now + 10 mins run_at_earliest = arrow.now().shift(minutes=+10) - query = Job.filter( + return Job.filter( and_( or_( Job.state == JobState.ready.value, @@ -337,6 +339,10 @@ def get_jobs_to_run(taken_before_time: arrow.Arrow) -> List[Job]: or_(Job.run_at.is_(None), and_(Job.run_at <= run_at_earliest)), ) ) + + +def get_jobs_to_run(taken_before_time: arrow.Arrow) -> List[Job]: + query = get_jobs_to_run_query(taken_before_time) return ( query.order_by(Job.priority.desc()) .order_by(Job.run_at.asc()) @@ -385,11 +391,33 @@ if __name__ == "__main__": if not take_job(job, taken_before_time): continue LOG.d("Take job %s", job) - process_job(job) - job.state = JobState.done.value + try: + newrelic.agent.record_custom_event("ProcessJob", {"job": job.name}) + process_job(job) + job_result = "success" + + job.state = JobState.done.value + jobs_done += 1 + except Exception as e: + LOG.warn(f"Error processing job (id={job.id} name={job.name}): {e}") + + # Increment manually, as the attempts increment is done by the take_job but not + # updated in our instance + job_attempts = job.attempts + 1 + if job_attempts >= config.JOB_MAX_ATTEMPTS: + LOG.warn( + f"Marking job (id={job.id} name={job.name} attempts={job_attempts}) as ERROR" + ) + job.state = JobState.error.value + job_result = "error" + else: + job_result = "retry" + + newrelic.agent.record_custom_event( + "JobProcessed", {"job": job.name, "result": job_result} + ) Session.commit() - jobs_done += 1 if jobs_done == 0: time.sleep(10) diff --git a/monitoring.py b/monitoring.py index 72f4967a..4052e478 100644 --- a/monitoring.py +++ b/monitoring.py @@ -7,8 +7,11 @@ from typing import List, Dict import arrow import newrelic.agent +from app.models import JobState +from app.config import JOB_MAX_ATTEMPTS, JOB_TAKEN_RETRY_WAIT_MINS from app.db import Session from app.log import LOG +from job_runner import get_jobs_to_run_query from monitor.metric_exporter import MetricExporter # the number of consecutive fails @@ -154,6 +157,38 @@ def log_failed_events(): newrelic.agent.record_custom_metric("Custom/sync_events_failed", failed_events) +@newrelic.agent.background_task() +def log_jobs_to_run(): + taken_before_time = arrow.now().shift(minutes=-JOB_TAKEN_RETRY_WAIT_MINS) + query = get_jobs_to_run_query(taken_before_time) + count = query.count() + LOG.d(f"Pending jobs to run: {count}") + newrelic.agent.record_custom_metric("Custom/jobs_to_run", count) + + +@newrelic.agent.background_task() +def log_failed_jobs(): + r = Session.execute( + """ + SELECT COUNT(*) + FROM job + WHERE ( + state = :error_state + OR (state = :taken_state AND attempts >= :max_attempts) + ) + """, + { + "error_state": JobState.error.value, + "taken_state": JobState.taken.value, + "max_attempts": JOB_MAX_ATTEMPTS, + }, + ) + failed_jobs = list(r)[0][0] + + LOG.d(f"Failed jobs: {failed_jobs}") + newrelic.agent.record_custom_metric("Custom/failed_jobs", failed_jobs) + + if __name__ == "__main__": exporter = MetricExporter(get_newrelic_license()) while True: @@ -163,6 +198,8 @@ if __name__ == "__main__": log_events_pending_dead_letter() log_failed_events() log_nb_db_connection_by_app_name() + log_jobs_to_run() + log_failed_jobs() Session.close() exporter.run() diff --git a/tests/jobs/test_job_runner.py b/tests/jobs/test_job_runner.py index 1be0a77e..dcfb208e 100644 --- a/tests/jobs/test_job_runner.py +++ b/tests/jobs/test_job_runner.py @@ -56,15 +56,22 @@ def test_get_jobs_to_run(flask_client): run_at=now.shift(hours=3), ) # Job out of attempts - ( - Job.create( - name="", - payload="", - state=JobState.taken.value, - taken_at=now.shift(minutes=-(config.JOB_TAKEN_RETRY_WAIT_MINS + 10)), - attempts=config.JOB_MAX_ATTEMPTS + 1, - ), + Job.create( + name="", + payload="", + state=JobState.taken.value, + taken_at=now.shift(minutes=-(config.JOB_TAKEN_RETRY_WAIT_MINS + 10)), + attempts=config.JOB_MAX_ATTEMPTS + 1, ) + # Job marked as error + Job.create( + name="", + payload="", + state=JobState.error.value, + taken_at=now.shift(minutes=-(config.JOB_TAKEN_RETRY_WAIT_MINS + 10)), + attempts=config.JOB_MAX_ATTEMPTS + 1, + ) + Session.commit() taken_before_time = arrow.now().shift(minutes=-config.JOB_TAKEN_RETRY_WAIT_MINS) jobs = get_jobs_to_run(taken_before_time)