diff --git a/compute_worker/compute_worker.py b/compute_worker/compute_worker.py index 69ac3e7b1..7023a644a 100644 --- a/compute_worker/compute_worker.py +++ b/compute_worker/compute_worker.py @@ -855,12 +855,16 @@ def start(self): self._run_program_directory(ingestion_program_dir, kind='ingestion'), self.watch_detailed_results(), loop=loop, + return_exceptions=True, ) + task_results = [] # will store results/exceptions from gather signal.signal(signal.SIGALRM, alarm_handler) signal.alarm(self.execution_time_limit) try: loop.run_until_complete(gathered_tasks) + # keep what gather returned so we can detect async errors later + task_results = list(gathered_tasks.result() or []) except ExecutionTimeLimitExceeded: error_message = f"Execution Time Limit exceeded. Limit was {self.execution_time_limit} seconds" logger.error(error_message) @@ -911,6 +915,18 @@ def start(self): logger.info("Program finished") signal.alarm(0) + # Failure "gate" BEFORE changing status + # An async task error? + had_async_exc = any(isinstance(r, BaseException) for r in task_results) + # Non-zero exit from either container counts as failure for this phase + program_rc = getattr(self, "program_exit_code", None) + ingestion_rc = getattr(self, "ingestion_program_exit_code", None) + failed_rc = any(rc not in (0, None) for rc in (program_rc, ingestion_rc)) + if had_async_exc or failed_rc: + self._update_status(STATUS_FAILED, extra_information=f"program_rc={program_rc}, ingestion_rc={ingestion_rc}") + # Raise so upstream marks failed immediately + raise SubmissionException("Child task failed or non-zero return code") + if self.is_scoring: self._update_status(STATUS_FINISHED) else: