Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 150 additions & 5 deletions cloudflare-gastown/src/dos/Town.do.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import * as scheduling from './town/scheduling';
import * as events from './town/events';
import * as reconciler from './town/reconciler';
import { applyAction } from './town/actions';
import type { ApplyActionContext } from './town/actions';
import type { Action, ApplyActionContext } from './town/actions';
import { buildRefinerySystemPrompt } from '../prompts/refinery-system.prompt';
import { GitHubPRStatusSchema, GitLabMRStatusSchema } from '../util/platform-pr.util';

Expand All @@ -46,6 +46,7 @@ import { review_metadata } from '../db/tables/review-metadata.table';
import { escalation_metadata } from '../db/tables/escalation-metadata.table';
import { convoy_metadata } from '../db/tables/convoy-metadata.table';
import { bead_dependencies } from '../db/tables/bead-dependencies.table';
import { town_events, TownEventRecord } from '../db/tables/town-events.table';
import {
agent_nudges,
AgentNudgeRecord,
Expand Down Expand Up @@ -2891,10 +2892,16 @@ export class TownDO extends DurableObject<Env> {
townId,
row.bead_id
);
events.upsertContainerStatus(this.sql, row.bead_id, {
status: containerInfo.status,
exit_reason: containerInfo.exitReason,
});
// Skip inserting events for 'running' — it's the steady-state and
// a no-op in applyEvent, so recording it just bloats the event table
// (~720 events/hour/agent). Non-running statuses (stopped, error,
// unknown) still get inserted so the reconciler can detect and handle them.
if (containerInfo.status !== 'running') {
events.upsertContainerStatus(this.sql, row.bead_id, {
status: containerInfo.status,
exit_reason: containerInfo.exitReason,
});
}
} catch (err) {
console.warn(
`${TOWN_LOG} alarm: container status check failed for agent=${row.bead_id}`,
Expand Down Expand Up @@ -3684,6 +3691,144 @@ export class TownDO extends DurableObject<Env> {
};
}

// DEBUG: replay events from a time range, apply them to state, run the
// reconciler, and return computed actions. Uses a savepoint + rollback so
// no state is permanently modified.
async debugReplayEvents(
from: string,
to: string
): Promise<{
eventsReplayed: number;
actions: Action[];
stateSnapshot: {
agents: unknown[];
nonTerminalBeads: unknown[];
};
}> {
this.sql.exec('SAVEPOINT debug_replay_events');
try {
// Query ALL events in the time range regardless of processed_at
const rangeEvents = TownEventRecord.array().parse([
...query(
this.sql,
/* sql */ `
SELECT ${town_events.event_id}, ${town_events.event_type},
${town_events.agent_id}, ${town_events.bead_id},
${town_events.payload}, ${town_events.created_at},
${town_events.processed_at}
FROM ${town_events}
WHERE ${town_events.created_at} >= ?
AND ${town_events.created_at} <= ?
ORDER BY ${town_events.created_at} ASC
`,
[from, to]
),
]);

// Apply each event to reconstruct state transitions
for (const event of rangeEvents) {
reconciler.applyEvent(this.sql, event);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WARNING: Replaying onto live state double-applies historical events

debugReplayEvents() starts from the current town tables, then runs each selected event through non-idempotent handlers like reviewQueue.agentDone() and completeReviewWithResult(). If the selected window has already been processed, this can target different beads than it originally did and return actions/snapshots that never would have existed at from. A real replay needs to start from a state snapshot taken before the requested range (or otherwise reset the affected state before applying the events).

}

// Run reconciler against the resulting state
const actions = reconciler.reconcile(this.sql);

// Capture a state snapshot before rollback
const agentSnapshot = [
...query(
this.sql,
/* sql */ `
SELECT ${agent_metadata.bead_id},
${agent_metadata.role},
${agent_metadata.status},
${agent_metadata.current_hook_bead_id},
${agent_metadata.dispatch_attempts},
${agent_metadata.last_activity_at}
FROM ${agent_metadata}
`,
[]
),
];

const beadSnapshot = [
...query(
this.sql,
/* sql */ `
SELECT ${beads.bead_id},
${beads.type},
${beads.status},
${beads.title},
${beads.assignee_agent_bead_id},
${beads.updated_at}
FROM ${beads}
WHERE ${beads.status} NOT IN ('closed', 'failed')
AND ${beads.type} != 'agent'
ORDER BY ${beads.type}, ${beads.status}
`,
[]
),
];

return {
eventsReplayed: rangeEvents.length,
actions,
stateSnapshot: {
agents: agentSnapshot,
nonTerminalBeads: beadSnapshot,
},
};
} finally {
this.sql.exec('ROLLBACK TO SAVEPOINT debug_replay_events');
this.sql.exec('RELEASE SAVEPOINT debug_replay_events');
}
}

// DEBUG: dry-run the reconciler against current state, returning actions
// it would emit without applying them. Drains pending events first (same
// as the real alarm loop) inside a savepoint that is rolled back, so the
// endpoint remains fully side-effect-free.
async debugDryRun(): Promise<{
actions: Action[];
metrics: Pick<
reconciler.ReconcilerMetrics,
'actionsEmitted' | 'actionsByType' | 'pendingEventCount' | 'eventsDrained'
>;
}> {
// Use a savepoint so we can drain events (which mutates state)
// then roll back without permanent side effects
this.sql.exec('SAVEPOINT debug_dry_run');
try {
// Phase 0: Drain and apply pending events (same as real alarm loop)
const pending = events.drainEvents(this.sql);
for (const event of pending) {
reconciler.applyEvent(this.sql, event);
events.markProcessed(this.sql, event.event_id);
}

// Phase 1: Reconcile against now-current state
const actions = reconciler.reconcile(this.sql);
const pendingEventCount = events.pendingEventCount(this.sql);
const actionsByType: Record<string, number> = {};
for (const a of actions) {
actionsByType[a.type] = (actionsByType[a.type] ?? 0) + 1;
}

return {
actions,
metrics: {
actionsEmitted: actions.length,
actionsByType,
pendingEventCount,
eventsDrained: pending.length,
},
};
} finally {
// Roll back all state mutations — this is a dry run
this.sql.exec('ROLLBACK TO SAVEPOINT debug_dry_run');
this.sql.exec('RELEASE SAVEPOINT debug_dry_run');
}
}

// DEBUG: concise non-terminal bead summary — remove after debugging
async debugBeadSummary(): Promise<unknown[]> {
return [
Expand Down
28 changes: 28 additions & 0 deletions cloudflare-gastown/src/gastown.worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,34 @@ app.get('/debug/towns/:townId/status', async c => {
return c.json({ alarmStatus, agentMeta, beadSummary });
});

app.post('/debug/towns/:townId/reconcile-dry-run', async c => {
const townId = c.req.param('townId');
const town = getTownDOStub(c.env, townId);
// eslint-disable-next-line @typescript-eslint/await-thenable -- DO RPC returns promise at runtime
const result = await town.debugDryRun();
return c.json(result);
});

app.post('/debug/towns/:townId/replay-events', async c => {
const townId = c.req.param('townId');
const body = (await c.req.json()) as { from?: string; to?: string };
if (!body.from || !body.to) {
return c.json({ error: 'Missing required fields: from, to (ISO timestamps)' }, 400);
}
const fromDate = new Date(body.from);
const toDate = new Date(body.to);
if (Number.isNaN(fromDate.getTime()) || Number.isNaN(toDate.getTime())) {
return c.json({ error: 'Invalid date format. Use ISO 8601 timestamps.' }, 400);
}
if (fromDate > toDate) {
return c.json({ error: '"from" must be before or equal to "to"' }, 400);
}
const town = getTownDOStub(c.env, townId);
// eslint-disable-next-line @typescript-eslint/await-thenable -- DO RPC returns promise at runtime
const result = await town.debugReplayEvents(body.from, body.to);
return c.json(result);
});

// ── Town ID + Auth ──────────────────────────────────────────────────────
// All rig routes live under /api/towns/:townId/rigs/:rigId so the townId
// is always available from the URL path.
Expand Down
5 changes: 4 additions & 1 deletion src/app/(app)/claw/components/CreateInstanceCard.tsx
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
'use client';

import { useEffect, useMemo, useRef, useState } from 'react';
import { usePostHog } from 'posthog-js/react';
import { useFeatureFlagVariantKey, usePostHog } from 'posthog-js/react';
import { useQuery } from '@tanstack/react-query';
import { toast } from 'sonner';
import type { useKiloClawMutations } from '@/hooks/useKiloClaw';
Expand All @@ -25,6 +25,9 @@ export function CreateInstanceCard({
mutations: ClawMutations;
onProvisionStart?: () => void;
}) {
// Evaluate the landing-page experiment flag so PostHog attaches
// $feature/button-vs-card to events fired in this component.
useFeatureFlagVariantKey('button-vs-card');
const posthog = usePostHog();
const trpc = useTRPC();
const { data: billingStatus } = useQuery(trpc.kiloclaw.getBillingStatus.queryOptions());
Expand Down
Loading