Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/tasks/cleanupFailedJobs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ export const CleanupFailedJobs: TaskConfig<"cleanupFailedJobs"> = {
retries: 1,
schedule: [
{
cron: "0 0 * * *",
cron: "0 * * * *",
queue: getCleanupQueue(),
},
],
Expand Down
5 changes: 0 additions & 5 deletions src/tasks/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,6 @@ const createTaskLogger = (
...context,
...extra,
});
Sentry.captureMessage(message, {
level: level as Sentry.SeverityLevel,
...context,
...extra,
});
};

return {
Expand Down
51 changes: 29 additions & 22 deletions src/workflows/airtableWorkflow.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { WorkflowConfig } from "payload";
import { randomUUID } from "node:crypto";
import { defineWorkflow } from "./utils";
import { defineWorkflow, runTask } from "./utils";

export const airtableWorkflow = defineWorkflow({
slug: "airtableWorkflow",
Expand All @@ -12,26 +12,33 @@ export const airtableWorkflow = defineWorkflow({
},
],
handler: async ({ tasks }) => {
await tasks.createTenantFromAirtable(randomUUID(), {
input: {},
});
await tasks.createPoliticalEntity(randomUUID(), {
input: {},
});
await tasks.fetchAirtableDocuments(randomUUID(), {
input: {},
});
await tasks.downloadDocuments(randomUUID(), {
input: {},
});
await tasks.extractDocuments(randomUUID(), {
input: {},
});
await tasks.extractPromises(randomUUID(), {
input: {},
});
await tasks.uploadToMeedan(randomUUID(), {
input: {},
});
await runTask(
() => tasks.createTenantFromAirtable(randomUUID(), { input: {} }),
"createTenantFromAirtable",
);
await runTask(
() => tasks.createPoliticalEntity(randomUUID(), { input: {} }),
"createPoliticalEntity",
);
await runTask(
() => tasks.fetchAirtableDocuments(randomUUID(), { input: {} }),
"fetchAirtableDocuments",
);
await runTask(
() => tasks.downloadDocuments(randomUUID(), { input: {} }),
"downloadDocuments",
);
await runTask(
() => tasks.extractDocuments(randomUUID(), { input: {} }),
"extractDocuments",
);
await runTask(
() => tasks.extractPromises(randomUUID(), { input: {} }),
"extractPromises",
);
await runTask(
() => tasks.uploadToMeedan(randomUUID(), { input: {} }),
"uploadToMeedan",
);
},
} satisfies WorkflowConfig);
23 changes: 13 additions & 10 deletions src/workflows/meedanWorkflow.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { WorkflowConfig } from "payload";
import { randomUUID } from "node:crypto";
import { defineWorkflow } from "./utils";
import { defineWorkflow, runTask } from "./utils";

export const meedanWorkflow = defineWorkflow({
slug: "meedanWorkflow",
Expand All @@ -12,14 +12,17 @@ export const meedanWorkflow = defineWorkflow({
},
],
handler: async ({ tasks }) => {
await tasks.fetchPromiseStatuses(randomUUID(), {
input: {},
});
await tasks.updatePromiseStatus(randomUUID(), {
input: {},
});
await tasks.syncMeedanPromises(randomUUID(), {
input: {},
});
await runTask(
() => tasks.fetchPromiseStatuses(randomUUID(), { input: {} }),
"fetchPromiseStatuses",
);
await runTask(
() => tasks.updatePromiseStatus(randomUUID(), { input: {} }),
"updatePromiseStatus",
);
await runTask(
() => tasks.syncMeedanPromises(randomUUID(), { input: {} }),
"syncMeedanPromises",
);
},
} satisfies WorkflowConfig);
55 changes: 36 additions & 19 deletions src/workflows/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,30 @@ import * as Sentry from "@sentry/nextjs";
import type { WorkflowConfig } from "payload";
import { randomUUID } from "node:crypto";

export const runTask = async (
fn: () => Promise<unknown>,
name: string,
): Promise<boolean> => {
try {
await fn();
return true;
} catch (error) {
Sentry.captureException(error, {
tags: { task: name },
});
return false;
}
};

type WorkflowHandlerFn = NonNullable<WorkflowConfig["handler"]>;
type WorkflowHandlerArgs = unknown;

const isObject = (value: unknown): value is Record<string, unknown> =>
typeof value === "object" && value !== null;

const buildSentryExtra = (args: WorkflowHandlerArgs): Record<string, unknown> => {
const buildSentryExtra = (
args: WorkflowHandlerArgs,
): Record<string, unknown> => {
if (!isObject(args)) {
return {};
}
Expand Down Expand Up @@ -54,7 +71,7 @@ const getRunId = (args: WorkflowHandlerArgs): string => {
};

const getTasks = (
args: WorkflowHandlerArgs
args: WorkflowHandlerArgs,
): Record<string, unknown> | undefined => {
if (!isObject(args)) {
return undefined;
Expand All @@ -66,7 +83,7 @@ const getTasks = (
const getWorkflowLogContext = (
slug: string,
runId: string,
args: WorkflowHandlerArgs
args: WorkflowHandlerArgs,
) => ({
log_source: "payload.workflow",
workflow: slug,
Expand All @@ -76,7 +93,7 @@ const getWorkflowLogContext = (

const withWorkflowContext = (
slug: string,
handler: WorkflowHandlerFn
handler: WorkflowHandlerFn,
): WorkflowHandlerFn => {
return async (args) => {
if (typeof handler !== "function") {
Expand Down Expand Up @@ -104,14 +121,14 @@ const withWorkflowContext = (
workflowRunId: runId,
};
return (
original as (id: string, options?: { input?: unknown }) => unknown
)(
id,
{
...options,
input: { ...input, runContext },
}
);
original as (
id: string,
options?: { input?: unknown },
) => unknown
)(id, {
...options,
input: { ...input, runContext },
});
};
},
})
Expand All @@ -137,14 +154,18 @@ const withWorkflowContext = (
Sentry.logger.info("workflow.complete", logContext);

return result;
}
},
);
};
};

// Note: task-level errors are already caught and reported by runTask, so this
// wrapper will only fire for errors thrown by the handler setup code itself
// (e.g. a bug outside of any runTask call). It is kept as a safety net for
// those rare cases.
const withWorkflowErrorCapture = (
slug: string,
handler: WorkflowHandlerFn
handler: WorkflowHandlerFn,
): WorkflowHandlerFn => {
if (typeof handler !== "function") {
return handler;
Expand All @@ -157,10 +178,6 @@ const withWorkflowErrorCapture = (
const runId = getRunId(args);
const logContext = getWorkflowLogContext(slug, runId, args);

Sentry.logger.error("workflow.error", {
...logContext,
error: error instanceof Error ? error.message : String(error ?? ""),
});
Sentry.captureException(error, {
tags: {
workflow: slug,
Expand All @@ -180,7 +197,7 @@ export const defineWorkflow = (config: WorkflowConfig): WorkflowConfig => {

const wrappedHandler = withWorkflowErrorCapture(
config.slug,
withWorkflowContext(config.slug, config.handler as WorkflowHandlerFn)
withWorkflowContext(config.slug, config.handler as WorkflowHandlerFn),
);

return {
Expand Down