diff --git a/src/tasks/cleanupFailedJobs.ts b/src/tasks/cleanupFailedJobs.ts index 0444681a..80ddfff6 100644 --- a/src/tasks/cleanupFailedJobs.ts +++ b/src/tasks/cleanupFailedJobs.ts @@ -19,7 +19,7 @@ export const CleanupFailedJobs: TaskConfig<"cleanupFailedJobs"> = { retries: 1, schedule: [ { - cron: "0 0 * * *", + cron: "0 * * * *", queue: getCleanupQueue(), }, ], diff --git a/src/tasks/utils.ts b/src/tasks/utils.ts index 945b41ff..9d012891 100644 --- a/src/tasks/utils.ts +++ b/src/tasks/utils.ts @@ -96,11 +96,6 @@ const createTaskLogger = ( ...context, ...extra, }); - Sentry.captureMessage(message, { - level: level as Sentry.SeverityLevel, - ...context, - ...extra, - }); }; return { diff --git a/src/workflows/airtableWorkflow.ts b/src/workflows/airtableWorkflow.ts index f623c9a2..952147fa 100644 --- a/src/workflows/airtableWorkflow.ts +++ b/src/workflows/airtableWorkflow.ts @@ -1,6 +1,6 @@ import { WorkflowConfig } from "payload"; import { randomUUID } from "node:crypto"; -import { defineWorkflow } from "./utils"; +import { defineWorkflow, runTask } from "./utils"; export const airtableWorkflow = defineWorkflow({ slug: "airtableWorkflow", @@ -12,26 +12,33 @@ export const airtableWorkflow = defineWorkflow({ }, ], handler: async ({ tasks }) => { - await tasks.createTenantFromAirtable(randomUUID(), { - input: {}, - }); - await tasks.createPoliticalEntity(randomUUID(), { - input: {}, - }); - await tasks.fetchAirtableDocuments(randomUUID(), { - input: {}, - }); - await tasks.downloadDocuments(randomUUID(), { - input: {}, - }); - await tasks.extractDocuments(randomUUID(), { - input: {}, - }); - await tasks.extractPromises(randomUUID(), { - input: {}, - }); - await tasks.uploadToMeedan(randomUUID(), { - input: {}, - }); + await runTask( + () => tasks.createTenantFromAirtable(randomUUID(), { input: {} }), + "createTenantFromAirtable", + ); + await runTask( + () => tasks.createPoliticalEntity(randomUUID(), { input: {} }), + "createPoliticalEntity", + ); + await runTask( + () => tasks.fetchAirtableDocuments(randomUUID(), { input: {} }), + "fetchAirtableDocuments", + ); + await runTask( + () => tasks.downloadDocuments(randomUUID(), { input: {} }), + "downloadDocuments", + ); + await runTask( + () => tasks.extractDocuments(randomUUID(), { input: {} }), + "extractDocuments", + ); + await runTask( + () => tasks.extractPromises(randomUUID(), { input: {} }), + "extractPromises", + ); + await runTask( + () => tasks.uploadToMeedan(randomUUID(), { input: {} }), + "uploadToMeedan", + ); }, } satisfies WorkflowConfig); diff --git a/src/workflows/meedanWorkflow.ts b/src/workflows/meedanWorkflow.ts index 04bc85f3..0d6f4e54 100644 --- a/src/workflows/meedanWorkflow.ts +++ b/src/workflows/meedanWorkflow.ts @@ -1,6 +1,6 @@ import { WorkflowConfig } from "payload"; import { randomUUID } from "node:crypto"; -import { defineWorkflow } from "./utils"; +import { defineWorkflow, runTask } from "./utils"; export const meedanWorkflow = defineWorkflow({ slug: "meedanWorkflow", @@ -12,14 +12,17 @@ export const meedanWorkflow = defineWorkflow({ }, ], handler: async ({ tasks }) => { - await tasks.fetchPromiseStatuses(randomUUID(), { - input: {}, - }); - await tasks.updatePromiseStatus(randomUUID(), { - input: {}, - }); - await tasks.syncMeedanPromises(randomUUID(), { - input: {}, - }); + await runTask( + () => tasks.fetchPromiseStatuses(randomUUID(), { input: {} }), + "fetchPromiseStatuses", + ); + await runTask( + () => tasks.updatePromiseStatus(randomUUID(), { input: {} }), + "updatePromiseStatus", + ); + await runTask( + () => tasks.syncMeedanPromises(randomUUID(), { input: {} }), + "syncMeedanPromises", + ); }, } satisfies WorkflowConfig); diff --git a/src/workflows/utils.ts b/src/workflows/utils.ts index 039fafab..c0242b52 100644 --- a/src/workflows/utils.ts +++ b/src/workflows/utils.ts @@ -2,13 +2,30 @@ import * as Sentry from "@sentry/nextjs"; import type { WorkflowConfig } from "payload"; import { randomUUID } from "node:crypto"; +export const runTask = async ( + fn: () => Promise, + name: string, +): Promise => { + try { + await fn(); + return true; + } catch (error) { + Sentry.captureException(error, { + tags: { task: name }, + }); + return false; + } +}; + type WorkflowHandlerFn = NonNullable; type WorkflowHandlerArgs = unknown; const isObject = (value: unknown): value is Record => typeof value === "object" && value !== null; -const buildSentryExtra = (args: WorkflowHandlerArgs): Record => { +const buildSentryExtra = ( + args: WorkflowHandlerArgs, +): Record => { if (!isObject(args)) { return {}; } @@ -54,7 +71,7 @@ const getRunId = (args: WorkflowHandlerArgs): string => { }; const getTasks = ( - args: WorkflowHandlerArgs + args: WorkflowHandlerArgs, ): Record | undefined => { if (!isObject(args)) { return undefined; @@ -66,7 +83,7 @@ const getTasks = ( const getWorkflowLogContext = ( slug: string, runId: string, - args: WorkflowHandlerArgs + args: WorkflowHandlerArgs, ) => ({ log_source: "payload.workflow", workflow: slug, @@ -76,7 +93,7 @@ const getWorkflowLogContext = ( const withWorkflowContext = ( slug: string, - handler: WorkflowHandlerFn + handler: WorkflowHandlerFn, ): WorkflowHandlerFn => { return async (args) => { if (typeof handler !== "function") { @@ -104,14 +121,14 @@ const withWorkflowContext = ( workflowRunId: runId, }; return ( - original as (id: string, options?: { input?: unknown }) => unknown - )( - id, - { - ...options, - input: { ...input, runContext }, - } - ); + original as ( + id: string, + options?: { input?: unknown }, + ) => unknown + )(id, { + ...options, + input: { ...input, runContext }, + }); }; }, }) @@ -137,14 +154,18 @@ const withWorkflowContext = ( Sentry.logger.info("workflow.complete", logContext); return result; - } + }, ); }; }; +// Note: task-level errors are already caught and reported by runTask, so this +// wrapper will only fire for errors thrown by the handler setup code itself +// (e.g. a bug outside of any runTask call). It is kept as a safety net for +// those rare cases. const withWorkflowErrorCapture = ( slug: string, - handler: WorkflowHandlerFn + handler: WorkflowHandlerFn, ): WorkflowHandlerFn => { if (typeof handler !== "function") { return handler; @@ -157,10 +178,6 @@ const withWorkflowErrorCapture = ( const runId = getRunId(args); const logContext = getWorkflowLogContext(slug, runId, args); - Sentry.logger.error("workflow.error", { - ...logContext, - error: error instanceof Error ? error.message : String(error ?? ""), - }); Sentry.captureException(error, { tags: { workflow: slug, @@ -180,7 +197,7 @@ export const defineWorkflow = (config: WorkflowConfig): WorkflowConfig => { const wrappedHandler = withWorkflowErrorCapture( config.slug, - withWorkflowContext(config.slug, config.handler as WorkflowHandlerFn) + withWorkflowContext(config.slug, config.handler as WorkflowHandlerFn), ); return {