Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions cli/src/codex/appServerTypes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,17 @@ export interface TurnStartResponse {
[key: string]: unknown;
}

export interface TurnSteerParams {
threadId: string;
input: UserInput[];
expectedTurnId: string;
}

export interface TurnSteerResponse {
turnId: string;
[key: string]: unknown;
}

export interface TurnInterruptParams {
threadId: string;
turnId: string;
Expand Down
9 changes: 9 additions & 0 deletions cli/src/codex/codexAppServerClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import type {
ThreadResumeResponse,
TurnStartParams,
TurnStartResponse,
TurnSteerParams,
TurnSteerResponse,
TurnInterruptParams,
TurnInterruptResponse,
ThreadCompactStartParams,
Expand Down Expand Up @@ -194,6 +196,13 @@ export class CodexAppServerClient {
return response as TurnStartResponse;
}

async steerTurn(params: TurnSteerParams): Promise<TurnSteerResponse> {
const response = await this.sendRequest('turn/steer', params, {
timeoutMs: 30_000
});
return response as TurnSteerResponse;
}

async interruptTurn(params: TurnInterruptParams): Promise<TurnInterruptResponse> {
const response = await this.sendRequest('turn/interrupt', params, {
timeoutMs: 30_000
Expand Down
32 changes: 32 additions & 0 deletions cli/src/codex/codexRemoteLauncher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2276,6 +2276,38 @@ class CodexRemoteLauncher extends RemoteLauncherBase {
onSwitch: () => this.handleSwitchRequest()
});

session.client.rpcHandlerManager.registerHandler('steer-queued-message', async (payload: unknown) => {
const record = asRecord(payload);
const localId = asString(record?.localId);
if (!localId) {
return { status: 'failed', error: 'Invalid localId' };
}

if (!turnInFlight || !this.currentThreadId || !this.currentTurnId) {
return { status: 'not-active' };
}

const item = session.queue.takeByLocalId(localId);
if (!item) {
return { status: 'not-found' };
}

try {
await appServerClient.steerTurn({
threadId: this.currentThreadId,
expectedTurnId: this.currentTurnId,
input: [{ type: 'text', text: item.message }]
});
return { status: 'steered', localId };
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[MAJOR] After a successful turn/steer, this path returns an RPC response and relies on the hub to mark the queued row invoked. If the Socket.IO ack is lost or the hub-side 30s RPC timeout wins just as the nested turn/steer succeeds, the item has already been removed from the CLI queue but the DB row stays queued, so the UI can offer/send the same prompt again.

Suggested fix:

await appServerClient.steerTurn({
    threadId: this.currentThreadId,
    expectedTurnId: this.currentTurnId,
    input: [{ type: 'text', text: item.message }]
});
session.queue.onBatchConsumed?.([localId]);
return { status: 'steered', localId };

} catch (error) {
session.queue.unshift(item.message, item.mode, item.localId, item.isolate);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[MAJOR] This restores a failed steer with unshift(), which changes the original queue order whenever the steered item was not already first. Example: queue [A, B, C], steer B, turn/steer fails, then this line restores B as [B, A, C]; the next normal drain sends prompts out of order.

Suggested fix:

const taken = session.queue.takeByLocalId(localId);
if (!taken) return { status: 'not-found' };

try {
    await appServerClient.steerTurn(...);
} catch (error) {
    session.queue.restoreAt(taken.index, taken.item);
    return { status: 'failed', error: error instanceof Error ? error.message : String(error) };
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[MAJOR] This restores a failed steer with unshift(), which changes the original queue order whenever the steered item was not already first. Example: queue [A, B, C], steer B, turn/steer fails, then this line restores B as [B, A, C]; the next normal drain sends prompts out of order.

Suggested fix:

const taken = session.queue.takeByLocalId(localId);
if (!taken) return { status: 'not-found' };

try {
    await appServerClient.steerTurn(...);
    return { status: 'steered', localId };
} catch (error) {
    session.queue.restoreAt(taken.index, taken.item);
    return { status: 'failed', error: error instanceof Error ? error.message : String(error) };
}

return {
status: 'failed',
error: error instanceof Error ? error.message : String(error)
};
}
});

function logActiveHandles(tag: string) {
if (!process.env.DEBUG) return;
const anyProc: any = process as any;
Expand Down
40 changes: 39 additions & 1 deletion cli/src/utils/MessageQueue2.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,30 @@ describe('MessageQueue2', () => {
});
});

describe('takeByLocalId', () => {
it('should remove and return the message with matching localId', () => {
const queue = new MessageQueue2<string>(mode => mode);
queue.push('msg1', 'local', 'id-abc');
queue.push('msg2', 'remote', 'id-def');

const taken = queue.takeByLocalId('id-def');
expect(taken?.message).toBe('msg2');
expect(taken?.mode).toBe('remote');
expect(taken?.localId).toBe('id-def');
expect(queue.size()).toBe(1);
expect(queue.queue[0].localId).toBe('id-abc');
});

it('should return null when localId is not found', () => {
const queue = new MessageQueue2<string>(mode => mode);
queue.push('msg1', 'local', 'id-abc');

const taken = queue.takeByLocalId('missing');
expect(taken).toBeNull();
expect(queue.size()).toBe(1);
});
});

it('should differentiate between pushImmediate and pushIsolateAndClear behavior', async () => {
const queue = new MessageQueue2<{ type: string }>((mode) => mode.type);

Expand Down Expand Up @@ -581,4 +605,18 @@ describe('MessageQueue2', () => {
expect(batch3?.message).toBe('after-isolated');
expect(batch3?.mode.type).toBe('B');
});
});

it('should preserve isolation when unshifting a message with isolate=true', async () => {
const queue = new MessageQueue2<string>(mode => mode);

queue.unshift('isolated', 'local', 'id-isolated', true);
queue.push('after', 'local', 'id-after');

const batch1 = await queue.waitForMessagesAndGetAsString();
expect(batch1?.message).toBe('isolated');
expect(batch1?.isolate).toBe(true);

const batch2 = await queue.waitForMessagesAndGetAsString();
expect(batch2?.message).toBe('after');
});
});
17 changes: 15 additions & 2 deletions cli/src/utils/MessageQueue2.ts
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ export class MessageQueue2<T> {
/**
* Push a message to the beginning of the queue with a mode.
*/
unshift(message: string, mode: T, localId?: string): void {
unshift(message: string, mode: T, localId?: string, isolate = false): void {
if (this.closed) {
throw new Error('Cannot unshift to closed queue');
}
Expand All @@ -163,7 +163,7 @@ export class MessageQueue2<T> {
mode,
modeHash,
localId,
isolate: false
isolate
});

// Trigger message handler if set
Expand Down Expand Up @@ -196,6 +196,19 @@ export class MessageQueue2<T> {
return true;
}

/**
* Remove and return the first queued message that matches the given localId.
* Used when a caller needs to consume a queued item through a path other
* than the normal batch drain, such as steering it into an active turn.
*/
takeByLocalId(localId: string): QueueItem<T> | null {
if (!localId) return null;
const idx = this.queue.findIndex(item => item.localId === localId);
if (idx === -1) return null;
const [item] = this.queue.splice(idx, 1);
return item ?? null;
}

/**
* Reset the queue - clears all messages and resets to empty state
*/
Expand Down
7 changes: 7 additions & 0 deletions hub/src/sync/rpcGateway.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,13 @@ export class RpcGateway {
await this.sessionRpc(sessionId, 'abort', { reason: 'User aborted via Telegram Bot' })
}

async steerQueuedMessage(
sessionId: string,
localId: string
): Promise<unknown> {
return await this.sessionRpc(sessionId, 'steer-queued-message', { localId })
}

async switchSession(sessionId: string, to: 'remote' | 'local'): Promise<void> {
await this.sessionRpc(sessionId, 'switch', { to })
}
Expand Down
57 changes: 57 additions & 0 deletions hub/src/sync/syncEngine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
*/

import type { CodexCollaborationMode, DecryptedMessage, PermissionMode, Session, SyncEvent } from '@hapi/protocol/types'
import type { SteerQueuedMessageResponse } from '@hapi/protocol/schemas'
import type { Server } from 'socket.io'
import type { Store, CancelQueuedMessageResult } from '../store'
import type { RpcRegistry } from '../socket/rpcRegistry'
Expand Down Expand Up @@ -329,6 +330,62 @@ export class SyncEngine {
this.messageService.sweepImmediateQueuedOnSessionEnd(sessionId, invokedAt)
}

async steerQueuedMessage(
sessionId: string,
messageId: string
): Promise<SteerQueuedMessageResponse> {
const lookup = this.store.messages.lookupQueuedMessage(sessionId, messageId)
if (lookup.status === 'absent') {
return { status: 'absent' }
}
if (lookup.status === 'invoked') {
return lookup
}
if (!lookup.localId) {
return { status: 'failed', error: 'Queued message has no localId' }
}

let result: unknown
try {
result = await this.rpcGateway.steerQueuedMessage(sessionId, lookup.localId)
} catch (error) {
return {
status: 'failed',
error: error instanceof Error ? error.message : String(error)
}
}

if (!result || typeof result !== 'object') {
return { status: 'failed', error: 'Unexpected steer response' }
}

const status = (result as { status?: unknown }).status
if (status === 'steered') {
const invokedAt = Date.now()
this.store.messages.markMessagesInvoked(sessionId, [lookup.localId], invokedAt)
this.recordSessionActivity(sessionId, invokedAt)
this.eventPublisher.emit({
type: 'messages-consumed',
sessionId,
localIds: [lookup.localId],
invokedAt
})
return { status: 'steered', localId: lookup.localId, invokedAt }
}
if (status === 'not-active' || status === 'not-found') {
return { status }
}
if (status === 'failed') {
const error = (result as { error?: unknown }).error
return {
status: 'failed',
error: typeof error === 'string' ? error : 'Steer failed'
}
}

return { status: 'failed', error: 'Unexpected steer response' }
}

async approvePermission(
sessionId: string,
requestId: string,
Expand Down
17 changes: 17 additions & 0 deletions hub/src/web/routes/messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,23 @@ export function createMessagesRoutes(getSyncEngine: () => SyncEngine | null): Ho
return c.json(result)
})

app.post('/sessions/:id/messages/:messageId/steer', async (c) => {
const engine = requireSyncEngine(c, getSyncEngine)
if (engine instanceof Response) {
return engine
}

const sessionResult = requireSessionFromParam(c, engine, { requireActive: true })
if (sessionResult instanceof Response) {
return sessionResult
}
const sessionId = sessionResult.sessionId
const messageId = c.req.param('messageId')

const result = await engine.steerQueuedMessage(sessionId, messageId)
return c.json(result)
})

app.post('/sessions/:id/messages', async (c) => {
const engine = requireSyncEngine(c, getSyncEngine)
if (engine instanceof Response) {
Expand Down
11 changes: 11 additions & 0 deletions shared/src/schemas.ts
Original file line number Diff line number Diff line change
Expand Up @@ -290,3 +290,14 @@ export const CancelMessageResponseSchema = z.discriminatedUnion('status', [
])

export type CancelMessageResponse = z.infer<typeof CancelMessageResponseSchema>

export const SteerQueuedMessageResponseSchema = z.discriminatedUnion('status', [
z.object({ status: z.literal('steered'), localId: z.string(), invokedAt: z.number() }),
z.object({ status: z.literal('invoked'), message: DecryptedMessageSchema }),
z.object({ status: z.literal('absent') }),
z.object({ status: z.literal('not-active') }),
z.object({ status: z.literal('not-found') }),
z.object({ status: z.literal('failed'), error: z.string() }),
])

export type SteerQueuedMessageResponse = z.infer<typeof SteerQueuedMessageResponseSchema>
10 changes: 9 additions & 1 deletion web/src/api/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import type {
SessionResponse,
SessionsResponse
} from '@/types/api'
import type { CancelMessageResponse } from '@hapi/protocol/schemas'
import type { CancelMessageResponse, SteerQueuedMessageResponse } from '@hapi/protocol/schemas'

type ApiClientOptions = {
baseUrl?: string
Expand Down Expand Up @@ -342,6 +342,14 @@ export class ApiClient {
return response as CancelMessageResponse
}

async steerQueuedMessage(sessionId: string, messageId: string): Promise<SteerQueuedMessageResponse> {
const response = await this.request(
`/api/sessions/${encodeURIComponent(sessionId)}/messages/${encodeURIComponent(messageId)}/steer`,
{ method: 'POST', body: JSON.stringify({}) }
)
return response as SteerQueuedMessageResponse
}

async abortSession(sessionId: string): Promise<void> {
await this.request(`/api/sessions/${encodeURIComponent(sessionId)}/abort`, {
method: 'POST',
Expand Down
Loading
Loading