Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions cmd/epp/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"strconv"
"strings"
"sync/atomic"
"time"

"github.com/go-logr/logr"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -328,6 +329,8 @@ func (r *Runner) Run(ctx context.Context) error {
saturationDetector := saturationdetector.NewDetector(eppConfig.SaturationDetectorConfig, setupLog)

// --- Admission Control Initialization ---
locator := requestcontrol.NewDatastorePodLocator(ds)
cachedLocator := requestcontrol.NewCachedPodLocator(ctx, locator, time.Millisecond*50)
var admissionController requestcontrol.AdmissionController
if r.featureGates[flowcontrol.FeatureGate] {
setupLog.Info("Initializing experimental Flow Control layer")
Expand All @@ -341,21 +344,28 @@ func (r *Runner) Run(ctx context.Context) error {
if err != nil {
return fmt.Errorf("failed to initialize Flow Registry: %w", err)
}
fc, err := fccontroller.NewFlowController(ctx, fcCfg.Controller, registry, saturationDetector, setupLog)
fc, err := fccontroller.NewFlowController(
ctx,
fcCfg.Controller,
registry, saturationDetector,
cachedLocator,
setupLog,
)
if err != nil {
return fmt.Errorf("failed to initialize Flow Controller: %w", err)
}
go registry.Run(ctx)
admissionController = requestcontrol.NewFlowControlAdmissionController(saturationDetector, fc)
admissionController = requestcontrol.NewFlowControlAdmissionController(fc)
} else {
setupLog.Info("Experimental Flow Control layer is disabled, using legacy admission control")
admissionController = requestcontrol.NewLegacyAdmissionController(saturationDetector)
admissionController = requestcontrol.NewLegacyAdmissionController(saturationDetector, cachedLocator)
}

director := requestcontrol.NewDirectorWithConfig(
ds,
scheduler,
admissionController,
cachedLocator,
r.requestControlConfig)

// --- Setup ExtProc Server Runner ---
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,16 @@ import (
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
)

// PodLocator defines the contract for a component that resolves the set of candidate pods for a request based on its
// metadata (e.g., subsetting).
//
// This interface allows the Flow Controller to fetch a fresh list of pods dynamically during the dispatch cycle,
// enabling support for "Scale-from-Zero" scenarios where pods may not exist when the request is first enqueued.
type PodLocator interface {
// Locate returns a list of pod metrics that match the criteria defined in the request metadata.
Locate(ctx context.Context, requestMetadata map[string]any) []metrics.PodMetrics
}

// SaturationDetector defines the contract for a component that provides real-time load signals to the
// `controller.FlowController`.
//
Expand Down
28 changes: 28 additions & 0 deletions pkg/epp/flowcontrol/contracts/mocks/mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ import (
typesmocks "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types/mocks"
)

// --- RegistryShard Mocks ---

// MockRegistryShard is a simple "stub-style" mock for testing.
// Its methods are implemented as function fields (e.g., `IDFunc`). A test can inject behavior by setting the desired
// function field in the test setup. If a func is nil, the method will return a zero value.
Expand Down Expand Up @@ -111,6 +113,8 @@ func (m *MockRegistryShard) Stats() contracts.ShardStats {
return contracts.ShardStats{}
}

// --- Dependency Mocks ---

// MockSaturationDetector is a simple "stub-style" mock for testing.
type MockSaturationDetector struct {
IsSaturatedFunc func(ctx context.Context, candidatePods []metrics.PodMetrics) bool
Expand All @@ -123,6 +127,30 @@ func (m *MockSaturationDetector) IsSaturated(ctx context.Context, candidatePods
return false
}

// MockPodLocator provides a mock implementation of the contracts.PodLocator interface.
// It allows tests to control the exact set of pods returned for a given request.
type MockPodLocator struct {
// LocateFunc allows injecting custom logic.
LocateFunc func(ctx context.Context, requestMetadata map[string]any) []metrics.PodMetrics
// Pods is a static return value used if LocateFunc is nil.
Pods []metrics.PodMetrics
}

func (m *MockPodLocator) Locate(ctx context.Context, requestMetadata map[string]any) []metrics.PodMetrics {
if m.LocateFunc != nil {
return m.LocateFunc(ctx, requestMetadata)
}
// Return copy to be safe
if m.Pods == nil {
return nil
}
result := make([]metrics.PodMetrics, len(m.Pods))
copy(result, m.Pods)
return result
}

// --- ManagedQueue Mock ---

// MockManagedQueue is a high-fidelity, thread-safe mock of the `contracts.ManagedQueue` interface, designed
// specifically for testing the concurrent `controller/internal.ShardProcessor`.
//
Expand Down
7 changes: 7 additions & 0 deletions pkg/epp/flowcontrol/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type shardProcessorFactory func(
ctx context.Context,
shard contracts.RegistryShard,
saturationDetector contracts.SaturationDetector,
podLocator contracts.PodLocator,
clock clock.WithTicker,
cleanupSweepInterval time.Duration,
enqueueChannelBufferSize int,
Expand Down Expand Up @@ -95,6 +96,7 @@ type FlowController struct {
config Config
registry registryClient
saturationDetector contracts.SaturationDetector
podLocator contracts.PodLocator
clock clock.WithTicker
logger logr.Logger
shardProcessorFactory shardProcessorFactory
Expand Down Expand Up @@ -126,13 +128,15 @@ func NewFlowController(
config Config,
registry contracts.FlowRegistry,
sd contracts.SaturationDetector,
podLocator contracts.PodLocator,
logger logr.Logger,
opts ...flowControllerOption,
) (*FlowController, error) {
fc := &FlowController{
config: config,
registry: registry,
saturationDetector: sd,
podLocator: podLocator,
clock: clock.RealClock{},
logger: logger.WithName("flow-controller"),
parentCtx: ctx,
Expand All @@ -142,6 +146,7 @@ func NewFlowController(
ctx context.Context,
shard contracts.RegistryShard,
saturationDetector contracts.SaturationDetector,
podLocator contracts.PodLocator,
clock clock.WithTicker,
cleanupSweepInterval time.Duration,
enqueueChannelBufferSize int,
Expand All @@ -151,6 +156,7 @@ func NewFlowController(
ctx,
shard,
saturationDetector,
podLocator,
clock,
cleanupSweepInterval,
enqueueChannelBufferSize,
Expand Down Expand Up @@ -448,6 +454,7 @@ func (fc *FlowController) getOrStartWorker(shard contracts.RegistryShard) *manag
processorCtx,
shard,
fc.saturationDetector,
fc.podLocator,
fc.clock,
fc.config.ExpiryCleanupInterval,
fc.config.EnqueueChannelBufferSize,
Expand Down
13 changes: 7 additions & 6 deletions pkg/epp/flowcontrol/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ type testHarness struct {
// clock is the clock interface used by the controller.
clock clock.WithTicker
mockRegistry *mockRegistryClient
mockDetector *mocks.MockSaturationDetector
// mockClock provides access to FakeClock methods (Step, HasWaiters) if and only if the underlying clock is a
// FakeClock.
mockClock *testclock.FakeClock
Expand All @@ -91,6 +90,7 @@ type testHarness struct {
func newUnitHarness(t *testing.T, ctx context.Context, cfg Config, registry *mockRegistryClient) *testHarness {
t.Helper()
mockDetector := &mocks.MockSaturationDetector{}
mockPodLocator := &mocks.MockPodLocator{}

// Initialize the FakeClock with the current system time.
// The controller implementation uses the injected clock to calculate the deadline timestamp,vbut uses the standard
Expand All @@ -113,15 +113,14 @@ func newUnitHarness(t *testing.T, ctx context.Context, cfg Config, registry *moc
withClock(mockClock),
withShardProcessorFactory(mockProcessorFactory.new),
}
fc, err := NewFlowController(ctx, cfg, registry, mockDetector, logr.Discard(), opts...)
fc, err := NewFlowController(ctx, cfg, registry, mockDetector, mockPodLocator, logr.Discard(), opts...)
require.NoError(t, err, "failed to create FlowController for unit test harness")

h := &testHarness{
fc: fc,
cfg: cfg,
clock: mockClock,
mockRegistry: registry,
mockDetector: mockDetector,
mockClock: mockClock,
mockProcessorFactory: mockProcessorFactory,
}
Expand All @@ -133,8 +132,9 @@ func newUnitHarness(t *testing.T, ctx context.Context, cfg Config, registry *moc
func newIntegrationHarness(t *testing.T, ctx context.Context, cfg Config, registry *mockRegistryClient) *testHarness {
t.Helper()
mockDetector := &mocks.MockSaturationDetector{}
// Align FakeClock with system time. See explanation in newUnitHarness.
mockPodLocator := &mocks.MockPodLocator{}

// Align FakeClock with system time. See explanation in newUnitHarness.
mockClock := testclock.NewFakeClock(time.Now())
if registry == nil {
registry = &mockRegistryClient{}
Expand All @@ -144,15 +144,14 @@ func newIntegrationHarness(t *testing.T, ctx context.Context, cfg Config, regist
withRegistryClient(registry),
withClock(mockClock),
}
fc, err := NewFlowController(ctx, cfg, registry, mockDetector, logr.Discard(), opts...)
fc, err := NewFlowController(ctx, cfg, registry, mockDetector, mockPodLocator, logr.Discard(), opts...)
require.NoError(t, err, "failed to create FlowController for integration test harness")

h := &testHarness{
fc: fc,
cfg: cfg,
clock: mockClock,
mockRegistry: registry,
mockDetector: mockDetector,
mockClock: mockClock,
}
return h
Expand Down Expand Up @@ -247,6 +246,7 @@ func (f *mockShardProcessorFactory) new(
_ context.Context, // The factory does not use the lifecycle context; it's passed to the processor's Run method later.
shard contracts.RegistryShard,
_ contracts.SaturationDetector,
_ contracts.PodLocator,
_ clock.WithTicker,
_ time.Duration,
_ int,
Expand Down Expand Up @@ -1001,6 +1001,7 @@ func TestFlowController_WorkerManagement(t *testing.T) {
ctx context.Context, // The context created by getOrStartWorker for the potential new processor.
shard contracts.RegistryShard,
_ contracts.SaturationDetector,
_ contracts.PodLocator,
_ clock.WithTicker,
_ time.Duration,
_ int,
Expand Down
7 changes: 5 additions & 2 deletions pkg/epp/flowcontrol/controller/internal/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ var ErrProcessorBusy = errors.New("shard processor is busy")
type ShardProcessor struct {
shard contracts.RegistryShard
saturationDetector contracts.SaturationDetector
podLocator contracts.PodLocator
clock clock.WithTicker
cleanupSweepInterval time.Duration
logger logr.Logger
Expand All @@ -86,6 +87,7 @@ func NewShardProcessor(
ctx context.Context,
shard contracts.RegistryShard,
saturationDetector contracts.SaturationDetector,
podLocator contracts.PodLocator,
clock clock.WithTicker,
cleanupSweepInterval time.Duration,
enqueueChannelBufferSize int,
Expand All @@ -94,6 +96,7 @@ func NewShardProcessor(
return &ShardProcessor{
shard: shard,
saturationDetector: saturationDetector,
podLocator: podLocator,
clock: clock,
cleanupSweepInterval: cleanupSweepInterval,
logger: logger,
Expand Down Expand Up @@ -307,8 +310,8 @@ func (sp *ShardProcessor) dispatchCycle(ctx context.Context) bool {

// --- Viability Check (Saturation/HoL Blocking) ---
req := item.OriginalRequest()
candidatePods := req.CandidatePodsForScheduling()
if sp.saturationDetector.IsSaturated(ctx, candidatePods) {
candidates := sp.podLocator.Locate(ctx, req.GetMetadata())
if sp.saturationDetector.IsSaturated(ctx, candidates) {
sp.logger.V(logutil.DEBUG).Info("Policy's chosen item is saturated; enforcing HoL blocking.",
"flowKey", req.FlowKey(), "reqID", req.ID(), "priorityName", originalBand.PriorityName())
// Stop the dispatch cycle entirely to respect strict policy decision and prevent priority inversion where
Expand Down
3 changes: 3 additions & 0 deletions pkg/epp/flowcontrol/controller/internal/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ type testHarness struct {
clock *testclock.FakeClock
logger logr.Logger
saturationDetector *mocks.MockSaturationDetector
podLocator *mocks.MockPodLocator

// --- Centralized Mock State ---
// The harness's mutex protects the single source of truth for all mock state.
Expand All @@ -96,6 +97,7 @@ func newTestHarness(t *testing.T, expiryCleanupInterval time.Duration) *testHarn
clock: testclock.NewFakeClock(time.Now()),
logger: logr.Discard(),
saturationDetector: &mocks.MockSaturationDetector{},
podLocator: &mocks.MockPodLocator{Pods: []metrics.PodMetrics{&metrics.FakePodMetrics{}}},
startSignal: make(chan struct{}),
queues: make(map[types.FlowKey]*mocks.MockManagedQueue),
priorityFlows: make(map[int][]types.FlowKey),
Expand Down Expand Up @@ -123,6 +125,7 @@ func newTestHarness(t *testing.T, expiryCleanupInterval time.Duration) *testHarn
h.ctx,
h,
h.saturationDetector,
h.podLocator,
h.clock,
expiryCleanupInterval,
100,
Expand Down
25 changes: 9 additions & 16 deletions pkg/epp/flowcontrol/types/mocks/mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,19 @@ package mocks
import (
"time"

"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types"
)

// MockFlowControlRequest provides a mock implementation of the `types.FlowControlRequest` interface.
// MockFlowControlRequest provides a mock implementation of the types.FlowControlRequest interface.
type MockFlowControlRequest struct {
FlowKeyV types.FlowKey
ByteSizeV uint64
InitialEffectiveTTLV time.Duration
IDV string
CandidatePodsForSchedulingV []*metrics.FakePodMetrics
FlowKeyV types.FlowKey
ByteSizeV uint64
InitialEffectiveTTLV time.Duration
IDV string
MetadataV map[string]any
}

// NewMockFlowControlRequest creates a new `MockFlowControlRequest` instance.
// NewMockFlowControlRequest creates a new MockFlowControlRequest instance.
func NewMockFlowControlRequest(
byteSize uint64,
id string,
Expand All @@ -44,21 +43,15 @@ func NewMockFlowControlRequest(
ByteSizeV: byteSize,
IDV: id,
FlowKeyV: key,
MetadataV: make(map[string]any),
}
}

func (m *MockFlowControlRequest) FlowKey() types.FlowKey { return m.FlowKeyV }
func (m *MockFlowControlRequest) ByteSize() uint64 { return m.ByteSizeV }
func (m *MockFlowControlRequest) InitialEffectiveTTL() time.Duration { return m.InitialEffectiveTTLV }
func (m *MockFlowControlRequest) ID() string { return m.IDV }

func (m *MockFlowControlRequest) CandidatePodsForScheduling() []metrics.PodMetrics {
pods := make([]metrics.PodMetrics, 0, len(m.CandidatePodsForSchedulingV))
for i, pod := range m.CandidatePodsForSchedulingV {
pods[i] = pod
}
return pods
}
func (m *MockFlowControlRequest) GetMetadata() map[string]any { return m.MetadataV }

var _ types.FlowControlRequest = &MockFlowControlRequest{}

Expand Down
12 changes: 5 additions & 7 deletions pkg/epp/flowcontrol/types/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ package types

import (
"time"

"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
)

// FlowControlRequest is the contract for an incoming request submitted to the `controller.FlowController`. It
Expand All @@ -45,15 +43,15 @@ type FlowControlRequest interface {
// applied.
InitialEffectiveTTL() time.Duration

// CandidatePodsForScheduling passes through a set of candidate pods a request may be admitted to.
// This is necessary for invoking `contracts.SaturationDetector.IsSaturated`, but it is otherwise unused in the Flow
// Control system.
CandidatePodsForScheduling() []metrics.PodMetrics

// ID returns an optional, user-facing unique identifier for this specific request. It is intended for logging,
// tracing, and observability. The `controller.FlowController` does not use this ID for dispatching decisions; it uses
// the internal, opaque `QueueItemHandle`.
ID() string

// GetMetadata returns the opaque metadata associated with the request (e.g., header-derived context, subset filters).
// This data is passed transparently to components like the contracts.PodLocator to resolve resources (candidate pods)
// lazily during the dispatch cycle.
GetMetadata() map[string]any
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lioraron This incidentally provides the plumbing necessary for #1863. Let me know if this is what you were expecting. This is populated from reqCtx.Request.Metadata.

IntraFlowDispatchPolicy.SelectItem gets access to QueueItemAccessor which exposes OriginalRequest() FlowControlRequest. Now your custom plugin impl can extract whatever it wants from here.

Is this sufficient to resolve #1863 or are you also seeking an extension point to intercept and augment reqCtx.Request.Metadata with additional key-value pairs?

}

// QueueItemHandle is an opaque handle to an item that has been successfully added to a `framework.SafeQueue`. It acts
Expand Down
Loading