From c9aa6e26812a429834eb06602cbf9d63c6bc086e Mon Sep 17 00:00:00 2001 From: kelvin <43873157+kelvinkipruto@users.noreply.github.com> Date: Fri, 6 Mar 2026 16:51:42 +0300 Subject: [PATCH 1/6] fix(workflows): make task failures non-blocking MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Wrap each workflow task in a shared runTask helper so that a single task failure no longer aborts the workflow. Previously, an unhandled task error would mark the workflow job as failed in payload-jobs, blocking all subsequent scheduled runs until the table was cleared manually. Now each task catches its own error, logs it to Sentry, and the workflow always completes — allowing the next cron cycle to start cleanly. Co-Authored-By: Claude Sonnet 4.6 --- src/workflows/airtableWorkflow.ts | 58 +++++++++++++++++++------------ src/workflows/meedanWorkflow.ts | 26 ++++++++------ src/workflows/utils.ts | 19 ++++++++++ 3 files changed, 71 insertions(+), 32 deletions(-) diff --git a/src/workflows/airtableWorkflow.ts b/src/workflows/airtableWorkflow.ts index f623c9a2..d24caa0d 100644 --- a/src/workflows/airtableWorkflow.ts +++ b/src/workflows/airtableWorkflow.ts @@ -1,6 +1,6 @@ import { WorkflowConfig } from "payload"; import { randomUUID } from "node:crypto"; -import { defineWorkflow } from "./utils"; +import { defineWorkflow, runTask } from "./utils"; export const airtableWorkflow = defineWorkflow({ slug: "airtableWorkflow", @@ -12,26 +12,40 @@ export const airtableWorkflow = defineWorkflow({ }, ], handler: async ({ tasks }) => { - await tasks.createTenantFromAirtable(randomUUID(), { - input: {}, - }); - await tasks.createPoliticalEntity(randomUUID(), { - input: {}, - }); - await tasks.fetchAirtableDocuments(randomUUID(), { - input: {}, - }); - await tasks.downloadDocuments(randomUUID(), { - input: {}, - }); - await tasks.extractDocuments(randomUUID(), { - input: {}, - }); - await tasks.extractPromises(randomUUID(), { - input: {}, - }); - await tasks.uploadToMeedan(randomUUID(), { - input: {}, - }); + await runTask( + () => tasks.createTenantFromAirtable(randomUUID(), { input: {} }), + "createTenantFromAirtable", + "airtableWorkflow", + ); + await runTask( + () => tasks.createPoliticalEntity(randomUUID(), { input: {} }), + "createPoliticalEntity", + "airtableWorkflow", + ); + await runTask( + () => tasks.fetchAirtableDocuments(randomUUID(), { input: {} }), + "fetchAirtableDocuments", + "airtableWorkflow", + ); + await runTask( + () => tasks.downloadDocuments(randomUUID(), { input: {} }), + "downloadDocuments", + "airtableWorkflow", + ); + await runTask( + () => tasks.extractDocuments(randomUUID(), { input: {} }), + "extractDocuments", + "airtableWorkflow", + ); + await runTask( + () => tasks.extractPromises(randomUUID(), { input: {} }), + "extractPromises", + "airtableWorkflow", + ); + await runTask( + () => tasks.uploadToMeedan(randomUUID(), { input: {} }), + "uploadToMeedan", + "airtableWorkflow", + ); }, } satisfies WorkflowConfig); diff --git a/src/workflows/meedanWorkflow.ts b/src/workflows/meedanWorkflow.ts index 04bc85f3..efc65d6d 100644 --- a/src/workflows/meedanWorkflow.ts +++ b/src/workflows/meedanWorkflow.ts @@ -1,6 +1,6 @@ import { WorkflowConfig } from "payload"; import { randomUUID } from "node:crypto"; -import { defineWorkflow } from "./utils"; +import { defineWorkflow, runTask } from "./utils"; export const meedanWorkflow = defineWorkflow({ slug: "meedanWorkflow", @@ -12,14 +12,20 @@ export const meedanWorkflow = defineWorkflow({ }, ], handler: async ({ tasks }) => { - await tasks.fetchPromiseStatuses(randomUUID(), { - input: {}, - }); - await tasks.updatePromiseStatus(randomUUID(), { - input: {}, - }); - await tasks.syncMeedanPromises(randomUUID(), { - input: {}, - }); + await runTask( + () => tasks.fetchPromiseStatuses(randomUUID(), { input: {} }), + "fetchPromiseStatuses", + "meedanWorkflow", + ); + await runTask( + () => tasks.updatePromiseStatus(randomUUID(), { input: {} }), + "updatePromiseStatus", + "meedanWorkflow", + ); + await runTask( + () => tasks.syncMeedanPromises(randomUUID(), { input: {} }), + "syncMeedanPromises", + "meedanWorkflow", + ); }, } satisfies WorkflowConfig); diff --git a/src/workflows/utils.ts b/src/workflows/utils.ts index 039fafab..f315dcfa 100644 --- a/src/workflows/utils.ts +++ b/src/workflows/utils.ts @@ -2,6 +2,25 @@ import * as Sentry from "@sentry/nextjs"; import type { WorkflowConfig } from "payload"; import { randomUUID } from "node:crypto"; +export const runTask = async ( + fn: () => Promise, + name: string, + workflow: string, +) => { + try { + await fn(); + } catch (error) { + Sentry.logger.error(`[${workflow}] Task "${name}" failed, continuing`, { + task: name, + workflow, + error: error instanceof Error ? error.message : String(error), + }); + Sentry.captureException(error, { + tags: { workflow, task: name }, + }); + } +}; + type WorkflowHandlerFn = NonNullable; type WorkflowHandlerArgs = unknown; From 8d4905592018cd29091baf1ca63ddb316e0bc659 Mon Sep 17 00:00:00 2001 From: kelvin <43873157+kelvinkipruto@users.noreply.github.com> Date: Fri, 6 Mar 2026 17:01:46 +0300 Subject: [PATCH 2/6] docs(workflows): clarify withWorkflowErrorCapture scope after runTask MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Task errors are now swallowed by runTask before they can bubble up, so withWorkflowErrorCapture no longer fires for task failures. Add a comment acknowledging this so it is not mistaken for dead code — it remains as a safety net for errors thrown in the handler setup code itself, outside any runTask call. Co-Authored-By: Claude Sonnet 4.6 --- src/workflows/utils.ts | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/workflows/utils.ts b/src/workflows/utils.ts index f315dcfa..b8f6f72d 100644 --- a/src/workflows/utils.ts +++ b/src/workflows/utils.ts @@ -161,6 +161,10 @@ const withWorkflowContext = ( }; }; +// Note: task-level errors are already caught and reported by runTask, so this +// wrapper will only fire for errors thrown by the handler setup code itself +// (e.g. a bug outside of any runTask call). It is kept as a safety net for +// those rare cases. const withWorkflowErrorCapture = ( slug: string, handler: WorkflowHandlerFn From 4962944493a87aaf3bc785bc5dd15acf498cd4c8 Mon Sep 17 00:00:00 2001 From: kelvin <43873157+kelvinkipruto@users.noreply.github.com> Date: Fri, 6 Mar 2026 17:03:20 +0300 Subject: [PATCH 3/6] fix(workflows): avoid duplicate Sentry reporting in runTask Replaced Sentry.logger.error + captureException with captureException alone. Tasks using withTaskTracing already capture exceptions before re-throwing, so the previous two-call pattern risked triple-reporting the same error. captureException also preserves the full stack trace, which Sentry.logger.error discarded by only passing error.message. Co-Authored-By: Claude Sonnet 4.6 --- src/workflows/utils.ts | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/workflows/utils.ts b/src/workflows/utils.ts index b8f6f72d..57906aae 100644 --- a/src/workflows/utils.ts +++ b/src/workflows/utils.ts @@ -10,11 +10,9 @@ export const runTask = async ( try { await fn(); } catch (error) { - Sentry.logger.error(`[${workflow}] Task "${name}" failed, continuing`, { - task: name, - workflow, - error: error instanceof Error ? error.message : String(error), - }); + // captureException is used here rather than Sentry.logger.error to avoid + // duplicate Sentry issues (logger may auto-capture depending on SDK config), + // and to preserve the full stack trace which logger.error would discard. Sentry.captureException(error, { tags: { workflow, task: name }, }); From 011a071f68f354433d754f4b8307419977ed9444 Mon Sep 17 00:00:00 2001 From: kelvin <43873157+kelvinkipruto@users.noreply.github.com> Date: Fri, 6 Mar 2026 17:05:05 +0300 Subject: [PATCH 4/6] style(workflows): improve code formatting and remove obsolete comment --- src/workflows/utils.ts | 35 +++++++++++++++++------------------ 1 file changed, 17 insertions(+), 18 deletions(-) diff --git a/src/workflows/utils.ts b/src/workflows/utils.ts index 57906aae..37968b74 100644 --- a/src/workflows/utils.ts +++ b/src/workflows/utils.ts @@ -10,9 +10,6 @@ export const runTask = async ( try { await fn(); } catch (error) { - // captureException is used here rather than Sentry.logger.error to avoid - // duplicate Sentry issues (logger may auto-capture depending on SDK config), - // and to preserve the full stack trace which logger.error would discard. Sentry.captureException(error, { tags: { workflow, task: name }, }); @@ -25,7 +22,9 @@ type WorkflowHandlerArgs = unknown; const isObject = (value: unknown): value is Record => typeof value === "object" && value !== null; -const buildSentryExtra = (args: WorkflowHandlerArgs): Record => { +const buildSentryExtra = ( + args: WorkflowHandlerArgs, +): Record => { if (!isObject(args)) { return {}; } @@ -71,7 +70,7 @@ const getRunId = (args: WorkflowHandlerArgs): string => { }; const getTasks = ( - args: WorkflowHandlerArgs + args: WorkflowHandlerArgs, ): Record | undefined => { if (!isObject(args)) { return undefined; @@ -83,7 +82,7 @@ const getTasks = ( const getWorkflowLogContext = ( slug: string, runId: string, - args: WorkflowHandlerArgs + args: WorkflowHandlerArgs, ) => ({ log_source: "payload.workflow", workflow: slug, @@ -93,7 +92,7 @@ const getWorkflowLogContext = ( const withWorkflowContext = ( slug: string, - handler: WorkflowHandlerFn + handler: WorkflowHandlerFn, ): WorkflowHandlerFn => { return async (args) => { if (typeof handler !== "function") { @@ -121,14 +120,14 @@ const withWorkflowContext = ( workflowRunId: runId, }; return ( - original as (id: string, options?: { input?: unknown }) => unknown - )( - id, - { - ...options, - input: { ...input, runContext }, - } - ); + original as ( + id: string, + options?: { input?: unknown }, + ) => unknown + )(id, { + ...options, + input: { ...input, runContext }, + }); }; }, }) @@ -154,7 +153,7 @@ const withWorkflowContext = ( Sentry.logger.info("workflow.complete", logContext); return result; - } + }, ); }; }; @@ -165,7 +164,7 @@ const withWorkflowContext = ( // those rare cases. const withWorkflowErrorCapture = ( slug: string, - handler: WorkflowHandlerFn + handler: WorkflowHandlerFn, ): WorkflowHandlerFn => { if (typeof handler !== "function") { return handler; @@ -201,7 +200,7 @@ export const defineWorkflow = (config: WorkflowConfig): WorkflowConfig => { const wrappedHandler = withWorkflowErrorCapture( config.slug, - withWorkflowContext(config.slug, config.handler as WorkflowHandlerFn) + withWorkflowContext(config.slug, config.handler as WorkflowHandlerFn), ); return { From 855253e5f3ae09aff9825104b581cd627272bbbd Mon Sep 17 00:00:00 2001 From: kelvin <43873157+kelvinkipruto@users.noreply.github.com> Date: Fri, 6 Mar 2026 17:09:05 +0300 Subject: [PATCH 5/6] refactor(workflows): tighten runTask signature and return type MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Drop the workflow parameter — the Sentry span context from withWorkflowContext already carries the workflow name, so passing it explicitly at every call site was redundant noise. - Add explicit Promise return type: true on success, false on failure, making the helper's intent clearer if callers ever need to act on the result. Co-Authored-By: Claude Sonnet 4.6 --- src/workflows/airtableWorkflow.ts | 7 ------- src/workflows/meedanWorkflow.ts | 3 --- src/workflows/utils.ts | 7 ++++--- 3 files changed, 4 insertions(+), 13 deletions(-) diff --git a/src/workflows/airtableWorkflow.ts b/src/workflows/airtableWorkflow.ts index d24caa0d..952147fa 100644 --- a/src/workflows/airtableWorkflow.ts +++ b/src/workflows/airtableWorkflow.ts @@ -15,37 +15,30 @@ export const airtableWorkflow = defineWorkflow({ await runTask( () => tasks.createTenantFromAirtable(randomUUID(), { input: {} }), "createTenantFromAirtable", - "airtableWorkflow", ); await runTask( () => tasks.createPoliticalEntity(randomUUID(), { input: {} }), "createPoliticalEntity", - "airtableWorkflow", ); await runTask( () => tasks.fetchAirtableDocuments(randomUUID(), { input: {} }), "fetchAirtableDocuments", - "airtableWorkflow", ); await runTask( () => tasks.downloadDocuments(randomUUID(), { input: {} }), "downloadDocuments", - "airtableWorkflow", ); await runTask( () => tasks.extractDocuments(randomUUID(), { input: {} }), "extractDocuments", - "airtableWorkflow", ); await runTask( () => tasks.extractPromises(randomUUID(), { input: {} }), "extractPromises", - "airtableWorkflow", ); await runTask( () => tasks.uploadToMeedan(randomUUID(), { input: {} }), "uploadToMeedan", - "airtableWorkflow", ); }, } satisfies WorkflowConfig); diff --git a/src/workflows/meedanWorkflow.ts b/src/workflows/meedanWorkflow.ts index efc65d6d..0d6f4e54 100644 --- a/src/workflows/meedanWorkflow.ts +++ b/src/workflows/meedanWorkflow.ts @@ -15,17 +15,14 @@ export const meedanWorkflow = defineWorkflow({ await runTask( () => tasks.fetchPromiseStatuses(randomUUID(), { input: {} }), "fetchPromiseStatuses", - "meedanWorkflow", ); await runTask( () => tasks.updatePromiseStatus(randomUUID(), { input: {} }), "updatePromiseStatus", - "meedanWorkflow", ); await runTask( () => tasks.syncMeedanPromises(randomUUID(), { input: {} }), "syncMeedanPromises", - "meedanWorkflow", ); }, } satisfies WorkflowConfig); diff --git a/src/workflows/utils.ts b/src/workflows/utils.ts index 37968b74..fb0a02d5 100644 --- a/src/workflows/utils.ts +++ b/src/workflows/utils.ts @@ -5,14 +5,15 @@ import { randomUUID } from "node:crypto"; export const runTask = async ( fn: () => Promise, name: string, - workflow: string, -) => { +): Promise => { try { await fn(); + return true; } catch (error) { Sentry.captureException(error, { - tags: { workflow, task: name }, + tags: { task: name }, }); + return false; } }; From 913c50920c1713b5e0191514bdef7e198ba31708 Mon Sep 17 00:00:00 2001 From: kelvin <43873157+kelvinkipruto@users.noreply.github.com> Date: Fri, 6 Mar 2026 17:17:51 +0300 Subject: [PATCH 6/6] fix: remove duplicate Sentry logging and adjust cleanup schedule Remove duplicate Sentry captureMessage call in task logger to avoid double reporting. Remove Sentry.logger.error call in workflow error capture as Sentry.captureException already handles error reporting. Change cleanup failed jobs schedule from daily to hourly to improve system maintenance. --- src/tasks/cleanupFailedJobs.ts | 2 +- src/tasks/utils.ts | 5 ----- src/workflows/utils.ts | 4 ---- 3 files changed, 1 insertion(+), 10 deletions(-) diff --git a/src/tasks/cleanupFailedJobs.ts b/src/tasks/cleanupFailedJobs.ts index 0444681a..80ddfff6 100644 --- a/src/tasks/cleanupFailedJobs.ts +++ b/src/tasks/cleanupFailedJobs.ts @@ -19,7 +19,7 @@ export const CleanupFailedJobs: TaskConfig<"cleanupFailedJobs"> = { retries: 1, schedule: [ { - cron: "0 0 * * *", + cron: "0 * * * *", queue: getCleanupQueue(), }, ], diff --git a/src/tasks/utils.ts b/src/tasks/utils.ts index 945b41ff..9d012891 100644 --- a/src/tasks/utils.ts +++ b/src/tasks/utils.ts @@ -96,11 +96,6 @@ const createTaskLogger = ( ...context, ...extra, }); - Sentry.captureMessage(message, { - level: level as Sentry.SeverityLevel, - ...context, - ...extra, - }); }; return { diff --git a/src/workflows/utils.ts b/src/workflows/utils.ts index fb0a02d5..c0242b52 100644 --- a/src/workflows/utils.ts +++ b/src/workflows/utils.ts @@ -178,10 +178,6 @@ const withWorkflowErrorCapture = ( const runId = getRunId(args); const logContext = getWorkflowLogContext(slug, runId, args); - Sentry.logger.error("workflow.error", { - ...logContext, - error: error instanceof Error ? error.message : String(error ?? ""), - }); Sentry.captureException(error, { tags: { workflow: slug,