From 02ab874dda40256828ce5fbd64c97568054970da Mon Sep 17 00:00:00 2001 From: NightWatcher Date: Mon, 18 May 2026 05:24:27 +0800 Subject: [PATCH 1/3] Add Codex steer for queued messages --- cli/src/codex/appServerTypes.ts | 11 ++++ cli/src/codex/codexAppServerClient.ts | 9 +++ cli/src/codex/codexRemoteLauncher.ts | 32 +++++++++++ cli/src/utils/MessageQueue2.test.ts | 26 ++++++++- cli/src/utils/MessageQueue2.ts | 13 +++++ hub/src/sync/rpcGateway.ts | 7 +++ hub/src/sync/syncEngine.ts | 57 +++++++++++++++++++ hub/src/web/routes/messages.ts | 17 ++++++ shared/src/schemas.ts | 11 ++++ web/src/api/client.ts | 10 +++- .../AssistantChat/QueuedMessagesBar.tsx | 36 ++++++++++++ .../hooks/mutations/useSteerQueuedMessage.ts | 52 +++++++++++++++++ 12 files changed, 279 insertions(+), 2 deletions(-) create mode 100644 web/src/hooks/mutations/useSteerQueuedMessage.ts diff --git a/cli/src/codex/appServerTypes.ts b/cli/src/codex/appServerTypes.ts index c5ceb1e3e..93113ee4c 100644 --- a/cli/src/codex/appServerTypes.ts +++ b/cli/src/codex/appServerTypes.ts @@ -176,6 +176,17 @@ export interface TurnStartResponse { [key: string]: unknown; } +export interface TurnSteerParams { + threadId: string; + input: UserInput[]; + expectedTurnId: string; +} + +export interface TurnSteerResponse { + turnId: string; + [key: string]: unknown; +} + export interface TurnInterruptParams { threadId: string; turnId: string; diff --git a/cli/src/codex/codexAppServerClient.ts b/cli/src/codex/codexAppServerClient.ts index 758b5605d..5a253ebb4 100644 --- a/cli/src/codex/codexAppServerClient.ts +++ b/cli/src/codex/codexAppServerClient.ts @@ -13,6 +13,8 @@ import type { ThreadResumeResponse, TurnStartParams, TurnStartResponse, + TurnSteerParams, + TurnSteerResponse, TurnInterruptParams, TurnInterruptResponse, ThreadCompactStartParams, @@ -194,6 +196,13 @@ export class CodexAppServerClient { return response as TurnStartResponse; } + async steerTurn(params: TurnSteerParams): Promise { + const response = await this.sendRequest('turn/steer', params, { + timeoutMs: 30_000 + }); + return response as TurnSteerResponse; + } + async interruptTurn(params: TurnInterruptParams): Promise { const response = await this.sendRequest('turn/interrupt', params, { timeoutMs: 30_000 diff --git a/cli/src/codex/codexRemoteLauncher.ts b/cli/src/codex/codexRemoteLauncher.ts index 581ace5f1..2987abe0a 100644 --- a/cli/src/codex/codexRemoteLauncher.ts +++ b/cli/src/codex/codexRemoteLauncher.ts @@ -2276,6 +2276,38 @@ class CodexRemoteLauncher extends RemoteLauncherBase { onSwitch: () => this.handleSwitchRequest() }); + session.client.rpcHandlerManager.registerHandler('steer-queued-message', async (payload: unknown) => { + const record = asRecord(payload); + const localId = asString(record?.localId); + if (!localId) { + return { status: 'failed', error: 'Invalid localId' }; + } + + if (!turnInFlight || !this.currentThreadId || !this.currentTurnId) { + return { status: 'not-active' }; + } + + const item = session.queue.takeByLocalId(localId); + if (!item) { + return { status: 'not-found' }; + } + + try { + await appServerClient.steerTurn({ + threadId: this.currentThreadId, + expectedTurnId: this.currentTurnId, + input: [{ type: 'text', text: item.message }] + }); + return { status: 'steered', localId }; + } catch (error) { + session.queue.unshift(item.message, item.mode, item.localId); + return { + status: 'failed', + error: error instanceof Error ? error.message : String(error) + }; + } + }); + function logActiveHandles(tag: string) { if (!process.env.DEBUG) return; const anyProc: any = process as any; diff --git a/cli/src/utils/MessageQueue2.test.ts b/cli/src/utils/MessageQueue2.test.ts index 8c03f7a67..aef618ac9 100644 --- a/cli/src/utils/MessageQueue2.test.ts +++ b/cli/src/utils/MessageQueue2.test.ts @@ -551,6 +551,30 @@ describe('MessageQueue2', () => { }); }); + describe('takeByLocalId', () => { + it('should remove and return the message with matching localId', () => { + const queue = new MessageQueue2(mode => mode); + queue.push('msg1', 'local', 'id-abc'); + queue.push('msg2', 'remote', 'id-def'); + + const taken = queue.takeByLocalId('id-def'); + expect(taken?.message).toBe('msg2'); + expect(taken?.mode).toBe('remote'); + expect(taken?.localId).toBe('id-def'); + expect(queue.size()).toBe(1); + expect(queue.queue[0].localId).toBe('id-abc'); + }); + + it('should return null when localId is not found', () => { + const queue = new MessageQueue2(mode => mode); + queue.push('msg1', 'local', 'id-abc'); + + const taken = queue.takeByLocalId('missing'); + expect(taken).toBeNull(); + expect(queue.size()).toBe(1); + }); + }); + it('should differentiate between pushImmediate and pushIsolateAndClear behavior', async () => { const queue = new MessageQueue2<{ type: string }>((mode) => mode.type); @@ -581,4 +605,4 @@ describe('MessageQueue2', () => { expect(batch3?.message).toBe('after-isolated'); expect(batch3?.mode.type).toBe('B'); }); -}); \ No newline at end of file +}); diff --git a/cli/src/utils/MessageQueue2.ts b/cli/src/utils/MessageQueue2.ts index fb59ec162..ff7dead53 100644 --- a/cli/src/utils/MessageQueue2.ts +++ b/cli/src/utils/MessageQueue2.ts @@ -196,6 +196,19 @@ export class MessageQueue2 { return true; } + /** + * Remove and return the first queued message that matches the given localId. + * Used when a caller needs to consume a queued item through a path other + * than the normal batch drain, such as steering it into an active turn. + */ + takeByLocalId(localId: string): QueueItem | null { + if (!localId) return null; + const idx = this.queue.findIndex(item => item.localId === localId); + if (idx === -1) return null; + const [item] = this.queue.splice(idx, 1); + return item ?? null; + } + /** * Reset the queue - clears all messages and resets to empty state */ diff --git a/hub/src/sync/rpcGateway.ts b/hub/src/sync/rpcGateway.ts index 896431382..3dd1a51db 100644 --- a/hub/src/sync/rpcGateway.ts +++ b/hub/src/sync/rpcGateway.ts @@ -122,6 +122,13 @@ export class RpcGateway { await this.sessionRpc(sessionId, 'abort', { reason: 'User aborted via Telegram Bot' }) } + async steerQueuedMessage( + sessionId: string, + localId: string + ): Promise { + return await this.sessionRpc(sessionId, 'steer-queued-message', { localId }) + } + async switchSession(sessionId: string, to: 'remote' | 'local'): Promise { await this.sessionRpc(sessionId, 'switch', { to }) } diff --git a/hub/src/sync/syncEngine.ts b/hub/src/sync/syncEngine.ts index d807b5d3c..c6b198064 100644 --- a/hub/src/sync/syncEngine.ts +++ b/hub/src/sync/syncEngine.ts @@ -8,6 +8,7 @@ */ import type { CodexCollaborationMode, DecryptedMessage, PermissionMode, Session, SyncEvent } from '@hapi/protocol/types' +import type { SteerQueuedMessageResponse } from '@hapi/protocol/schemas' import type { Server } from 'socket.io' import type { Store, CancelQueuedMessageResult } from '../store' import type { RpcRegistry } from '../socket/rpcRegistry' @@ -329,6 +330,62 @@ export class SyncEngine { this.messageService.sweepImmediateQueuedOnSessionEnd(sessionId, invokedAt) } + async steerQueuedMessage( + sessionId: string, + messageId: string + ): Promise { + const lookup = this.store.messages.lookupQueuedMessage(sessionId, messageId) + if (lookup.status === 'absent') { + return { status: 'absent' } + } + if (lookup.status === 'invoked') { + return lookup + } + if (!lookup.localId) { + return { status: 'failed', error: 'Queued message has no localId' } + } + + let result: unknown + try { + result = await this.rpcGateway.steerQueuedMessage(sessionId, lookup.localId) + } catch (error) { + return { + status: 'failed', + error: error instanceof Error ? error.message : String(error) + } + } + + if (!result || typeof result !== 'object') { + return { status: 'failed', error: 'Unexpected steer response' } + } + + const status = (result as { status?: unknown }).status + if (status === 'steered') { + const invokedAt = Date.now() + this.store.messages.markMessagesInvoked(sessionId, [lookup.localId], invokedAt) + this.recordSessionActivity(sessionId, invokedAt) + this.eventPublisher.emit({ + type: 'messages-consumed', + sessionId, + localIds: [lookup.localId], + invokedAt + }) + return { status: 'steered', localId: lookup.localId, invokedAt } + } + if (status === 'not-active' || status === 'not-found') { + return { status } + } + if (status === 'failed') { + const error = (result as { error?: unknown }).error + return { + status: 'failed', + error: typeof error === 'string' ? error : 'Steer failed' + } + } + + return { status: 'failed', error: 'Unexpected steer response' } + } + async approvePermission( sessionId: string, requestId: string, diff --git a/hub/src/web/routes/messages.ts b/hub/src/web/routes/messages.ts index cb1d83708..ee82c1023 100644 --- a/hub/src/web/routes/messages.ts +++ b/hub/src/web/routes/messages.ts @@ -90,6 +90,23 @@ export function createMessagesRoutes(getSyncEngine: () => SyncEngine | null): Ho return c.json(result) }) + app.post('/sessions/:id/messages/:messageId/steer', async (c) => { + const engine = requireSyncEngine(c, getSyncEngine) + if (engine instanceof Response) { + return engine + } + + const sessionResult = requireSessionFromParam(c, engine, { requireActive: true }) + if (sessionResult instanceof Response) { + return sessionResult + } + const sessionId = sessionResult.sessionId + const messageId = c.req.param('messageId') + + const result = await engine.steerQueuedMessage(sessionId, messageId) + return c.json(result) + }) + app.post('/sessions/:id/messages', async (c) => { const engine = requireSyncEngine(c, getSyncEngine) if (engine instanceof Response) { diff --git a/shared/src/schemas.ts b/shared/src/schemas.ts index aa634bd1f..040ede5ee 100644 --- a/shared/src/schemas.ts +++ b/shared/src/schemas.ts @@ -285,3 +285,14 @@ export const CancelMessageResponseSchema = z.discriminatedUnion('status', [ ]) export type CancelMessageResponse = z.infer + +export const SteerQueuedMessageResponseSchema = z.discriminatedUnion('status', [ + z.object({ status: z.literal('steered'), localId: z.string(), invokedAt: z.number() }), + z.object({ status: z.literal('invoked'), message: DecryptedMessageSchema }), + z.object({ status: z.literal('absent') }), + z.object({ status: z.literal('not-active') }), + z.object({ status: z.literal('not-found') }), + z.object({ status: z.literal('failed'), error: z.string() }), +]) + +export type SteerQueuedMessageResponse = z.infer diff --git a/web/src/api/client.ts b/web/src/api/client.ts index c4637adb6..2aa9f6ba3 100644 --- a/web/src/api/client.ts +++ b/web/src/api/client.ts @@ -25,7 +25,7 @@ import type { SessionResponse, SessionsResponse } from '@/types/api' -import type { CancelMessageResponse } from '@hapi/protocol/schemas' +import type { CancelMessageResponse, SteerQueuedMessageResponse } from '@hapi/protocol/schemas' type ApiClientOptions = { baseUrl?: string @@ -342,6 +342,14 @@ export class ApiClient { return response as CancelMessageResponse } + async steerQueuedMessage(sessionId: string, messageId: string): Promise { + const response = await this.request( + `/api/sessions/${encodeURIComponent(sessionId)}/messages/${encodeURIComponent(messageId)}/steer`, + { method: 'POST', body: JSON.stringify({}) } + ) + return response as SteerQueuedMessageResponse + } + async abortSession(sessionId: string): Promise { await this.request(`/api/sessions/${encodeURIComponent(sessionId)}/abort`, { method: 'POST', diff --git a/web/src/components/AssistantChat/QueuedMessagesBar.tsx b/web/src/components/AssistantChat/QueuedMessagesBar.tsx index 5866a224a..0c68e60cf 100644 --- a/web/src/components/AssistantChat/QueuedMessagesBar.tsx +++ b/web/src/components/AssistantChat/QueuedMessagesBar.tsx @@ -7,6 +7,7 @@ import { EMPTY_STATE } from '@/hooks/queries/useMessages' import { normalizeDecryptedMessage } from '@/chat/normalize' import type { DecryptedMessage } from '@/types/api' import { useCancelQueuedMessage } from '@/hooks/mutations/useCancelQueuedMessage' +import { useSteerQueuedMessage } from '@/hooks/mutations/useSteerQueuedMessage' import { useTranslation } from '@/lib/use-translation' import { useToast } from '@/lib/toast-context' import type { PendingSchedule } from '@/components/AssistantChat/ScheduleTimePicker' @@ -180,6 +181,7 @@ export function QueuedMessagesBar({ const queued = useQueuedMessages(sessionId) const assistantApi = useAssistantApi() const cancelMutation = useCancelQueuedMessage(api) + const steerMutation = useSteerQueuedMessage(api) const { t } = useTranslation() const { addToast } = useToast() @@ -206,7 +208,9 @@ export function QueuedMessagesBar({ const text = getTextFromMessage(msg) const localId = msg.localId ?? msg.id const isPending = cancelMutation.isPending && cancelMutation.variables?.localId === localId + const isSteering = steerMutation.isPending && steerMutation.variables?.localId === localId const canCancel = computeCanCancel({ id: msg.id, localId: msg.localId, isPending }) + const canSteer = computeCanCancel({ id: msg.id, localId: msg.localId, isPending: isPending || isSteering }) const handleCancel = () => { if (!canCancel) return @@ -255,6 +259,14 @@ export function QueuedMessagesBar({ ) } + const handleSteer = () => { + if (!canSteer) return + steerMutation.mutate({ + sessionId, + messageId: msg.id, + localId, + }) + } const canEdit = canCancel return ( @@ -276,6 +288,30 @@ export function QueuedMessagesBar({ )}
+
- + + {isSteering ? '引导中' : '引导'} + + ) : null} ) : null}