Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,6 @@ describe('Concurrent Operations Tests', () => {
expect(run1.flow_slug).toBe(flow1.slug);
expect(run2.flow_slug).toBe(flow2.slug);

// Give realtime subscriptions time to establish
await new Promise(resolve => setTimeout(resolve, 300));

// Get and complete tasks from both flows
console.log('=== Completing steps ===');

Expand Down Expand Up @@ -203,9 +200,6 @@ describe('Concurrent Operations Tests', () => {
const uniqueRunIds = [...new Set(runIds)];
expect(uniqueRunIds.length).toBe(3);

// Give subscriptions time to establish
await new Promise(resolve => setTimeout(resolve, 300));

// Poll for all tasks and complete them sequentially for reliability
const allTasks = await readAndStart(sql, sqlClient, testFlow.slug, 5, 5);
expect(allTasks.length).toBe(3); // One task per run
Expand Down Expand Up @@ -280,9 +274,6 @@ describe('Concurrent Operations Tests', () => {
const runA = await pgflowClient.startFlow(flowA.slug, { type: 'flow-a' });
const runB = await pgflowClient.startFlow(flowB.slug, { type: 'flow-b' });

// Give subscriptions time to establish
await new Promise(resolve => setTimeout(resolve, 300));

// Get tasks from both flows
const tasksA = await readAndStart(sql, sqlClient, flowA.slug, 2, 5);
const tasksB = await readAndStart(sql, sqlClient, flowB.slug, 2, 5);
Expand Down
3 changes: 0 additions & 3 deletions pkgs/client/__tests__/integration/full-stack-dsl.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,6 @@ describe('Full Stack DSL Integration', () => {
expect(run.flow_slug).toBe(SimpleFlow.slug);
expect(run.input).toEqual(input);

// Give realtime subscription time to establish
await new Promise((resolve) => setTimeout(resolve, 200));

// 7. Execute the complete flow lifecycle
console.log('=== Step 1: Completing fetch step ===');
let tasks = await readAndStart(sql, sqlClient, SimpleFlow.slug, 1, 5);
Expand Down
3 changes: 0 additions & 3 deletions pkgs/client/__tests__/integration/happy-path-e2e.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,6 @@ describe('Happy Path E2E Integration', () => {
expect(run.status).toBe(FlowRunStatus.Started);
expect(run.input).toEqual(input);

// Give realtime subscription time to establish
await new Promise(resolve => setTimeout(resolve, 200));

// Step 1: Complete fetch step
console.log('=== Step 1: Completing fetch step ===');
let tasks = await readAndStart(sql, sqlClient, testFlow.slug, 1, 5);
Expand Down
6 changes: 0 additions & 6 deletions pkgs/client/__tests__/integration/input-validation.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -215,9 +215,6 @@ describe('Input Validation', () => {
run.on('*', runTracker.callback);
run.step('producer').on('*', stepTracker.callback);

// Give realtime subscription time to establish
await new Promise(resolve => setTimeout(resolve, 100));

// Execute the producer step
const tasks = await readAndStart(sql, sqlClient, testFlow.slug, 1, 5);
expect(tasks).toHaveLength(1);
Expand Down Expand Up @@ -294,9 +291,6 @@ describe('Input Validation', () => {

const run = await pgflowClient.startFlow(testFlow.slug, { data: 'test' });

// Give realtime subscription time to establish
await new Promise(resolve => setTimeout(resolve, 100));

// Execute the producer step
const tasks = await readAndStart(sql, sqlClient, testFlow.slug, 1, 5);
expect(tasks).toHaveLength(1);
Expand Down
6 changes: 0 additions & 6 deletions pkgs/client/__tests__/integration/network-resilience.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,6 @@ describe('Network Resilience Tests', () => {
}
});

// Give subscription time to establish
await new Promise((resolve) => setTimeout(resolve, 200));

// Complete first step before disconnection
let tasks = await readAndStart(sql, sqlClient, testFlow.slug, 1, 5);
expect(tasks).toHaveLength(1);
Expand Down Expand Up @@ -151,9 +148,6 @@ describe('Network Resilience Tests', () => {
});
}

// Give subscription time to establish
await new Promise((resolve) => setTimeout(resolve, 200));

// Complete step while monitoring connection
const tasks = await readAndStart(sql, sqlClient, testFlow.slug, 1, 5);
expect(tasks).toHaveLength(1);
Expand Down
12 changes: 0 additions & 12 deletions pkgs/client/__tests__/integration/real-flow-execution.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,6 @@ describe('Real Flow Execution', () => {
expect(run.run_id).toBeDefined();
expect(run.flow_slug).toBe(testFlow.slug);

// Give realtime subscription time to establish properly
await new Promise((resolve) => setTimeout(resolve, 2000));

// Poll for task
const tasks = await readAndStart(sql, sqlClient, testFlow.slug, 1, 5);

Expand Down Expand Up @@ -111,9 +108,6 @@ describe('Real Flow Execution', () => {
const step = run.step('event_step');
step.on('*', stepTracker.callback);

// Give realtime subscription time to establish
await new Promise((resolve) => setTimeout(resolve, 100));

// Poll and complete task
const tasks = await readAndStart(sql, sqlClient, testFlow.slug, 1, 5);
await sqlClient.completeTask(tasks[0], { hello: 'world' });
Expand Down Expand Up @@ -217,9 +211,6 @@ describe('Real Flow Execution', () => {
const tracker = createEventTracker();
dependentStep.on('*', tracker.callback);

// Give realtime subscription time to establish
await new Promise((resolve) => setTimeout(resolve, 100));

// Complete root step - this will trigger dependent_step to start
const rootTasks = await readAndStart(sql, sqlClient, testFlow.slug, 1, 5);
expect(rootTasks[0].step_slug).toBe('root_step');
Expand Down Expand Up @@ -401,9 +392,6 @@ describe('Real Flow Execution', () => {
expect(step.status).toBe(FlowStepStatus.Started);
expect(step.started_at).toBeDefined();

// Give realtime subscription time to establish
await new Promise((resolve) => setTimeout(resolve, 100));

// waitForStatus should resolve immediately since step is already Started
const waitPromise = step.waitForStatus(FlowStepStatus.Started, {
timeoutMs: 5000,
Expand Down
3 changes: 0 additions & 3 deletions pkgs/client/__tests__/integration/realtime-send.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,6 @@ describe('Realtime Send Integration', () => {
await subscriptionPromise;
console.log('Channel fully subscribed and ready');

// Additional wait to ensure realtime connection is stable
await new Promise((resolve) => setTimeout(resolve, 200));

// 4. Send event via SQL realtime.send() function
const dbPayload = {
test_message: 'Hello from SQL realtime.send()!',
Expand Down
9 changes: 0 additions & 9 deletions pkgs/client/__tests__/integration/reconnection.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,6 @@ describe('Reconnection Integration Tests', () => {
reconnectionEvents.push(event.event_type);
});

// Give realtime subscription time to establish
await new Promise((resolve) => setTimeout(resolve, 200));

// Simulate network interruption by creating a new client
// This forces the underlying channel to be recreated
const newSupabaseClient = createTestSupabaseClient();
Expand Down Expand Up @@ -110,9 +107,6 @@ describe('Reconnection Integration Tests', () => {
expect(run.status).toBe(FlowRunStatus.Started);
expect(run.input).toEqual(input);

// Give initial subscription time to establish
await new Promise((resolve) => setTimeout(resolve, 200));

// Simulate disconnection by disposing the client and creating a new one
pgflowClient.dispose(run.run_id);

Expand Down Expand Up @@ -174,9 +168,6 @@ describe('Reconnection Integration Tests', () => {
const input = { data: 'rapid-test' };
const originalRun = await pgflowClient.startFlow(testFlow.slug, input);

// Give initial subscription time to establish
await new Promise((resolve) => setTimeout(resolve, 200));

// Create multiple run instances rapidly (simulates rapid reconnections)
const runs = await Promise.all([
pgflowClient.getRun(originalRun.run_id),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,6 @@ describe('Step Failed Event Broadcasting', () => {
step.on('*', stepTracker.callback);
run.on('*', runTracker.callback);

// Give realtime subscription time to establish
await new Promise(resolve => setTimeout(resolve, 100));

// Poll and start the task (uses pgmq.read_with_poll and pgflow.start_tasks internally)
const tasks = await readAndStart(sql, sqlClient, testFlow.slug, 1, 5);
expect(tasks).toHaveLength(1);
Expand Down
12 changes: 0 additions & 12 deletions pkgs/client/__tests__/integration/wait-for-status-failure.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,6 @@ describe('waitForStatus - Failure Scenarios', () => {
const stepTracker = createEventTracker();
step.on('*', stepTracker.callback);

// Give realtime subscription time to establish
await new Promise(resolve => setTimeout(resolve, 100));

// Start waiting for Failed status (before the step actually fails)
const waitPromise = step.waitForStatus(FlowStepStatus.Failed, { timeoutMs: 10000 });

Expand Down Expand Up @@ -90,9 +87,6 @@ describe('waitForStatus - Failure Scenarios', () => {
const runTracker = createEventTracker();
run.on('*', runTracker.callback);

// Give realtime subscription time to establish
await new Promise(resolve => setTimeout(resolve, 100));

// Start waiting for Failed status (before the run actually fails)
const waitPromise = run.waitForStatus(FlowRunStatus.Failed, { timeoutMs: 10000 });

Expand Down Expand Up @@ -139,9 +133,6 @@ describe('waitForStatus - Failure Scenarios', () => {
const run = await pgflowClient.startFlow(testFlow.slug, { test: 'timeout' });
const step = run.step('normal_step');

// Give realtime subscription time to establish
await new Promise(resolve => setTimeout(resolve, 100));

// Wait for Failed status with a short timeout (step will complete normally, not fail)
const waitPromise = step.waitForStatus(FlowStepStatus.Failed, { timeoutMs: 2000 });

Expand Down Expand Up @@ -186,9 +177,6 @@ describe('waitForStatus - Failure Scenarios', () => {
run.on('*', runTracker.callback);
step.on('*', stepTracker.callback);

// Give realtime subscription time to establish
await new Promise(resolve => setTimeout(resolve, 100));

// Wait for both step and run to fail
const stepWaitPromise = step.waitForStatus(FlowStepStatus.Failed, { timeoutMs: 10000 });
const runWaitPromise = run.waitForStatus(FlowRunStatus.Failed, { timeoutMs: 10000 });
Expand Down
8 changes: 7 additions & 1 deletion pkgs/client/src/lib/SupabaseBroadcastAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,13 @@ export class SupabaseBroadcastAdapter implements IFlowRealtime {

// Wait for the 'SUBSCRIBED' acknowledgment to avoid race conditions
await subscriptionPromise;


// Stabilization delay - known Supabase Realtime limitation
// The SUBSCRIBED event is emitted before backend routing is fully established.
// This delay ensures the backend can receive messages sent immediately after subscription.
// See: https://github.com/supabase/supabase-js/issues/1599
await new Promise(resolve => setTimeout(resolve, 300));

this.#channels.set(run_id, channel);

const unsubscribe = () => this.unsubscribe(run_id);
Expand Down
Loading
Loading