diff --git a/go/core/flow.go b/go/core/flow.go index ea514365c2..a2dad316e6 100644 --- a/go/core/flow.go +++ b/go/core/flow.go @@ -45,9 +45,9 @@ type flowContext struct { flowName string } -// DefineFlow creates a Flow that runs fn, and registers it as an action. fn takes an input of type In and returns an output of type Out. -func DefineFlow[In, Out any](r api.Registry, name string, fn Func[In, Out]) *Flow[In, Out, struct{}] { - return (*Flow[In, Out, struct{}])(DefineAction(r, name, api.ActionTypeFlow, nil, nil, func(ctx context.Context, input In) (Out, error) { +// NewFlow creates a Flow that runs fn without registering it. fn takes an input of type In and returns an output of type Out. +func NewFlow[In, Out any](name string, fn Func[In, Out]) *Flow[In, Out, struct{}] { + return (*Flow[In, Out, struct{}])(NewAction(name, api.ActionTypeFlow, nil, nil, func(ctx context.Context, input In) (Out, error) { fc := &flowContext{ flowName: name, } @@ -56,6 +56,27 @@ func DefineFlow[In, Out any](r api.Registry, name string, fn Func[In, Out]) *Flo })) } +// NewStreamingFlow creates a streaming Flow that runs fn without registering it. +func NewStreamingFlow[In, Out, Stream any](name string, fn StreamingFunc[In, Out, Stream]) *Flow[In, Out, Stream] { + return (*Flow[In, Out, Stream])(NewStreamingAction(name, api.ActionTypeFlow, nil, nil, func(ctx context.Context, input In, cb func(context.Context, Stream) error) (Out, error) { + fc := &flowContext{ + flowName: name, + } + ctx = flowContextKey.NewContext(ctx, fc) + if cb == nil { + cb = func(context.Context, Stream) error { return nil } + } + return fn(ctx, input, cb) + })) +} + +// DefineFlow creates a Flow that runs fn, and registers it as an action. fn takes an input of type In and returns an output of type Out. +func DefineFlow[In, Out any](r api.Registry, name string, fn Func[In, Out]) *Flow[In, Out, struct{}] { + f := NewFlow(name, fn) + f.Register(r) + return f +} + // DefineStreamingFlow creates a streaming Flow that runs fn, and registers it as an action. // // fn takes an input of type In and returns an output of type Out, optionally @@ -66,16 +87,9 @@ func DefineFlow[In, Out any](r api.Registry, name string, fn Func[In, Out]) *Flo // with a final return value that includes all the streamed data. // Otherwise, it should ignore the callback and just return a result. func DefineStreamingFlow[In, Out, Stream any](r api.Registry, name string, fn StreamingFunc[In, Out, Stream]) *Flow[In, Out, Stream] { - return (*Flow[In, Out, Stream])(DefineStreamingAction(r, name, api.ActionTypeFlow, nil, nil, func(ctx context.Context, input In, cb func(context.Context, Stream) error) (Out, error) { - fc := &flowContext{ - flowName: name, - } - ctx = flowContextKey.NewContext(ctx, fc) - if cb == nil { - cb = func(context.Context, Stream) error { return nil } - } - return fn(ctx, input, cb) - })) + f := NewStreamingFlow(name, fn) + f.Register(r) + return f } // Run runs the function f in the context of the current flow diff --git a/go/genkit/genkit.go b/go/genkit/genkit.go index 377fb5e836..468d10fd5b 100644 --- a/go/genkit/genkit.go +++ b/go/genkit/genkit.go @@ -359,6 +359,18 @@ func DefineStreamingFlow[In, Out, Stream any](g *Genkit, name string, fn core.St return core.DefineStreamingFlow(g.reg, name, fn) } +// NewFlow creates a [core.Flow] without registering it as an action. +// To register the flow later, call [RegisterAction]. +func NewFlow[In, Out any](name string, fn core.Func[In, Out]) *core.Flow[In, Out, struct{}] { + return core.NewFlow(name, fn) +} + +// NewStreamingFlow creates a streaming [core.Flow] without registering it as an action. +// To register the flow later, call [RegisterAction]. +func NewStreamingFlow[In, Out, Stream any](name string, fn core.StreamingFunc[In, Out, Stream]) *core.Flow[In, Out, Stream] { + return core.NewStreamingFlow(name, fn) +} + // Run executes the given function `fn` within the context of the current flow run, // creating a distinct trace span for this step. It's used to add observability // to specific sub-operations within a flow defined by [DefineFlow] or [DefineStreamingFlow].