feat(go/ai/exp): add DefineAgent and DefineCustomAgent#4462
Conversation
Summary of ChangesHello @apascal07, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances the Genkit Go SDK by introducing a new Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This is a great pull request that introduces the SessionFlow feature and refactors the core Action type to support bidirectional streaming. The new functionality is well-structured, comes with comprehensive tests, and includes a helpful sample application. I've identified a critical race condition in the new BidiConnection implementation and a minor issue in the sample code that would prevent it from running. My comments provide suggestions to address these points.
I am having trouble creating individual review comments. Click here to see my feedback.
go/core/action.go (534-550)
This Send implementation has a race condition that can cause a panic. The mutex is unlocked on line 540 before the channel send on line 543. If another goroutine calls Close() in between, c.inputCh will be closed, and the send will panic.
A robust way to fix this is to use recover to handle the "send on closed channel" panic, which is a common pattern in Go for this scenario. This avoids holding a lock over a potentially blocking operation.
Here's a suggested safer implementation for Send that removes the racy mutex usage. The Close method's use of the mutex remains important to make it safe for concurrent calls.
func (c *BidiConnection[In, Out, Stream]) Send(input In) (err error) {
defer func() {
if r := recover(); r != nil {
// This recovers from a panic that occurs when sending on a closed channel.
err = NewError(FAILED_PRECONDITION, "connection is closed")
}
}()
select {
case c.inputCh <- input:
return nil
case <-c.ctx.Done():
return c.ctx.Err()
case <-c.doneCh:
// The recover will handle a panic if doneCh and inputCh close concurrently.
return NewError(FAILED_PRECONDITION, "action has completed")
}
}
go/samples/basic-session-flow/main.go (49-53)
The model name googleai/gemini-3-flash-preview appears to be incorrect and will likely cause the sample to fail at runtime. Please use a valid model name, for example googleai/gemini-1.5-flash-latest.
ai.WithModel(googlegenai.ModelRef("googleai/gemini-1.5-flash-latest", &genai.GenerateContentConfig{
ThinkingConfig: &genai.ThinkingConfig{
ThinkingBudget: genai.Ptr[int32](0),
},
})),
SessionFlow and relatedDefineSessionFlow
DefineSessionFlowDefineCustomAgent and DefinePromptAgent
# Conflicts: # go/internal/base/json.go
DefineSessionFlow and DefineSessionFlowFromPromptDefineAgent, DefinePromptAgent, and DefineCustomAgent
…me, generator type-params
* `core.GenkitError` gains `MarshalJSON`/`UnmarshalJSON` so it serializes
as the canonical `{status, message, details}` wire shape, plus an
`AsGenkitError` helper that wraps non-GenkitError values with status
`INTERNAL`. Snapshot-related types now hold `*core.GenkitError`
directly instead of a string.
* `SnapshotStatus` renamed to `pending`, `succeeded`, `aborted`, `failed`
(was `pending`, `complete`, `canceled`, `error`). All Go consts,
doc, and test strings updated.
* `INTERNAL` `StatusName` value changed from `"INTERNAL_SERVER_ERROR"`
to the canonical `"INTERNAL"` so Go matches the JSON schema and JS
runtime. The googlegenai workaround is dropped.
* `AgentInput.toolRestarts` replaced with a structured `Resume` field.
In Go this is `*exp.ToolResume` (`{Respond, Restart}`); the resume
schema is inlined in the JS schema and renamed via schemas.config.
* `SessionSnapshot` is now hand-written in `go/ai/exp/session.go` and
removed from the JS schema (it's runtime-internal). State on
snapshots becomes a pointer so pending snapshots can hold nil.
* `AgentMetadataStateManagement` renamed to `AgentStateManagement`,
`AgentSession` renamed to `SessionRunner`.
* `jsonschemagen` learns to auto-forward type parameters on field
references: a ref to a type with `typeparams [State any]` now
resolves to `*Foo[State]` automatically. The previously-omitted
generic snapshot types are now generated through schemas.config.
Note: API surface changes here are intentional; the `exp` package is
explicitly experimental.
Auto-formats from `go fmt ./...` and `prettier --write` after the previous commit; no behavior changes.
Picks up the recent JSON-schema renames (AgentStateManagement, SnapshotStatus succeeded/aborted/failed), the inlined AgentInput.resume shape, and the removal of the SessionSnapshot wire schema.
Both `AgentInput.resume` and `GenerateActionOptions.resume` are inline object schemas that the Python generator collapses into a single `Resume` class. The previous "first one wins" rule picked the AgentInput shape (no `metadata`) and broke `_ai/_generate.py` and `_ai/_prompt.py`, which rely on `Resume.metadata` for the GenerateActionOptions case. Keep the schema with the larger property set so the generated dataclass captures the superset.
…, slim AbortSnapshot
- AgentInit: reject state-with-server-managed-agent and snapshotId-with-client-managed-agent
in addition to both-set; add coverage in TestLoadSession_AgentInitValidation.
- SessionSnapshot promoted to a shared schema (Zod + JSON schema + Python + Go gen);
the hand-written Go struct in session.go is gone, and the error field validates the
canonical {status, message, details} wire shape.
- AbortSnapshot now returns (SnapshotStatus, error). SnapshotMetadata and the
snapshotMetadata projection helper are removed (Zod, JSON schema, Python, Go).
Empty status + nil error continues to signal "not found"; the abort companion
action maps that to core.NOT_FOUND, now exercised by TestAgent_AbortAction_NotFound.
- Companion actions (getSnapshot, abortSnapshot) are no longer registered when
the agent has no SessionStore configured. Updated TestAgent_GetSnapshotAction_NoStore
to verify neither action is registered in the client-managed case.
…ic Error The inline SessionSnapshot.error shape (status, message, details) collides with OperationSchema.error's inline object in Python codegen — both synthesize to the same `Error` Pydantic class, and the codegen picks the superset. Marking status required broke veo.py, which constructs Error from a third-party API response that has no canonical status. Go side still gets *core.GenkitError via the schemas.config override, so the Genkit wire format is unaffected. The user-facing schema will be named and tightened in a follow-up; relaxing status now matches the real-world construction sites until then.
PromptIn provided no real type safety: ai.Prompt.Render takes `any`, so the constrained defaultInput erased to any immediately and the prompt did its own schema validation at runtime. Every call site wrote `DefinePromptAgent[State, any]` and PromptIn was pure noise. Replace it with a Define-time smoke render so a defaultInput that fails the prompt's input schema panics here rather than failing on the first invocation. Same safety guarantee at a meaningful boundary, simpler API. Call sites lose one type argument: - DefinePromptAgent[testState, any](...) → DefinePromptAgent[testState](...) - DefinePromptAgent[any](...) → DefinePromptAgent(...) (State inferred)
…tion marker Sample reorg: - custom-agent → agent-custom; prompt-agent → agent-prompt. - New agent-inline sample uses DefineAgent with an inline prompt (mixed variadic of ai prompt options + aix agent options). Marker fix: the documented "DefineAgent accepts ai.PromptOption + aix.AgentOption" mixed variadic never compiled. The marker method isAgentDefineOption() was unexported and declared separately in both ai and ai/exp packages — Go's package-scoping rule treated them as different methods, so ai option types never actually satisfied exp.AgentDefineOption[State]. The bug went unnoticed because no test ever called DefineAgent with an ai.WithX option (the only call site passed aix.WithSessionStore). Fix: introduce ai.AgentDefineOption (exported marker, package ai) embedded in each prompt-definition option interface (ConfigOption, CommonGenOption, InputOption, PromptOption, PromptingOption, OutputOption). exp.AgentDefineOption[State] now embeds ai.AgentDefineOption; aix.AgentOption[State] inherits the marker through the chain. Method renamed isAgentDefineOption → IsAgentDefineOption on every implementing struct (ai prompt option structs and exp.agentOptions[State]).
DefineAgent's mixed variadic accepts AgentDefineOption[State] where State is phantom on the marker, so a typed AgentOption[Wrong] passed to DefineAgent[Right] satisfies the variadic at compile time and is only caught at runtime via type-assertion-then-panic. Existing test covered WithSessionStore; the same runtime path applies to WithSnapshotCallback, WithSnapshotOn, and WithStateTransform. Table-drive the test across all four so the runtime backstop is exercised wherever it matters. DefineCustomAgent and DefinePromptAgent both declare opts ...AgentOption[State] directly, so they reject State mismatches at compile time and need no runtime coverage.
StateTransform now takes and returns *SessionState[State], matching the rest of the API (Session.State, SnapshotContext, AgentOutput, and SessionSnapshot all use the pointer). This enables nil-return for "omit state from the response", drops a redundant struct copy in applyTransform, and lets the doc describe ownership plainly. Also drops a now-unnecessary explicit [State] arg on WithSnapshotCallback inside WithSnapshotOn, since the callback parameter pins down State on its own.
DefineAgent now takes a typed AgentSource as its third arg (either
FromInline for inline prompt options or FromPrompt for an existing
prompt looked up by the agent's name) followed by a typed variadic of
AgentOption[State]. This:
* fixes the phantom-State hole: a State mismatch on any agent option
now fails at compile time instead of panicking at registration;
* subsumes DefinePromptAgent, which is deleted;
* lets State be inferred from the typed agent options in the common
case, so the explicit [State] type arg can be dropped from
DefineAgent calls that pass a typed option;
* lets the agent-vs-prompt namespace invariant (they share a name)
become a structural property of the API rather than a convention.
The mixed-variadic AgentDefineOption marker on every ai.PromptOption
interface is now unused and removed (both the ai package marker and
its exp package generic wrapper).
Migrate the three samples and five DefinePromptAgent test call sites.
The State-mismatch panic test is obsolete (the mismatch is a compile
error now) and is deleted.
DefineAgent, DefinePromptAgent, and DefineCustomAgentDefineAgent and DefineCustomAgent
AgentInput.messages was an array but per-turn input is conceptually one message. Renamed to AgentInput.message (single, optional) across the schema and the Go runtime; AgentConnection.SendMessages becomes SendMessage. The DefineAgent loop now rejects inputs whose role is not "user" or whose content carries tool request / response parts — those belong on AgentInput.Resume, not on a turn message.
…utdown Send methods on Responder previously did a bare channel send into the chunk router. If the router was pinned on a downstream send to a slow/gone consumer when workCtx cancelled, fn blocked on the send until the runtime's terminal path called router.stopAndWait() to flip the router into drain mode. It worked in practice — every terminal path calls stopAndWait before awaiting fnDone — but tied fn liveness to that invariant; a future terminal path that forgot stopAndWait would deadlock with no test to catch it. Plumb workCtx through Responder and select on it in every Send. A Send issued after cancellation drops the chunk and returns immediately regardless of router state. emitTurnEnd's internal send moves to the same shape via a new router.sendChunk helper. Companion change in handleFnDone: fn can now return with ctx.Err() while the router is still pinned on r.out, so close(r.in) alone wouldn't unblock it. Call router.stopAndWait() before close when res.err != nil. The natural-completion path deliberately skips this so a last in-flight chunk to a slow-but-alive consumer is never trashed. Public call sites are unchanged — resp.SendModelChunk(chunk) etc. behave identically when ctx is alive.
…er cancels Previously, breaking out of the BidiConnection.Receive iterator called c.cancel(), terminating the entire bidi action. This was modelled on Go's iter convention for "owning" iterators, but bidi streams are not owned by their receive iterator: send and receive are independent phases on a long-lived connection, and callers routinely break receive to switch to sending. This is the same shape as gRPC, WebSockets, NATS, and similar bidi protocols. Drop the c.cancel() so break is purely an iterator exit. Lifecycle is now controlled by ctx and Close, matching industry convention. This eliminates the workaround AgentConnection needed: a drainer goroutine that read the underlying iterator and buffered chunks into a private channel so the user could break between turns without killing the connection. With break-doesn't-cancel, AgentConnection.Receive collapses to a direct delegation to c.conn.Receive, and the chunks channel, chunkErr, initOnce, and initReceiver all disappear. One fewer goroutine per agent connection, less state, fewer moving parts. Breaking change for non-agent BidiConnection callers that relied on break-cancels-connection; they should switch to ctx cancellation or Close. No such callers exist in the tree (all existing bidi tests drain to completion).
Register the input schema via DefineSchemaFor and reference it by name from chat.prompt instead of inlining the field list. Keeps the prompt file in sync with the Go type.
SessionRunner.Result returned shallow-copied pointers: the slice
header for Artifacts was fresh but the *ai.Message and *Artifact
elements were shared with session state. A caller doing
result.Message.Content[0].Text = "..." would mutate the session's
actual message. handleFnDone then forwarded those same pointers
into AgentOutput, exposing the same vector to the framework's
caller.
Add jsonClone and cloneArtifacts helpers in session.go (alongside
the existing copySnapshot JSON-copy pattern) and use them in two
places:
* SessionRunner.Result deep-copies the last message and all
artifacts on its way out. fn-side mutations of the returned
result no longer reach session state.
* handleFnDone deep-copies again when assembling AgentOutput.
Defense in depth: even if a custom fn constructs AgentResult
manually with raw session pointers (bypassing Result), the
framework's caller still gets an isolated copy.
Cost is small relative to the existing per-snapshot JSON copy:
just the last message and the artifact list, not the whole
history. Regression test asserts both invariants in one go.
- New go/ai/exp/localstore package with FileSessionStore and InMemorySessionStore (moved out of exp). Positioned for single-process / single-instance use (CLI tools, desktop apps, local services); production deployments should reach for a real database-backed store. Tests live alongside. - exp keeps a private testInMemStore fixture used only by the agent's internal tests so they don't introduce an exp -> localstore -> exp import cycle. - agent-custom, agent-prompt, and agent-inline now own SIGINT handling (signal.NotifyContext + ctx-aware select REPL with a readLines helper). genkit.Init's existing signal catch stays in place for dev-mode reflection-server cleanup; samples just need to propagate the cancellable ctx. Previously Ctrl+C was trapped but no one observed the cancellation, so the prompt spun in a tight loop. agent-inline switches to FileSessionStore as a demo.
Output() is the single "I'm done" call: it implicitly closes the input side and drains any chunks the caller did not consume via Receive, then blocks until the agent finalizes. The drain is required because the underlying stream buffer is shallow; without it a producing fn would block on chunk sends and never reach completion. Close remains public and idempotent for defer-style cleanup and fire-and-forget patterns. Simplifies Agent.Run accordingly: drop the explicit Close and Receive drain loop, lean on Output for both. Updates the three agent samples to the new pattern: drop defer Close, print the final snapshot ID and a friendly goodbye on any user-initiated exit, and treat context.Canceled as a clean exit rather than a fatal error.
Adds an experimental Agent API (
ai/exp) for multi-turn conversations with automatic snapshot management and optional background execution viaDetach, built on top of the bidirectional streaming primitives added upstream.Docs PR: genkit-ai/docsite#238
Examples
Inline-prompt Agent
DefineAgentis the default path for prompt-backed agents. The third argument is anaix.AgentSourcethat selects how the prompt is configured:aix.FromInlinedefines it inline alongside the agent (the prompt is registered under the agent's name),aix.FromPromptreferences an existing prompt already in the registry under the same name. Agent-level options (store, snapshot policy, state transform) follow as a typed variadic.The
Statetype parameter is inferred from the typed agent options (aix.WithSessionStore,aix.WithSnapshotOn, etc.), so the explicitDefineAgent[State]is only needed when no typed option is supplied. A typedaix.AgentOption[OtherState]accidentally passed alongside anAgentOption[State]is a compile-time error.Existing-prompt Agent
aix.FromPromptattaches the agent to a prompt already registered with the registry (viaDefinePromptor a.promptfile). The agent and its backing prompt always share a name.defaultInputis rendered through the prompt on every turn; the prompt'sRenderis invoked once at definition time as a smoke check, so adefaultInputthat fails the prompt's input schema panics here rather than failing on the first invocation. For prompts that take no input, callaix.FromPrompt()with no arguments.Custom Agent
DefineCustomAgenthands you the turn loop. Use it when you need control before or after the generate call (set up expensive clients, route through different models per turn, clean up in-progress state before returning the final outcome).sess.Result()returns anAgentResultwith the last message from the conversation history and all artifacts. If you need to control what gets sent back to the client (e.g. returning only artifacts without a message, or omitting certain artifacts), you can construct the result directly:DefineCustomAgenttakes the same...AgentOption[State]variadic asDefineAgent, so the same compile-time State guarantee applies.Invocation patterns
Agentexposes the same single-turn / multi-turn / streaming surface regardless of whichDefine*Agentproduced it.For single-turn usage,
RunandRunTexthandle the connection lifecycle:For multi-turn conversations with streaming, the client drives the conversation by sending messages and iterating chunks until
TurnEnd:Snapshots & Resumption
Configure automatic snapshot persistence with a store and optional callback:
Stateis inferred asMyStatefromWithSessionStore(store); the explicit[MyState]onWithSnapshotOnis still needed because itsStatedoes not appear in any argument.Resume a conversation from a server-stored snapshot:
Or resume from client-kept state (no server store needed):
AgentInitenforces that the resumption mode matches the agent's state management: passingWithStateto a server-managed agent (one with aSessionStore) orWithSnapshotIDto a client-managed agent is rejected withFAILED_PRECONDITION. Passing both is rejected withINVALID_ARGUMENT. Passing neither starts a fresh invocation.Background Agents
AgentConnection.Detachends the input stream and asks the server to take ownership of the rest of the work. The connection closes promptly with a pending snapshot ID the client can use later to poll, fetch results, or abort. Any inputs queued behind the in-flight one continue draining through the runner on a context decoupled from the client's.For single-turn use,
Runalready takes anAgentInputwhoseDetachfield is the same wire bit, so detached single-turn with a final payload is just:Local Go callers poll, fetch results, and abort via the store reference already in hand from
WithSessionStore. No indirection through the agent:For Dev UI and non-Go clients, an agent configured with a
SessionStoreregistersgetSnapshotandabortSnapshotcompanion actions (abort is registered only when the store also implementsSnapshotAborter, so the reflected surface matches actual capabilities). Client-managed agents (no store) register neither. See Companion Actions below.Once a snapshot has finalized to
succeeded, resume it like any other snapshot. Pending, aborted, and failed snapshots are rejected withFAILED_PRECONDITION:To redact PII or strip secrets on the way out to a client, register a
StateTransform. The transform runs on getSnapshot responses and on client-managedAgentOutput.State; the raw state is what gets persisted and what the user fn sees:The transform receives a fresh deep copy and owns the returned pointer; mutate in place, return a new value, or return
nilto omit state from the response entirely.Lifecycle. Detach is observed by an eager intake reader the moment it arrives in the input channel, regardless of what the runner is processing. The server suspends future turn-end snapshots, writes a single pending snapshot (empty state placeholder, chained off the most recent prior snapshot), returns that ID over the wire, and closes the connection. When
fnreturns, the same snapshot row is rewritten in place with one of three statuses (succeeded,failedwithErrorset, oraborted) and the cumulative final session state. The snapshot ID never changes. Side effects on session state (sess.AddMessages,resp.SendArtifact, etc.) keep applying through the queued turns so user code does not have to branch on detach.Abort.
store.AbortSnapshot(id)(or theabortSnapshotaction) atomically flips pending -> aborted and returns the resulting status. The runtime subscribes viaSnapshotAborter.OnSnapshotStatusChange, so the abort is observed by push rather than polling:fn's context is cancelled as soon as the store publishes the status change. The finalizer rechecks status before writing terminal state, so a late abort wins over asucceededthat was about to land.Pre-conditions. Detach requires a store that implements
SnapshotAborter. Stores missing it are rejected at detach time withFAILED_PRECONDITION; the agent can still run synchronously against a minimalSessionStore.Custom Session State
The
Statetype parameter lets you maintain typed state across turns:Custom state is included in snapshots and available when resuming.
API Reference
Agent API (
ai/exp— experimental)Define
Two entrypoints;
DefineAgentcovers prompt-backed agents,DefineCustomAgentis the escape hatch for custom turn loops.AgentSource
AgentOption[State]
AgentFunc
Agent[Stream, State]
InvocationOption[State]
Configures an agent invocation (
StreamBidi,Run, orRunText). The two options are mutually exclusive at option-application time, and the choice must match the agent's state management at invocation time (seeAgentInitbelow).AgentConnection[Stream, State]
Unlike
BidiConnection, breaking fromReceive()does not cancel the connection — enabling multi-turn patterns.SessionRunner[State]
Extends
Session[State]with turn management. Passed as thesessparameter toAgentFunc.Session[State]
Thread-safe conversation state. Available via
SessionRunnerembedding orSessionFromContext.Responder[Stream]
Output channel with convenience methods. Artifacts sent here are auto-added to the session.
Wire Types
Snapshot System
Snapshots are automatically deduplicated within an invocation: if no state mutations occur between two snapshot checkpoints, the second snapshot is skipped. This commonly happens at the end of a single-turn
Run/RunTextcall, where the turn-end snapshot already captured the final state and the invocation-end snapshot would be identical. When an invocation-end snapshot is skipped, the output'sSnapshotIDfalls back to the last snapshot that was created (typically the turn-end snapshot). If theAgentFuncmutates state aftersess.Run()returns, the invocation-end snapshot fires normally.Store interfaces split by capability. The minimum to use
WithSessionStoreis reader + writer; detach support layers onSnapshotAborter:The atomic
SaveSnapshotshape composes cleanly with SQL (SELECT FOR UPDATEin a txn), Firestore (RunTransaction(fn)), DynamoDB (optimistic concurrency on aversionattribute), or any K/V store with a CAS primitive. It also folds the finalize path's read-then-rewrite into one step so a late abort cannot be clobbered by the terminal write.Companion Actions
An agent configured with a
SessionStoreauto-registers companion actions so Dev UI and non-Go clients can observe and control detached invocations without reaching into the store directly. Client-managed agents (no store) register neither. Local Go callers use the store reference they already have fromWithSessionStore.Each agent's own action descriptor (registered as
ActionTypeAgent="agent") also carries anAgentMetadatavalue undermetadata["agent"]so reflective callers can shape the surface (e.g. hide the Abort button when the store cannot support it) without round-tripping through the reflection API:AgentMetadataand the request/response schemas are defined once ingenkit-tools/common/src/types/agent.tsand shared across language clients;AgentMetadatais generated intogo/ai/exp/gen.go. The companion-action request/response types take a Go type parameter and are hand-written in Go.Supporting core additions
Two
go/corechanges the design relies on:BidiConnection.Output()gains a fast path that returns the already-settled output even when the connection context was cancelled concurrently. This is what makesconn.Output()afterDetach()return the pending snapshot ID instead ofctx.Err()on the client side when the server closes the wire first.Known Issues
StreamBidicreates a trace span immediately when the connection is established. If the connection is closed without sending any messages (zero turns), an empty single-span trace is still emitted. This is cosmetic -- the trace contains no useful data (just a snapshot ID in the output). A future change could defer span creation until the first input arrives.Stream()andStreamText()convenience methods:AgenthasRun()andRunText()for single-turn non-streaming usage, but lacks correspondingStream()andStreamText()methods that would return an iterator of stream chunks for single-turn streaming. Currently, single-turn streaming requires usingStreamBididirectly (open connection, send, close, iterateReceive). Adding these would improve DX by matching the pattern established bycore.Flow.Stream().failed to send requestline and the trace in the dev tools may be incomplete or missing. This is OTel exporter behavior surfaced by ctx cancellation, not specific to the agent API. A future change could install a custom OTel error handler that filters shutdown-time export failures.