Skip to content

feat(go/ai/exp): add DefineAgent and DefineCustomAgent#4462

Open
apascal07 wants to merge 74 commits into
ap/go-bidifrom
ap/go-session-flow
Open

feat(go/ai/exp): add DefineAgent and DefineCustomAgent#4462
apascal07 wants to merge 74 commits into
ap/go-bidifrom
ap/go-session-flow

Conversation

@apascal07
Copy link
Copy Markdown
Collaborator

@apascal07 apascal07 commented Feb 6, 2026

Adds an experimental Agent API (ai/exp) for multi-turn conversations with automatic snapshot management and optional background execution via Detach, built on top of the bidirectional streaming primitives added upstream.

Docs PR: genkit-ai/docsite#238

Examples

Inline-prompt Agent

DefineAgent is the default path for prompt-backed agents. The third argument is an aix.AgentSource that selects how the prompt is configured: aix.FromInline defines it inline alongside the agent (the prompt is registered under the agent's name), aix.FromPrompt references an existing prompt already in the registry under the same name. Agent-level options (store, snapshot policy, state transform) follow as a typed variadic.

chatAgent := genkit.DefineAgent(g, "chat",
    aix.FromInline(
        ai.WithModelName("googleai/gemini-flash-latest"),
        ai.WithSystem("You are a sarcastic pirate. Keep responses concise."),
    ),
    aix.WithSessionStore(aix.NewInMemorySessionStore[any]()),
)

conn, _ := chatAgent.StreamBidi(ctx)
conn.SendText("What is Go?")
for chunk, _ := range conn.Receive() {
    if chunk.ModelChunk != nil { fmt.Print(chunk.ModelChunk.Text()) }
    if chunk.TurnEnd != nil { break }
}
conn.Close()

The State type parameter is inferred from the typed agent options (aix.WithSessionStore, aix.WithSnapshotOn, etc.), so the explicit DefineAgent[State] is only needed when no typed option is supplied. A typed aix.AgentOption[OtherState] accidentally passed alongside an AgentOption[State] is a compile-time error.


Existing-prompt Agent

aix.FromPrompt attaches the agent to a prompt already registered with the registry (via DefinePrompt or a .prompt file). The agent and its backing prompt always share a name.

# prompts/chat.prompt
---
model: googleai/gemini-flash-latest
input:
  schema:
    personality: string
  default:
    personality: a helpful assistant
---
{{ role "system" }}
You are {{ personality }}. Keep responses concise.
type ChatInput struct {
    Personality string `json:"personality"`
}

chatAgent := genkit.DefineAgent(g, "chat",
    aix.FromPrompt(ChatInput{Personality: "a sarcastic pirate"}),
    aix.WithSessionStore(aix.NewInMemorySessionStore[any]()),
)

defaultInput is rendered through the prompt on every turn; the prompt's Render is invoked once at definition time as a smoke check, so a defaultInput that fails the prompt's input schema panics here rather than failing on the first invocation. For prompts that take no input, call aix.FromPrompt() with no arguments.


Custom Agent

DefineCustomAgent hands you the turn loop. Use it when you need control before or after the generate call (set up expensive clients, route through different models per turn, clean up in-progress state before returning the final outcome).

chatAgent := genkit.DefineCustomAgent(g, "chat",
    func(ctx context.Context, resp aix.Responder[any], sess *aix.SessionRunner[struct{}]) (*aix.AgentResult, error) {
        if err := sess.Run(ctx, func(ctx context.Context, input *aix.AgentInput) error {
            for result, err := range genkit.GenerateStream(ctx, g,
                ai.WithModelName("googleai/gemini-flash-latest"),
                ai.WithMessages(sess.Messages()...),
            ) {
                if err != nil {
                    return err
                }
                if result.Done {
                    sess.AddMessages(result.Response.Message)
                    break
                }
                resp.SendModelChunk(result.Chunk) // stream tokens to client
            }
            return nil
        }); err != nil {
            return nil, err
        }
        return sess.Result(), nil
    },
)

sess.Result() returns an AgentResult with the last message from the conversation history and all artifacts. If you need to control what gets sent back to the client (e.g. returning only artifacts without a message, or omitting certain artifacts), you can construct the result directly:

return &aix.AgentResult{Artifacts: sess.Artifacts()}, nil

DefineCustomAgent takes the same ...AgentOption[State] variadic as DefineAgent, so the same compile-time State guarantee applies.


Invocation patterns

Agent exposes the same single-turn / multi-turn / streaming surface regardless of which Define*Agent produced it.

For single-turn usage, Run and RunText handle the connection lifecycle:

output, _ := chatAgent.RunText(ctx, "What is Go?")
fmt.Println(output.Message.Text())

For multi-turn conversations with streaming, the client drives the conversation by sending messages and iterating chunks until TurnEnd:

conn, _ := chatAgent.StreamBidi(ctx)

conn.SendText("What is Go?")

for chunk, err := range conn.Receive() {
    if chunk.ModelChunk != nil {
        fmt.Print(chunk.ModelChunk.Text())
    }
    if chunk.TurnEnd != nil {
        break // turn complete, ready for next input
    }
}

conn.SendText("Tell me more about its concurrency model")
// ... iterate conn.Receive() again ...

conn.Close()
output, _ := conn.Output() // AgentOutput with final state/snapshot

Snapshots & Resumption

Configure automatic snapshot persistence with a store and optional callback:

store := aix.NewInMemorySessionStore[MyState]()

chatAgent := genkit.DefineAgent(g, "chat",
    aix.FromInline(ai.WithModelName("googleai/gemini-flash-latest")),
    aix.WithSessionStore(store),
    aix.WithSnapshotOn[MyState](aix.SnapshotEventTurnEnd),
)

State is inferred as MyState from WithSessionStore(store); the explicit [MyState] on WithSnapshotOn is still needed because its State does not appear in any argument.

Resume a conversation from a server-stored snapshot:

// Single-turn with snapshot resumption:
output, _ := chatAgent.RunText(ctx, "continue",
    aix.WithSnapshotID[MyState]("snapshot-abc-123"),
)

// Or multi-turn with streaming:
conn, _ := chatAgent.StreamBidi(ctx, aix.WithSnapshotID[MyState]("snapshot-abc-123"))

Or resume from client-kept state (no server store needed):

output, _ := chatAgent.RunText(ctx, "continue", aix.WithState(&aix.SessionState[MyState]{
    Messages: previousMessages,
    Custom:   MyState{Topic: "concurrency"},
}))

AgentInit enforces that the resumption mode matches the agent's state management: passing WithState to a server-managed agent (one with a SessionStore) or WithSnapshotID to a client-managed agent is rejected with FAILED_PRECONDITION. Passing both is rejected with INVALID_ARGUMENT. Passing neither starts a fresh invocation.


Background Agents

AgentConnection.Detach ends the input stream and asks the server to take ownership of the rest of the work. The connection closes promptly with a pending snapshot ID the client can use later to poll, fetch results, or abort. Any inputs queued behind the in-flight one continue draining through the runner on a context decoupled from the client's.

chatAgent := genkit.DefineAgent(g, "chat",
    aix.FromInline(ai.WithModelName("googleai/gemini-flash-latest")),
    aix.WithSessionStore(store),
)

conn, _ := chatAgent.StreamBidi(ctx)
conn.SendText("draft a long report on Go's runtime")
conn.SendText("...and email it to me when done")

// Client walks away. The server keeps working.
conn.Detach()

out, _ := conn.Output() // returns immediately with the pending snapshot ID
fmt.Println(out.SnapshotID)

For single-turn use, Run already takes an AgentInput whose Detach field is the same wire bit, so detached single-turn with a final payload is just:

out, _ := chatAgent.Run(ctx, &aix.AgentInput{
    Detach:   true,
    Messages: []*ai.Message{ai.NewUserTextMessage("...")},
})

Local Go callers poll, fetch results, and abort via the store reference already in hand from WithSessionStore. No indirection through the agent:

snap, _ := store.GetSnapshot(ctx, id)

switch snap.Status {
case aix.SnapshotStatusPending:    // still working; snap.State is empty until finalize
case aix.SnapshotStatusSucceeded:  // snap.State has the cumulative final state
case aix.SnapshotStatusFailed:     // snap.Error has the structured failure (*core.GenkitError)
case aix.SnapshotStatusAborted:    // someone (or some thing) aborted it
}

status, _ := store.AbortSnapshot(ctx, id) // atomic pending -> aborted; no-op on terminal

For Dev UI and non-Go clients, an agent configured with a SessionStore registers getSnapshot and abortSnapshot companion actions (abort is registered only when the store also implements SnapshotAborter, so the reflected surface matches actual capabilities). Client-managed agents (no store) register neither. See Companion Actions below.

Once a snapshot has finalized to succeeded, resume it like any other snapshot. Pending, aborted, and failed snapshots are rejected with FAILED_PRECONDITION:

out, _ := chatAgent.RunText(ctx, "next question",
    aix.WithSnapshotID[MyState](id),
)

To redact PII or strip secrets on the way out to a client, register a StateTransform. The transform runs on getSnapshot responses and on client-managed AgentOutput.State; the raw state is what gets persisted and what the user fn sees:

chatAgent := genkit.DefineAgent(g, "chat",
    aix.FromInline(ai.WithModelName("googleai/gemini-flash-latest")),
    aix.WithSessionStore(store),
    aix.WithStateTransform[MyState](func(ctx context.Context, s *aix.SessionState[MyState]) *aix.SessionState[MyState] {
        return redactPII(ctx, s) // ctx carries caller identity for RBAC-aware redaction
    }),
)

The transform receives a fresh deep copy and owns the returned pointer; mutate in place, return a new value, or return nil to omit state from the response entirely.

Lifecycle. Detach is observed by an eager intake reader the moment it arrives in the input channel, regardless of what the runner is processing. The server suspends future turn-end snapshots, writes a single pending snapshot (empty state placeholder, chained off the most recent prior snapshot), returns that ID over the wire, and closes the connection. When fn returns, the same snapshot row is rewritten in place with one of three statuses (succeeded, failed with Error set, or aborted) and the cumulative final session state. The snapshot ID never changes. Side effects on session state (sess.AddMessages, resp.SendArtifact, etc.) keep applying through the queued turns so user code does not have to branch on detach.

Abort. store.AbortSnapshot(id) (or the abortSnapshot action) atomically flips pending -> aborted and returns the resulting status. The runtime subscribes via SnapshotAborter.OnSnapshotStatusChange, so the abort is observed by push rather than polling: fn's context is cancelled as soon as the store publishes the status change. The finalizer rechecks status before writing terminal state, so a late abort wins over a succeeded that was about to land.

Pre-conditions. Detach requires a store that implements SnapshotAborter. Stores missing it are rejected at detach time with FAILED_PRECONDITION; the agent can still run synchronously against a minimal SessionStore.


Custom Session State

The State type parameter lets you maintain typed state across turns:

type ChatState struct {
    TopicsDiscussed []string `json:"topicsDiscussed"`
}

chatAgent := genkit.DefineCustomAgent(g, "chat",
    func(ctx context.Context, resp aix.Responder[any], sess *aix.SessionRunner[ChatState]) (*aix.AgentResult, error) {
        if err := sess.Run(ctx, func(ctx context.Context, input *aix.AgentInput) error {
            // ... generate response ...

            sess.UpdateCustom(func(s ChatState) ChatState {
                s.TopicsDiscussed = append(s.TopicsDiscussed, extractTopic(input))
                return s
            })
            return nil
        }); err != nil {
            return nil, err
        }
        return sess.Result(), nil
    },
    aix.WithSessionStore(aix.NewInMemorySessionStore[ChatState]()),
)

Custom state is included in snapshots and available when resuming.


API Reference

Agent API (ai/exp — experimental)

Define

Two entrypoints; DefineAgent covers prompt-backed agents, DefineCustomAgent is the escape hatch for custom turn loops.

// DefineAgent registers a prompt-backed agent. The source selects how
// the prompt is configured (FromInline for inline options, FromPrompt
// to reference an existing prompt under the agent's name). State is
// inferred from the typed variadic of AgentOption[State]; a mismatch
// fails at compile time.
func DefineAgent[State any](
    r Registry, name string,
    source AgentSource,
    opts ...AgentOption[State],
) *Agent[any, State]

// DefineCustomAgent registers an agent whose per-turn body you implement.
// Use this for full control of the turn loop.
func DefineCustomAgent[Stream, State any](
    r Registry, name string,
    fn AgentFunc[Stream, State],
    opts ...AgentOption[State],
) *Agent[Stream, State]
AgentSource
// AgentSource selects the prompt backing a prompt-based agent.
// Construct with FromInline or FromPrompt. An agent and its backing
// prompt always share a name; if you need them to differ, use
// DefineCustomAgent.
type AgentSource interface { /* sealed */ }

// FromInline defines the agent's prompt inline from the given options.
// The prompt is registered with the registry under the agent's name.
func FromInline(opts ...ai.PromptOption) AgentSource

// FromPrompt references an existing prompt registered with the registry
// under the same name as the agent (e.g. one defined via DefinePrompt or
// loaded from a .prompt file). defaultInput, if provided, is the input
// passed to the prompt's Render on every turn. Call FromPrompt() with
// no arguments when the prompt takes no input. Only the first argument
// is used; any additional arguments are ignored.
func FromPrompt(defaultInput ...any) AgentSource
AgentOption[State]
// AgentOption configures an agent at definition time. Both DefineAgent
// and DefineCustomAgent take a typed `...AgentOption[State]` variadic,
// so a State mismatch is a compile-time error.

// WithSessionStore sets the store for persisting snapshots. The store must
// implement SnapshotReader and SnapshotWriter at minimum. Detach support
// also requires SnapshotAborter; detach attempts on a store that lacks
// it are rejected at runtime.
WithSessionStore[State](store SessionStore[State])

// WithSnapshotCallback configures when snapshots are created.
// If not provided and a store is configured, snapshots are always created.
WithSnapshotCallback[State](cb SnapshotCallback[State])

// WithSnapshotOn configures snapshots to be created only for the specified events.
// Convenience wrapper around WithSnapshotCallback.
WithSnapshotOn[State](events ...SnapshotEvent)

// WithStateTransform rewrites session state on its way out to a client
// (getSnapshot response and client-managed AgentOutput.State). Not
// applied to state persisted in the store or to state passed to the
// user agent function. Typical use: PII redaction or secret stripping.
WithStateTransform[State](transform StateTransform[State])

AgentFunc

// AgentFunc is the function signature for DefineCustomAgent. It
// receives a responder for streaming output, a SessionRunner for
// state management, and returns an optional AgentResult with the
// final output.
type AgentFunc[Stream, State any] = func(
    ctx context.Context,
    resp Responder[Stream],
    sess *SessionRunner[State],
) (*AgentResult, error)

Agent[Stream, State]

// Agent is a bidirectional streaming agent with automatic snapshot management.

// StreamBidi starts a new agent invocation with bidirectional streaming.
// Use this for multi-turn interactions where you need to send multiple inputs
// and receive streaming chunks. For single-turn usage, see Run and RunText.
func (*Agent) StreamBidi(ctx context.Context, opts ...InvocationOption[State]) (*AgentConnection[...], error)

// Run starts a single-turn agent invocation with the given input.
// It sends the input, waits for the agent to complete, and returns the output.
// For multi-turn interactions or streaming, use StreamBidi instead.
func (*Agent) Run(ctx context.Context, input *AgentInput, opts ...InvocationOption[State]) (*AgentOutput[State], error)

// RunText is a convenience method that starts a single-turn agent invocation
// with a user text message. Equivalent to calling Run with an AgentInput
// containing a single user text message.
func (*Agent) RunText(ctx context.Context, text string, opts ...InvocationOption[State]) (*AgentOutput[State], error)
InvocationOption[State]

Configures an agent invocation (StreamBidi, Run, or RunText). The two options are mutually exclusive at option-application time, and the choice must match the agent's state management at invocation time (see AgentInit below).

// WithSnapshotID loads state from a persisted snapshot by ID.
// Use this for server-managed state (a SessionStore is configured).
WithSnapshotID[State](id string)

// WithState sets the initial state for the invocation.
// Use this for client-managed state (no SessionStore configured).
WithState[State](state *SessionState[State])

AgentConnection[Stream, State]

Unlike BidiConnection, breaking from Receive() does not cancel the connection — enabling multi-turn patterns.

// Send sends an AgentInput to the agent.
func (*AgentConnection) Send(input *AgentInput) error

// SendMessages sends messages to the agent.
func (*AgentConnection) SendMessages(messages ...*ai.Message) error

// SendText sends a single user text message to the agent.
func (*AgentConnection) SendText(text string) error

// SendResume sends a ToolResume payload to resume an interrupted generation.
// Construct using ai.ToolDef.RestartWith / ai.ToolDef.RespondWith parts.
func (*AgentConnection) SendResume(resume *ToolResume) error

// Close signals that no more inputs will be sent.
func (*AgentConnection) Close() error

// Detach asks the server to write a pending snapshot, close the
// connection, and continue processing any already-buffered inputs in
// the background. Output() then returns immediately with that snapshot
// ID. To ride a final input on the same wire message, send
// Send(&AgentInput{Detach: true, Messages: ...}) directly.
// Requires a store that implements SnapshotAborter.
func (*AgentConnection) Detach() error

// Receive returns an iterator for receiving stream chunks. Breaking out of this
// iterator does not cancel the connection, enabling multi-turn patterns where
// the caller breaks on TurnEnd, sends the next input, then calls Receive again.
func (*AgentConnection) Receive() iter.Seq2[*AgentStreamChunk[Stream], error]

// Output returns the final response after the agent completes.
func (*AgentConnection) Output() (*AgentOutput[State], error)

// Done returns a channel closed when the connection completes.
func (*AgentConnection) Done() <-chan struct{}

SessionRunner[State]

Extends Session[State] with turn management. Passed as the sess parameter to AgentFunc.

// SessionRunner extends Session with agent-runtime functionality:
// turn management, snapshot persistence, and input channel handling.
type SessionRunner[State any] struct {
    *Session[State]
    InputCh   <-chan *AgentInput  // channel delivering per-turn inputs from the client
    TurnIndex int                 // zero-based index of the current conversation turn
}

// Run loops over the input channel, calling fn for each turn. Each turn is
// wrapped in a trace span for observability. Input messages are automatically
// added to the session before fn is called. After fn returns successfully, a
// TurnEnd chunk is sent and a snapshot check is triggered.
func (*SessionRunner) Run(ctx context.Context, fn func(ctx context.Context, input *AgentInput) error) error

// Result returns an AgentResult populated from the current session state:
// the last message in the conversation history and all artifacts.
// Convenience for custom agents that don't need to construct the result manually.
func (*SessionRunner) Result() *AgentResult

Session[State]

Thread-safe conversation state. Available via SessionRunner embedding or SessionFromContext.

// Session holds conversation state and provides thread-safe read/write access
// to messages, custom state, and artifacts.

// State returns a deep copy of the current state.
func (*Session) State() *SessionState[State]

// Conversation history
func (*Session) Messages() []*ai.Message
func (*Session) AddMessages(messages ...*ai.Message)
func (*Session) SetMessages(messages []*ai.Message)
func (*Session) UpdateMessages(fn func([]*ai.Message) []*ai.Message)

// Custom state
func (*Session) Custom() State
func (*Session) UpdateCustom(fn func(State) State)

// Artifacts
func (*Session) Artifacts() []*Artifact
func (*Session) AddArtifacts(artifacts ...*Artifact)
func (*Session) UpdateArtifacts(fn func([]*Artifact) []*Artifact)

// Context helpers
func NewSessionContext[State](ctx, *Session[State]) context.Context
func SessionFromContext[State](ctx) *Session[State]

Responder[Stream]

Output channel with convenience methods. Artifacts sent here are auto-added to the session.

// Responder is the output channel for an agent. Artifacts sent through it
// are automatically added to the session before being forwarded to the client.
type Responder[Stream any] chan<- *AgentStreamChunk[Stream]

// SendModelChunk sends a generation chunk (token-level streaming).
func (Responder) SendModelChunk(chunk *ai.ModelResponseChunk)

// SendStatus sends a user-defined status update.
func (Responder) SendStatus(status Stream)

// SendArtifact sends an artifact and adds it to the session.
// If an artifact with the same name already exists, it is replaced.
func (Responder) SendArtifact(artifact *Artifact)

Wire Types

// AgentInit is the input for starting an agent invocation. Exactly one of
// SnapshotID or State may be set, and the choice must match the agent's
// state management:
//   - Server-managed (SessionStore configured): callers must use SnapshotID;
//     sending State is rejected with FAILED_PRECONDITION.
//   - Client-managed (no SessionStore): callers must use State;
//     sending SnapshotID is rejected with FAILED_PRECONDITION.
// Sending both is rejected with INVALID_ARGUMENT. Sending neither starts
// a fresh invocation with empty state.
type AgentInit[State any] struct {
    SnapshotID string               `json:"snapshotId,omitempty"`  // load from persisted snapshot (server-managed)
    State      *SessionState[State] `json:"state,omitempty"`       // direct state (client-managed)
}

// AgentInput is the input sent to an agent during a conversation turn.
type AgentInput struct {
    Detach   bool          `json:"detach,omitempty"`   // request to detach; server takes over background processing
    Messages []*ai.Message `json:"messages,omitempty"` // user's input for this turn
    Resume   *ToolResume   `json:"resume,omitempty"`   // resume parts for an interrupted tool generation
}

// ToolResume holds the parts that resume an interrupted agent turn.
// Mirrors ai.GenerateActionResume but named for the tool-call callsite
// where it's set on an AgentInput.
type ToolResume struct {
    Respond []*ai.Part `json:"respond,omitempty"` // tool response parts to send when resuming
    Restart []*ai.Part `json:"restart,omitempty"` // tool request parts to restart when resuming
}

// AgentStreamChunk represents a single item in the agent's output stream.
type AgentStreamChunk[Stream any] struct {
    ModelChunk *ai.ModelResponseChunk `json:"modelChunk,omitempty"` // token-level streaming
    Status     Stream                 `json:"status,omitempty"`     // user-defined status update
    Artifact   *Artifact              `json:"artifact,omitempty"`   // newly produced artifact
    TurnEnd    *TurnEnd               `json:"turnEnd,omitempty"`    // non-nil signals turn complete
}

// TurnEnd groups the signals emitted when an agent turn finishes.
// Emitted exactly once per turn; structured as a struct so we can add
// fields (e.g. whether the turn was interrupted by the client, how many
// inputs were batched into a single generation) to support batching and
// interruptible agents without further wire breakage.
type TurnEnd struct {
    SnapshotID string `json:"snapshotId,omitempty"` // ID of snapshot persisted at turn end (empty if none, including when suspended after detach)
}

// AgentOutput is the output when an agent invocation completes.
type AgentOutput[State any] struct {
    Artifacts  []*Artifact          `json:"artifacts,omitempty"`  // artifacts produced during the session
    Message    *ai.Message          `json:"message,omitempty"`    // last model response message
    SnapshotID string               `json:"snapshotId,omitempty"` // final snapshot ID (empty if none)
    State      *SessionState[State] `json:"state,omitempty"`      // final state (client-managed only)
}

// AgentResult is the return value from an AgentFunc.
// Contains user-specified outputs of the agent invocation.
type AgentResult struct {
    Artifacts []*Artifact `json:"artifacts,omitempty"` // artifacts produced during the session
    Message   *ai.Message `json:"message,omitempty"`   // last model response message
}

// SessionState is the portable conversation state that flows between client
// and server. Contains only the data needed for conversation continuity.
type SessionState[State any] struct {
    Messages  []*ai.Message `json:"messages,omitempty"`  // conversation history (excludes prompt-rendered messages)
    Custom    State         `json:"custom,omitempty"`    // user-defined state
    Artifacts []*Artifact   `json:"artifacts,omitempty"` // named artifact collections
}

// Artifact represents a named collection of parts produced during a session.
// Examples: generated files, images, code snippets, diagrams.
type Artifact struct {
    Name     string         `json:"name,omitempty"`     // identifies the artifact
    Parts    []*ai.Part     `json:"parts"`              // artifact content (text, media, etc.)
    Metadata map[string]any `json:"metadata,omitempty"` // additional artifact-specific data
}

Snapshot System

Snapshots are automatically deduplicated within an invocation: if no state mutations occur between two snapshot checkpoints, the second snapshot is skipped. This commonly happens at the end of a single-turn Run/RunText call, where the turn-end snapshot already captured the final state and the invocation-end snapshot would be identical. When an invocation-end snapshot is skipped, the output's SnapshotID falls back to the last snapshot that was created (typically the turn-end snapshot). If the AgentFunc mutates state after sess.Run() returns, the invocation-end snapshot fires normally.

Store interfaces split by capability. The minimum to use WithSessionStore is reader + writer; detach support layers on SnapshotAborter:

// SnapshotReader retrieves snapshots. Required.
type SnapshotReader[State any] interface {
    GetSnapshot(ctx context.Context, snapshotID string) (*SessionSnapshot[State], error)
}

// SnapshotWriter persists snapshots. Required.
type SnapshotWriter[State any] interface {
    // SaveSnapshot atomically reads, applies fn, and persists. The store
    // owns identity (SnapshotID, generated if id is empty) and lifecycle
    // timestamps (CreatedAt on first write, UpdatedAt on every commit).
    // Status defaults to SnapshotStatusSucceeded if fn leaves it empty;
    // callers writing a pending row set it explicitly.
    //
    // fn receives the existing row (or nil) and returns the snapshot to
    // commit, or (nil, nil) to skip without changing the row. fn must
    // be a pure function of its input: stores using optimistic
    // concurrency or transaction retries may invoke fn multiple times.
    SaveSnapshot(
        ctx context.Context,
        id string,
        fn func(existing *SessionSnapshot[State]) (*SessionSnapshot[State], error),
    ) (*SessionSnapshot[State], error)
}

// SessionStore is the minimum interface required by WithSessionStore.
type SessionStore[State any] interface {
    SnapshotReader[State]
    SnapshotWriter[State]
}

// SnapshotAborter is the optional capability that lets detached
// invocations be aborted. It bundles the two methods that must be
// implemented together for the abort lifecycle to function: abort
// triggering and status-change subscription. Splitting them just
// exposed the "implemented one, not the other" footgun.
type SnapshotAborter interface {
    // AbortSnapshot atomically flips pending -> aborted and returns
    // the resulting status. No-op for any other status; returns an
    // empty status with a nil error if the snapshot is not found.
    // Must be transactional (or CAS) so a racing finalizer cannot
    // clobber the abort.
    AbortSnapshot(ctx context.Context, snapshotID string) (SnapshotStatus, error)

    // OnSnapshotStatusChange yields the snapshot's status on subscription
    // and on every change until ctx is cancelled. Lets the runtime
    // observe an abort without polling so fn's context can be cancelled
    // immediately when the store publishes the status change.
    OnSnapshotStatusChange(ctx context.Context, snapshotID string) <-chan SnapshotStatus
}

// NewInMemorySessionStore creates a thread-safe in-memory snapshot store.
// Implements the full set of optional interfaces (reader, writer, aborter,
// status subscriber).
func NewInMemorySessionStore[State any]() *InMemorySessionStore[State]

The atomic SaveSnapshot shape composes cleanly with SQL (SELECT FOR UPDATE in a txn), Firestore (RunTransaction(fn)), DynamoDB (optimistic concurrency on a version attribute), or any K/V store with a CAS primitive. It also folds the finalize path's read-then-rewrite into one step so a late abort cannot be clobbered by the terminal write.

// SnapshotCallback decides whether to create a snapshot.
// Return true to create, false to skip.
// If not provided and a store is configured, snapshots are always created.
type SnapshotCallback[State] = func(ctx context.Context, sc *SnapshotContext[State]) bool

// SnapshotContext provides context for snapshot decision callbacks.
type SnapshotContext[State any] struct {
    State     *SessionState[State]  // current state
    PrevState *SessionState[State]  // state at last snapshot (nil if none)
    TurnIndex int                   // turn number in current invocation
    Event     SnapshotEvent         // what triggered this check
}

// Events
const (
    SnapshotEventTurnEnd       SnapshotEvent = "turnEnd"
    SnapshotEventInvocationEnd SnapshotEvent = "invocationEnd"
    SnapshotEventDetach        SnapshotEvent = "detach" // pending row written at detach time; rewritten with terminal status when fn returns
)

// SnapshotStatus is the lifecycle state of a snapshot.
type SnapshotStatus string
const (
    SnapshotStatusPending   SnapshotStatus = "pending"   // detached invocation still running
    SnapshotStatusSucceeded SnapshotStatus = "succeeded" // settled state (empty value also treated as succeeded)
    SnapshotStatusAborted   SnapshotStatus = "aborted"   // aborted via AbortSnapshot
    SnapshotStatusFailed    SnapshotStatus = "failed"    // invocation failed; Error is populated
)

// SessionSnapshot is a persisted point-in-time capture of session state.
// Shared schema: defined in genkit-tools/common/src/types/agent.ts and
// generated into Go (with Error mapped to *core.GenkitError) and Python.
type SessionSnapshot[State any] struct {
    SnapshotID string               `json:"snapshotId"`
    ParentID   string               `json:"parentId,omitempty"`
    CreatedAt  time.Time            `json:"createdAt"`
    UpdatedAt  time.Time            `json:"updatedAt,omitempty"` // equals CreatedAt for unrewritten rows
    Event      SnapshotEvent        `json:"event"`
    Status     SnapshotStatus       `json:"status,omitempty"`    // empty treated as Succeeded for back-compat
    Error      *core.GenkitError    `json:"error,omitempty"`     // populated when Status=Failed
    State      *SessionState[State] `json:"state,omitempty"`     // nil on pending rows; populated on terminal rewrite
}

// StateTransform rewrites session state on its way out to a client.
// Applied to getSnapshot responses and to client-managed
// AgentOutput.State. Not applied to state persisted in the store
// or to state passed to the user fn. ctx carries the caller's identity
// (for RBAC-aware redaction), trace, and cancellation signal.
//
// state is a fresh deep copy made for this call: the transform owns it
// and may mutate in place, return a new pointer, or return nil to omit
// state from the response.
type StateTransform[State any] = func(ctx context.Context, state *SessionState[State]) *SessionState[State]

Companion Actions

An agent configured with a SessionStore auto-registers companion actions so Dev UI and non-Go clients can observe and control detached invocations without reaching into the store directly. Client-managed agents (no store) register neither. Local Go callers use the store reference they already have from WithSessionStore.

// Registered under action type "agent-snapshot", action name = agent name.
// Registered when a SessionStore is configured.
type GetSnapshotRequest struct {
    SnapshotID string `json:"snapshotId"`
}
type GetSnapshotResponse[State any] struct {
    SnapshotID string               `json:"snapshotId"`
    CreatedAt  time.Time            `json:"createdAt,omitempty"`
    UpdatedAt  time.Time            `json:"updatedAt,omitempty"`
    Status     SnapshotStatus       `json:"status,omitempty"`
    Error      *core.GenkitError    `json:"error,omitempty"`
    State      *SessionState[State] `json:"state,omitempty"` // omitted on pending/failed; WithStateTransform applied
}

// Registered under action type "agent-abort", action name = agent name.
// Registered only when the store also implements SnapshotAborter, so the
// reflected surface matches actual capabilities.
type AbortSnapshotRequest struct {
    SnapshotID string `json:"snapshotId"`
}
type AbortSnapshotResponse struct {
    SnapshotID string         `json:"snapshotId"`
    Status     SnapshotStatus `json:"status,omitempty"` // aborted on success; the existing terminal status otherwise
}

Each agent's own action descriptor (registered as ActionTypeAgent = "agent") also carries an AgentMetadata value under metadata["agent"] so reflective callers can shape the surface (e.g. hide the Abort button when the store cannot support it) without round-tripping through the reflection API:

type AgentMetadata struct {
    StateManagement AgentStateManagement `json:"stateManagement,omitempty"` // "server" or "client"
    Abortable       bool                 `json:"abortable,omitempty"`       // true iff store implements SnapshotAborter
}

type AgentStateManagement string
const (
    AgentStateManagementServer AgentStateManagement = "server" // a SessionStore is configured
    AgentStateManagementClient AgentStateManagement = "client" // no store; state round-trips through init/output
)

AgentMetadata and the request/response schemas are defined once in genkit-tools/common/src/types/agent.ts and shared across language clients; AgentMetadata is generated into go/ai/exp/gen.go. The companion-action request/response types take a Go type parameter and are hand-written in Go.


Supporting core additions

Two go/core changes the design relies on:

// WithFlowContext attaches flow-context metadata so core.Run and
// core.FlowNameFromContext resolve from inside a custom flow-like
// action wired via core.NewBidiAction / core.DefineBidiAction.
// Agents take that path so the agent metadata can be set on the
// action descriptor at construction time (actions are immutable once
// registered); the Define*Flow constructors call it internally.
func WithFlowContext(ctx context.Context, flowName string) context.Context

BidiConnection.Output() gains a fast path that returns the already-settled output even when the connection context was cancelled concurrently. This is what makes conn.Output() after Detach() return the pending snapshot ID instead of ctx.Err() on the client side when the server closes the wire first.


Known Issues

  • Empty trace on zero-turn connections: StreamBidi creates a trace span immediately when the connection is established. If the connection is closed without sending any messages (zero turns), an empty single-span trace is still emitted. This is cosmetic -- the trace contains no useful data (just a snapshot ID in the output). A future change could defer span creation until the first input arrives.
  • Missing Stream() and StreamText() convenience methods: Agent has Run() and RunText() for single-turn non-streaming usage, but lacks corresponding Stream() and StreamText() methods that would return an iterator of stream chunks for single-turn streaming. Currently, single-turn streaming requires using StreamBidi directly (open connection, send, close, iterate Receive). Adding these would improve DX by matching the pattern established by core.Flow.Stream().
  • Incomplete traces on client cancellation: When the client cancels the context mid-invocation (e.g., Ctrl+C during an active bidi connection), the trace span wrapping the action may not finalize cleanly before the dev server connection is torn down. The OTel exporter logs a transient failed to send request line and the trace in the dev tools may be incomplete or missing. This is OTel exporter behavior surfaced by ctx cancellation, not specific to the agent API. A future change could install a custom OTel error handler that filters shutdown-time export failures.

@gemini-code-assist
Copy link
Copy Markdown
Contributor

Summary of Changes

Hello @apascal07, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly enhances the Genkit Go SDK by introducing a new SessionFlow API, specifically designed for building and managing complex, stateful, multi-turn conversational AI applications. This new API is built upon a fundamental refactoring of the core Action type to support bidirectional streaming, enabling more dynamic and interactive AI experiences. The changes provide a structured approach to handling conversational state, including message history, custom data, and generated artifacts, with built-in mechanisms for persistence and lifecycle management.

Highlights

  • Introduction of SessionFlow API: A new SessionFlow API has been introduced in go/ai/x to manage stateful, multi-turn conversational AI interactions, including automatic snapshot management and artifact handling.
  • Core Action Refactoring for Bidirectional Streaming: The core ActionDef type has been refactored to Action to natively support bidirectional streaming, which is a foundational change enabling the SessionFlow API. This includes new BidiFunc and BidiConnection types.
  • Update of Existing AI Components: All existing AI components (embedder, evaluator, model, prompt, resource, retriever) have been updated to utilize the new core.Action type and its extended capabilities.
  • Snapshot Management Features: The SessionFlow now includes robust snapshot management, allowing state to be persisted, loaded, and controlled via callbacks, with an in-memory store implementation provided.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Changelog
  • go/ai/embedder.go
    • Updated embedder struct to use core.Action instead of core.ActionDef.
    • Modified NewEmbedder and LookupEmbedder to align with the new core.Action type and updated ResolveActionFor signature.
  • go/ai/evaluator.go
    • Updated evaluator struct to use core.Action instead of core.ActionDef.
    • Modified NewEvaluator, NewBatchEvaluator, and LookupEvaluator to align with the new core.Action type and updated ResolveActionFor signature.
  • go/ai/generate.go
    • Updated model and generateAction structs to use core.Action instead of core.ActionDef.
    • Modified LookupModel, model.Generate, and model.supportsConstrained to align with the new core.Action type and updated ResolveActionFor signature.
  • go/ai/prompt.go
    • Updated prompt struct to use core.Action instead of core.ActionDef.
    • Modified DefinePrompt, LookupPrompt, and prompt.Desc to align with the new core.Action type and updated ResolveActionFor signature.
  • go/ai/resource.go
    • Updated resource struct to use core.Action instead of core.ActionDef.
    • Modified DefineResource, NewResource, FindMatchingResource, and LookupResource to align with the new core.Action type and updated ResolveActionFor signature.
  • go/ai/retriever.go
    • Updated retriever struct to use core.Action instead of core.ActionDef.
    • Modified NewRetriever and LookupRetriever to align with the new core.Action type and updated ResolveActionFor signature.
  • go/ai/x/option.go
    • Added SessionFlowOption and StreamBidiOption interfaces for configuring session flows and bidirectional streams.
    • Introduced WithSnapshotStore, WithSnapshotCallback, WithState, and WithSnapshotID functions for flexible session flow initialization.
  • go/ai/x/session_flow.go
    • Introduced SessionFlowArtifact, SessionFlowInput, SessionFlowInit, SessionFlowOutput, and SessionFlowStreamChunk data structures.
    • Defined the Session type for managing conversational state (messages, custom state, artifacts) with methods for manipulation and snapshot handling.
    • Implemented Responder for sending various types of stream chunks (generation, status, artifacts).
    • Introduced the SessionFlow type, DefineSessionFlow for registration, and StreamBidi for initiating sessions.
    • Added logic for snapshot creation, loading, and integration with tracing, including SessionFlowConnection for buffered chunk reception.
  • go/ai/x/session_flow_test.go
    • Added comprehensive unit tests for SessionFlow covering multi-turn interactions, snapshot persistence, resuming from snapshots, client-managed state, artifact handling, snapshot callbacks, and error handling.
  • go/ai/x/snapshot.go
    • Defined SessionState for portable conversation state, SnapshotEvent for trigger types, and SessionSnapshot for persisted state.
    • Introduced SnapshotContext and SnapshotCallback for custom snapshot logic.
    • Defined SnapshotStore interface and provided an InMemorySnapshotStore implementation.
    • Added SnapshotOn utility function for selective snapshotting.
  • go/core/action.go
    • Refactored ActionDef to Action and added a new Init type parameter for bidirectional actions.
    • Introduced BidiFunc for bidirectional streaming function signatures and ActionOptions for configuration.
    • Implemented NewBidiAction and DefineBidiAction for creating and registering bidirectional actions.
    • Added StreamBidi method to Action for initiating bidirectional connections.
    • Introduced BidiConnection type for managing bidirectional streaming, including Send, Close, Receive, Output, and Done methods.
    • Updated ResolveActionFor and LookupActionFor to use the new Action type and Init parameter.
    • Added wrapBidiAsStreaming to adapt BidiFunc to StreamingFunc.
  • go/core/action_test.go
    • Updated existing tests to use DefineStreamingAction and the new Init type parameter in ResolveActionFor and LookupActionFor.
    • Added new tests for BidiAction functionality, covering echo, initialization, send after close, context cancellation, and Done channel.
  • go/core/api/action.go
    • Added new ActionType constants: ActionTypeSessionFlow and ActionTypeSnapshotStore.
    • Extended ActionDesc with StreamSchema and InitSchema fields for describing bidirectional actions.
  • go/core/background_action.go
    • Updated BackgroundActionDef to use core.Action instead of core.ActionDef.
    • Modified Register, NewBackgroundAction, and LookupBackgroundAction to align with the new core.Action type and updated ResolveActionFor signature.
  • go/core/flow.go
    • Refactored Flow to be a struct embedding *Action with the new Init type parameter.
    • Updated DefineFlow and DefineStreamingFlow to use the new Flow struct and Action type.
    • Introduced NewBidiFlow and DefineBidiFlow for creating and registering bidirectional flows.
    • Updated Run and Stream methods to use the embedded Action's Run method.
  • go/core/flow_test.go
    • Updated existing tests to reflect changes in Flow type and method calls.
    • Added new tests for BidiFlow functionality, including registration, echo, and integration with core.Run.
  • go/genkit/genkit.go
    • Updated DefineFlow and DefineStreamingFlow signatures to include the new Init type parameter.
    • Added DefineBidiFlow function to expose the new bidirectional flow definition.
  • go/genkit/session_flow.go
    • Introduced DefineSessionFlow as a top-level Genkit function to define and register session flows, wrapping aix.DefineSessionFlow.
  • go/samples/basic-session-flow/main.go
    • Added a sample CLI REPL application demonstrating the usage of SessionFlow for multi-turn conversations with token-level streaming and snapshot management.
Activity
  • The pull request author, apascal07, has indicated that the PR title adheres to conventional commits.
  • The author has confirmed that the changes have been manually and unit tested.
  • Documentation updates are noted as pending in the PR description.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@apascal07 apascal07 changed the base branch from main to ap/go-bidi February 6, 2026 05:48
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This is a great pull request that introduces the SessionFlow feature and refactors the core Action type to support bidirectional streaming. The new functionality is well-structured, comes with comprehensive tests, and includes a helpful sample application. I've identified a critical race condition in the new BidiConnection implementation and a minor issue in the sample code that would prevent it from running. My comments provide suggestions to address these points.

I am having trouble creating individual review comments. Click here to see my feedback.

go/core/action.go (534-550)

high

This Send implementation has a race condition that can cause a panic. The mutex is unlocked on line 540 before the channel send on line 543. If another goroutine calls Close() in between, c.inputCh will be closed, and the send will panic.

A robust way to fix this is to use recover to handle the "send on closed channel" panic, which is a common pattern in Go for this scenario. This avoids holding a lock over a potentially blocking operation.

Here's a suggested safer implementation for Send that removes the racy mutex usage. The Close method's use of the mutex remains important to make it safe for concurrent calls.

func (c *BidiConnection[In, Out, Stream]) Send(input In) (err error) {
	defer func() {
		if r := recover(); r != nil {
			// This recovers from a panic that occurs when sending on a closed channel.
			err = NewError(FAILED_PRECONDITION, "connection is closed")
		}
	}()

	select {
	case c.inputCh <- input:
		return nil
	case <-c.ctx.Done():
		return c.ctx.Err()
	case <-c.doneCh:
		// The recover will handle a panic if doneCh and inputCh close concurrently.
		return NewError(FAILED_PRECONDITION, "action has completed")
	}
}

go/samples/basic-session-flow/main.go (49-53)

medium

The model name googleai/gemini-3-flash-preview appears to be incorrect and will likely cause the sample to fail at runtime. Please use a valid model name, for example googleai/gemini-1.5-flash-latest.

					ai.WithModel(googlegenai.ModelRef("googleai/gemini-1.5-flash-latest", &genai.GenerateContentConfig{
						ThinkingConfig: &genai.ThinkingConfig{
							ThinkingBudget: genai.Ptr[int32](0),
						},
					})),

@apascal07 apascal07 changed the title feat(go): added SessionFlow and related feat(go): added DefineSessionFlow Feb 6, 2026
@apascal07 apascal07 mentioned this pull request Feb 6, 2026
@apascal07 apascal07 linked an issue Feb 6, 2026 that may be closed by this pull request
@apascal07 apascal07 changed the title feat(go): added DefineSessionFlow feat(go): added DefineCustomAgent and DefinePromptAgent Feb 18, 2026
@apascal07 apascal07 changed the title feat(go): add DefineSessionFlow and DefineSessionFlowFromPrompt feat(go): add DefineAgent, DefinePromptAgent, and DefineCustomAgent May 11, 2026
@apascal07 apascal07 changed the title feat(go): add DefineAgent, DefinePromptAgent, and DefineCustomAgent feat(go/ai/exp): add DefineAgent, DefinePromptAgent, and DefineCustomAgent May 11, 2026
apascal07 added 11 commits May 11, 2026 18:37
…me, generator type-params

* `core.GenkitError` gains `MarshalJSON`/`UnmarshalJSON` so it serializes
  as the canonical `{status, message, details}` wire shape, plus an
  `AsGenkitError` helper that wraps non-GenkitError values with status
  `INTERNAL`. Snapshot-related types now hold `*core.GenkitError`
  directly instead of a string.
* `SnapshotStatus` renamed to `pending`, `succeeded`, `aborted`, `failed`
  (was `pending`, `complete`, `canceled`, `error`). All Go consts,
  doc, and test strings updated.
* `INTERNAL` `StatusName` value changed from `"INTERNAL_SERVER_ERROR"`
  to the canonical `"INTERNAL"` so Go matches the JSON schema and JS
  runtime. The googlegenai workaround is dropped.
* `AgentInput.toolRestarts` replaced with a structured `Resume` field.
  In Go this is `*exp.ToolResume` (`{Respond, Restart}`); the resume
  schema is inlined in the JS schema and renamed via schemas.config.
* `SessionSnapshot` is now hand-written in `go/ai/exp/session.go` and
  removed from the JS schema (it's runtime-internal). State on
  snapshots becomes a pointer so pending snapshots can hold nil.
* `AgentMetadataStateManagement` renamed to `AgentStateManagement`,
  `AgentSession` renamed to `SessionRunner`.
* `jsonschemagen` learns to auto-forward type parameters on field
  references: a ref to a type with `typeparams [State any]` now
  resolves to `*Foo[State]` automatically. The previously-omitted
  generic snapshot types are now generated through schemas.config.

Note: API surface changes here are intentional; the `exp` package is
explicitly experimental.
Auto-formats from `go fmt ./...` and `prettier --write` after the
previous commit; no behavior changes.
Picks up the recent JSON-schema renames (AgentStateManagement,
SnapshotStatus succeeded/aborted/failed), the inlined AgentInput.resume
shape, and the removal of the SessionSnapshot wire schema.
Both `AgentInput.resume` and `GenerateActionOptions.resume` are inline
object schemas that the Python generator collapses into a single
`Resume` class. The previous "first one wins" rule picked the
AgentInput shape (no `metadata`) and broke `_ai/_generate.py` and
`_ai/_prompt.py`, which rely on `Resume.metadata` for the
GenerateActionOptions case. Keep the schema with the larger property
set so the generated dataclass captures the superset.
…, slim AbortSnapshot

- AgentInit: reject state-with-server-managed-agent and snapshotId-with-client-managed-agent
  in addition to both-set; add coverage in TestLoadSession_AgentInitValidation.
- SessionSnapshot promoted to a shared schema (Zod + JSON schema + Python + Go gen);
  the hand-written Go struct in session.go is gone, and the error field validates the
  canonical {status, message, details} wire shape.
- AbortSnapshot now returns (SnapshotStatus, error). SnapshotMetadata and the
  snapshotMetadata projection helper are removed (Zod, JSON schema, Python, Go).
  Empty status + nil error continues to signal "not found"; the abort companion
  action maps that to core.NOT_FOUND, now exercised by TestAgent_AbortAction_NotFound.
- Companion actions (getSnapshot, abortSnapshot) are no longer registered when
  the agent has no SessionStore configured. Updated TestAgent_GetSnapshotAction_NoStore
  to verify neither action is registered in the client-managed case.
…ic Error

The inline SessionSnapshot.error shape (status, message, details) collides
with OperationSchema.error's inline object in Python codegen — both
synthesize to the same `Error` Pydantic class, and the codegen picks the
superset. Marking status required broke veo.py, which constructs Error
from a third-party API response that has no canonical status.

Go side still gets *core.GenkitError via the schemas.config override,
so the Genkit wire format is unaffected. The user-facing schema will be
named and tightened in a follow-up; relaxing status now matches the
real-world construction sites until then.
PromptIn provided no real type safety: ai.Prompt.Render takes `any`, so
the constrained defaultInput erased to any immediately and the prompt did
its own schema validation at runtime. Every call site wrote
`DefinePromptAgent[State, any]` and PromptIn was pure noise.

Replace it with a Define-time smoke render so a defaultInput that fails
the prompt's input schema panics here rather than failing on the first
invocation. Same safety guarantee at a meaningful boundary, simpler API.

Call sites lose one type argument:
- DefinePromptAgent[testState, any](...) → DefinePromptAgent[testState](...)
- DefinePromptAgent[any](...) → DefinePromptAgent(...) (State inferred)
…tion marker

Sample reorg:
- custom-agent → agent-custom; prompt-agent → agent-prompt.
- New agent-inline sample uses DefineAgent with an inline prompt (mixed
  variadic of ai prompt options + aix agent options).

Marker fix: the documented "DefineAgent accepts ai.PromptOption + aix.AgentOption"
mixed variadic never compiled. The marker method isAgentDefineOption() was
unexported and declared separately in both ai and ai/exp packages — Go's
package-scoping rule treated them as different methods, so ai option types
never actually satisfied exp.AgentDefineOption[State]. The bug went
unnoticed because no test ever called DefineAgent with an ai.WithX option
(the only call site passed aix.WithSessionStore).

Fix: introduce ai.AgentDefineOption (exported marker, package ai) embedded
in each prompt-definition option interface (ConfigOption, CommonGenOption,
InputOption, PromptOption, PromptingOption, OutputOption). exp.AgentDefineOption[State]
now embeds ai.AgentDefineOption; aix.AgentOption[State] inherits the marker
through the chain. Method renamed isAgentDefineOption → IsAgentDefineOption
on every implementing struct (ai prompt option structs and exp.agentOptions[State]).
DefineAgent's mixed variadic accepts AgentDefineOption[State] where State
is phantom on the marker, so a typed AgentOption[Wrong] passed to
DefineAgent[Right] satisfies the variadic at compile time and is only
caught at runtime via type-assertion-then-panic. Existing test covered
WithSessionStore; the same runtime path applies to WithSnapshotCallback,
WithSnapshotOn, and WithStateTransform. Table-drive the test across all
four so the runtime backstop is exercised wherever it matters.

DefineCustomAgent and DefinePromptAgent both declare opts ...AgentOption[State]
directly, so they reject State mismatches at compile time and need no
runtime coverage.
StateTransform now takes and returns *SessionState[State], matching
the rest of the API (Session.State, SnapshotContext, AgentOutput, and
SessionSnapshot all use the pointer). This enables nil-return for
"omit state from the response", drops a redundant struct copy in
applyTransform, and lets the doc describe ownership plainly.

Also drops a now-unnecessary explicit [State] arg on
WithSnapshotCallback inside WithSnapshotOn, since the callback
parameter pins down State on its own.
DefineAgent now takes a typed AgentSource as its third arg (either
FromInline for inline prompt options or FromPrompt for an existing
prompt looked up by the agent's name) followed by a typed variadic of
AgentOption[State]. This:

  * fixes the phantom-State hole: a State mismatch on any agent option
    now fails at compile time instead of panicking at registration;
  * subsumes DefinePromptAgent, which is deleted;
  * lets State be inferred from the typed agent options in the common
    case, so the explicit [State] type arg can be dropped from
    DefineAgent calls that pass a typed option;
  * lets the agent-vs-prompt namespace invariant (they share a name)
    become a structural property of the API rather than a convention.

The mixed-variadic AgentDefineOption marker on every ai.PromptOption
interface is now unused and removed (both the ai package marker and
its exp package generic wrapper).

Migrate the three samples and five DefinePromptAgent test call sites.
The State-mismatch panic test is obsolete (the mismatch is a compile
error now) and is deleted.
@apascal07 apascal07 changed the title feat(go/ai/exp): add DefineAgent, DefinePromptAgent, and DefineCustomAgent feat(go/ai/exp): add DefineAgent and DefineCustomAgent May 12, 2026
apascal07 added 9 commits May 12, 2026 17:07
AgentInput.messages was an array but per-turn input is conceptually
one message. Renamed to AgentInput.message (single, optional) across
the schema and the Go runtime; AgentConnection.SendMessages becomes
SendMessage. The DefineAgent loop now rejects inputs whose role is
not "user" or whose content carries tool request / response parts —
those belong on AgentInput.Resume, not on a turn message.
…utdown

Send methods on Responder previously did a bare channel send into
the chunk router. If the router was pinned on a downstream send to
a slow/gone consumer when workCtx cancelled, fn blocked on the send
until the runtime's terminal path called router.stopAndWait() to
flip the router into drain mode. It worked in practice — every
terminal path calls stopAndWait before awaiting fnDone — but tied
fn liveness to that invariant; a future terminal path that forgot
stopAndWait would deadlock with no test to catch it.

Plumb workCtx through Responder and select on it in every Send. A
Send issued after cancellation drops the chunk and returns
immediately regardless of router state. emitTurnEnd's internal
send moves to the same shape via a new router.sendChunk helper.

Companion change in handleFnDone: fn can now return with ctx.Err()
while the router is still pinned on r.out, so close(r.in) alone
wouldn't unblock it. Call router.stopAndWait() before close when
res.err != nil. The natural-completion path deliberately skips
this so a last in-flight chunk to a slow-but-alive consumer is
never trashed.

Public call sites are unchanged — resp.SendModelChunk(chunk) etc.
behave identically when ctx is alive.
…er cancels

Previously, breaking out of the BidiConnection.Receive iterator
called c.cancel(), terminating the entire bidi action. This was
modelled on Go's iter convention for "owning" iterators, but bidi
streams are not owned by their receive iterator: send and receive
are independent phases on a long-lived connection, and callers
routinely break receive to switch to sending. This is the same
shape as gRPC, WebSockets, NATS, and similar bidi protocols.

Drop the c.cancel() so break is purely an iterator exit. Lifecycle
is now controlled by ctx and Close, matching industry convention.

This eliminates the workaround AgentConnection needed: a drainer
goroutine that read the underlying iterator and buffered chunks
into a private channel so the user could break between turns
without killing the connection. With break-doesn't-cancel,
AgentConnection.Receive collapses to a direct delegation to
c.conn.Receive, and the chunks channel, chunkErr, initOnce, and
initReceiver all disappear. One fewer goroutine per agent
connection, less state, fewer moving parts.

Breaking change for non-agent BidiConnection callers that relied
on break-cancels-connection; they should switch to ctx
cancellation or Close. No such callers exist in the tree (all
existing bidi tests drain to completion).
Register the input schema via DefineSchemaFor and reference it by
name from chat.prompt instead of inlining the field list. Keeps
the prompt file in sync with the Go type.
SessionRunner.Result returned shallow-copied pointers: the slice
header for Artifacts was fresh but the *ai.Message and *Artifact
elements were shared with session state. A caller doing
result.Message.Content[0].Text = "..." would mutate the session's
actual message. handleFnDone then forwarded those same pointers
into AgentOutput, exposing the same vector to the framework's
caller.

Add jsonClone and cloneArtifacts helpers in session.go (alongside
the existing copySnapshot JSON-copy pattern) and use them in two
places:

  * SessionRunner.Result deep-copies the last message and all
    artifacts on its way out. fn-side mutations of the returned
    result no longer reach session state.

  * handleFnDone deep-copies again when assembling AgentOutput.
    Defense in depth: even if a custom fn constructs AgentResult
    manually with raw session pointers (bypassing Result), the
    framework's caller still gets an isolated copy.

Cost is small relative to the existing per-snapshot JSON copy:
just the last message and the artifact list, not the whole
history. Regression test asserts both invariants in one go.
- New go/ai/exp/localstore package with FileSessionStore and
  InMemorySessionStore (moved out of exp). Positioned for
  single-process / single-instance use (CLI tools, desktop apps,
  local services); production deployments should reach for a real
  database-backed store. Tests live alongside.
- exp keeps a private testInMemStore fixture used only by the
  agent's internal tests so they don't introduce an
  exp -> localstore -> exp import cycle.
- agent-custom, agent-prompt, and agent-inline now own SIGINT
  handling (signal.NotifyContext + ctx-aware select REPL with a
  readLines helper). genkit.Init's existing signal catch stays in
  place for dev-mode reflection-server cleanup; samples just need
  to propagate the cancellable ctx. Previously Ctrl+C was trapped
  but no one observed the cancellation, so the prompt spun in a
  tight loop. agent-inline switches to FileSessionStore as a demo.
Output() is the single "I'm done" call: it implicitly closes the input
side and drains any chunks the caller did not consume via Receive, then
blocks until the agent finalizes. The drain is required because the
underlying stream buffer is shallow; without it a producing fn would
block on chunk sends and never reach completion. Close remains public
and idempotent for defer-style cleanup and fire-and-forget patterns.

Simplifies Agent.Run accordingly: drop the explicit Close and Receive
drain loop, lean on Output for both. Updates the three agent samples to
the new pattern: drop defer Close, print the final snapshot ID and a
friendly goodbye on any user-initiated exit, and treat context.Canceled
as a clean exit rather than a fatal error.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

RFC: Session flows

4 participants