diff --git a/apix/config/v1alpha1/endpointpickerconfig_types.go b/apix/config/v1alpha1/endpointpickerconfig_types.go index d30f2084c..18898ce88 100644 --- a/apix/config/v1alpha1/endpointpickerconfig_types.go +++ b/apix/config/v1alpha1/endpointpickerconfig_types.go @@ -30,6 +30,11 @@ import ( type EndpointPickerConfig struct { metav1.TypeMeta `json:",inline"` + // +optional + // FeatureGates is a set of flags that enable various experimental features with the EPP. + // If omitted non of these experimental features will be enabled. + FeatureGates FeatureGates `json:"featureGates,omitempty"` + // +required // +kubebuilder:validation:Required // Plugins is the list of plugins that will be instantiated. @@ -41,23 +46,23 @@ type EndpointPickerConfig struct { // that will be created. SchedulingProfiles []SchedulingProfile `json:"schedulingProfiles"` - // +optional - // FeatureGates is a set of flags that enable various experimental features with the EPP. - // If omitted non of these experimental features will be enabled. - FeatureGates FeatureGates `json:"featureGates,omitempty"` - // +optional // SaturationDetector when present specifies the configuration of the // Saturation detector. If not present, default values are used. SaturationDetector *SaturationDetector `json:"saturationDetector,omitempty"` + + // +optional + // Data configures the DataLayer. It is required if the new DataLayer is enabled. + Data *DataLayerConfig `json:"data"` } func (cfg EndpointPickerConfig) String() string { return fmt.Sprintf( - "{Plugins: %v, SchedulingProfiles: %v, FeatureGates: %v, SaturationDetector: %v}", + "{FeatureGates: %v, Plugins: %v, SchedulingProfiles: %v, Data: %v, SaturationDetector: %v}", + cfg.FeatureGates, cfg.Plugins, cfg.SchedulingProfiles, - cfg.FeatureGates, + cfg.Data, cfg.SaturationDetector, ) } @@ -193,3 +198,50 @@ func (sd *SaturationDetector) String() string { } return "{" + result + "}" } + +// DataLayerConfig contains the configuration of the DataLayer feature +type DataLayerConfig struct { + // +required + // +kubebuilder:validation:Required + // Sources is the list of sources to define to the DataLayer + Sources []DataLayerSource `json:"sources"` +} + +func (dlc DataLayerConfig) String() string { + return fmt.Sprintf("{Sources: %v}", dlc.Sources) +} + +// DataLayerSource contains the configuration of a DataSource of the DataLayer feature +type DataLayerSource struct { + // +required + // +kubebuilder:validation:Required + // PluginRef specifies a partiular Plugin instance to be associated with + // this Source. The reference is to the name of an entry of the Plugins + // defined in the configuration's Plugins section + PluginRef string `json:"pluginRef"` + + // +required + // +kubebuilder:validation:Required + // Extractors specifies the list of Plugin instances to be associated with + // this Source. The entries are references to the names of entries of the Plugins + // defined in the configuration's Plugins section + Extractors []DataLayerExtractor `json:"extractors"` +} + +func (dls DataLayerSource) String() string { + return fmt.Sprintf("{PluginRef: %s, Extractors: %v}", dls.PluginRef, dls.Extractors) +} + +// DataLayerExtractor contains the configuration of an Extractor of the DataLayer feature +type DataLayerExtractor struct { + // +required + // +kubebuilder:validation:Required + // PluginRef specifies a partiular Plugin instance to be associated with + // this Extractor. The reference is to the name of an entry of the Plugins + // defined in the configuration's Plugins section + PluginRef string `json:"pluginRef"` +} + +func (dle DataLayerExtractor) String() string { + return "{PluginRef: " + dle.PluginRef + "}" +} diff --git a/apix/config/v1alpha1/zz_generated.deepcopy.go b/apix/config/v1alpha1/zz_generated.deepcopy.go index 701f73793..dec218e87 100644 --- a/apix/config/v1alpha1/zz_generated.deepcopy.go +++ b/apix/config/v1alpha1/zz_generated.deepcopy.go @@ -25,10 +25,72 @@ import ( runtime "k8s.io/apimachinery/pkg/runtime" ) +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *DataLayerConfig) DeepCopyInto(out *DataLayerConfig) { + *out = *in + if in.Sources != nil { + in, out := &in.Sources, &out.Sources + *out = make([]DataLayerSource, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DataLayerConfig. +func (in *DataLayerConfig) DeepCopy() *DataLayerConfig { + if in == nil { + return nil + } + out := new(DataLayerConfig) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *DataLayerExtractor) DeepCopyInto(out *DataLayerExtractor) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DataLayerExtractor. +func (in *DataLayerExtractor) DeepCopy() *DataLayerExtractor { + if in == nil { + return nil + } + out := new(DataLayerExtractor) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *DataLayerSource) DeepCopyInto(out *DataLayerSource) { + *out = *in + if in.Extractors != nil { + in, out := &in.Extractors, &out.Extractors + *out = make([]DataLayerExtractor, len(*in)) + copy(*out, *in) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DataLayerSource. +func (in *DataLayerSource) DeepCopy() *DataLayerSource { + if in == nil { + return nil + } + out := new(DataLayerSource) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *EndpointPickerConfig) DeepCopyInto(out *EndpointPickerConfig) { *out = *in out.TypeMeta = in.TypeMeta + if in.FeatureGates != nil { + in, out := &in.FeatureGates, &out.FeatureGates + *out = make(FeatureGates, len(*in)) + copy(*out, *in) + } if in.Plugins != nil { in, out := &in.Plugins, &out.Plugins *out = make([]PluginSpec, len(*in)) @@ -43,16 +105,16 @@ func (in *EndpointPickerConfig) DeepCopyInto(out *EndpointPickerConfig) { (*in)[i].DeepCopyInto(&(*out)[i]) } } - if in.FeatureGates != nil { - in, out := &in.FeatureGates, &out.FeatureGates - *out = make(FeatureGates, len(*in)) - copy(*out, *in) - } if in.SaturationDetector != nil { in, out := &in.SaturationDetector, &out.SaturationDetector *out = new(SaturationDetector) **out = **in } + if in.Data != nil { + in, out := &in.Data, &out.Data + *out = new(DataLayerConfig) + (*in).DeepCopyInto(*out) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new EndpointPickerConfig. diff --git a/pkg/epp/config/config.go b/pkg/epp/config/config.go index d966e3c0b..e36812879 100644 --- a/pkg/epp/config/config.go +++ b/pkg/epp/config/config.go @@ -17,6 +17,7 @@ limitations under the License. package config import ( + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/saturationdetector" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling" ) @@ -25,4 +26,5 @@ import ( type Config struct { SchedulerConfig *scheduling.SchedulerConfig SaturationDetectorConfig *saturationdetector.Config + DataConfig *datalayer.Config } diff --git a/pkg/epp/config/loader/configloader.go b/pkg/epp/config/loader/configloader.go index 56f05dcfc..bdee45528 100644 --- a/pkg/epp/config/loader/configloader.go +++ b/pkg/epp/config/loader/configloader.go @@ -28,6 +28,7 @@ import ( configapi "sigs.k8s.io/gateway-api-inference-extension/apix/config/v1alpha1" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/config" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/saturationdetector" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling" @@ -87,6 +88,11 @@ func LoadConfigPhaseTwo(rawConfig *configapi.EndpointPickerConfig, handle plugin } config.SaturationDetectorConfig = loadSaturationDetectorConfig(rawConfig.SaturationDetector) + config.DataConfig, err = loadDataLayerConfig(rawConfig.Data, rawConfig.FeatureGates, handle) + if err != nil { + return nil, err + } + return config, nil } @@ -170,6 +176,44 @@ func loadSaturationDetectorConfig(sd *configapi.SaturationDetector) *saturationd return &sdConfig } +func loadDataLayerConfig(rawDataConfig *configapi.DataLayerConfig, rawFeatureGates configapi.FeatureGates, handle plugins.Handle) (*datalayer.Config, error) { + featureGates := loadFeatureConfig(rawFeatureGates) + if !featureGates[datalayer.FeatureGate] { + if rawDataConfig != nil { + return nil, errors.New("the Datalayer has not been enabled, but you specified a configuration for it") + } + return nil, nil + } + + if rawDataConfig == nil { + return nil, errors.New("the Datalayer has been enabled. You must specify the Data section in the configuration") + } + + dataConfig := datalayer.Config{ + Sources: []datalayer.DataSourceConfig{}, + } + for _, source := range rawDataConfig.Sources { + if sourcePlugin, ok := handle.Plugin(source.PluginRef).(datalayer.DataSource); ok { + sourceConfig := datalayer.DataSourceConfig{ + Plugin: sourcePlugin, + Extractors: []datalayer.Extractor{}, + } + for _, extractor := range source.Extractors { + if extractorPlugin, ok := handle.Plugin(extractor.PluginRef).(datalayer.Extractor); ok { + sourceConfig.Extractors = append(sourceConfig.Extractors, extractorPlugin) + } else { + return nil, fmt.Errorf("the plugin %s is not a datalayer.Extractor", source.PluginRef) + } + } + dataConfig.Sources = append(dataConfig.Sources, sourceConfig) + } else { + return nil, fmt.Errorf("the plugin %s is not a datalayer.Source", source.PluginRef) + } + } + + return &dataConfig, nil +} + func instantiatePlugins(configuredPlugins []configapi.PluginSpec, handle plugins.Handle) error { pluginNames := sets.New[string]() // set of plugin names, a name must be unique diff --git a/pkg/epp/config/loader/configloader_test.go b/pkg/epp/config/loader/configloader_test.go index a23f22316..53e95dbe5 100644 --- a/pkg/epp/config/loader/configloader_test.go +++ b/pkg/epp/config/loader/configloader_test.go @@ -20,6 +20,7 @@ import ( "context" "encoding/json" "os" + "reflect" "testing" "time" @@ -46,6 +47,8 @@ const ( test1Type = "test-one" test2Type = "test-two" testPickerType = "test-picker" + testSourceType = "test-source" + testExtractorType = "test-extractor" ) type testStruct struct { @@ -82,6 +85,12 @@ func TestLoadRawConfiguration(t *testing.T) { Name: "testPicker", Type: testPickerType, }, + { + Type: testSourceType, + }, + { + Type: testExtractorType, + }, }, SchedulingProfiles: []configapi.SchedulingProfile{ { @@ -100,6 +109,18 @@ func TestLoadRawConfiguration(t *testing.T) { }, }, }, + Data: &configapi.DataLayerConfig{ + Sources: []configapi.DataLayerSource{ + { + PluginRef: "test-source", + Extractors: []configapi.DataLayerExtractor{ + { + PluginRef: "test-extractor", + }, + }, + }, + }, + }, FeatureGates: configapi.FeatureGates{datalayer.FeatureGate}, SaturationDetector: &configapi.SaturationDetector{ MetricsStalenessThreshold: metav1.Duration{Duration: 150 * time.Millisecond}, @@ -188,6 +209,14 @@ func TestLoadRawConfigurationWithDefaults(t *testing.T) { Name: "testPicker", Type: testPickerType, }, + { + Name: testSourceType, + Type: testSourceType, + }, + { + Name: testExtractorType, + Type: testExtractorType, + }, }, SchedulingProfiles: []configapi.SchedulingProfile{ { @@ -206,6 +235,18 @@ func TestLoadRawConfigurationWithDefaults(t *testing.T) { }, }, }, + Data: &configapi.DataLayerConfig{ + Sources: []configapi.DataLayerSource{ + { + PluginRef: "test-source", + Extractors: []configapi.DataLayerExtractor{ + { + PluginRef: "test-extractor", + }, + }, + }, + }, + }, FeatureGates: configapi.FeatureGates{datalayer.FeatureGate}, SaturationDetector: &configapi.SaturationDetector{ QueueDepthThreshold: saturationdetector.DefaultQueueDepthThreshold, @@ -358,7 +399,7 @@ func checkError(t *testing.T, function string, test testStruct, err error) { if !test.wantErr { t.Fatalf("In test '%s' %s returned unexpected error: %v, want %v", test.name, function, err, test.wantErr) } - t.Logf("error was %s", err) + t.Logf("error in %s was %s", test.name, err) } else if test.wantErr { t.Fatalf("In test %s %s did not return an expected error", test.name, function) } @@ -459,6 +500,21 @@ func TestLoadConfig(t *testing.T) { configText: errorUnknownFeatureGateText, wantErr: true, }, + { + name: "errorMissingDataConfig", + configText: errorMissingDataConfigText, + wantErr: true, + }, + { + name: "errorBadSourceReference", + configText: errorBadSourceReferenceText, + wantErr: true, + }, + { + name: "errorBadExtractorReference", + configText: errorBadExtractorReferenceText, + wantErr: true, + }, } registerNeededFeatureGates() @@ -577,6 +633,8 @@ plugins: blockSize: 32 - name: testPicker type: test-picker +- type: test-source +- type: test-extractor schedulingProfiles: - name: default plugins: @@ -584,6 +642,11 @@ schedulingProfiles: - pluginRef: test-two weight: 50 - pluginRef: testPicker +data: + sources: + - pluginRef: test-source + extractors: + - pluginRef: test-extractor featureGates: - dataLayer saturationDetector: @@ -768,6 +831,72 @@ featureGates: - qwerty ` +// datalayer enabled without config +// +//nolint:dupword +const errorMissingDataConfigText = ` +apiVersion: inference.networking.x-k8s.io/v1alpha1 +kind: EndpointPickerConfig +plugins: +- name: test1 + type: test-one + parameters: + threshold: 10 +schedulingProfiles: +- name: default + plugins: + - pluginRef: test1 +featureGates: +- dataLayer +` + +// error bad DataSource plugin reference +// +//nolint:dupword +const errorBadSourceReferenceText = ` +apiVersion: inference.networking.x-k8s.io/v1alpha1 +kind: EndpointPickerConfig +plugins: +- name: test1 + type: test-one + parameters: + threshold: 10 +schedulingProfiles: +- name: default + plugins: + - pluginRef: test1 +data: + sources: + - pluginRef: test-one +featureGates: +- dataLayer +` + +// error bad Extractor plugin reference +// +//nolint:dupword +const errorBadExtractorReferenceText = ` +apiVersion: inference.networking.x-k8s.io/v1alpha1 +kind: EndpointPickerConfig +plugins: +- name: test1 + type: test-one + parameters: + threshold: 10 +- type: test-source +schedulingProfiles: +- name: default + plugins: + - pluginRef: test1 +data: + sources: + - pluginRef: test-source + extractors: + - pluginRef: test-one +featureGates: +- dataLayer +` + // compile-time type validation var _ framework.Filter = &test1{} @@ -858,6 +987,60 @@ func (p *testProfileHandler) ProcessResults(_ context.Context, _ *types.CycleSta return nil, nil } +// compile-time type validation +var _ datalayer.DataSource = &testSource{} + +type testSource struct { + typedName plugins.TypedName +} + +func newTestSource() *testSource { + return &testSource{ + typedName: plugins.TypedName{Type: testSourceType, Name: "test-source"}, + } +} + +func (s *testSource) TypedName() plugins.TypedName { + return s.typedName +} + +func (s *testSource) AddExtractor(_ datalayer.Extractor) error { + return nil +} + +func (s *testSource) Collect(ctx context.Context, ep datalayer.Endpoint) error { + return nil +} + +func (s *testSource) Extractors() []string { + return []string{} +} + +// compile-time type validation +var _ datalayer.Extractor = &testExtractor{} + +type testExtractor struct { + typedName plugins.TypedName +} + +func newTestExtractor() *testExtractor { + return &testExtractor{ + typedName: plugins.TypedName{Type: testExtractorType, Name: "test-extractor"}, + } +} + +func (e *testExtractor) TypedName() plugins.TypedName { + return e.typedName +} + +func (e *testExtractor) ExpectedInputType() reflect.Type { + return reflect.TypeOf("") +} + +func (e *testExtractor) Extract(ctx context.Context, data any, ep datalayer.Endpoint) error { + return nil +} + func registerTestPlugins() { plugins.Register(test1Type, func(_ string, parameters json.RawMessage, _ plugins.Handle) (plugins.Plugin, error) { @@ -884,6 +1067,18 @@ func registerTestPlugins() { return newTestProfileHandler(), nil }, ) + + plugins.Register(testSourceType, + func(_ string, _ json.RawMessage, _ plugins.Handle) (plugins.Plugin, error) { + return newTestSource(), nil + }, + ) + + plugins.Register(testExtractorType, + func(_ string, _ json.RawMessage, _ plugins.Handle) (plugins.Plugin, error) { + return newTestExtractor(), nil + }, + ) } // valid configuration diff --git a/site-src/guides/epp-configuration/config-text.md b/site-src/guides/epp-configuration/config-text.md index 252609102..88c6e32dc 100644 --- a/site-src/guides/epp-configuration/config-text.md +++ b/site-src/guides/epp-configuration/config-text.md @@ -26,22 +26,27 @@ schedulingProfiles: - .... saturationDetector: ... +data: + ... featureGates: ... ``` The first two lines of the configuration are constant and must appear as is. -The plugins section defines the set of plugins that will be instantiated and their parameters. This section is described in more detail in the section [Configuring Plugins via text](#configuring-plugins-via-text) +The `featureGates` section allows the enablement of experimental features of the IGW. This section is +described in more detail in the section [Feature Gates](#feature-gates) + +The `plugins` section defines the set of plugins that will be instantiated and their parameters. This section is described in more detail in the section [Configuring Plugins via text](#configuring-plugins-via-text) -The schedulingProfiles section defines the set of scheduling profiles that can be used in scheduling +The `schedulingProfiles` section defines the set of scheduling profiles that can be used in scheduling requests to pods. This section is described in more detail in the section [Configuring Plugins via YAML](#configuring-plugins-via-yaml) -The saturationDetector section configures the saturation detector, which is used to determine if special +The `saturationDetector` section configures the saturation detector, which is used to determine if special action needs to eb taken due to the system being overloaded or saturated. This section is described in more detail in the section [Saturation Detector configuration](#saturation-detector-configuration) -The featureGates sections allows the enablement of experimental features of the IGW. This section is -described in more detail in the section [Feature Gates](#feature-gates) +The `data` section configures the data layer, which is used to gather information (such as metrics) used in making scheduling +decisions. This section is described in more detail in the section [Data Layer configuration](#data-layer-configuration) ## Configuring Plugins via YAML @@ -64,7 +69,7 @@ In addition, the set of instantiated plugins can also include a picker, which ch the request is scheduled after filtering and scoring. If one is not referenced in a SchedulingProfile, an instance of `MaxScorePicker` will be added to the SchedulingProfile in question. -The plugins section defines the set of plugins that will be instantiated and their parameters. +The `plugins` section defines the set of plugins that will be instantiated and their parameters. Each entry in this section has the following form: ```yaml @@ -83,7 +88,7 @@ field is omitted, the plugin's type will be used as its name. - *parameters* which is optional, defines the set of parameters used to configure the plugin in question. The actual set of parameters varies from plugin to plugin. -The schedulingProfiles section defines the set of scheduling profiles that can be used in scheduling +The `schedulingProfiles` section defines the set of scheduling profiles that can be used in scheduling requests to pods. The number of scheduling profiles one defines, depends on the use case. For simple serving of requests, one is enough. For disaggregated prefill, two profiles are required. Each entry in this section has the following form: @@ -308,7 +313,7 @@ The Saturation Detector determines that the cluster is saturated by looking at t - KV cache utilization - Metrics staleness -The Saturation Detector is configured via the saturationDetector section of the overall configuration. +The Saturation Detector is configured via the `saturationDetector` section of the overall configuration. It has the following form: ```yaml @@ -318,7 +323,7 @@ saturationDetector: metricsStalenessThreshold: 150ms ``` -The various sub-fields of the saturationDetector section are: +The various sub-fields of the `saturationDetector` section are: - The `queueDepthThreshold` field which defines the backend waiting queue size above which a pod is considered to have insufficient capacity for new requests. This field is optional, if @@ -330,6 +335,36 @@ a value of `0.8` will be used. metrics are older than this, it might be excluded from "good capacity" considerations or treated as having no capacity for safety. This field is optional, if omitted a value of `200ms` will be used. +## Data Layer configuration + +The Data Layer collects metrics and other data used in scheduling decisions made by the various configured +plugins. The exact data collected varies by the DataSource and Extractors configured. The baseline +provided in GAIE collect Prometheus metrics from the Model Servers in the InferencePool. + +The Data Layer is configured via the `data` section of the overall configuration. It has the following form: + +```yaml +data: + sources: + - pluginRef: source1 + extractors: + - pluginRef: extractor1 + - pluginRef: extractor2 +``` + +The data section has one field *sources* which configures the set of DataSources to be used to gather the metrics +and other data used for scheduling. + +Each entry in the sources list has the following fields: + +- *pluginRef* is a reference to the name of the plugin instance to be used. +- *extractors* specifies the list of the extractors to be used with this DataSource. Each entry in the extractors +list has the following field: + - *pluginRef* is a reference to the name of the plugin instances to be used. + +**Note**: The names of the plugin instances mentioned above, refer to plugin instances defined in the plugins section +of the configuration. + ## Feature Gates The Feature Gates section allows for the enabling of experimental features of the IGW. These experimental diff --git a/test/testdata/configloader_1_test.yaml b/test/testdata/configloader_1_test.yaml index f43684158..2d724d7b7 100644 --- a/test/testdata/configloader_1_test.yaml +++ b/test/testdata/configloader_1_test.yaml @@ -12,6 +12,8 @@ plugins: blockSize: 32 - name: testPicker type: test-picker +- type: test-source +- type: test-extractor schedulingProfiles: - name: default plugins: @@ -19,6 +21,11 @@ schedulingProfiles: - pluginRef: test-two weight: 50 - pluginRef: testPicker +data: + sources: + - pluginRef: test-source + extractors: + - pluginRef: test-extractor featureGates: - dataLayer saturationDetector: