From aacd033bb090fcb2d1f4397b7d87640ffdea1503 Mon Sep 17 00:00:00 2001 From: Arjun Dinesh Jagdale <142811259+ArjunJagdale@users.noreply.github.com> Date: Wed, 2 Jul 2025 01:19:59 +0530 Subject: [PATCH] Update executor.py - fixes #2532 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fixes #2532 When a worker sends a heartbeat immediately after job assignment, it may race with job persistence, resulting in: `JobDocument matching query does not exist`. This patch adds retry logic to the `WorkerExecutor.heartbeat()` method in `executor.py`, retrying the heartbeat a few times (with delay) if this specific error occurs. This avoids false negatives and unnecessary worker shutdowns on transient states. Retries: 6 attempts Delay: 0.5s between retries Only the specific "JobDocument not found" error is retried — other exceptions still raise immediately. --- services/worker/src/worker/executor.py | 27 ++++++++++++++++++++------ 1 file changed, 21 insertions(+), 6 deletions(-) diff --git a/services/worker/src/worker/executor.py b/services/worker/src/worker/executor.py index ae7982337e..5ab9cb6c28 100644 --- a/services/worker/src/worker/executor.py +++ b/services/worker/src/worker/executor.py @@ -146,12 +146,27 @@ def heartbeat(self) -> None: worker_state = self.get_state() if worker_state and worker_state["current_job_info"]: job_id = worker_state["current_job_info"]["job_id"] - try: - Queue().heartbeat(job_id=job_id) - except Exception as error: - logging.warning(f"Heartbeat failed for job {job_id} (AfterJobPlan might be running): {error}") - # Don't stop since the AfterJobPlan may be running - # self.stop() + max_retries = 6 + delay = 0.5 # seconds + + for attempt in range(max_retries): + try: + Queue().heartbeat(job_id=job_id) + return # success + except Exception as error: + if "JobDocument matching query does not exist" in str(error): + if attempt < max_retries - 1: + logging.warning( + f"Heartbeat retry {attempt + 1}/{max_retries} for job {job_id} - waiting {delay}s: {error}" + ) + time.sleep(delay) + else: + logging.error(f"Heartbeat failed after retries for job {job_id}: {error}") + else: + # Unrelated error, raise immediately + raise + + def kill_zombies(self) -> None: queue = Queue()