Skip to content

feat(go): added DefineSessionFlow#4462

Draft
apascal07 wants to merge 6 commits intoap/go-bidifrom
ap/go-session-flow
Draft

feat(go): added DefineSessionFlow#4462
apascal07 wants to merge 6 commits intoap/go-bidifrom
ap/go-session-flow

Conversation

@apascal07
Copy link
Collaborator

@apascal07 apascal07 commented Feb 6, 2026

Adds bidirectional streaming primitives to core and a high-level SessionFlow API for multi-turn conversations with automatic snapshot management.


Bidirectional Streaming Flows

DefineBidiFlow creates a flow that accepts multiple inputs and streams multiple outputs over a persistent connection:

echoFlow := genkit.DefineBidiFlow(g, "echo",
    func(ctx context.Context, _ struct{}, inCh <-chan string, outCh chan<- string) (int, error) {
        var count int
        for msg := range inCh {
            count++
            outCh <- fmt.Sprintf("echo: %s", msg)
        }
        return count, nil
    },
)

conn, _ := echoFlow.StreamBidi(ctx, struct{}{})

go func() {
    conn.Send("hello")
    conn.Send("world")
    conn.Close()
}()

for chunk, err := range conn.Receive() {
    fmt.Println(chunk) // "echo: hello", "echo: world"
}

output, _ := conn.Output()
fmt.Println(output) // 2

Session Flows

DefineSessionFlow builds on bidi streaming to provide multi-turn conversations with managed state, token-level streaming, and automatic snapshots. The Session.Run loop handles turn boundaries — you bring your own generation logic:

chatFlow := genkit.DefineSessionFlow(g, "chat",
    func(ctx context.Context, resp aix.Responder[any], params *aix.SessionFlowParams[struct{}]) error {
        return params.Session.Run(ctx, func(ctx context.Context, input *aix.SessionFlowInput) error {
            sess := params.Session

            for chunk, err := range genkit.GenerateStream(ctx, g,
                ai.WithModelName("googleai/gemini-3-flash-preview"),
                ai.WithMessages(sess.Messages()...),
            ) {
                if err != nil {
                    return err
                }
                if chunk.Done {
                    sess.AddMessages(chunk.Response.Message)
                    break
                }
                resp.SendChunk(chunk.Chunk) // stream tokens to client
            }

            return nil
        })
    },
)

The client drives the conversation by sending messages and iterating chunks until EndTurn:

conn, _ := chatFlow.StreamBidi(ctx)

conn.SendText("What is Go?")

for chunk, err := range conn.Receive() {
    if chunk.Chunk != nil {
        fmt.Print(chunk.Chunk.Text())
    }
    if chunk.EndTurn {
        break // turn complete, ready for next input
    }
}

conn.SendText("Tell me more about its concurrency model")
// ... iterate conn.Receive() again ...

conn.Close()

Prompt-Backed Session Flows

DefineSessionFlowFromPrompt eliminates the manual generate loop entirely. Give it a prompt and it handles rendering, streaming, and history management automatically:

# prompts/chat.prompt
---
model: googleai/gemini-3-flash-preview
input:
  schema:
    personality: string
  default:
    personality: a helpful assistant
---
You are {{personality}}. Keep responses concise.
type ChatInput struct {
    Personality string `json:"personality"`
}

chatPrompt := genkit.LookupDataPrompt[ChatInput, string](g, "chat")

chatFlow := genkit.DefineSessionFlowFromPrompt[struct{}](
    g, "chat", chatPrompt, ChatInput{Personality: "a sarcastic pirate"},
)

conn, _ := chatFlow.StreamBidi(ctx)
conn.SendText("What is Go?")
for chunk, _ := range conn.Receive() {
    // tokens stream automatically
    if chunk.EndTurn { break }
}
conn.Close()

Snapshots & Resumption

Configure automatic snapshot persistence with a store and optional callback:

store := aix.NewInMemorySnapshotStore[MyState]()

chatFlow := genkit.DefineSessionFlow(g, "chat", myFunc,
    aix.WithSnapshotStore(store),
    aix.WithSnapshotCallback(aix.SnapshotOn[MyState](aix.SnapshotEventTurnEnd)),
)

Resume a conversation from a server-stored snapshot:

conn, _ := chatFlow.StreamBidi(ctx, aix.WithSnapshotID[MyState]("snapshot-abc-123"))

Or resume from client-kept state (no server store needed):

conn, _ := chatFlow.StreamBidi(ctx, aix.WithState(&aix.SessionState[MyState]{
    Messages: previousMessages,
    Custom:   MyState{Topic: "concurrency"},
}))

Custom Session State

The State type parameter lets you maintain typed state across turns:

type ChatState struct {
    TopicsDiscussed []string `json:"topicsDiscussed"`
}

chatFlow := genkit.DefineSessionFlow(g, "chat",
    func(ctx context.Context, resp aix.Responder[any], params *aix.SessionFlowParams[ChatState]) error {
        return params.Session.Run(ctx, func(ctx context.Context, input *aix.SessionFlowInput) error {
            // ... generate response ...

            params.Session.UpdateCustom(func(s ChatState) ChatState {
                s.TopicsDiscussed = append(s.TopicsDiscussed, extractTopic(input))
                return s
            })
            return nil
        })
    },
    aix.WithSnapshotStore(aix.NewInMemorySnapshotStore[ChatState]()),
)

Custom state is included in snapshots and available when resuming.

Checklist (if applicable):

@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @apascal07, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly enhances the Genkit Go SDK by introducing a new SessionFlow API, specifically designed for building and managing complex, stateful, multi-turn conversational AI applications. This new API is built upon a fundamental refactoring of the core Action type to support bidirectional streaming, enabling more dynamic and interactive AI experiences. The changes provide a structured approach to handling conversational state, including message history, custom data, and generated artifacts, with built-in mechanisms for persistence and lifecycle management.

Highlights

  • Introduction of SessionFlow API: A new SessionFlow API has been introduced in go/ai/x to manage stateful, multi-turn conversational AI interactions, including automatic snapshot management and artifact handling.
  • Core Action Refactoring for Bidirectional Streaming: The core ActionDef type has been refactored to Action to natively support bidirectional streaming, which is a foundational change enabling the SessionFlow API. This includes new BidiFunc and BidiConnection types.
  • Update of Existing AI Components: All existing AI components (embedder, evaluator, model, prompt, resource, retriever) have been updated to utilize the new core.Action type and its extended capabilities.
  • Snapshot Management Features: The SessionFlow now includes robust snapshot management, allowing state to be persisted, loaded, and controlled via callbacks, with an in-memory store implementation provided.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Changelog
  • go/ai/embedder.go
    • Updated embedder struct to use core.Action instead of core.ActionDef.
    • Modified NewEmbedder and LookupEmbedder to align with the new core.Action type and updated ResolveActionFor signature.
  • go/ai/evaluator.go
    • Updated evaluator struct to use core.Action instead of core.ActionDef.
    • Modified NewEvaluator, NewBatchEvaluator, and LookupEvaluator to align with the new core.Action type and updated ResolveActionFor signature.
  • go/ai/generate.go
    • Updated model and generateAction structs to use core.Action instead of core.ActionDef.
    • Modified LookupModel, model.Generate, and model.supportsConstrained to align with the new core.Action type and updated ResolveActionFor signature.
  • go/ai/prompt.go
    • Updated prompt struct to use core.Action instead of core.ActionDef.
    • Modified DefinePrompt, LookupPrompt, and prompt.Desc to align with the new core.Action type and updated ResolveActionFor signature.
  • go/ai/resource.go
    • Updated resource struct to use core.Action instead of core.ActionDef.
    • Modified DefineResource, NewResource, FindMatchingResource, and LookupResource to align with the new core.Action type and updated ResolveActionFor signature.
  • go/ai/retriever.go
    • Updated retriever struct to use core.Action instead of core.ActionDef.
    • Modified NewRetriever and LookupRetriever to align with the new core.Action type and updated ResolveActionFor signature.
  • go/ai/x/option.go
    • Added SessionFlowOption and StreamBidiOption interfaces for configuring session flows and bidirectional streams.
    • Introduced WithSnapshotStore, WithSnapshotCallback, WithState, and WithSnapshotID functions for flexible session flow initialization.
  • go/ai/x/session_flow.go
    • Introduced SessionFlowArtifact, SessionFlowInput, SessionFlowInit, SessionFlowOutput, and SessionFlowStreamChunk data structures.
    • Defined the Session type for managing conversational state (messages, custom state, artifacts) with methods for manipulation and snapshot handling.
    • Implemented Responder for sending various types of stream chunks (generation, status, artifacts).
    • Introduced the SessionFlow type, DefineSessionFlow for registration, and StreamBidi for initiating sessions.
    • Added logic for snapshot creation, loading, and integration with tracing, including SessionFlowConnection for buffered chunk reception.
  • go/ai/x/session_flow_test.go
    • Added comprehensive unit tests for SessionFlow covering multi-turn interactions, snapshot persistence, resuming from snapshots, client-managed state, artifact handling, snapshot callbacks, and error handling.
  • go/ai/x/snapshot.go
    • Defined SessionState for portable conversation state, SnapshotEvent for trigger types, and SessionSnapshot for persisted state.
    • Introduced SnapshotContext and SnapshotCallback for custom snapshot logic.
    • Defined SnapshotStore interface and provided an InMemorySnapshotStore implementation.
    • Added SnapshotOn utility function for selective snapshotting.
  • go/core/action.go
    • Refactored ActionDef to Action and added a new Init type parameter for bidirectional actions.
    • Introduced BidiFunc for bidirectional streaming function signatures and ActionOptions for configuration.
    • Implemented NewBidiAction and DefineBidiAction for creating and registering bidirectional actions.
    • Added StreamBidi method to Action for initiating bidirectional connections.
    • Introduced BidiConnection type for managing bidirectional streaming, including Send, Close, Receive, Output, and Done methods.
    • Updated ResolveActionFor and LookupActionFor to use the new Action type and Init parameter.
    • Added wrapBidiAsStreaming to adapt BidiFunc to StreamingFunc.
  • go/core/action_test.go
    • Updated existing tests to use DefineStreamingAction and the new Init type parameter in ResolveActionFor and LookupActionFor.
    • Added new tests for BidiAction functionality, covering echo, initialization, send after close, context cancellation, and Done channel.
  • go/core/api/action.go
    • Added new ActionType constants: ActionTypeSessionFlow and ActionTypeSnapshotStore.
    • Extended ActionDesc with StreamSchema and InitSchema fields for describing bidirectional actions.
  • go/core/background_action.go
    • Updated BackgroundActionDef to use core.Action instead of core.ActionDef.
    • Modified Register, NewBackgroundAction, and LookupBackgroundAction to align with the new core.Action type and updated ResolveActionFor signature.
  • go/core/flow.go
    • Refactored Flow to be a struct embedding *Action with the new Init type parameter.
    • Updated DefineFlow and DefineStreamingFlow to use the new Flow struct and Action type.
    • Introduced NewBidiFlow and DefineBidiFlow for creating and registering bidirectional flows.
    • Updated Run and Stream methods to use the embedded Action's Run method.
  • go/core/flow_test.go
    • Updated existing tests to reflect changes in Flow type and method calls.
    • Added new tests for BidiFlow functionality, including registration, echo, and integration with core.Run.
  • go/genkit/genkit.go
    • Updated DefineFlow and DefineStreamingFlow signatures to include the new Init type parameter.
    • Added DefineBidiFlow function to expose the new bidirectional flow definition.
  • go/genkit/session_flow.go
    • Introduced DefineSessionFlow as a top-level Genkit function to define and register session flows, wrapping aix.DefineSessionFlow.
  • go/samples/basic-session-flow/main.go
    • Added a sample CLI REPL application demonstrating the usage of SessionFlow for multi-turn conversations with token-level streaming and snapshot management.
Activity
  • The pull request author, apascal07, has indicated that the PR title adheres to conventional commits.
  • The author has confirmed that the changes have been manually and unit tested.
  • Documentation updates are noted as pending in the PR description.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@apascal07 apascal07 changed the base branch from main to ap/go-bidi February 6, 2026 05:48
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This is a great pull request that introduces the SessionFlow feature and refactors the core Action type to support bidirectional streaming. The new functionality is well-structured, comes with comprehensive tests, and includes a helpful sample application. I've identified a critical race condition in the new BidiConnection implementation and a minor issue in the sample code that would prevent it from running. My comments provide suggestions to address these points.

I am having trouble creating individual review comments. Click here to see my feedback.

go/core/action.go (534-550)

high

This Send implementation has a race condition that can cause a panic. The mutex is unlocked on line 540 before the channel send on line 543. If another goroutine calls Close() in between, c.inputCh will be closed, and the send will panic.

A robust way to fix this is to use recover to handle the "send on closed channel" panic, which is a common pattern in Go for this scenario. This avoids holding a lock over a potentially blocking operation.

Here's a suggested safer implementation for Send that removes the racy mutex usage. The Close method's use of the mutex remains important to make it safe for concurrent calls.

func (c *BidiConnection[In, Out, Stream]) Send(input In) (err error) {
	defer func() {
		if r := recover(); r != nil {
			// This recovers from a panic that occurs when sending on a closed channel.
			err = NewError(FAILED_PRECONDITION, "connection is closed")
		}
	}()

	select {
	case c.inputCh <- input:
		return nil
	case <-c.ctx.Done():
		return c.ctx.Err()
	case <-c.doneCh:
		// The recover will handle a panic if doneCh and inputCh close concurrently.
		return NewError(FAILED_PRECONDITION, "action has completed")
	}
}

go/samples/basic-session-flow/main.go (49-53)

medium

The model name googleai/gemini-3-flash-preview appears to be incorrect and will likely cause the sample to fail at runtime. Please use a valid model name, for example googleai/gemini-1.5-flash-latest.

					ai.WithModel(googlegenai.ModelRef("googleai/gemini-1.5-flash-latest", &genai.GenerateContentConfig{
						ThinkingConfig: &genai.ThinkingConfig{
							ThinkingBudget: genai.Ptr[int32](0),
						},
					})),

@apascal07 apascal07 changed the title feat(go): added SessionFlow and related feat(go): added DefineSessionFlow Feb 6, 2026
@apascal07 apascal07 mentioned this pull request Feb 6, 2026
@apascal07 apascal07 linked an issue Feb 6, 2026 that may be closed by this pull request
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

Status: No status

Development

Successfully merging this pull request may close these issues.

RFC: Session flows

1 participant