Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 27 additions & 13 deletions go/core/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ type flowContext struct {
flowName string
}

// DefineFlow creates a Flow that runs fn, and registers it as an action. fn takes an input of type In and returns an output of type Out.
func DefineFlow[In, Out any](r api.Registry, name string, fn Func[In, Out]) *Flow[In, Out, struct{}] {
return (*Flow[In, Out, struct{}])(DefineAction(r, name, api.ActionTypeFlow, nil, nil, func(ctx context.Context, input In) (Out, error) {
// NewFlow creates a Flow that runs fn without registering it. fn takes an input of type In and returns an output of type Out.
func NewFlow[In, Out any](name string, fn Func[In, Out]) *Flow[In, Out, struct{}] {
return (*Flow[In, Out, struct{}])(NewAction(name, api.ActionTypeFlow, nil, nil, func(ctx context.Context, input In) (Out, error) {
fc := &flowContext{
flowName: name,
}
Expand All @@ -56,6 +56,27 @@ func DefineFlow[In, Out any](r api.Registry, name string, fn Func[In, Out]) *Flo
}))
}

// NewStreamingFlow creates a streaming Flow that runs fn without registering it.
func NewStreamingFlow[In, Out, Stream any](name string, fn StreamingFunc[In, Out, Stream]) *Flow[In, Out, Stream] {
return (*Flow[In, Out, Stream])(NewStreamingAction(name, api.ActionTypeFlow, nil, nil, func(ctx context.Context, input In, cb func(context.Context, Stream) error) (Out, error) {
fc := &flowContext{
flowName: name,
}
ctx = flowContextKey.NewContext(ctx, fc)
if cb == nil {
cb = func(context.Context, Stream) error { return nil }
}
return fn(ctx, input, cb)
}))
}
Comment on lines +60 to +71
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Similar to NewFlow, we should add a nil check for the fn argument in NewStreamingFlow. This ensures that we fail early with a descriptive error message if a nil function is provided, preventing a runtime panic that is harder to trace.

func NewStreamingFlow[In, Out, Stream any](name string, fn StreamingFunc[In, Out, Stream]) *Flow[In, Out, Stream] {
	if fn == nil {
		panic("core.NewStreamingFlow: fn cannot be nil")
	}
	return (*Flow[In, Out, Stream])(NewStreamingAction(name, api.ActionTypeFlow, nil, nil, func(ctx context.Context, input In, cb func(context.Context, Stream) error) (Out, error) {
		fc := &flowContext{
			flowName: name,
		}
		ctx = flowContextKey.NewContext(ctx, fc)
		if cb == nil {
			cb = func(context.Context, Stream) error { return nil }
		}
		return fn(ctx, input, cb)
	}))
}


// DefineFlow creates a Flow that runs fn, and registers it as an action. fn takes an input of type In and returns an output of type Out.
func DefineFlow[In, Out any](r api.Registry, name string, fn Func[In, Out]) *Flow[In, Out, struct{}] {
f := NewFlow(name, fn)
f.Register(r)
return f
}

// DefineStreamingFlow creates a streaming Flow that runs fn, and registers it as an action.
//
// fn takes an input of type In and returns an output of type Out, optionally
Expand All @@ -66,16 +87,9 @@ func DefineFlow[In, Out any](r api.Registry, name string, fn Func[In, Out]) *Flo
// with a final return value that includes all the streamed data.
// Otherwise, it should ignore the callback and just return a result.
func DefineStreamingFlow[In, Out, Stream any](r api.Registry, name string, fn StreamingFunc[In, Out, Stream]) *Flow[In, Out, Stream] {
return (*Flow[In, Out, Stream])(DefineStreamingAction(r, name, api.ActionTypeFlow, nil, nil, func(ctx context.Context, input In, cb func(context.Context, Stream) error) (Out, error) {
fc := &flowContext{
flowName: name,
}
ctx = flowContextKey.NewContext(ctx, fc)
if cb == nil {
cb = func(context.Context, Stream) error { return nil }
}
return fn(ctx, input, cb)
}))
f := NewStreamingFlow(name, fn)
f.Register(r)
return f
}

// Run runs the function f in the context of the current flow
Expand Down
12 changes: 12 additions & 0 deletions go/genkit/genkit.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,18 @@ func DefineStreamingFlow[In, Out, Stream any](g *Genkit, name string, fn core.St
return core.DefineStreamingFlow(g.reg, name, fn)
}

// NewFlow creates a [core.Flow] without registering it as an action.
// To register the flow later, call [RegisterAction].
func NewFlow[In, Out any](name string, fn core.Func[In, Out]) *core.Flow[In, Out, struct{}] {
return core.NewFlow(name, fn)
}

// NewStreamingFlow creates a streaming [core.Flow] without registering it as an action.
// To register the flow later, call [RegisterAction].
func NewStreamingFlow[In, Out, Stream any](name string, fn core.StreamingFunc[In, Out, Stream]) *core.Flow[In, Out, Stream] {
return core.NewStreamingFlow(name, fn)
}

// Run executes the given function `fn` within the context of the current flow run,
// creating a distinct trace span for this step. It's used to add observability
// to specific sub-operations within a flow defined by [DefineFlow] or [DefineStreamingFlow].
Expand Down
Loading