diff --git a/docs/go-agent-design.md b/docs/go-agent-design.md new file mode 100644 index 0000000000..7a6a8fe509 --- /dev/null +++ b/docs/go-agent-design.md @@ -0,0 +1,398 @@ +# Genkit Go Agent Abstraction - Design Document + +## Overview + +This document describes the design for the `DefineAgent` API in Genkit Go. An Agent is a higher-level abstraction over SessionFlow that provides automatic passthrough to the Generate API, eliminating boilerplate conversation loop code. + +An Agent: +- Is created via `DefineAgent` +- Returns a `*SessionFlow` as the underlying primitive +- Automatically handles the generate-respond loop for each turn +- Supports configuration via options or by inheriting from a Prompt + +This design builds on SessionFlow (as described in [go-session-flow-design.md](go-session-flow-design.md)). + +## Package Location + +Agent lives alongside SessionFlow in `go/ai/x/` (experimental). Import as `aix "github.com/firebase/genkit/go/ai/x"`. + +--- + +## 1. API Surface + +### 1.1 DefineAgent + +```go +// DefineAgent creates an Agent and registers it as a SessionFlow. +// Type parameters: +// - Stream: Type for status updates (for subagent compatibility) +// - State: Type for user-defined state (accessible by tools via context) +func DefineAgent[Stream, State any](r api.Registry, name string, opts ...AgentOption) *SessionFlow[Stream, State] +``` + +### 1.2 AgentOption + +```go +// AgentOption configures an Agent. Non-generic for clean usage. +type AgentOption interface { + applyAgent(*agentOptions) error +} +``` + +--- + +## 2. Available Options + +### Generate API Options (non-generic) + +Standard Generate API options, applied to each turn: + +`WithModel`, `WithModelName`, `WithConfig`, `WithTools`, `WithToolChoice`, `WithMaxTurns`, `WithResources`, `WithDocs`, `WithTextDocs`, `WithOutputType`, `WithOutputSchema`, `WithOutputFormat`, `WithPrompt`, `WithMessages` + +### System Prompt Option (non-generic) + +`WithSystem(text string, args ...any)` - Sets the system prompt text. + +### Snapshot Options (shared with SessionFlow) + +These options are shared between `DefineAgent` and `DefineSessionFlow`. The concrete return type implements both `AgentOption` and `SessionFlowOption[State]`. + +```go +// Returns *snapshotStoreOption[State] which implements both interfaces +func WithSnapshotStore[State any](store SnapshotStore[State]) *snapshotStoreOption[State] +func WithSnapshotCallback[State any](cb SnapshotCallback[State]) *snapshotCallbackOption[State] +``` + +### Base Prompt Option (generic) + +```go +// PromptRenderer is satisfied by both Prompt (with In=any) and *DataPrompt[In, Out]. +// This enables type-safe input when using DataPrompt. +type PromptRenderer[In any] interface { + Render(ctx context.Context, input In) (*GenerateActionOptions, error) +} + +// Configure agent using a Prompt's settings as defaults. +// Stores a closure that DefineAgent calls once to extract all settings. +// +// The input parameter provides values for {{variable}} style templates. +// Pass nil when the prompt has no input variables. +// +// When p is a *DataPrompt[In, Out], the input type is enforced at compile time. +// When p is a Prompt, input is any. +func WithBasePrompt[In any](p PromptRenderer[In], input In) AgentOption + +// Implementation stores a render closure to handle generic type erasure: +// +// func WithBasePrompt[In any](p PromptRenderer[In], input In) AgentOption { +// return &basePromptOption{ +// render: func(ctx context.Context) (*GenerateActionOptions, error) { +// return p.Render(ctx, input) +// }, +// } +// } +``` + +### Options NOT Included + +| Option | Reason | +|--------|--------| +| `WithStreaming` | Handled internally | +| `WithReturnToolRequests`, `WithToolResponses/Restarts` | Agent handles tools automatically | + +--- + +## 3. Prompt Integration + +### What WithBasePrompt Extracts + +When `WithBasePrompt` is called, `prompt.Render(ctx, input)` is invoked once to extract: + +- Model, Config, Tools, ToolChoice, MaxTurns, OutputSchema, OutputFormat +- System message(s) → stored as `SystemText` +- Non-system messages (user/model) → stored as `InitialMessages` + +All settings are static after definition. Template variables (`{{variable}}`) are substituted at definition time using the provided input. + +```go +// DataPrompt with typed input - compile-time type safety +rolePrompt := genkit.DefineDataPrompt[RoleInput, string](g, "roleAgent", + ai.WithSystem("You are a {{role}} assistant."), +) + +// Input type is enforced by the compiler +agent := genkit.DefineAgent[Status, State](g, "roleAgent", + aix.WithBasePrompt(rolePrompt, RoleInput{Role: "coding"}), // ✓ compiles + // aix.WithBasePrompt(rolePrompt, WrongType{}), // ✗ compile error +) +``` + +### Option Precedence + +`WithBasePrompt` settings are always applied first as defaults. Other options override: + +```go +myAgent := genkit.DefineAgent[Status, State](g, "myAgent", + aix.WithBasePrompt(myPrompt, nil), // Applied first (defaults) + aix.WithModelName("googleai/gemini-2.5-pro"), // Overrides prompt's model +) +``` + +--- + +## 4. Internal Implementation + +### agentOptions Structure + +```go +type agentOptions struct { + // Generate API options (set directly or extracted from basePromptRender) + Model ai.ModelArg + Config any + Tools []ai.ToolRef + ToolChoice ai.ToolChoice + MaxTurns int + Resources []ai.Resource + Documents []*ai.Document + OutputSchema map[string]any + OutputFormat string + InitialMessages []*ai.Message // Non-system messages, added to session once at start + SystemText string // System prompt text, passed to Generate each turn + + // Base prompt render closure (from WithBasePrompt, called once by DefineAgent) + basePromptRender func(ctx context.Context) (*ai.GenerateActionOptions, error) + + // Snapshot options + snapshotStore any + snapshotCallback any +} +``` + +### DefineAgent Implementation + +```go +func DefineAgent[Stream, State any](r api.Registry, name string, opts ...AgentOption) *SessionFlow[Stream, State] { + agentOpts := &agentOptions{} + for _, opt := range opts { + opt.applyAgent(agentOpts) + } + + // If WithBasePrompt was used, call the render closure to extract settings + if agentOpts.basePromptRender != nil { + rendered, err := agentOpts.basePromptRender(context.Background()) + if err != nil { + panic(fmt.Errorf("failed to render base prompt: %w", err)) + } + + // Extract settings (only if not already set by other options) + if agentOpts.Model == nil && rendered.Model != "" { + agentOpts.Model = ai.ModelName(rendered.Model) + } + if agentOpts.Config == nil { + agentOpts.Config = rendered.Config + } + // ... extract tools, toolChoice, maxTurns, outputSchema, outputFormat ... + + // Separate system from non-system messages + for _, msg := range rendered.Messages { + if msg.Role == ai.RoleSystem { + if agentOpts.SystemText == "" { + agentOpts.SystemText = msg.Text() + } + } else { + agentOpts.InitialMessages = append(agentOpts.InitialMessages, msg) + } + } + } + + // Create SessionFlow with automatic generate loop + fn := func(ctx context.Context, resp *Responder[Stream], params *SessionFlowParams[Stream, State]) error { + // Add initial messages to session once at start + if len(agentOpts.InitialMessages) > 0 { + params.Session.AddMessages(agentOpts.InitialMessages...) + } + + return params.Session.Run(ctx, func(ctx context.Context, input *SessionFlowInput) error { + genOpts := []ai.GenerateOption{ai.WithMessages(params.Session.Messages()...)} + + // Add configured options + if agentOpts.Model != nil { + genOpts = append(genOpts, ai.WithModel(agentOpts.Model)) + } + if agentOpts.Config != nil { + genOpts = append(genOpts, ai.WithConfig(agentOpts.Config)) + } + if len(agentOpts.Tools) > 0 { + genOpts = append(genOpts, ai.WithTools(agentOpts.Tools...)) + } + if len(agentOpts.OutputSchema) > 0 { + genOpts = append(genOpts, ai.WithOutputSchema(agentOpts.OutputSchema)) + } + if agentOpts.OutputFormat != "" { + genOpts = append(genOpts, ai.WithOutputFormat(agentOpts.OutputFormat)) + } + if agentOpts.SystemText != "" { + genOpts = append(genOpts, ai.WithSystem(agentOpts.SystemText)) + } + // ... other options ... + + // Stream generation + for result, err := range ai.GenerateStream(ctx, r, genOpts...) { + if err != nil { + return err + } + if result.Done { + params.Session.AddMessages(result.Response.Message) + } else { + resp.SendChunk(result.Chunk) + } + } + return nil + }) + } + + return DefineSessionFlow[Stream, State](r, name, fn, /* snapshot options */) +} +``` + +--- + +## 5. Example Usage + +### Basic Agent + +```go +chatAgent := genkit.DefineAgent[ChatStatus, ChatState](g, "chatAgent", + aix.WithModelName("googleai/gemini-3-flash-preview"), + aix.WithSystem("You are a helpful assistant."), + aix.WithTools(myTool), +) + +conn, _ := chatAgent.StreamBidi(ctx) +conn.SendText("Hello!") +for chunk, err := range conn.Receive() { + if chunk.Chunk != nil { + fmt.Print(chunk.Chunk.Text()) + } + if chunk.EndTurn { + break + } +} +conn.Close() +``` + +### Agent from Prompt + +```go +assistantPrompt := genkit.DefinePrompt(g, "assistant", + ai.WithModelName("googleai/gemini-3-flash-preview"), + ai.WithSystem("You are a helpful coding assistant."), + ai.WithTools(searchTool, calculatorTool), +) + +assistantAgent := genkit.DefineAgent[Status, State](g, "assistant", + aix.WithBasePrompt(assistantPrompt, nil), +) +``` + +### Agent with Initial Context + +```go +// Initial messages are added to session once at start, then user inputs append +tutorAgent := genkit.DefineAgent[Status, State](g, "tutorAgent", + aix.WithModelName("googleai/gemini-3-flash-preview"), + aix.WithSystem("You are a coding tutor."), + aix.WithMessages( + ai.NewUserTextMessage("I'm learning Go and want to understand concurrency."), + ai.NewModelTextMessage("Great! Go's concurrency model is one of its strengths. What would you like to start with?"), + ), +) +``` + +### Agent with Structured Output + +```go +type TaskResponse struct { + Task string `json:"task"` + Steps []string `json:"steps"` + Priority string `json:"priority"` +} + +taskAgent := genkit.DefineAgent[Status, State](g, "taskAgent", + aix.WithModelName("googleai/gemini-3-flash-preview"), + aix.WithSystem("You are a task planning assistant. Break down tasks into steps."), + aix.WithOutputType(TaskResponse{}), +) +``` + +### Agent with Snapshots + +```go +store := aix.NewInMemorySnapshotStore[ChatState]() + +chatAgent := genkit.DefineAgent[ChatStatus, ChatState](g, "chatAgent", + aix.WithModelName("googleai/gemini-3-flash-preview"), + aix.WithSystem("You are a helpful assistant."), + aix.WithSnapshotStore(store), +) +``` + +--- + +## 6. Design Decisions + +### Why Return SessionFlow? + +Returning `*SessionFlow` directly avoids API duplication and makes clear that Agent IS a SessionFlow with automatic behavior. + +### Why Non-Generic Options? + +Most options don't involve `State`, so generic options would add verbosity. Only snapshot options need type parameters (usually inferred from arguments). + +### How Snapshot Options Work With Both Agent and SessionFlow + +Snapshot options return a concrete generic type that implements both interfaces: + +```go +type snapshotStoreOption[State any] struct { + store SnapshotStore[State] +} + +func (o *snapshotStoreOption[State]) applySessionFlow(opts *sessionFlowOptions[State]) error { + opts.SnapshotStore = o.store + return nil +} + +func (o *snapshotStoreOption[State]) applyAgent(opts *agentOptions) error { + opts.snapshotStore = o.store // stored as any + return nil +} + +func WithSnapshotStore[State any](store SnapshotStore[State]) *snapshotStoreOption[State] { + return &snapshotStoreOption[State]{store: store} +} +``` + +Go's structural typing allows `*snapshotStoreOption[State]` to satisfy both `SessionFlowOption[State]` and `AgentOption`. + +### Why Does WithBasePrompt Store a Closure? + +`WithBasePrompt[In any]` is generic, but `agentOptions` is not. We can't store the prompt directly because Go doesn't allow type assertions like `prompt.(PromptRenderer[any])` when the actual type is `*DataPrompt[MyInput, Out]`. The closure captures the concrete generic types at option creation time, allowing `DefineAgent` to call it without knowing the types. + +--- + +## 7. Files to Create/Modify + +| File | Description | +|------|-------------| +| `go/ai/x/agent.go` | DefineAgent function | +| `go/ai/x/agent_options.go` | AgentOption interface and agent-specific options | +| `go/ai/x/session_flow_options.go` | Update snapshot options to return concrete types and implement AgentOption | + +--- + +## 8. Future Considerations + +Out of scope for this design: +- Per-turn system prompt rendering with `{{@state.field}}` support +- Subagents, agent hooks, agent composition, agent routing diff --git a/docs/go-bidi-design.md b/docs/go-bidi-design.md new file mode 100644 index 0000000000..c1551c4c13 --- /dev/null +++ b/docs/go-bidi-design.md @@ -0,0 +1,588 @@ +# Genkit Go Bidirectional Streaming Features - Design Document + +## Overview + +This document describes the design for bidirectional streaming features in Genkit Go. The implementation introduces three new primitives: + +1. **BidiAction** - Core primitive for bidirectional operations (`go/core/x`) +2. **BidiFlow** - BidiAction with observability, intended for user definition (`go/core/x`) +3. **BidiModel** - Specialized bidi action for real-time LLM APIs (`go/ai/x`) + +For stateful multi-turn agents with session persistence, see [go-agent-design.md](go-agent-design.md). + +## Package Location + +``` +go/core/x/ +├── bidi.go # BidiAction, BidiFunc, BidiConnection +├── bidi_flow.go # BidiFlow +├── bidi_options.go # Options +├── bidi_test.go # Tests + +go/ai/x/ +├── bidi_model.go # BidiModel, BidiModelFunc +├── bidi_model_test.go +``` + +Import as: +- `corex "github.com/firebase/genkit/go/core/x"` +- `aix "github.com/firebase/genkit/go/ai/x"` + +--- + +## 1. Core Type Definitions + +### 1.1 BidiAction + +```go +// BidiAction represents a bidirectional streaming action. +// Type parameters: +// - Init: Type of initialization data (use struct{} if not needed) +// - In: Type of each message sent to the action +// - Out: Type of the final output +// - Stream: Type of each streamed output chunk +type BidiAction[Init, In, Out, Stream any] struct { + name string + fn BidiFunc[Init, In, Out, Stream] + registry api.Registry + desc *api.ActionDesc +} + +// BidiFunc is the function signature for bidi actions. +type BidiFunc[Init, In, Out, Stream any] func( + ctx context.Context, + init Init, + inCh <-chan In, + outCh chan<- Stream, +) (Out, error) +``` + +### 1.2 BidiConnection + +```go +// BidiConnection represents an active bidirectional streaming session. +type BidiConnection[In, Out, Stream any] struct { + inputCh chan In // Internal, accessed via Send() + streamCh chan Stream // Internal output stream channel + doneCh chan struct{} // Closed when action completes + output Out // Final output (valid after done) + err error // Error if any (valid after done) + ctx context.Context + cancel context.CancelFunc + span tracing.Span // Trace span, ended on completion + mu sync.Mutex + closed bool +} + +// Send sends an input message to the bidi action. +func (c *BidiConnection[In, Out, Stream]) Send(input In) error + +// Close signals that no more inputs will be sent. +func (c *BidiConnection[In, Out, Stream]) Close() error + +// Receive returns an iterator for receiving streamed response chunks. +// The iterator completes when the action finishes or signals end of turn. +func (c *BidiConnection[In, Out, Stream]) Receive() iter.Seq2[Stream, error] + +// Output returns the final output after the action completes. +// Blocks until done or context cancelled. +func (c *BidiConnection[In, Out, Stream]) Output() (Out, error) + +// Done returns a channel closed when the connection completes. +func (c *BidiConnection[In, Out, Stream]) Done() <-chan struct{} +``` + +### 1.3 BidiFlow + +```go +type BidiFlow[Init, In, Out, Stream any] struct { + *BidiAction[Init, In, Out, Stream] +} +``` + +--- + +## 2. BidiModel + +### 2.1 Overview + +`BidiModel` is a specialized bidi action for real-time LLM APIs like Gemini Live and OpenAI Realtime. These APIs establish a persistent connection where configuration (temperature, system prompt, tools) must be provided upfront, and then the conversation streams bidirectionally. + +### 2.2 The Role of `init` + +For real-time sessions, the connection to the model API often requires configuration to be established *before* the first user message is received. The `init` payload fulfills this requirement: + +- **`init`**: `ModelRequest` (contains config, tools, system prompt) +- **`inputStream`**: Stream of `ModelRequest` (contains user messages/turns) +- **`stream`**: Stream of `ModelResponseChunk` + +### 2.3 Type Definitions + +```go +// In go/ai/x/bidi_model.go + +// BidiModel represents a bidirectional streaming model for real-time LLM APIs. +type BidiModel struct { + *corex.BidiAction[*ai.ModelRequest, *ai.ModelRequest, *ai.ModelResponse, *ai.ModelResponseChunk] +} + +// BidiModelFunc is the function signature for bidi model implementations. +type BidiModelFunc func( + ctx context.Context, + init *ai.ModelRequest, + inCh <-chan *ai.ModelRequest, + outCh chan<- *ai.ModelResponseChunk, +) (*ai.ModelResponse, error) +``` + +### 2.4 Defining a BidiModel + +```go +// DefineBidiModel creates and registers a BidiModel for real-time LLM interactions. +// The opts parameter follows the same pattern as DefineModel for consistency. +func DefineBidiModel(r api.Registry, name string, opts *ai.ModelOptions, fn BidiModelFunc) *BidiModel +``` + +**Example Plugin Implementation:** + +```go +func (g *GoogleAI) defineBidiModel(r api.Registry) *aix.BidiModel { + return aix.DefineBidiModel(r, "googleai/gemini-2.0-flash-live", + &ai.ModelOptions{ + Label: "Gemini 2.0 Flash Live", + Supports: &ai.ModelSupports{ + Multiturn: true, + Tools: true, + SystemRole: true, + Media: true, + }, + }, + func(ctx context.Context, init *ai.ModelRequest, inCh <-chan *ai.ModelRequest, outCh chan<- *ai.ModelResponseChunk) (*ai.ModelResponse, error) { + session, err := g.client.Live.Connect(ctx, "gemini-2.0-flash-live", &genai.LiveConnectConfig{ + SystemInstruction: toContent(init.Messages), + Tools: toTools(init.Tools), + Temperature: toFloat32Ptr(init.Config), + ResponseModalities: []genai.Modality{genai.ModalityText}, + }) + if err != nil { + return nil, err + } + defer session.Close() + + var totalUsage ai.GenerationUsage + + for request := range inCh { + err := session.SendClientContent(genai.LiveClientContentInput{ + Turns: toContents(request.Messages), + TurnComplete: true, + }) + if err != nil { + return nil, err + } + + for { + msg, err := session.Receive() + if err != nil { + return nil, err + } + + if msg.ToolCall != nil { + outCh <- &ai.ModelResponseChunk{ + Content: toToolCallParts(msg.ToolCall), + } + continue + } + + if msg.ServerContent != nil { + if msg.ServerContent.ModelTurn != nil { + outCh <- &ai.ModelResponseChunk{ + Content: fromParts(msg.ServerContent.ModelTurn.Parts), + } + } + if msg.ServerContent.TurnComplete { + break + } + } + + if msg.UsageMetadata != nil { + totalUsage.InputTokens += int(msg.UsageMetadata.PromptTokenCount) + totalUsage.OutputTokens += int(msg.UsageMetadata.CandidatesTokenCount) + } + } + } + + return &ai.ModelResponse{ + Usage: &totalUsage, + }, nil + }, + ) +} +``` + +### 2.5 Using BidiModel (`GenerateBidi`) + +`GenerateBidi` is the high-level API for interacting with bidi models. It provides a session-like interface for real-time conversations. + +```go +// In go/genkit/generate.go or go/ai/x/generate_bidi.go + +// ModelBidiConnection wraps BidiConnection with model-specific convenience methods. +type ModelBidiConnection struct { + conn *corex.BidiConnection[*ai.ModelRequest, *ai.ModelResponse, *ai.ModelResponseChunk] +} + +// Send sends a user message to the model. +func (s *ModelBidiConnection) Send(messages ...*ai.Message) error { + return s.conn.Send(&ai.ModelRequest{Messages: messages}) +} + +// SendText is a convenience method for sending a text message. +func (s *ModelBidiConnection) SendText(text string) error { + return s.Send(ai.NewUserTextMessage(text)) +} + +// Stream returns an iterator for receiving response chunks. +func (s *ModelBidiConnection) Receive() iter.Seq2[*ai.ModelResponseChunk, error] { + return s.conn.Receive() +} + +// Close signals that the conversation is complete. +func (s *ModelBidiConnection) Close() error { + return s.conn.Close() +} + +// Output returns the final response after the session completes. +func (s *ModelBidiConnection) Output() (*ai.ModelResponse, error) { + return s.conn.Output() +} +``` + +**Usage:** + +`GenerateBidi` uses the same shared option types as regular `Generate` calls. Options like `WithModel`, `WithConfig`, `WithSystem`, and `WithTools` work the same way - they configure the initial session setup. + +```go +// GenerateBidi starts a bidirectional streaming session with a model. +// Uses the existing shared option types from ai/option.go. +func GenerateBidi(ctx context.Context, g *Genkit, opts ...ai.GenerateBidiOption) (*ModelBidiConnection, error) +``` + +**Example:** + +```go +conn, err := genkit.GenerateBidi(ctx, g, + ai.WithModel(geminiLive), + ai.WithConfig(&genai.LiveConnectConfig{Temperature: genai.Ptr[float32](0.7)}), + ai.WithSystem("You are a helpful voice assistant"), + ai.WithTools(weatherTool), +) +if err != nil { + return err +} +defer conn.Close() + +conn.SendText("Hello!") + +for chunk, err := range conn.Receive() { + if err != nil { + return err + } + fmt.Print(chunk.Text()) +} + +conn.SendText("Tell me more about that.") +for chunk, err := range conn.Receive() { + // ... +} + +response, _ := conn.Output() +fmt.Printf("Total tokens: %d\n", response.Usage.TotalTokens) +``` + +### 2.6 Tool Calling in BidiModel + +Real-time models may support tool calling. The pattern follows the standard generate flow but within the streaming context: + +```go +conn, _ := genkit.GenerateBidi(ctx, g, + ai.WithModel(geminiLive), + ai.WithTools(weatherTool, calculatorTool), +) + +conn.SendText("What's the weather in NYC?") + +for chunk, err := range conn.Receive() { + if err != nil { + return err + } + + if toolCall := chunk.ToolCall(); toolCall != nil { + result, _ := toolCall.Tool.Execute(ctx, toolCall.Input) + conn.Send(ai.NewToolResponseMessage(toolCall.ID, result)) + } else { + fmt.Print(chunk.Text()) + } +} +``` + +--- + +## 3. API Surface + +### 3.1 Defining Bidi Actions + +```go +// In go/core/x/bidi.go + +// NewBidiAction creates a BidiAction without registering it. +func NewBidiAction[Init, In, Out, Stream any]( + name string, + fn BidiFunc[Init, In, Out, Stream], +) *BidiAction[Init, In, Out, Stream] + +// DefineBidiAction creates and registers a BidiAction. +func DefineBidiAction[Init, In, Out, Stream any]( + r api.Registry, + name string, + fn BidiFunc[Init, In, Out, Stream], +) *BidiAction[Init, In, Out, Stream] +``` + +Schemas for `In`, `Out`, `Init`, and `Stream` types are automatically inferred from the type parameters using the existing JSON schema inference in `go/internal/base/json.go`. + +### 3.2 Defining Bidi Flows + +```go +// In go/core/x/bidi_flow.go + +// DefineBidiFlow creates a BidiFlow with tracing and registers it. +// Use this for user-defined bidirectional streaming operations. +func DefineBidiFlow[Init, In, Out, Stream any]( + r api.Registry, + name string, + fn BidiFunc[Init, In, Out, Stream], +) *BidiFlow[Init, In, Out, Stream] +``` + +### 3.3 Starting Connections + +All bidi types (BidiAction, BidiFlow, BidiModel) use the same `StreamBidi` method to start connections: + +```go +func (ba *BidiAction[Init, In, Out, Stream]) StreamBidi(ctx context.Context, init Init) (*BidiConnection[In, Out, Stream], error) +``` + +### 3.4 High-Level Genkit API + +```go +// In go/genkit/bidi.go + +func DefineBidiFlow[Init, In, Out, Stream any]( + g *Genkit, + name string, + fn corex.BidiFunc[Init, In, Out, Stream], +) *corex.BidiFlow[Init, In, Out, Stream] + +// GenerateBidi uses shared options from ai/option.go +// Options like WithModel, WithConfig, WithSystem, WithTools configure the session init. +func GenerateBidi( + ctx context.Context, + g *Genkit, + opts ...ai.GenerateBidiOption, +) (*ModelBidiConnection, error) +``` + +--- + +## 4. Integration with Existing Infrastructure + +### 4.1 Tracing Integration + +BidiFlows create spans that remain open for the lifetime of the connection, enabling streaming trace visualization in the Dev UI. + +**Key behaviors:** +- Span starts when `StreamBidi()` is called +- Span ends when the bidi function returns (via `defer` in the connection goroutine) +- Flow context is injected so `core.Run()` works inside the bidi function +- Nested spans for sub-operations (e.g., each LLM call) work normally + +**Important**: The span stays open while the connection is active, allowing: +- Streaming traces to the Dev UI in real-time +- Nested spans for sub-operations (e.g., each LLM call) +- Events recorded as they happen + +### 4.2 Action Registration + +Add new action types and schema fields: + +```go +// In go/core/api/action.go +const ( + ActionTypeBidiFlow ActionType = "bidi-flow" + ActionTypeBidiModel ActionType = "bidi-model" +) + +// ActionDesc gets two new optional fields +type ActionDesc struct { + // ... existing fields ... + StreamSchema map[string]any `json:"streamSchema,omitempty"` // NEW: schema for streamed chunks + InitSchema map[string]any `json:"initSchema,omitempty"` // NEW: schema for initialization data +} +``` + +--- + +## 5. Example Usage + +### 5.1 Basic Echo Bidi Flow + +```go +package main + +import ( + "context" + "fmt" + + "github.com/firebase/genkit/go/genkit" +) + +func main() { + ctx := context.Background() + g := genkit.Init(ctx) + + echoFlow := genkit.DefineBidiFlow(g, "echo", + func(ctx context.Context, init struct{}, inCh <-chan string, outCh chan<- string) (string, error) { + var count int + for input := range inCh { + count++ + outCh <- fmt.Sprintf("echo: %s", input) + } + return fmt.Sprintf("processed %d messages", count), nil + }, + ) + + conn, err := echoFlow.StreamBidi(ctx) + if err != nil { + panic(err) + } + + conn.Send("hello") + conn.Send("world") + conn.Close() + + for chunk, err := range conn.Receive() { + if err != nil { + panic(err) + } + fmt.Println(chunk) + } + + output, _ := conn.Output() + fmt.Println(output) +} +``` + +### 5.2 Bidi Flow with Initialization Data + +```go +type ChatInit struct { + SystemPrompt string `json:"systemPrompt"` + Temperature float64 `json:"temperature"` +} + +configuredChat := genkit.DefineBidiFlow(g, "configuredChat", + func(ctx context.Context, init ChatInit, inCh <-chan string, outCh chan<- string) (string, error) { + for input := range inCh { + resp, _ := genkit.GenerateText(ctx, g, + ai.WithSystem(init.SystemPrompt), + ai.WithConfig(&genai.GenerateContentConfig{Temperature: &init.Temperature}), + ai.WithPrompt(input), + ) + outCh <- resp + } + return "done", nil + }, +) + +conn, _ := configuredChat.StreamBidi(ctx, ChatInit{ + SystemPrompt: "You are a helpful assistant.", + Temperature: 0.7, +}) +``` + +--- + +## 6. Files to Create/Modify + +### New Files + +| File | Description | +|------|-------------| +| `go/core/x/bidi.go` | BidiAction, BidiFunc, BidiConnection | +| `go/core/x/bidi_flow.go` | BidiFlow with tracing | +| `go/core/x/bidi_options.go` | BidiOption types | +| `go/core/x/bidi_test.go` | Tests | +| `go/ai/x/bidi_model.go` | BidiModel, BidiModelFunc, ModelBidiConnection | +| `go/ai/x/bidi_model_test.go` | Tests | +| `go/genkit/bidi.go` | High-level API wrappers | + +### Modified Files + +| File | Change | +|------|--------| +| `go/core/api/action.go` | Add `ActionTypeBidiFlow`, `ActionTypeBidiModel` constants | + +--- + +## 7. Implementation Notes + +### Error Handling +- Errors from the bidi function propagate to both `Responses()` iterator and `Output()` +- Context cancellation closes all channels and terminates the action +- Send after Close returns an error +- Errors are yielded as the second value in the `iter.Seq2[Stream, error]` iterator + +### Goroutine Management +- BidiConnection spawns a goroutine to run the action +- Proper cleanup on context cancellation using `defer` and `sync.Once` +- Channel closure follows Go idioms (sender closes) +- Trace span is ended in the goroutine's defer + +### Thread Safety +- BidiConnection uses mutex for state (closed flag) +- Send is safe to call from multiple goroutines + +### Channels and Backpressure +- Both input and output channels are **unbuffered** by default (size 0) +- This provides natural backpressure: `Send()` blocks until the action reads, output blocks until consumer reads +- If needed, `WithInputBufferSize` / `WithOutputBufferSize` options could be added later for specific use cases + +### Tracing +- Span is started when connection is created, ended when action completes +- Nested spans work normally within the bidi function +- Events can be recorded throughout the connection lifecycle +- Dev UI can show traces in real-time as they stream +- Implementation uses the existing tracer infrastructure (details left to implementation) + +### Shutdown Sequence +When `Close()` is called on a BidiConnection: +1. The input channel is closed, signaling no more inputs +2. The bidi function's `for range inputStream` loop exits +3. The function returns its final output +4. The stream channel is closed +5. The `Done()` channel is closed +6. `Output()` unblocks and returns the result + +On context cancellation: +1. Context error propagates to the bidi function +2. All channels are closed +3. `Output()` returns the context error + +--- + +## 8. Integration with Reflection API + +These features align with **Reflection API V2**, which uses WebSockets to support bidirectional streaming between the Runtime and the CLI/Manager. + +- `runAction` now supports an `input` stream +- `streamChunk` notifications are bidirectional (Manager <-> Runtime) diff --git a/docs/go-session-design.md b/docs/go-session-design.md new file mode 100644 index 0000000000..b64404b03b --- /dev/null +++ b/docs/go-session-design.md @@ -0,0 +1,530 @@ +# Genkit Go Session Snapshots - Design Document + +## Overview + +This document describes the design for session snapshots in Genkit Go. This feature builds on the bidirectional streaming primitives described in [go-bidi-design.md](./go-bidi-design.md), extending the session management system with point-in-time state capture and restoration capabilities. + +Session snapshots enable: +- **Debugging**: Inspect session state at any point in a conversation +- **Restoration**: Resume conversations from previous states +- **Dev UI Integration**: Display state alongside traces for better observability + +--- + +# Part 1: API Definitions + +## 1. Core Types + +### 1.1 Snapshot + +```go +// Snapshot represents a point-in-time capture of session state. +// Snapshots are immutable once created. +type Snapshot[S any] struct { + // ID is the content-addressed identifier (SHA256 of JSON-serialized state). + ID string `json:"id"` + + // ParentID is the ID of the previous snapshot in this session's timeline. + // Empty for the first snapshot in a session. + ParentID string `json:"parentId,omitempty"` + + // SessionID is the session this snapshot belongs to. + SessionID string `json:"sessionId"` + + // CreatedAt is when the snapshot was created. + CreatedAt time.Time `json:"createdAt"` + + // State is the complete session state at the time of the snapshot. + State S `json:"state"` + + // Index is a monotonically increasing sequence number for ordering snapshots + // within a session. This is independent of turn boundaries. + Index int `json:"index"` + + // TurnIndex is the turn number when this snapshot was created (0-indexed). + // Turn 0 is after the first user input and agent response. + TurnIndex int `json:"turnIndex"` + + // Event is the snapshot event that triggered this snapshot. + Event SnapshotEvent `json:"event"` + + // Orphaned indicates this snapshot is no longer on the main timeline. + // This occurs when a user restores from an earlier snapshot, causing + // all subsequent snapshots to be marked as orphaned. + Orphaned bool `json:"orphaned,omitempty"` +} +``` + +### 1.2 SnapshotEvent + +```go +// SnapshotEvent identifies when a snapshot opportunity occurs. +type SnapshotEvent int + +const ( + // SnapshotEventTurnEnd occurs after resp.EndTurn() is called, + // when control returns to the user. + SnapshotEventTurnEnd SnapshotEvent = iota + + // SnapshotEventToolIterationEnd occurs after all tool calls in a single + // model iteration complete, before the results are sent back to the model. + // This captures state after tools have mutated it but before the next + // model response. + SnapshotEventToolIterationEnd + + // SnapshotEventInvocationEnd occurs when the agent function returns, + // capturing the final state of the invocation. + SnapshotEventInvocationEnd +) +``` + +### 1.3 SnapshotContext + +```go +// SnapshotContext provides context for snapshot decision callbacks. +type SnapshotContext[S any] struct { + // Event is the snapshot event that triggered this callback. + Event SnapshotEvent + + // State is the current session state that will be snapshotted if the callback returns true. + State S + + // PrevState is the state at the last snapshot, or nil if no previous snapshot exists. + // Useful for comparing states to decide whether a snapshot is needed. + PrevState *S + + // Index is the sequence number this snapshot would have if created. + Index int + + // TurnIndex is the current turn number. + TurnIndex int +} +``` + +### 1.4 SnapshotCallback + +```go +// SnapshotCallback decides whether to create a snapshot at a given event. +// It receives the context and snapshot context, returning true if a snapshot +// should be created. +// +// The callback is invoked at each snapshot opportunity. Users can filter +// by event type, inspect state, compare with previous state, or apply any +// custom logic to decide. +type SnapshotCallback[S any] = func(ctx context.Context, snap *SnapshotContext[S]) bool +``` + +--- + +## 2. Store Interface + +The existing `Store[S]` interface in `go/core/x/session` is extended with snapshot methods: + +```go +type Store[S any] interface { + // Existing session methods + Get(ctx context.Context, sessionID string) (*Data[S], error) + Save(ctx context.Context, sessionID string, data *Data[S]) error + + // GetSnapshot retrieves a snapshot by ID. Returns nil if not found. + GetSnapshot(ctx context.Context, snapshotID string) (*Snapshot[S], error) + + // SaveSnapshot persists a snapshot. If a snapshot with the same ID already + // exists (content-addressed deduplication), this is a no-op and returns nil. + SaveSnapshot(ctx context.Context, snapshot *Snapshot[S]) error + + // ListSnapshots returns snapshots for a session, ordered by Index ascending. + // If includeOrphaned is false, only active (non-orphaned) snapshots are returned. + ListSnapshots(ctx context.Context, sessionID string, includeOrphaned bool) ([]*Snapshot[S], error) + + // InvalidateSnapshotsAfter marks all snapshots with Index > afterIndex as orphaned. + // Called when restoring from a snapshot to mark "future" snapshots as no longer active. + InvalidateSnapshotsAfter(ctx context.Context, sessionID string, afterIndex int) error +} +``` + +--- + +## 3. Agent Options + +### 3.1 WithSnapshotCallback + +```go +// WithSnapshotCallback configures when snapshots are created. +// The callback is invoked at each snapshot opportunity (turn end, tool iteration +// end, invocation end) and decides whether to create a snapshot. +// +// If no callback is provided, snapshots are never created automatically. +// Requires WithSessionStore to be configured; otherwise snapshots cannot be persisted. +func WithSnapshotCallback[S any](cb SnapshotCallback[S]) AgentOption[S] +``` + +### 3.2 Convenience Callbacks + +```go +// SnapshotAlways returns a callback that always creates snapshots at all events. +func SnapshotAlways[S any]() SnapshotCallback[S] { + return func(ctx context.Context, snap *SnapshotContext[S]) bool { + return true + } +} + +// SnapshotNever returns a callback that never creates snapshots. +// This is the default behavior when no callback is configured. +func SnapshotNever[S any]() SnapshotCallback[S] { + return func(ctx context.Context, snap *SnapshotContext[S]) bool { + return false + } +} + +// SnapshotOn returns a callback that creates snapshots only for the specified events. +func SnapshotOn[S any](events ...SnapshotEvent) SnapshotCallback[S] { + eventSet := make(map[SnapshotEvent]bool) + for _, e := range events { + eventSet[e] = true + } + return func(ctx context.Context, snap *SnapshotContext[S]) bool { + return eventSet[snap.Event] + } +} + +// SnapshotOnChange returns a callback that creates snapshots only when state has changed +// since the last snapshot. +func SnapshotOnChange[S any](events ...SnapshotEvent) SnapshotCallback[S] { + eventSet := make(map[SnapshotEvent]bool) + for _, e := range events { + eventSet[e] = true + } + return func(ctx context.Context, snap *SnapshotContext[S]) bool { + if !eventSet[snap.Event] { + return false + } + // Always snapshot if this is the first one + if snap.PrevState == nil { + return true + } + // Compare by computing content-addressed IDs + return computeStateHash(snap.State) != computeStateHash(*snap.PrevState) + } +} +``` + +--- + +## 4. Invocation Options + +### 4.1 WithSnapshotID + +```go +// WithSnapshotID specifies a snapshot to restore from when starting the agent. +// This loads the session state from the snapshot and marks all subsequent +// snapshots in that session as orphaned. +// +// The session continues with the same session ID as the snapshot. +// +// Requires the agent to be configured with WithSessionStore; returns an error +// if no store is available to load the snapshot from. +func WithSnapshotID[Init any](id string) BidiOption[Init] +``` + +--- + +## 5. AgentOutput + +```go +// AgentOutput wraps the output with session info for persistence. +type AgentOutput[State, Out any] struct { + SessionID string `json:"sessionId"` + Output Out `json:"output"` + State State `json:"state"` + Artifacts []Artifact `json:"artifacts,omitempty"` + + // SnapshotIDs contains the IDs of all snapshots created during this agent invocation. + // Empty if no snapshots were created (callback returned false or not configured). + SnapshotIDs []string `json:"snapshotIds,omitempty"` +} +``` + +--- + +# Part 2: Behaviors + +## 6. Snapshot Creation + +Snapshots are created at three points, each corresponding to a `SnapshotEvent`: + +| Event | Trigger | +|-------|---------| +| `SnapshotEventTurnEnd` | When `resp.EndTurn()` is called, signaling control returns to the user | +| `SnapshotEventToolIterationEnd` | After all tool calls in a single model iteration complete | +| `SnapshotEventInvocationEnd` | When the agent function returns | + +At each point, the snapshot callback is invoked. If it returns true: +1. Compute the snapshot ID by hashing the state (SHA256) +2. Create the snapshot with the next sequence index +3. Set the parent snapshot ID to the previous snapshot (if any) +4. Persist to the store (no-op if ID already exists due to identical state) +5. Record the snapshot ID in the current trace span + +### 6.1 Snapshot ID Computation + +Snapshot IDs are content-addressed using SHA256 of the JSON-serialized state: +- **Deduplication**: Identical states produce identical IDs +- **Verification**: State integrity can be verified against the ID +- **Determinism**: No dependency on timestamps for uniqueness + +--- + +## 7. Snapshot Restoration + +When `WithSnapshotID` is provided to `StreamBidi`: + +1. Load the snapshot from the store +2. Call `InvalidateSnapshotsAfter(sessionID, snapshot.Index)` to orphan subsequent snapshots +3. Update the session with the snapshot's state +4. Continue from the snapshot's turn and index +5. Track the snapshot ID as the parent for new snapshots + +### 7.1 Option Validation + +| Combination | Result | +|-------------|--------| +| `WithSnapshotID` + `WithInit` | **Error**: Cannot specify initial state when restoring | +| `WithSnapshotID` + `WithSessionID` (mismatched) | **Error**: Session ID must match snapshot | +| `WithSnapshotID` + `WithSessionID` (matching) | Allowed but redundant | + +--- + +## 8. Tracing Integration + +When a snapshot is created, span metadata is recorded: +- `genkit:metadata:snapshotId` - The snapshot ID +- `genkit:metadata:agent` - The agent name (e.g., `chatAgent`) + +This enables the Dev UI to fetch snapshot data via the reflection API. + +--- + +# Part 3: Examples + +## 9. Usage Examples + +### 9.1 Defining an Agent with Snapshots + +```go +type ChatState struct { + Messages []*ai.Message `json:"messages"` +} + +chatAgent := genkit.DefineAgent(g, "chatAgent", + func(ctx context.Context, sess *session.Session[ChatState], inCh <-chan string, resp *corex.Responder[string]) (corex.AgentResult[string], error) { + state := sess.State() + + for input := range inCh { + state.Messages = append(state.Messages, ai.NewUserTextMessage(input)) + resp := generateResponse(ctx, g, state.Messages) + state.Messages = append(state.Messages, resp.Message) + sess.UpdateState(ctx, state) + resp.EndTurn() // SnapshotEventTurnEnd fires here + } + + return corex.AgentResult[string]{Output: "done"}, nil + // SnapshotEventInvocationEnd fires after return + }, + corex.WithSessionStore(store), + corex.WithSnapshotCallback(session.SnapshotOn[ChatState]( + session.SnapshotEventTurnEnd, + session.SnapshotEventInvocationEnd, + )), +) +``` + +### 9.2 Restoring from a Snapshot + +```go +snapshotID := previousOutput.SnapshotIDs[0] + +conn, _ := chatAgent.StreamBidi(ctx, + corex.WithSnapshotID[ChatState](snapshotID), +) + +conn.Send("Actually, tell me about channels instead") +// ... conversation continues from restored state ... +``` + +### 9.3 Custom Snapshot Callback + +```go +// Snapshot every 5 messages at turn end, always at invocation end +corex.WithSnapshotCallback(func(ctx context.Context, snap *session.SnapshotContext[ChatState]) bool { + switch snap.Event { + case session.SnapshotEventTurnEnd: + return len(snap.State.Messages) % 5 == 0 + case session.SnapshotEventInvocationEnd: + return true + default: + return false + } +}) +``` + +### 9.4 Snapshot Only When State Changed + +```go +// Only snapshot if messages have been added since last snapshot +corex.WithSnapshotCallback(func(ctx context.Context, snap *session.SnapshotContext[ChatState]) bool { + if snap.Event != session.SnapshotEventTurnEnd { + return false + } + // Always snapshot if this is the first one + if snap.PrevState == nil { + return true + } + // Only snapshot if message count increased + return len(snap.State.Messages) > len(snap.PrevState.Messages) +}) +``` + +### 9.5 Snapshot Based on Index + +```go +// Snapshot every 3rd snapshot opportunity +corex.WithSnapshotCallback(func(ctx context.Context, snap *session.SnapshotContext[ChatState]) bool { + return snap.Index % 3 == 0 +}) +``` + +### 9.6 Listing Snapshots + +```go +activeSnapshots, _ := store.ListSnapshots(ctx, sessionID, false) +allSnapshots, _ := store.ListSnapshots(ctx, sessionID, true) // includes orphaned +``` + +--- + +# Part 4: Implementation Details + +## 10. Reflection API Integration + +Session stores are exposed via the reflection API for Dev UI access. + +### 10.1 Action Registration + +When `DefineAgent` is called with `WithSessionStore`, actions are registered: + +| Action | Key | Returns | +|--------|-----|---------| +| getSnapshot | `/session-store/{agent}/getSnapshot` | `Snapshot[S]` | +| listSnapshots | `/session-store/{agent}/listSnapshots` | `[]*Snapshot[S]` | +| getSession | `/session-store/{agent}/getSession` | `*Data[S]` | + +### 10.2 Action Type + +```go +const ActionTypeSessionStore api.ActionType = "session-store" +``` + +### 10.3 Dev UI Flow + +1. Dev UI extracts `snapshotId` and `agent` from span metadata +2. Calls `POST /api/runAction` with key `/session-store/{agent}/getSnapshot` +3. Displays the returned state alongside the trace + +--- + +## 11. Session Snapshot Fields + +The `Session` struct is extended with fields to track snapshot state. These are persisted with the session so that loading a session restores the snapshot tracking state. + +```go +type Session[S any] struct { + // ... existing fields (id, state, store, mu) ... + + // LastSnapshot is the most recent snapshot for this session. + // Used to derive ParentID (LastSnapshot.ID), PrevState (LastSnapshot.State), + // and next index (LastSnapshot.Index + 1). + // Nil if no snapshots have been created. + LastSnapshot *Snapshot[S] `json:"lastSnapshot,omitempty"` + + // TurnIndex tracks the current turn number. + TurnIndex int `json:"turnIndex"` +} +``` + +The `snapshotIDs` list (for `AgentOutput.SnapshotIDs`) is tracked transiently during an invocation and does not need to be persisted. + +When building `SnapshotContext` for the callback: +- `PrevState` = `session.LastSnapshot.State` (or nil if `LastSnapshot` is nil) +- `Index` = `session.LastSnapshot.Index + 1` (or 0 if `LastSnapshot` is nil) +- `ParentID` for new snapshot = `session.LastSnapshot.ID` (or empty if nil) + +--- + +## 12. Tool Iteration Snapshot Mechanism + +The `SnapshotEventToolIterationEnd` event requires coordination between the agent layer (typed state) and Generate layer (untyped). + +This is accomplished via a context-based trigger: + +1. Agent layer creates a closure capturing the typed callback +2. Stores an untyped trigger function in context +3. Generate calls `TriggerSnapshot(ctx, SnapshotEventToolIterationEnd)` after tool iterations +4. Trigger retrieves session from context, gets state, invokes callback +5. If callback returns true, snapshot is created + +This keeps Generate decoupled from session types. + +--- + +# Part 5: Design Decisions + +## 13. Rationale + +### Why a Single Callback with Event Types? + +Rather than separate options for each trigger: +- **Extensibility**: New events can be added without new options +- **Flexibility**: Filter by event AND inspect state in one place +- **Composability**: Logic like "every N messages at turn end, always at invocation end" is natural + +### Why Content-Addressed IDs? + +- **Automatic deduplication**: Identical states share the same snapshot +- **Verification**: State integrity can be verified against the ID +- **Determinism**: No dependency on sequence numbers or timestamps + +### Why Orphaned Instead of Deleted? + +When restoring from an earlier snapshot, subsequent snapshots are marked orphaned: +- **Audit trail**: Complete history preserved for debugging +- **Recovery**: Accidentally orphaned snapshots can be recovered +- **Visualization**: Dev UI can show the full conversation tree + +### Why IDs Only in Traces? + +- **Lightweight traces**: Avoid bloating with large state objects +- **Single source of truth**: State lives in the session store +- **On-demand retrieval**: Dev UI fetches when needed + +### Why Both Index and TurnIndex? + +- **Index**: Monotonically increasing for ordering and invalidation +- **TurnIndex**: Human-comprehensible ("after turn 3") + +### Why Separate Session and Snapshot? + +- **Session state**: Working copy that changes frequently +- **Snapshots**: Explicit, immutable captures (like git commits) + +This provides efficiency (not every change needs snapshot overhead) and user control via callbacks. + +--- + +## 14. Future Considerations + +Out of scope for this design: + +- **Snapshot expiration**: Automatic cleanup based on age or count +- **Snapshot compression**: Delta/patch-based storage +- **Snapshot annotations**: User-provided labels or descriptions diff --git a/docs/go-session-flow-design.md b/docs/go-session-flow-design.md new file mode 100644 index 0000000000..238e5436f0 --- /dev/null +++ b/docs/go-session-flow-design.md @@ -0,0 +1,913 @@ +# Genkit Go SessionFlow with Snapshots - Design Document + +## Overview + +This document describes the design for the `SessionFlow` primitive in Genkit Go with snapshot-based state management. A SessionFlow is a stateful, multi-turn conversational flow with automatic snapshot persistence and turn semantics. + +Snapshots provide: +- **State encapsulation**: Messages, user-defined state, and artifacts in a single serializable unit +- **Resumability**: Start new invocations from any previous snapshot +- **Flexibility**: Support for both client-managed and server-managed state patterns +- **Debugging**: Point-in-time state capture for inspection and replay + +This design builds on the bidirectional streaming primitives described in [go-bidi-design.md](go-bidi-design.md). + +## Package Location + +SessionFlow is an AI concept and belongs in `go/ai/x/` (experimental): + +``` +go/ai/x/ +├── session_flow.go # SessionFlow, SessionFlowFunc, SessionFlowParams, Responder +├── session_flow_options.go # SessionFlowOption, StreamBidiOption +├── session_flow_state.go # SessionState, SessionSnapshot, SessionFlowArtifact, SessionFlowInit, SessionFlowResponse, SessionFlowStreamChunk +├── session_flow_store.go # SnapshotStore interface, InMemorySnapshotStore, SnapshotCallback, SnapshotContext +├── session_flow_test.go # Tests +``` + +Import as `aix "github.com/firebase/genkit/go/ai/x"`. + +--- + +## 1. Core Type Definitions + +### 1.1 State and Snapshot Types + +**SessionState** is the portable state that flows between client and server. It contains only the data needed for conversation continuity. + +**SessionSnapshot** is a persisted point-in-time capture with metadata. It wraps SessionState with additional fields for storage, debugging, and restoration. + +```go +// SessionState is the portable conversation state. +type SessionState[State any] struct { + // Messages is the conversation history. + Messages []*ai.Message `json:"messages,omitempty"` + // Custom is the user-defined state associated with this conversation. + Custom State `json:"custom,omitempty"` + // Artifacts are named collections of parts produced during the conversation. + Artifacts []*SessionFlowArtifact `json:"artifacts,omitempty"` +} + +// SessionSnapshot is a persisted point-in-time capture of session state. +type SessionSnapshot[State any] struct { + // SnapshotID is the unique identifier for this snapshot (UUID). + SnapshotID string `json:"snapshotId"` + // ParentID is the ID of the previous snapshot in this timeline. + ParentID string `json:"parentId,omitempty"` + // CreatedAt is when the snapshot was created. + CreatedAt time.Time `json:"createdAt"` + // TurnIndex is the turn number when this snapshot was created (0-indexed). + TurnIndex int `json:"turnIndex"` + // State is the actual conversation state. + State SessionState[State] `json:"state"` +} + +// SessionFlowArtifact represents a named collection of parts produced during a session. +// Examples: generated files, images, code snippets, diagrams, etc. +type SessionFlowArtifact struct { + // Name identifies the artifact (e.g., "generated_code.go", "diagram.png"). + Name string `json:"name,omitempty"` + // Parts contains the artifact content (text, media, etc.). + Parts []*ai.Part `json:"parts"` + // Metadata contains additional artifact-specific data. + Metadata map[string]any `json:"metadata,omitempty"` +} +``` + +### 1.2 Input/Output Types + +```go +// SessionFlowInput is the input sent to a session flow during a conversation turn. +// This wrapper allows future extensibility beyond just messages. +type SessionFlowInput struct { + // Messages contains the user's input for this turn. + Messages []*ai.Message `json:"messages,omitempty"` +} + +// SessionFlowInit is the input for starting a session flow invocation. +// Provide either SnapshotID (to load from store) or State (direct state). +type SessionFlowInit[State any] struct { + // SnapshotID loads state from a persisted snapshot. + // Mutually exclusive with State. + SnapshotID string `json:"snapshotId,omitempty"` + // State provides direct state for the invocation. + // Mutually exclusive with SnapshotID. + State *SessionState[State] `json:"state,omitempty"` +} + +// SessionFlowResponse is the output when a session flow invocation completes. +type SessionFlowResponse[State any] struct { + // State contains the final conversation state. + State *SessionState[State] `json:"state"` + // SnapshotID is the ID of the snapshot created at the end of this invocation. + // Empty if no snapshot was created (callback returned false or no store configured). + SnapshotID string `json:"snapshotId,omitempty"` +} +``` + +### 1.3 Stream Types + +```go +// SessionFlowStreamChunk represents a single item in the session flow's output stream. +// Multiple fields can be populated in a single chunk. +type SessionFlowStreamChunk[Stream any] struct { + // Chunk contains token-level generation data. + Chunk *ai.ModelResponseChunk `json:"chunk,omitempty"` + // Status contains user-defined structured status information. + // The Stream type parameter defines the shape of this data. + Status Stream `json:"status,omitempty"` + // Artifact contains a newly produced artifact. + Artifact *SessionFlowArtifact `json:"artifact,omitempty"` + // SnapshotCreated contains the ID of a snapshot that was just persisted. + SnapshotCreated string `json:"snapshotCreated,omitempty"` + // EndTurn signals that the session flow has finished processing the current input. + // When true, the client should stop iterating and may send the next input. + EndTurn bool `json:"endTurn,omitempty"` +} +``` + +### 1.4 Session + +The Session provides mutable working state during a session flow invocation. It is propagated via context so that nested operations (tools, sub-flows) can access consistent state. + +```go +// Session holds the working state during a session flow invocation. +// It is propagated through context and provides read/write access to state. +type Session[State any] struct { + mu sync.RWMutex + state SessionState[State] + store SnapshotStore[State] + + // Internal references set by the framework + onEndTurn func() // set by runWrapped; triggers snapshot + EndTurn chunk + inCh <-chan *SessionFlowInput + + // Snapshot tracking + lastSnapshot *SessionSnapshot[State] + turnIndex int +} + +// Run loops over the input channel, calling fn for each turn. Each turn is +// wrapped in an OTel span for tracing. Input messages are automatically added +// to the session before fn is called. After fn returns successfully, an EndTurn +// chunk is sent and a snapshot check is triggered. +func (s *Session[State]) Run( + ctx context.Context, + fn func(ctx context.Context, input *SessionFlowInput) error, +) error + +// State returns a copy of the current session flow state. +func (s *Session[State]) State() *SessionState[State] + +// Messages returns the current conversation history. +func (s *Session[State]) Messages() []*ai.Message + +// AddMessages appends messages to the conversation history. +func (s *Session[State]) AddMessages(messages ...*ai.Message) + +// SetMessages replaces the entire conversation history. +func (s *Session[State]) SetMessages(messages []*ai.Message) + +// Custom returns the current user-defined custom state. +func (s *Session[State]) Custom() State + +// SetCustom updates the user-defined custom state. +func (s *Session[State]) SetCustom(custom State) + +// PatchCustom atomically reads the current custom state, applies the given +// function, and writes the result back. Use this instead of Custom()/SetCustom() +// when concurrent access to state is possible. +func (s *Session[State]) PatchCustom(fn func(State) State) + +// Artifacts returns the current artifacts. +func (s *Session[State]) Artifacts() []*SessionFlowArtifact + +// AddArtifact adds an artifact to the session. If an artifact with the same +// name already exists, it is replaced. +func (s *Session[State]) AddArtifact(artifact *SessionFlowArtifact) + +// SetArtifacts replaces the entire artifact list. +func (s *Session[State]) SetArtifacts(artifacts ...*SessionFlowArtifact) + +// Context integration +func NewSessionContext[State any](ctx context.Context, s *Session[State]) context.Context +func SessionFromContext[State any](ctx context.Context) *Session[State] +``` + +### 1.5 Responder + +The Responder wraps the output stream with typed methods for sending different kinds of data. + +```go +// Responder provides methods for sending data to the session flow's output stream. +type Responder[Stream any] struct { + ch chan<- *SessionFlowStreamChunk[Stream] + session *Session[any] +} + +// Send sends a complete stream chunk. Use this for full control over the chunk contents. +func (r *Responder[Stream]) Send(chunk *SessionFlowStreamChunk[Stream]) + +// SendChunk sends a generation chunk (token-level streaming). +func (r *Responder[Stream]) SendChunk(chunk *ai.ModelResponseChunk) + +// SendStatus sends a user-defined status update. +func (r *Responder[Stream]) SendStatus(status Stream) + +// SendArtifact sends an artifact to the stream and adds it to the session. +// If an artifact with the same name already exists in the session, it is replaced. +func (r *Responder[Stream]) SendArtifact(artifact *SessionFlowArtifact) +``` + +### 1.6 SessionFlow Function and Parameters + +```go +// SessionFlowParams contains the parameters passed to a session flow function. +// This struct may be extended with additional fields in the future. +type SessionFlowParams[Stream, State any] struct { + // Session provides access to the working state. + Session *Session[State] +} + +// SessionFlowFunc is the function signature for session flows. +// Type parameters: +// - Stream: Type for status updates sent via the responder +// - State: Type for user-defined state in snapshots +type SessionFlowFunc[Stream, State any] func( + ctx context.Context, + resp *Responder[Stream], + params *SessionFlowParams[Stream, State], +) error +``` + +### 1.7 SessionFlow + +```go +// SessionFlow is a bidirectional streaming action with automatic snapshot management. +type SessionFlow[Stream, State any] struct { + *corex.BidiAction[*SessionFlowInit[State], *SessionFlowInput, *SessionFlowResponse[State], *SessionFlowStreamChunk[Stream]] + store SnapshotStore[State] + snapshotCallback SnapshotCallback[State] +} +``` + +--- + +## 2. Snapshot Store + +### 2.1 Store Interface + +```go +// SnapshotStore persists and retrieves snapshots. +type SnapshotStore[State any] interface { + // GetSnapshot retrieves a snapshot by ID. Returns nil if not found. + GetSnapshot(ctx context.Context, snapshotID string) (*SessionSnapshot[State], error) + // SaveSnapshot persists a snapshot. + SaveSnapshot(ctx context.Context, snapshot *SessionSnapshot[State]) error +} +``` + +### 2.2 In-Memory Implementation + +```go +// InMemorySnapshotStore provides a thread-safe in-memory snapshot store. +type InMemorySnapshotStore[State any] struct { + snapshots map[string]*SessionSnapshot[State] + mu sync.RWMutex +} + +func NewInMemorySnapshotStore[State any]() *InMemorySnapshotStore[State] +``` + +--- + +## 3. Snapshot Callbacks + +```go +// SnapshotContext provides context for snapshot decision callbacks. +type SnapshotContext[State any] struct { + // State is the current state that will be snapshotted if the callback returns true. + State *SessionState[State] + // PrevState is the state at the last snapshot, or nil if none exists. + PrevState *SessionState[State] + // TurnIndex is the current turn number. + TurnIndex int +} + +// SnapshotCallback decides whether to create a snapshot. +// If not provided and a store is configured, snapshots are always created. +type SnapshotCallback[State any] = func(ctx context.Context, sc *SnapshotContext[State]) bool +``` + +--- + +## 4. API Surface + +### 4.1 Defining Session Flows + +```go +// DefineSessionFlow creates a SessionFlow with automatic snapshot management and registers it. +func DefineSessionFlow[Stream, State any]( + r api.Registry, + name string, + fn SessionFlowFunc[Stream, State], + opts ...SessionFlowOption[State], +) *SessionFlow[Stream, State] + +// SessionFlowOption configures a SessionFlow. +type SessionFlowOption[State any] interface { + applySessionFlow(*sessionFlowOptions[State]) error +} + +// WithSnapshotStore sets the store for persisting snapshots. +func WithSnapshotStore[State any](store SnapshotStore[State]) SessionFlowOption[State] + +// WithSnapshotCallback configures when snapshots are created. +// If not provided and a store is configured, snapshots are always created. +func WithSnapshotCallback[State any](cb SnapshotCallback[State]) SessionFlowOption[State] +``` + +### 4.2 Starting Connections + +```go +// StreamBidiOption configures a StreamBidi call. +type StreamBidiOption[State any] interface { + applyStreamBidi(*streamBidiOptions[State]) error +} + +// WithState sets the initial state for the invocation. +// Use this for client-managed state where the client sends state directly. +func WithState[State any](state *SessionState[State]) StreamBidiOption[State] + +// WithSnapshotID loads state from a persisted snapshot by ID. +// Use this for server-managed state where snapshots are stored. +func WithSnapshotID[State any](id string) StreamBidiOption[State] + +// StreamBidi starts a new session flow invocation. +func (sf *SessionFlow[Stream, State]) StreamBidi( + ctx context.Context, + opts ...StreamBidiOption[State], +) (*SessionFlowConnection[Stream, State], error) +``` + +### 4.3 SessionFlow Connection + +```go +// SessionFlowConnection wraps BidiConnection with session flow-specific functionality. +type SessionFlowConnection[Stream, State any] struct { + conn *corex.BidiConnection[*SessionFlowInput, *SessionFlowResponse[State], *SessionFlowStreamChunk[Stream]] +} + +// Send sends a SessionFlowInput to the session flow. +// Use this for full control over the input structure. +func (c *SessionFlowConnection[Stream, State]) Send(input *SessionFlowInput) error + +// SendMessages sends messages to the session flow. +// This is a convenience method that wraps messages in a SessionFlowInput. +func (c *SessionFlowConnection[Stream, State]) SendMessages(messages ...*ai.Message) error + +// SendText sends a single user text message to the session flow. +// This is a convenience method that creates a user message and wraps it in SessionFlowInput. +func (c *SessionFlowConnection[Stream, State]) SendText(text string) error + +// Close signals that no more inputs will be sent. +func (c *SessionFlowConnection[Stream, State]) Close() error + +// Receive returns an iterator for receiving stream chunks. +func (c *SessionFlowConnection[Stream, State]) Receive() iter.Seq2[*SessionFlowStreamChunk[Stream], error] + +// Output returns the final response after the session flow completes. +func (c *SessionFlowConnection[Stream, State]) Output() (*SessionFlowResponse[State], error) + +// Done returns a channel closed when the connection completes. +func (c *SessionFlowConnection[Stream, State]) Done() <-chan struct{} +``` + +### 4.4 High-Level Genkit API + +```go +// In go/genkit/session_flow.go + +func DefineSessionFlow[Stream, State any]( + g *Genkit, + name string, + fn aix.SessionFlowFunc[Stream, State], + opts ...aix.SessionFlowOption[State], +) *aix.SessionFlow[Stream, State] +``` + +--- + +## 5. Snapshot Lifecycle + +### 5.1 Snapshot Points + +Snapshots are created at two points: + +| Event | Trigger | Description | +|-------|---------|-------------| +| Turn end | `Session.Run` completes a turn | After the turn function returns successfully | +| Invocation end | Session flow function returns | Final state capture when invocation completes | + +At each point: +1. The snapshot callback is invoked (or defaults to "always" if a store is configured) +2. If callback returns true: + - Generate a UUID for the snapshot ID + - Persist to store + - Send `SnapshotCreated` on the stream +3. The snapshot ID is set in `message.Metadata["snapshotId"]` on the last message, so the client knows which snapshot corresponds to which message and can revert to any point + +### 5.2 Resuming from Snapshots + +When `WithSnapshotID` is provided to `StreamBidi`: + +1. Load the snapshot from the store +2. Extract the `SessionState` from the snapshot +3. Initialize the session with that state (messages, custom state, artifacts) +4. New snapshots will reference this as the parent +5. Conversation continues from the restored state + +When `WithState` is provided to `StreamBidi`: + +1. Use the provided `SessionState` directly +2. Initialize the session with that state +3. No parent snapshot reference (client-managed mode) + +--- + +## 6. Internal Flow + +### 6.1 SessionFlow Wrapping + +The user's `SessionFlowFunc` returns `error`. The framework wraps this to produce `SessionFlowResponse`: + +```go +func (sf *SessionFlow[Stream, State]) runWrapped( + ctx context.Context, + init *SessionFlowInit[State], + inCh <-chan *SessionFlowInput, + outCh chan<- *SessionFlowStreamChunk[Stream], +) (*SessionFlowResponse[State], error) { + session := newSessionFromInit(init, sf.store) + session.inCh = inCh + ctx = NewSessionContext(ctx, session) + + responder := &Responder[Stream]{ + ch: outCh, + session: session, + snapshotCallback: sf.snapshotCallback, + store: sf.store, + } + session.onEndTurn = responder.endTurn + + params := &SessionFlowParams[Stream, State]{ + Session: session, + } + + err := sf.fn(ctx, responder, params) + if err != nil { + return nil, err + } + + snapshotID := responder.triggerSnapshot() + + return &SessionFlowResponse[State]{ + State: session.toState(), + SnapshotID: snapshotID, + }, nil +} +``` + +### 6.2 Session.Run and Turn Lifecycle + +`Session.Run` owns the input loop. For each input received from the channel: + +1. Create an OTel span (`sessionFlow/turn/{turnIndex}`) with input messages as attributes +2. Add input messages to session +3. Call the user's turn function with the span context +4. On success: trigger snapshot, send `EndTurn` chunk, increment turn index +5. On error: record error on span and return + +```go +func (s *Session[State]) Run( + ctx context.Context, + fn func(ctx context.Context, input *SessionFlowInput) error, +) error { + for input := range s.inCh { + ctx, span := tracer.Start(ctx, fmt.Sprintf("sessionFlow/turn/%d", s.turnIndex)) + + s.AddMessages(input.Messages...) + + if err := fn(ctx, input); err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + span.End() + return err + } + + s.onEndTurn() + s.turnIndex++ + span.End() + } + return nil +} +``` + +--- + +## 7. Example Usage + +### 7.1 Chat SessionFlow with Snapshots + +```go +package main + +import ( + "context" + "fmt" + + "github.com/firebase/genkit/go/ai" + aix "github.com/firebase/genkit/go/ai/x" + "github.com/firebase/genkit/go/genkit" + "github.com/firebase/genkit/go/plugins/googlegenai" +) + +type ChatState struct { + UserPreferences map[string]string `json:"userPreferences,omitempty"` + TopicHistory []string `json:"topicHistory,omitempty"` +} + +type ChatStatus struct { + Phase string `json:"phase"` + Details string `json:"details,omitempty"` +} + +func main() { + ctx := context.Background() + store := aix.NewInMemorySnapshotStore[ChatState]() + + g := genkit.Init(ctx, + genkit.WithPlugins(&googlegenai.GoogleAI{}), + genkit.WithDefaultModel("googleai/gemini-3-flash-preview"), + ) + + chatFlow := genkit.DefineSessionFlow(g, "chatFlow", + func(ctx context.Context, resp *aix.Responder[ChatStatus], params *aix.SessionFlowParams[ChatStatus, ChatState]) error { + return params.Session.Run(ctx, func(ctx context.Context, input *aix.SessionFlowInput) error { + sess := params.Session + + resp.SendStatus(ChatStatus{Phase: "generating"}) + + for result, err := range genkit.GenerateStream(ctx, g, + ai.WithMessages(sess.Messages()...), + ) { + if err != nil { + return err + } + if result.Done { + sess.AddMessages(result.Response.Message) + } + resp.SendChunk(result.Chunk) + } + + sess.PatchCustom(func(s ChatState) ChatState { + s.TopicHistory = append(s.TopicHistory, extractTopic(input.Messages)) + return s + }) + + resp.SendStatus(ChatStatus{Phase: "complete"}) + return nil + }) + }, + aix.WithSnapshotStore(store), + ) + + conn, _ := chatFlow.StreamBidi(ctx) + + conn.SendText("Hello! Tell me about Go programming.") + for chunk, err := range conn.Receive() { + if err != nil { + panic(err) + } + if chunk.Chunk != nil { + fmt.Print(chunk.Chunk.Text()) + } + if chunk.EndTurn { + break + } + } + + conn.SendText("What are channels used for?") + for chunk, err := range conn.Receive() { + if err != nil { + panic(err) + } + if chunk.Chunk != nil { + fmt.Print(chunk.Chunk.Text()) + } + if chunk.EndTurn { + break + } + } + + conn.Close() + + response, _ := conn.Output() + fmt.Printf("Messages in history: %d\n", len(response.State.Messages)) + if response.SnapshotID != "" { + fmt.Printf("Final snapshot: %s\n", response.SnapshotID) + } +} +``` + +### 7.2 Resuming from a Snapshot + +```go +snapshotID := "abc123..." + +conn, _ := chatFlow.StreamBidi(ctx, aix.WithSnapshotID[ChatState](snapshotID)) + +conn.SendText("Continue our discussion about channels") +for chunk, err := range conn.Receive() { + // ... handle response ... +} +``` + +### 7.3 Client-Managed State + +For clients that manage their own state (e.g., web apps with local storage): + +```go +clientState := &aix.SessionState[ChatState]{ + Messages: previousMessages, + Custom: ChatState{UserPreferences: prefs}, +} + +conn, _ := chatFlow.StreamBidi(ctx, aix.WithState(clientState)) + +// ... interact ... + +response, _ := conn.Output() +// Client stores response.State locally for next invocation +``` + +### 7.4 SessionFlow with Artifacts + +```go +type CodeState struct { + Language string `json:"language"` +} + +type CodeStatus struct { + Phase string `json:"phase"` +} + +codeFlow := genkit.DefineSessionFlow(g, "codeFlow", + func(ctx context.Context, resp *aix.Responder[CodeStatus], params *aix.SessionFlowParams[CodeStatus, CodeState]) error { + return params.Session.Run(ctx, func(ctx context.Context, input *aix.SessionFlowInput) error { + sess := params.Session + + generatedCode := "func main() { fmt.Println(\"Hello\") }" + + resp.SendStatus(CodeStatus{Phase: "code_generated"}) + + resp.SendArtifact(&aix.SessionFlowArtifact{ + Name: "main.go", + Parts: []*ai.Part{ai.NewTextPart(generatedCode)}, + Metadata: map[string]any{"language": "go"}, + }) + + sess.AddMessages(ai.NewModelTextMessage("Here's the code you requested.")) + return nil + }) + }, + aix.WithSnapshotStore(store), +) +``` + +--- + +## 8. Tracing Integration + +Each turn executed by `Session.Run` creates an OTel span named `sessionFlow/turn/{turnIndex}`. This provides per-turn visibility into inputs, outputs, and timing. + +When a snapshot is created (at turn end or invocation end), metadata is recorded on the current span: + +- `genkit:metadata:snapshotId` - The snapshot ID +- `genkit:metadata:sessionFlow` - The session flow name (e.g., `chatFlow`) + +This enables the Dev UI to correlate traces with snapshots and fetch snapshot data via the reflection API. + +**Recording snapshot in span:** + +```go +func (r *Responder[Stream]) triggerSnapshot() string { + // ... create snapshot ... + + if snapshot != nil { + span := trace.SpanFromContext(r.ctx) + span.SetAttributes( + attribute.String("genkit:metadata:snapshotId", snapshot.SnapshotID), + attribute.String("genkit:metadata:sessionFlow", r.flowName), + ) + return snapshot.SnapshotID + } + return "" +} +``` + +--- + +## 9. Reflection API Integration + +Snapshot stores are exposed via the reflection API for Dev UI access. + +### 9.1 Action Registration + +When `DefineSessionFlow` is called with `WithSnapshotStore`, actions are registered: + +| Action | Key | Input | Returns | +|--------|-----|-------|---------| +| getSnapshot | `/snapshot-store/{flow}/getSnapshot` | `{snapshotId: string}` | `SessionSnapshot[State]` | + +### 9.2 Action Type + +```go +const ActionTypeSnapshotStore api.ActionType = "snapshot-store" +``` + +### 9.3 Dev UI Flow + +1. Dev UI receives a trace with `snapshotId` and `sessionFlow` in span metadata +2. Calls `POST /api/runAction` with key `/snapshot-store/{flow}/getSnapshot` +3. Displays the returned state alongside the trace + +This allows developers to inspect the exact session flow state at any traced point in the conversation. + +--- + +## 10. Files to Create/Modify + +### New Files + +| File | Description | +|------|-------------| +| `go/ai/x/session_flow.go` | SessionFlow, SessionFlowFunc, SessionFlowParams, Responder, Session | +| `go/ai/x/session_flow_state.go` | SessionState, SessionSnapshot, SessionFlowArtifact, SessionFlowInit, SessionFlowResponse, SessionFlowStreamChunk | +| `go/ai/x/session_flow_options.go` | SessionFlowOption, StreamBidiOption, SnapshotCallback, SnapshotContext | +| `go/ai/x/session_flow_store.go` | SnapshotStore interface, InMemorySnapshotStore | +| `go/ai/x/session_flow_test.go` | Tests | + +### Modified Files + +| File | Change | +|------|--------| +| `go/genkit/session_flow.go` | Add DefineSessionFlow wrapper | +| `go/core/api/action.go` | Add ActionTypeSessionFlow, ActionTypeSnapshotStore constants | + +--- + +## 11. Design Decisions + +### Why Separate SessionState from SessionSnapshot? + +**SessionState** is the portable state that flows between client and server: +- Just the data: Messages, Custom, Artifacts +- No IDs, timestamps, or metadata +- Time is implicit: it's either input or output +- Clients manage it however they want + +**SessionSnapshot** is a persisted point-in-time capture: +- Has a UUID-based ID +- Has timestamps and metadata (ParentID, TurnIndex) +- Used for storage, debugging, branching/restoration +- Managed by the framework and store + +This separation provides: +- **Clarity**: Users know exactly what fields are relevant for their use case +- **Simplicity**: Client-managed state doesn't deal with server metadata +- **Flexibility**: Server can add snapshot metadata without affecting client API + +### Why Mandate Messages in State? + +Messages are fundamental to conversation continuity. By including them in the state schema: + +- Ensures consistent conversation history across invocations +- Prevents common bugs where messages are lost between turns +- Enables the framework to optimize message handling +- Provides a standard structure that tools and middleware can rely on + +### Why UUID Snapshot IDs? + +- **Simplicity**: No need to serialize and hash state +- **Uniqueness**: Every snapshot gets its own ID regardless of content +- **Performance**: No serialization overhead for ID generation + +### Why Callback-Based Snapshotting? + +Rather than always snapshotting or never snapshotting: + +- **Efficiency**: Only snapshot when needed +- **Flexibility**: Different strategies for different use cases +- **User control**: Application decides snapshot granularity +- **Sensible default**: Always snapshots when a store is configured, no callback needed for common case + +### Why Does SendArtifact Add to Session? + +`SendArtifact()` both streams the artifact to the client and adds it to the session state (replacing by name if a duplicate exists). This prevents a common class of bugs where the developer streams an artifact but forgets to add it to session state, resulting in artifacts being lost across snapshots. + +### SessionFlow Function Signature Options + +Three options were considered for how the session flow function receives and processes inputs: + +**Option A: Manual loop over channel** + +The function receives the input channel directly and manually loops over it, calling `RunTurn` for each input: + +```go +genkit.DefineSessionFlow(g, "chatFlow", + func(ctx context.Context, inCh <-chan *aix.SessionFlowInput, resp *aix.Responder[ChatStatus], params *aix.SessionFlowParams[ChatState]) error { + // ... setup + for input := range inCh { + err := params.Session.RunTurn(ctx, input, func(ctx context.Context) error { + // ...turn logic + return nil + }) + if err != nil { + return err + } + } + // ... teardown + return nil + }, +) +``` + +*Pros:* Maximum flexibility for custom loop control. *Cons:* Boilerplate in every session flow; easy to forget `RunTurn` or mess up error handling. + +**Option B: Session.Run pulls from channel** + +The function receives the input channel but passes it to `Session.Run`, which handles the loop: + +```go +genkit.DefineSessionFlow(g, "chatFlow", + func(ctx context.Context, inCh <-chan *aix.SessionFlowInput, resp *aix.Responder[ChatStatus], params *aix.SessionFlowParams[ChatState]) error { + // ... setup + err := params.Session.Run(ctx, inCh, func(ctx context.Context, input *aix.SessionFlowInput) error { + // ... turn logic + return nil + }) + // ... teardown + if err != nil { + return err + } + return nil + }, +) +``` + +*Pros:* Explicit channel visibility. *Cons:* Redundant parameter (channel appears in signature but is always passed to `Run`); awkward if user wants to do something with the channel other than pass it to `Run`. + +**Option C: Session owns the loop (chosen)** + +The function does not receive the input channel. The session holds the channel internally, and `Session.Run` handles the loop: + +```go +genkit.DefineSessionFlow(g, "chatFlow", + func(ctx context.Context, resp *aix.Responder[ChatStatus], params *aix.SessionFlowParams[ChatState]) error { + // ... setup + err := params.Session.Run(ctx, func(ctx context.Context, input *aix.SessionFlowInput) error { + // ... turn logic + return nil + }) + // ... teardown + if err != nil { + return err + } + return nil + }, +) +``` + +*Pros:* Cleanest API; impossible to misuse the channel; framework handles all loop mechanics. *Cons:* Less flexibility for advanced use cases that need direct channel access. + +**Why Option C?** + +Option C was chosen because: +- It provides the simplest API for the common case +- It eliminates a class of bugs (forgetting to call `RunTurn`, incorrect loop handling) +- The channel is an implementation detail that users rarely need to interact with directly +- Setup and teardown remain possible before/after `Session.Run` +- Advanced users can request an escape hatch if needed, but the default should guide toward correct usage + +--- + +## 12. Future Considerations + +Out of scope for this design: + +- **Snapshot expiration**: Automatic cleanup based on age or count +- **Snapshot compression**: Delta/patch-based storage +- **Snapshot branching**: Tree-structured conversation histories +- **Snapshot annotations**: User-provided labels or descriptions +- **Tool iteration snapshots**: Mid-turn snapshots after tool execution