Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions backend/drizzle/0018_add-subworkflow-linkage.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
ALTER TABLE workflow_runs
ADD COLUMN parent_run_id text REFERENCES workflow_runs(run_id) ON DELETE SET NULL;

ALTER TABLE workflow_runs
ADD COLUMN parent_node_ref text;

CREATE INDEX IF NOT EXISTS workflow_runs_parent_run_idx ON workflow_runs(parent_run_id);

2 changes: 2 additions & 0 deletions backend/src/database/schema/workflow-runs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ export const workflowRunsTable = pgTable('workflow_runs', {
workflowVersionId: uuid('workflow_version_id'),
workflowVersion: integer('workflow_version'),
temporalRunId: text('temporal_run_id'),
parentRunId: text('parent_run_id'),
parentNodeRef: text('parent_node_ref'),
totalActions: integer('total_actions').notNull().default(0),
inputs: jsonb('inputs').$type<Record<string, unknown>>().notNull().default({}),
triggerType: text('trigger_type').notNull().default('manual'),
Expand Down
2 changes: 2 additions & 0 deletions backend/src/workflows/dto/workflow-graph.dto.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ export const PrepareRunRequestSchema = BaseRunWorkflowRequestSchema.extend({
trigger: ExecutionTriggerMetadataSchema.optional(),
runId: z.string().optional(),
idempotencyKey: z.string().trim().min(1).max(128).optional(),
parentRunId: z.string().optional(),
parentNodeRef: z.string().optional(),
}).refine(validateVersionSelection, 'Provide either version or versionId, not both');

export class PrepareRunRequestDto extends createZodDto(PrepareRunRequestSchema) {}
Expand Down
2 changes: 2 additions & 0 deletions backend/src/workflows/internal-runs.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ export class InternalRunsController {
nodeOverrides: body.nodeOverrides ?? undefined,
runId: body.runId,
idempotencyKey: body.idempotencyKey,
parentRunId: body.parentRunId,
parentNodeRef: body.parentNodeRef,
},
);

Expand Down
33 changes: 32 additions & 1 deletion backend/src/workflows/repository/workflow-run.repository.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Inject, Injectable } from '@nestjs/common';
import { and, desc, eq, sql } from 'drizzle-orm';
import { and, desc, eq, sql, type SQL } from 'drizzle-orm';
import { NodePgDatabase } from 'drizzle-orm/node-postgres';

import { DRIZZLE_TOKEN } from '../../database/database.module';
Expand All @@ -17,6 +17,8 @@ interface CreateWorkflowRunInput {
workflowVersionId: string;
workflowVersion: number;
temporalRunId?: string | null;
parentRunId?: string | null;
parentNodeRef?: string | null;
totalActions: number;
inputs: Record<string, unknown>;
organizationId?: string | null;
Expand Down Expand Up @@ -48,6 +50,12 @@ export class WorkflowRunRepository {
updatedAt: new Date(),
organizationId: input.organizationId ?? null,
};
if (input.parentRunId !== undefined) {
values.parentRunId = input.parentRunId ?? null;
}
if (input.parentNodeRef !== undefined) {
values.parentNodeRef = input.parentNodeRef ?? null;
}

if (input.temporalRunId !== undefined) {
values.temporalRunId = input.temporalRunId;
Expand All @@ -66,6 +74,12 @@ export class WorkflowRunRepository {
updatedAt: new Date(),
organizationId: input.organizationId ?? null,
};
if (input.parentRunId !== undefined) {
updateValues.parentRunId = input.parentRunId ?? null;
}
if (input.parentNodeRef !== undefined) {
updateValues.parentNodeRef = input.parentNodeRef ?? null;
}

if (input.temporalRunId !== undefined) {
updateValues.temporalRunId = input.temporalRunId;
Expand Down Expand Up @@ -123,6 +137,23 @@ export class WorkflowRunRepository {
.limit(options.limit ?? 50);
}

async listChildren(
parentRunId: string,
options: { organizationId?: string | null; limit?: number } = {},
): Promise<WorkflowRunRecord[]> {
const conditions: SQL[] = [eq(workflowRunsTable.parentRunId, parentRunId)];
if (options.organizationId) {
conditions.push(eq(workflowRunsTable.organizationId, options.organizationId));
}

return this.db
.select()
.from(workflowRunsTable)
.where(and(...conditions))
.orderBy(desc(workflowRunsTable.createdAt))
.limit(options.limit ?? 200);
}

async hasPendingInputs(runId: string): Promise<boolean> {
const [result] = await this.db
.select({ count: sql<number>`count(*)` })
Expand Down
31 changes: 31 additions & 0 deletions backend/src/workflows/workflows.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,37 @@ export class WorkflowsController {
return this.workflowsService.getRun(runId, auth);
}

@Get('/runs/:runId/children')
@ApiOkResponse({
description: 'List direct child workflow runs spawned by a parent run',
schema: {
type: 'object',
properties: {
runs: {
type: 'array',
items: {
type: 'object',
properties: {
runId: { type: 'string' },
workflowId: { type: 'string' },
workflowName: { type: 'string' },
parentNodeRef: { type: 'string', nullable: true },
status: { type: 'string' },
startedAt: { type: 'string', format: 'date-time' },
completedAt: { type: 'string', format: 'date-time', nullable: true },
},
},
},
},
},
})
async listChildRuns(
@CurrentAuth() auth: AuthContext | null,
@Param('runId') runId: string,
) {
return this.workflowsService.listChildRuns(runId, auth);
}

@Get(':id')
@ApiOkResponse({ type: WorkflowResponseDto })
async findOne(
Expand Down
46 changes: 46 additions & 0 deletions backend/src/workflows/workflows.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ export interface WorkflowRunSummary {
triggerSource?: string | null;
triggerLabel?: string | null;
inputPreview: ExecutionInputPreview;
parentRunId?: string | null;
parentNodeRef?: string | null;
}

const SHIPSEC_WORKFLOW_TYPE = 'shipsecWorkflowRun';
Expand Down Expand Up @@ -494,6 +496,8 @@ export class WorkflowsService {
triggerSource,
triggerLabel,
inputPreview,
parentRunId: run.parentRunId ?? null,
parentNodeRef: run.parentNodeRef ?? null,
};
}

Expand All @@ -519,6 +523,44 @@ export class WorkflowsService {
return { runs: summaries };
}

async listChildRuns(
parentRunId: string,
auth?: AuthContext | null,
options: { limit?: number } = {},
): Promise<{
runs: Array<{
runId: string;
workflowId: string;
workflowName: string;
parentNodeRef: string | null;
status: ExecutionStatus;
startedAt: string;
completedAt?: string;
}>;
}> {
const { organizationId } = await this.requireRunAccess(parentRunId, auth);
const children = await this.runRepository.listChildren(parentRunId, {
organizationId,
limit: options.limit,
});

const summaries = await Promise.all(
children.map((run) => this.buildRunSummary(run, organizationId)),
);

const runs = summaries.map((summary, index) => ({
runId: summary.id,
workflowId: summary.workflowId,
workflowName: summary.workflowName,
parentNodeRef: children[index]?.parentNodeRef ?? null,
status: summary.status,
startedAt: new Date(summary.startTime).toISOString(),
completedAt: summary.endTime ? new Date(summary.endTime).toISOString() : undefined,
}));

return { runs };
}

async getRun(runId: string, auth?: AuthContext | null): Promise<WorkflowRunSummary> {
const organizationId = this.requireOrganizationId(auth);
const run = await this.runRepository.findByRunId(runId, { organizationId });
Expand Down Expand Up @@ -713,6 +755,8 @@ export class WorkflowsService {
nodeOverrides?: Record<string, Record<string, unknown>>;
runId?: string;
idempotencyKey?: string;
parentRunId?: string;
parentNodeRef?: string;
} = {},
): Promise<PreparedRunPayload> {
const organizationId = this.requireOrganizationId(auth);
Expand Down Expand Up @@ -763,6 +807,8 @@ export class WorkflowsService {
triggerSource: triggerMetadata.sourceId,
triggerLabel: triggerMetadata.label,
inputPreview,
parentRunId: options.parentRunId,
parentNodeRef: options.parentNodeRef,
});

this.analyticsService.trackWorkflowStarted({
Expand Down
Loading