diff --git a/go.mod b/go.mod index 7b6e93da..3384116d 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( github.com/gin-gonic/gin v1.10.1 github.com/google/uuid v1.3.0 github.com/splitio/gincache v1.0.1 - github.com/splitio/go-split-commons/v8 v8.0.0-20251022154508-1ea26a26874f + github.com/splitio/go-split-commons/v8 v8.0.0-20251028203151-2b6d18a2f657 github.com/splitio/go-toolkit/v5 v5.4.1 github.com/stretchr/testify v1.11.1 go.etcd.io/bbolt v1.3.6 diff --git a/go.sum b/go.sum index 0bc211a1..0f54bde1 100644 --- a/go.sum +++ b/go.sum @@ -74,8 +74,8 @@ github.com/rogpeppe/go-internal v1.8.0 h1:FCbCCtXNOY3UtUuHUYaghJg4y7Fd14rXifAYUA github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6poM+XZ2dLUbcbE= github.com/splitio/gincache v1.0.1 h1:dLYdANY/BqH4KcUMCe/LluLyV5WtuE/LEdQWRE06IXU= github.com/splitio/gincache v1.0.1/go.mod h1:CcgJDSM9Af75kyBH0724v55URVwMBuSj5x1eCWIOECY= -github.com/splitio/go-split-commons/v8 v8.0.0-20251022154508-1ea26a26874f h1:2o8Hu3G4jAoF6Y0Ceptr4Bwp3x9wFDenp494Cu/V5nU= -github.com/splitio/go-split-commons/v8 v8.0.0-20251022154508-1ea26a26874f/go.mod h1:vgRGPn0s4RC9/zp1nIn4KeeIEj/K3iXE2fxYQbCk/WI= +github.com/splitio/go-split-commons/v8 v8.0.0-20251028203151-2b6d18a2f657 h1:FYT0P+uFnXzALLgWOTIAJS6P4J1NpMGNi+rWsv2ZIkU= +github.com/splitio/go-split-commons/v8 v8.0.0-20251028203151-2b6d18a2f657/go.mod h1:vgRGPn0s4RC9/zp1nIn4KeeIEj/K3iXE2fxYQbCk/WI= github.com/splitio/go-toolkit/v5 v5.4.1 h1:srTyvDBJZMUcJ/KiiQDMyjCuELVgTBh2TGRVn0sOXEE= github.com/splitio/go-toolkit/v5 v5.4.1/go.mod h1:SifzysrOVDbzMcOE8zjX02+FG5az4FrR3Us/i5SeStw= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= diff --git a/splitio/admin/common/config.go b/splitio/admin/common/config.go index b576cc96..63cb4376 100644 --- a/splitio/admin/common/config.go +++ b/splitio/admin/common/config.go @@ -1,14 +1,28 @@ package common -import "github.com/splitio/go-split-commons/v8/storage" +import ( + "github.com/splitio/go-split-commons/v8/engine/grammar/constants" + "github.com/splitio/go-split-commons/v8/storage" +) + +var ProducerFeatureFlagsRules = []string{constants.MatcherTypeAllKeys, constants.MatcherTypeInSegment, constants.MatcherTypeWhitelist, constants.MatcherTypeEqualTo, constants.MatcherTypeGreaterThanOrEqualTo, constants.MatcherTypeLessThanOrEqualTo, constants.MatcherTypeBetween, + constants.MatcherTypeEqualToSet, constants.MatcherTypePartOfSet, constants.MatcherTypeContainsAllOfSet, constants.MatcherTypeContainsAnyOfSet, constants.MatcherTypeStartsWith, constants.MatcherTypeEndsWith, constants.MatcherTypeContainsString, constants.MatcherTypeInSplitTreatment, + constants.MatcherTypeEqualToBoolean, constants.MatcherTypeMatchesString, constants.MatcherEqualToSemver, constants.MatcherTypeGreaterThanOrEqualToSemver, constants.MatcherTypeLessThanOrEqualToSemver, constants.MatcherTypeBetweenSemver, constants.MatcherTypeInListSemver, + constants.MatcherTypeInRuleBasedSegment} + +var ProducerRuleBasedSegmentRules = []string{constants.MatcherTypeAllKeys, constants.MatcherTypeInSegment, constants.MatcherTypeWhitelist, constants.MatcherTypeEqualTo, constants.MatcherTypeGreaterThanOrEqualTo, constants.MatcherTypeLessThanOrEqualTo, constants.MatcherTypeBetween, + constants.MatcherTypeEqualToSet, constants.MatcherTypePartOfSet, constants.MatcherTypeContainsAllOfSet, constants.MatcherTypeContainsAnyOfSet, constants.MatcherTypeStartsWith, constants.MatcherTypeEndsWith, constants.MatcherTypeContainsString, + constants.MatcherTypeEqualToBoolean, constants.MatcherTypeMatchesString, constants.MatcherEqualToSemver, constants.MatcherTypeGreaterThanOrEqualToSemver, constants.MatcherTypeLessThanOrEqualToSemver, constants.MatcherTypeBetweenSemver, constants.MatcherTypeInListSemver, + constants.MatcherTypeInRuleBasedSegment} // Storages wraps storages in one struct type Storages struct { - SplitStorage storage.SplitStorage - SegmentStorage storage.SegmentStorage - LocalTelemetryStorage storage.TelemetryRuntimeConsumer - EventStorage storage.EventMultiSdkConsumer - ImpressionStorage storage.ImpressionMultiSdkConsumer - UniqueKeysStorage storage.UniqueKeysMultiSdkConsumer - LargeSegmentStorage storage.LargeSegmentsStorage + SplitStorage storage.SplitStorage + SegmentStorage storage.SegmentStorage + LocalTelemetryStorage storage.TelemetryRuntimeConsumer + EventStorage storage.EventMultiSdkConsumer + ImpressionStorage storage.ImpressionMultiSdkConsumer + UniqueKeysStorage storage.UniqueKeysMultiSdkConsumer + LargeSegmentStorage storage.LargeSegmentsStorage + RuleBasedSegmentsStorage storage.RuleBasedSegmentsStorage } diff --git a/splitio/commitversion.go b/splitio/commitversion.go index b182b237..03f6673b 100644 --- a/splitio/commitversion.go +++ b/splitio/commitversion.go @@ -5,4 +5,4 @@ This file is created automatically, please do not edit */ // CommitVersion is the version of the last commit previous to release -const CommitVersion = "5e4d9e1" +const CommitVersion = "1807589" diff --git a/splitio/producer/conf/sections.go b/splitio/producer/conf/sections.go index b306e71f..649635f6 100644 --- a/splitio/producer/conf/sections.go +++ b/splitio/producer/conf/sections.go @@ -18,7 +18,7 @@ type Main struct { Integrations conf.Integrations `json:"integrations" s-nested:"true"` Logging conf.Logging `json:"logging" s-nested:"true"` Healthcheck Healthcheck `json:"healthcheck" s-nested:"true"` - FlagSpecVersion string `json:"flagSpecVersion" s-cli:"flag-spec-version" s-def:"1.1" s-desc:"Spec version for flags"` + FlagSpecVersion string `json:"flagSpecVersion" s-cli:"flag-spec-version" s-def:"1.3" s-desc:"Spec version for flags"` } // BuildAdvancedConfig generates a commons-compatible advancedconfig with default + overriden parameters diff --git a/splitio/producer/initialization.go b/splitio/producer/initialization.go index 080f3442..46d7f161 100644 --- a/splitio/producer/initialization.go +++ b/splitio/producer/initialization.go @@ -103,12 +103,13 @@ func Start(logger logging.LoggerInterface, cfg *conf.Main) error { return fmt.Errorf("error instantiating observable segment storage: %w", err) } storages := adminCommon.Storages{ - SplitStorage: splitStorage, - SegmentStorage: segmentStorage, - LocalTelemetryStorage: syncTelemetryStorage, - ImpressionStorage: redis.NewImpressionStorage(redisClient, dtos.Metadata{}, logger), - EventStorage: redis.NewEventsStorage(redisClient, dtos.Metadata{}, logger), - UniqueKeysStorage: redis.NewUniqueKeysMultiSdkConsumer(redisClient, logger), + SplitStorage: splitStorage, + SegmentStorage: segmentStorage, + LocalTelemetryStorage: syncTelemetryStorage, + ImpressionStorage: redis.NewImpressionStorage(redisClient, dtos.Metadata{}, logger), + EventStorage: redis.NewEventsStorage(redisClient, dtos.Metadata{}, logger), + UniqueKeysStorage: redis.NewUniqueKeysMultiSdkConsumer(redisClient, logger), + RuleBasedSegmentsStorage: redis.NewRuleBasedStorage(redisClient, logger), } // Healcheck Monitor @@ -125,11 +126,19 @@ func Start(logger logging.LoggerInterface, cfg *conf.Main) error { // Creating Workers and Tasks eventEvictionMonitor := evcalc.New(1) + ruleBuilder := grammar.NewRuleBuilder( + storages.SegmentStorage, + storages.RuleBasedSegmentsStorage, + storages.LargeSegmentStorage, + adminCommon.ProducerFeatureFlagsRules, + adminCommon.ProducerRuleBasedSegmentRules, + logger, + nil) + workers := synchronizer.Workers{ - // TODO add ruleBasedSegmentStorage, ruleBuilder, sdkOverrides - SplitUpdater: split.NewSplitUpdater(storages.SplitStorage, nil, splitAPI.SplitFetcher, logger, syncTelemetryStorage, appMonitor, flagSetsFilter, grammar.RuleBuilder{}, false, cfg.FlagSpecVersion), - // TODO add ruleBasedSegmentStorage - SegmentUpdater: segment.NewSegmentUpdater(storages.SplitStorage, storages.SegmentStorage, nil, splitAPI.SegmentFetcher, + // TODO add sdkOverrides + SplitUpdater: split.NewSplitUpdater(storages.SplitStorage, storages.RuleBasedSegmentsStorage, splitAPI.SplitFetcher, logger, syncTelemetryStorage, appMonitor, flagSetsFilter, ruleBuilder, false, cfg.FlagSpecVersion), + SegmentUpdater: segment.NewSegmentUpdater(storages.SplitStorage, storages.SegmentStorage, storages.RuleBasedSegmentsStorage, splitAPI.SegmentFetcher, logger, syncTelemetryStorage, appMonitor), ImpressionsCountRecorder: impressionscount.NewRecorderSingle(impressionsCounter, splitAPI.ImpressionRecorder, metadata, logger, syncTelemetryStorage), diff --git a/splitio/proxy/caching/workers.go b/splitio/proxy/caching/workers.go index d73e18ec..c7614475 100644 --- a/splitio/proxy/caching/workers.go +++ b/splitio/proxy/caching/workers.go @@ -17,6 +17,7 @@ import ( // CacheAwareSplitSynchronizer wraps a SplitSynchronizer and flushes cache when an update happens type CacheAwareSplitSynchronizer struct { splitStorage storage.SplitStorage + rbStorage storage.RuleBasedSegmentsStorage wrapped split.Updater cacheFlusher gincache.CacheFlusher } @@ -24,6 +25,7 @@ type CacheAwareSplitSynchronizer struct { // NewCacheAwareSplitSync constructs a split-sync wrapper that evicts cache on updates func NewCacheAwareSplitSync( splitStorage storage.SplitStorage, + ruleBasedStorage storage.RuleBasedSegmentsStorage, splitFetcher service.SplitFetcher, logger logging.LoggerInterface, runtimeTelemetry storage.TelemetryRuntimeProducer, @@ -31,11 +33,12 @@ func NewCacheAwareSplitSync( appMonitor application.MonitorProducerInterface, flagSetsFilter flagsets.FlagSetFilter, specVersion string, + ruleBuilder grammar.RuleBuilder, ) *CacheAwareSplitSynchronizer { return &CacheAwareSplitSynchronizer{ - // TODO add ruleBasedSegmentStorage, ruleBuilder, increase FLAG SPEC when we support RUleBased - wrapped: split.NewSplitUpdater(splitStorage, nil, splitFetcher, logger, runtimeTelemetry, appMonitor, flagSetsFilter, grammar.RuleBuilder{}, false, specVersion), + wrapped: split.NewSplitUpdater(splitStorage, ruleBasedStorage, splitFetcher, logger, runtimeTelemetry, appMonitor, flagSetsFilter, ruleBuilder, false, specVersion), splitStorage: splitStorage, + rbStorage: ruleBasedStorage, cacheFlusher: cacheFlusher, } } @@ -43,8 +46,12 @@ func NewCacheAwareSplitSync( // SynchronizeSplits synchronizes feature flags and if something changes, purges the cache appropriately func (c *CacheAwareSplitSynchronizer) SynchronizeSplits(till *int64) (*split.UpdateResult, error) { previous, _ := c.splitStorage.ChangeNumber() + previousRB, _ := c.rbStorage.ChangeNumber() + result, err := c.wrapped.SynchronizeSplits(till) - if current, _ := c.splitStorage.ChangeNumber(); current > previous || (previous != -1 && current == -1) { + current, _ := c.splitStorage.ChangeNumber() + currentRB, _ := c.rbStorage.ChangeNumber() + if current > previous || (previous != -1 && current == -1) || currentRB > previousRB || (previousRB != -1 && currentRB == -1) { // if the changenumber was updated, evict splitChanges responses from cache c.cacheFlusher.EvictBySurrogate(SplitSurrogate) } @@ -61,8 +68,12 @@ func (c *CacheAwareSplitSynchronizer) LocalKill(splitName string, defaultTreatme // SynchronizeFeatureFlags synchronizes feature flags and if something changes, purges the cache appropriately func (c *CacheAwareSplitSynchronizer) SynchronizeFeatureFlags(ffChange *dtos.SplitChangeUpdate) (*split.UpdateResult, error) { previous, _ := c.splitStorage.ChangeNumber() + previousRB, _ := c.rbStorage.ChangeNumber() + result, err := c.wrapped.SynchronizeFeatureFlags(ffChange) - if current, _ := c.splitStorage.ChangeNumber(); current > previous || (previous != -1 && current == -1) { + current, _ := c.splitStorage.ChangeNumber() + currentRB, _ := c.rbStorage.ChangeNumber() + if current > previous || (previous != -1 && current == -1) || currentRB > previousRB || (previousRB != -1 && currentRB == -1) { // if the changenumber was updated, evict splitChanges responses from cache c.cacheFlusher.EvictBySurrogate(SplitSurrogate) } @@ -81,6 +92,7 @@ type CacheAwareSegmentSynchronizer struct { func NewCacheAwareSegmentSync( splitStorage storage.SplitStorage, segmentStorage storage.SegmentStorage, + ruleBasedStorage storage.RuleBasedSegmentsStorage, segmentFetcher service.SegmentFetcher, logger logging.LoggerInterface, runtimeTelemetry storage.TelemetryRuntimeProducer, @@ -88,8 +100,7 @@ func NewCacheAwareSegmentSync( appMonitor application.MonitorProducerInterface, ) *CacheAwareSegmentSynchronizer { return &CacheAwareSegmentSynchronizer{ - // TODO add ruleBasedSegmentStorage - wrapped: segment.NewSegmentUpdater(splitStorage, segmentStorage, nil, segmentFetcher, logger, runtimeTelemetry, appMonitor), + wrapped: segment.NewSegmentUpdater(splitStorage, segmentStorage, ruleBasedStorage, segmentFetcher, logger, runtimeTelemetry, appMonitor), cacheFlusher: cacheFlusher, splitStorage: splitStorage, segmentStorage: segmentStorage, diff --git a/splitio/proxy/conf/sections.go b/splitio/proxy/conf/sections.go index 2e288b09..1e131785 100644 --- a/splitio/proxy/conf/sections.go +++ b/splitio/proxy/conf/sections.go @@ -22,7 +22,7 @@ type Main struct { Logging conf.Logging `json:"logging" s-nested:"true"` Healthcheck Healthcheck `json:"healthcheck" s-nested:"true"` Observability Observability `json:"observability" s-nested:"true"` - FlagSpecVersion string `json:"flagSpecVersion" s-cli:"flag-spec-version" s-def:"1.2" s-desc:"Spec version for flags"` + FlagSpecVersion string `json:"flagSpecVersion" s-cli:"flag-spec-version" s-def:"1.3" s-desc:"Spec version for flags"` } // BuildAdvancedConfig generates a commons-compatible advancedconfig with default + overriden parameters diff --git a/splitio/proxy/controllers/sdk.go b/splitio/proxy/controllers/sdk.go index 0e2238af..5e156180 100644 --- a/splitio/proxy/controllers/sdk.go +++ b/splitio/proxy/controllers/sdk.go @@ -25,14 +25,15 @@ import ( // SdkServerController bundles all request handler for sdk-server apis type SdkServerController struct { - logger logging.LoggerInterface - fetcher service.SplitFetcher - proxySplitStorage storage.ProxySplitStorage - proxySegmentStorage storage.ProxySegmentStorage - fsmatcher flagsets.FlagSetMatcher - versionFilter specs.SplitVersionFilter - largeSegmentStorage cmnStorage.LargeSegmentsStorage - specVersion string + logger logging.LoggerInterface + fetcher service.SplitFetcher + proxySplitStorage storage.ProxySplitStorage + proxyRBSegmentStorage storage.ProxyRuleBasedSegmentsStorage + proxySegmentStorage storage.ProxySegmentStorage + fsmatcher flagsets.FlagSetMatcher + versionFilter specs.SplitVersionFilter + largeSegmentStorage cmnStorage.LargeSegmentsStorage + specVersion string } // NewSdkServerController instantiates a new sdk server controller @@ -41,19 +42,21 @@ func NewSdkServerController( fetcher service.SplitFetcher, proxySplitStorage storage.ProxySplitStorage, proxySegmentStorage storage.ProxySegmentStorage, + proxyRBSegmentStorage storage.ProxyRuleBasedSegmentsStorage, fsmatcher flagsets.FlagSetMatcher, largeSegmentStorage cmnStorage.LargeSegmentsStorage, specVersion string, ) *SdkServerController { return &SdkServerController{ - logger: logger, - fetcher: fetcher, - proxySplitStorage: proxySplitStorage, - proxySegmentStorage: proxySegmentStorage, - fsmatcher: fsmatcher, - versionFilter: specs.NewSplitVersionFilter(), - largeSegmentStorage: largeSegmentStorage, - specVersion: specVersion, + logger: logger, + fetcher: fetcher, + proxySplitStorage: proxySplitStorage, + proxySegmentStorage: proxySegmentStorage, + proxyRBSegmentStorage: proxyRBSegmentStorage, + fsmatcher: fsmatcher, + versionFilter: specs.NewSplitVersionFilter(), + largeSegmentStorage: largeSegmentStorage, + specVersion: specVersion, } } @@ -107,6 +110,11 @@ func (c *SdkServerController) SplitChanges(ctx *gin.Context) { since = -1 } + rbsince, err := strconv.ParseInt(ctx.DefaultQuery("rbSince", "-1"), 10, 64) + if err != nil { + rbsince = -1 + } + var rawSets []string if fq, ok := ctx.GetQuery("sets"); ok { rawSets = strings.Split(fq, ",") @@ -116,9 +124,9 @@ func (c *SdkServerController) SplitChanges(ctx *gin.Context) { c.logger.Warning(fmt.Sprintf("SDK [%s] is sending flagsets unordered or with duplicates.", ctx.Request.Header.Get("SplitSDKVersion"))) } - c.logger.Debug(fmt.Sprintf("SDK Fetches Feature Flags Since: %d", since)) + c.logger.Debug(fmt.Sprintf("SDK Fetches Feature Flags Since: %d, RBSince: %d", since, rbsince)) - splits, err := c.fetchSplitChangesSince(since, sets) + rules, err := c.fetchRulesSince(since, rbsince, sets) if err != nil { c.logger.Error("error fetching splitChanges payload from storage: ", err) ctx.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) @@ -133,9 +141,23 @@ func (c *SdkServerController) SplitChanges(ctx *gin.Context) { return } - splits.Splits = c.patchUnsupportedMatchers(splits.Splits, spec) + rules.FeatureFlags.Splits = c.patchUnsupportedMatchers(rules.FeatureFlags.Splits, spec) - ctx.JSON(http.StatusOK, splits) + if spec == specs.FLAG_V1_3 { + fmt.Println("1.3 1.3 1.3 1.3 1.3 ") + fmt.Println("cantidad de ff:", len(rules.FeatureFlags.Splits), rules.FeatureFlags.Since, rules.FeatureFlags.Till) + fmt.Println("cantidad de rb:", len(rules.RuleBasedSegments.RuleBasedSegments), rules.RuleBasedSegments.Since, rules.RuleBasedSegments.Till) + ctx.JSON(http.StatusOK, rules) + ctx.Set(caching.SurrogateContextKey, []string{caching.SplitSurrogate}) + ctx.Set(caching.StickyContextKey, true) + return + } + fmt.Println("otra otra otra otra") + ctx.JSON(http.StatusOK, dtos.SplitChangesDTO{ + Splits: rules.FeatureFlags.Splits, + Since: rules.FeatureFlags.Since, + Till: rules.FeatureFlags.Till, + }) ctx.Set(caching.SurrogateContextKey, []string{caching.SplitSurrogate}) ctx.Set(caching.StickyContextKey, true) } @@ -187,26 +209,49 @@ func (c *SdkServerController) MySegments(ctx *gin.Context) { ctx.Set(caching.SurrogateContextKey, caching.MakeSurrogateForMySegments(mySegments)) } -func (c *SdkServerController) fetchSplitChangesSince(since int64, sets []string) (*dtos.SplitChangesDTO, error) { +func (c *SdkServerController) fetchRulesSince(since int64, rbsince int64, sets []string) (*dtos.RuleChangesDTO, error) { + fmt.Println("split change since: ", since) splits, err := c.proxySplitStorage.ChangesSince(since, sets) - if err == nil { - return splits, nil + fmt.Println("split result: ", splits, err) + fmt.Println("rule baseed since: ", rbsince) + rbs, rbsErr := c.proxyRBSegmentStorage.ChangesSince(rbsince) + fmt.Println("rulebased result: ", rbs, rbsErr) + if err == nil && rbsErr == nil { + return &dtos.RuleChangesDTO{ + FeatureFlags: dtos.FeatureFlagsDTO{ + Splits: splits.Splits, + Till: splits.Till, + Since: splits.Since, + }, + RuleBasedSegments: *rbs, + }, err } - if !errors.Is(err, storage.ErrSinceParamTooOld) { + if err != nil && !errors.Is(err, storage.ErrSinceParamTooOld) { return nil, fmt.Errorf("unexpected error fetching feature flag changes from storage: %w", err) } + if rbsErr != nil && !errors.Is(rbsErr, storage.ErrSinceParamTooOld) { + return nil, fmt.Errorf("unexpected error fetching rule-based segments changes from storage: %w", rbsErr) + } + // perform a fetch to the BE using the supplied `since`, have the storage process it's response &, retry // TODO(mredolatti): implement basic collapsing here to avoid flooding the BE with requests - fetchOptions := service.MakeFlagRequestParams().WithSpecVersion(common.StringRef(c.specVersion)).WithChangeNumber(since).WithFlagSetsFilter(strings.Join(sets, ",")) // at this point the sets have been sanitized & sorted + fetchOptions := service.MakeFlagRequestParams().WithSpecVersion(common.StringRef(c.specVersion)).WithChangeNumber(since).WithChangeNumberRB(rbsince).WithFlagSetsFilter(strings.Join(sets, ",")) // at this point the sets have been sanitized & sorted ruleChanges, err := c.fetcher.Fetch(fetchOptions) if err != nil { return nil, err } - return &dtos.SplitChangesDTO{ - Since: ruleChanges.FFSince(), - Till: ruleChanges.FFTill(), - Splits: ruleChanges.FeatureFlags(), + return &dtos.RuleChangesDTO{ + FeatureFlags: dtos.FeatureFlagsDTO{ + Splits: ruleChanges.FeatureFlags(), + Till: ruleChanges.FFTill(), + Since: ruleChanges.FFSince(), + }, + RuleBasedSegments: dtos.RuleBasedSegmentsDTO{ + RuleBasedSegments: ruleChanges.RuleBasedSegments(), + Till: ruleChanges.RBTill(), + Since: ruleChanges.RBSince(), + }, }, nil } diff --git a/splitio/proxy/controllers/sdk_test.go b/splitio/proxy/controllers/sdk_test.go index 6be1ecf9..a8d7fecf 100644 --- a/splitio/proxy/controllers/sdk_test.go +++ b/splitio/proxy/controllers/sdk_test.go @@ -48,6 +48,7 @@ func TestSplitChangesImpressionsDisabled(t *testing.T) { splitFetcher, &splitStorage, nil, + nil, flagsets.NewMatcher(false, nil), &largeSegmentStorageMock, specs.FLAG_V1_2, @@ -99,6 +100,7 @@ func TestSplitChangesRecentSince(t *testing.T) { splitFetcher, &splitStorage, nil, + nil, flagsets.NewMatcher(false, nil), &largeSegmentStorageMock, specs.FLAG_V1_2, @@ -157,6 +159,7 @@ func TestSplitChangesOlderSince(t *testing.T) { splitFetcher, &splitStorage, nil, + nil, flagsets.NewMatcher(false, nil), &largeSegmentStorageMock, specs.FLAG_V1_2, @@ -212,6 +215,7 @@ func TestSplitChangesOlderSinceFetchFails(t *testing.T) { splitFetcher, &splitStorage, nil, + nil, flagsets.NewMatcher(false, nil), &largeSegmentStorageMock, specs.FLAG_V1_2, @@ -253,6 +257,7 @@ func TestSplitChangesWithFlagSets(t *testing.T) { splitFetcher, &splitStorage, nil, + nil, flagsets.NewMatcher(false, nil), &largeSegmentStorageMock, specs.FLAG_V1_2, @@ -303,6 +308,7 @@ func TestSplitChangesWithFlagSetsStrict(t *testing.T) { splitFetcher, &splitStorage, nil, + nil, flagsets.NewMatcher(true, []string{"a", "c"}), &largeSegmentStorageMock, specs.FLAG_V1_2, @@ -371,6 +377,7 @@ func TestSplitChangesNewMatcherOldSpec(t *testing.T) { splitFetcher, &splitStorage, nil, + nil, flagsets.NewMatcher(false, nil), &largeSegmentStorageMock, specs.FLAG_V1_2, @@ -442,6 +449,7 @@ func TestSplitChangesNewMatcherNewSpec(t *testing.T) { splitFetcher, &splitStorage, nil, + nil, flagsets.NewMatcher(false, nil), &largeSegmentStorageMock, specs.FLAG_V1_2, @@ -497,7 +505,7 @@ func TestSegmentChanges(t *testing.T) { logger := logging.NewLogger(nil) group := router.Group("/api") - controller := NewSdkServerController(logger, splitFetcher, &splitStorage, &segmentStorage, flagsets.NewMatcher(false, nil), &largeSegmentStorageMock, specs.FLAG_V1_2) + controller := NewSdkServerController(logger, splitFetcher, &splitStorage, &segmentStorage, nil, flagsets.NewMatcher(false, nil), &largeSegmentStorageMock, specs.FLAG_V1_2) controller.Register(group) ctx.Request, _ = http.NewRequest(http.MethodGet, "/api/segmentChanges/someSegment?since=-1", nil) @@ -541,7 +549,7 @@ func TestSegmentChangesNotFound(t *testing.T) { logger := logging.NewLogger(nil) group := router.Group("/api") - controller := NewSdkServerController(logger, splitFetcher, &splitStorage, &segmentStorage, flagsets.NewMatcher(false, nil), &largeSegmentStorageMock, specs.FLAG_V1_2) + controller := NewSdkServerController(logger, splitFetcher, &splitStorage, &segmentStorage, nil, flagsets.NewMatcher(false, nil), &largeSegmentStorageMock, specs.FLAG_V1_2) controller.Register(group) ctx.Request, _ = http.NewRequest(http.MethodGet, "/api/segmentChanges/someSegment?since=-1", nil) @@ -575,7 +583,7 @@ func TestMySegments(t *testing.T) { logger := logging.NewLogger(nil) group := router.Group("/api") - controller := NewSdkServerController(logger, splitFetcher, &splitStorage, &segmentStorage, flagsets.NewMatcher(false, nil), &largeSegmentStorageMock, specs.FLAG_V1_2) + controller := NewSdkServerController(logger, splitFetcher, &splitStorage, &segmentStorage, nil, flagsets.NewMatcher(false, nil), &largeSegmentStorageMock, specs.FLAG_V1_2) controller.Register(group) ctx.Request, _ = http.NewRequest(http.MethodGet, "/api/mySegments/someKey", nil) @@ -618,7 +626,7 @@ func TestMySegmentsError(t *testing.T) { logger := logging.NewLogger(nil) group := router.Group("/api") - controller := NewSdkServerController(logger, splitFetcher, &splitStorage, &segmentStorage, flagsets.NewMatcher(false, nil), &largeSegmentStorageMock, specs.FLAG_V1_2) + controller := NewSdkServerController(logger, splitFetcher, &splitStorage, &segmentStorage, nil, flagsets.NewMatcher(false, nil), &largeSegmentStorageMock, specs.FLAG_V1_2) controller.Register(group) ctx.Request, _ = http.NewRequest(http.MethodGet, "/api/mySegments/someKey", nil) @@ -655,7 +663,7 @@ func TestMemberships(t *testing.T) { logger := logging.NewLogger(nil) group := router.Group("/api") - controller := NewSdkServerController(logger, splitFetcher, &splitStorage, &segmentStorage, flagsets.NewMatcher(false, nil), &largeSegmentStorageMock, specs.FLAG_V1_2) + controller := NewSdkServerController(logger, splitFetcher, &splitStorage, &segmentStorage, nil, flagsets.NewMatcher(false, nil), &largeSegmentStorageMock, specs.FLAG_V1_2) controller.Register(group) ctx.Request, _ = http.NewRequest(http.MethodGet, "/api/memberships/keyTest", nil) @@ -705,7 +713,7 @@ func TestMembershipsError(t *testing.T) { logger := logging.NewLogger(nil) group := router.Group("/api") - controller := NewSdkServerController(logger, splitFetcher, &splitStorage, &segmentStorage, flagsets.NewMatcher(false, nil), &largeSegmentStorageMock, specs.FLAG_V1_2) + controller := NewSdkServerController(logger, splitFetcher, &splitStorage, &segmentStorage, nil, flagsets.NewMatcher(false, nil), &largeSegmentStorageMock, specs.FLAG_V1_2) controller.Register(group) ctx.Request, _ = http.NewRequest(http.MethodGet, "/api/memberships/keyTest", nil) diff --git a/splitio/proxy/initialization.go b/splitio/proxy/initialization.go index ad667c1f..60a129a4 100644 --- a/splitio/proxy/initialization.go +++ b/splitio/proxy/initialization.go @@ -26,6 +26,7 @@ import ( "github.com/splitio/split-synchronizer/v5/splitio/util" "github.com/splitio/go-split-commons/v8/conf" + "github.com/splitio/go-split-commons/v8/engine/grammar" "github.com/splitio/go-split-commons/v8/flagsets" "github.com/splitio/go-split-commons/v8/service/api" inmemory "github.com/splitio/go-split-commons/v8/storage/inmemory/mutexmap" @@ -83,6 +84,7 @@ func Start(logger logging.LoggerInterface, cfg *pconf.Main) error { // Proxy storages already implement the observable interface, so no need to wrap them splitStorage := storage.NewProxySplitStorage(dbInstance, logger, flagsets.NewFlagSetFilter(cfg.FlagSetsFilter), cfg.Initialization.Snapshot != "") + ruleBasedStorage := storage.NewProxyRuleBasedSegmentsStorage(logger) segmentStorage := storage.NewProxySegmentStorage(dbInstance, logger, cfg.Initialization.Snapshot != "") largeSegmentStorage := inmemory.NewLargeSegmentsStorage() @@ -117,10 +119,19 @@ func Start(logger logging.LoggerInterface, cfg *pconf.Main) error { eventsRecorder := api.NewHTTPEventsRecorder(cfg.Apikey, *advanced, logger) eventsTask := pTasks.NewEventsFlushTask(eventsRecorder, logger, 1, int(cfg.Sync.Advanced.EventsBuffer), int(cfg.Sync.Advanced.EventsWorkers)) + ruleBuilder := grammar.NewRuleBuilder( + segmentStorage, + ruleBasedStorage, + largeSegmentStorage, + adminCommon.ProducerFeatureFlagsRules, + adminCommon.ProducerRuleBasedSegmentRules, + logger, + nil) + // setup feature flags, segments & local telemetry API interactions workers := synchronizer.Workers{ - SplitUpdater: caching.NewCacheAwareSplitSync(splitStorage, splitAPI.SplitFetcher, logger, localTelemetryStorage, httpCache, appMonitor, flagSetsFilter, advanced.FlagsSpecVersion), - SegmentUpdater: caching.NewCacheAwareSegmentSync(splitStorage, segmentStorage, splitAPI.SegmentFetcher, logger, localTelemetryStorage, httpCache, + SplitUpdater: caching.NewCacheAwareSplitSync(splitStorage, ruleBasedStorage, splitAPI.SplitFetcher, logger, localTelemetryStorage, httpCache, appMonitor, flagSetsFilter, advanced.FlagsSpecVersion, ruleBuilder), + SegmentUpdater: caching.NewCacheAwareSegmentSync(splitStorage, segmentStorage, ruleBasedStorage, splitAPI.SegmentFetcher, logger, localTelemetryStorage, httpCache, appMonitor), TelemetryRecorder: telemetry.NewTelemetrySynchronizer(localTelemetryStorage, telemetryRecorder, splitStorage, segmentStorage, logger, metadata, localTelemetryStorage), @@ -165,7 +176,7 @@ func Start(logger logging.LoggerInterface, cfg *pconf.Main) error { // health monitors are only started after successful init (otherwise they'll fail if the app doesn't sync correctly within the /// specified refresh period) before := time.Now() - err = startBGSyng(syncManager, mstatus, cfg.Initialization.Snapshot != "", func() { + err = startBGSync(syncManager, mstatus, cfg.Initialization.Snapshot != "", func() { logger.Info("Synchronizer tasks started") appMonitor.Start() servicesMonitor.Start() @@ -249,6 +260,7 @@ func Start(logger logging.LoggerInterface, cfg *pconf.Main) error { SplitFetcher: splitAPI.SplitFetcher, ProxySplitStorage: splitStorage, ProxySegmentStorage: segmentStorage, + ProxyRBSegmentStorage: ruleBasedStorage, ImpressionsSink: impressionTask, ImpressionCountSink: impressionCountTask, EventsSink: eventsTask, @@ -262,6 +274,7 @@ func Start(logger logging.LoggerInterface, cfg *pconf.Main) error { FlagSets: cfg.FlagSetsFilter, FlagSetsStrictMatching: cfg.FlagSetStrictMatching, ProxyLargeSegmentStorage: largeSegmentStorage, + SpecVersion: cfg.FlagSpecVersion, } if ilcfg := cfg.Integrations.ImpressionListener; ilcfg.Endpoint != "" { @@ -286,8 +299,7 @@ var ( errUnrecoverable = errors.New("error and no snapshot available") ) -func startBGSyng(m synchronizer.Manager, mstatus chan int, haveSnapshot bool, onReady func()) error { - +func startBGSync(m synchronizer.Manager, mstatus chan int, haveSnapshot bool, onReady func()) error { attemptInit := func() bool { go m.Start() status := <-mstatus diff --git a/splitio/proxy/initialization_test.go b/splitio/proxy/initialization_test.go index 06066f15..4a6d4d7e 100644 --- a/splitio/proxy/initialization_test.go +++ b/splitio/proxy/initialization_test.go @@ -25,7 +25,7 @@ func (m *syncManagerMock) Start() { } func (m *syncManagerMock) Stop() { panic("unimplemented") } -func (m *syncManagerMock) StartBGSyng(mstatus chan int, shouldRetry bool, onReady func()) error { +func (m *syncManagerMock) StartBGSync(mstatus chan int, shouldRetry bool, onReady func()) error { panic("unimplemented") } @@ -37,7 +37,7 @@ func TestSyncManagerInitializationRetriesWithSnapshot(t *testing.T) { // No snapshot and error complete := make(chan struct{}, 1) - err := startBGSyng(sm, sm.c, false, func() { complete <- struct{}{} }) + err := startBGSync(sm, sm.c, false, func() { complete <- struct{}{} }) if err != errUnrecoverable { t.Error("should be an unrecoverable error. Got: ", err) } @@ -51,7 +51,7 @@ func TestSyncManagerInitializationRetriesWithSnapshot(t *testing.T) { // Snapshot and error atomic.StoreInt64(&sm.execCount, 0) - err = startBGSyng(sm, sm.c, true, func() { complete <- struct{}{} }) + err = startBGSync(sm, sm.c, true, func() { complete <- struct{}{} }) if err != errRetrying { t.Error("should be a retrying error. Got: ", err) } diff --git a/splitio/proxy/proxy.go b/splitio/proxy/proxy.go index 3f3c9b30..8a76e94c 100644 --- a/splitio/proxy/proxy.go +++ b/splitio/proxy/proxy.go @@ -51,6 +51,9 @@ type Options struct { // used to resolve segmentChanges & mySegments requests ProxySegmentStorage storage.ProxySegmentStorage + // used to resolve splitChanges with rule-based segments requests + ProxyRBSegmentStorage storage.ProxyRuleBasedSegmentsStorage + // ProxyLargeSegmentStorage ProxyLargeSegmentStorage cmnStorage.LargeSegmentsStorage @@ -165,6 +168,7 @@ func setupSdkController(options *Options) *controllers.SdkServerController { options.SplitFetcher, options.ProxySplitStorage, options.ProxySegmentStorage, + options.ProxyRBSegmentStorage, flagsets.NewMatcher(options.FlagSetsStrictMatching, options.FlagSets), options.ProxyLargeSegmentStorage, options.SpecVersion, diff --git a/splitio/proxy/storage/rulebasedsegments.go b/splitio/proxy/storage/rulebasedsegments.go new file mode 100644 index 00000000..8e9736e3 --- /dev/null +++ b/splitio/proxy/storage/rulebasedsegments.go @@ -0,0 +1,109 @@ +package storage + +import ( + "github.com/splitio/go-split-commons/v8/dtos" + "github.com/splitio/go-split-commons/v8/storage" + "github.com/splitio/go-split-commons/v8/storage/inmemory/mutexmap" + "github.com/splitio/go-toolkit/v5/datastructures/set" + "github.com/splitio/go-toolkit/v5/logging" +) + +// ProxyRuleBasedSegmentsStorage defines the interface of a storage that can be used for serving payloads +// for different requested `since` parameters +type ProxyRuleBasedSegmentsStorage interface { + ChangesSince(since int64) (*dtos.RuleBasedSegmentsDTO, error) +} + +// ProxyRuleBasedSegmentsStorageImpl implements the ProxyRuleBasedSegmentsStorage interface and the SplitProducer interface +type ProxyRuleBasedSegmentsStorageImpl struct { + snapshot mutexmap.RuleBasedSegmentsStorageImpl + logger logging.LoggerInterface + // mtx sync.Mutex +} + +// NewProxyRuleBasedSegmentsStorage instantiates a new proxy storage that wraps an in-memory snapshot of the last known +// flag configuration +func NewProxyRuleBasedSegmentsStorage(logger logging.LoggerInterface) *ProxyRuleBasedSegmentsStorageImpl { + snapshot := mutexmap.NewRuleBasedSegmentsStorage() + + return &ProxyRuleBasedSegmentsStorageImpl{ + snapshot: *snapshot, + logger: logger, + } +} + +// ChangesSince retrieves the rule-based segments changes since the given change number +func (p *ProxyRuleBasedSegmentsStorageImpl) ChangesSince(since int64) (*dtos.RuleBasedSegmentsDTO, error) { + if since > -1 { + return &dtos.RuleBasedSegmentsDTO{Since: since, Till: since, RuleBasedSegments: []dtos.RuleBasedSegmentDTO{}}, nil + } + cn, _ := p.snapshot.ChangeNumber() + return &dtos.RuleBasedSegmentsDTO{Since: since, Till: cn, RuleBasedSegments: p.snapshot.All()}, nil +} + +// All call is forwarded to the snapshot +func (p *ProxyRuleBasedSegmentsStorageImpl) All() []dtos.RuleBasedSegmentDTO { + return p.snapshot.All() +} + +// ChangeNumber returns the current change number +func (p *ProxyRuleBasedSegmentsStorageImpl) ChangeNumber() (int64, error) { + return p.snapshot.ChangeNumber() +} + +// Contains checks if the given rule-based segments are present in storage +func (p *ProxyRuleBasedSegmentsStorageImpl) Contains(rbs []string) bool { + return p.snapshot.Contains(rbs) +} + +// GetRuleBasedSegmentByName retrieves a rule-based segment by name +func (p *ProxyRuleBasedSegmentsStorageImpl) GetRuleBasedSegmentByName(name string) (*dtos.RuleBasedSegmentDTO, error) { + return p.snapshot.GetRuleBasedSegmentByName(name) +} + +// LargeSegments call is forwarded to the snapshot +func (p *ProxyRuleBasedSegmentsStorageImpl) LargeSegments() *set.ThreadUnsafeSet { + return p.snapshot.LargeSegments() +} + +// ReplaceAll replaces all rule-based segments in storage +func (p *ProxyRuleBasedSegmentsStorageImpl) ReplaceAll(rbs []dtos.RuleBasedSegmentDTO, cn int64) error { + return p.snapshot.ReplaceAll(rbs, cn) +} + +// RuleBasedSegmentNames retrieves the names of all rule-based segments +func (p *ProxyRuleBasedSegmentsStorageImpl) RuleBasedSegmentNames() ([]string, error) { + return p.snapshot.RuleBasedSegmentNames() +} + +// Segments retrieves the names of all segments used in rule-based segments +func (p *ProxyRuleBasedSegmentsStorageImpl) Segments() *set.ThreadUnsafeSet { + return p.snapshot.Segments() +} + +// SetChangeNumber sets the change number +func (p *ProxyRuleBasedSegmentsStorageImpl) SetChangeNumber(cn int64) error { + return p.snapshot.SetChangeNumber(cn) +} + +// Update +func (p *ProxyRuleBasedSegmentsStorageImpl) Update(toAdd []dtos.RuleBasedSegmentDTO, toRemove []dtos.RuleBasedSegmentDTO, cn int64) error { + // TODO Add the other logic + // p.setStartingPoint(changeNumber) // will be executed only the first time this method is called + + // if len(toAdd) == 0 && len(toRemove) == 0 { + // return + // } + + // p.mtx.Lock() + // p.snapshot.Update(toAdd, toRemove, changeNumber) + // p.historic.Update(toAdd, toRemove, changeNumber) + // p.db.Update(toAdd, toRemove, changeNumber) + // p.mtx.Unlock() + + p.snapshot.Update(toAdd, toRemove, cn) + return nil +} + +var _ ProxyRuleBasedSegmentsStorage = (*ProxyRuleBasedSegmentsStorageImpl)(nil) +var _ storage.RuleBasedSegmentsStorage = (*ProxyRuleBasedSegmentsStorageImpl)(nil) diff --git a/splitio/proxy/storage/splits.go b/splitio/proxy/storage/splits.go index b8b79627..cefc7688 100644 --- a/splitio/proxy/storage/splits.go +++ b/splitio/proxy/storage/splits.go @@ -17,10 +17,6 @@ import ( "github.com/splitio/go-toolkit/v5/logging" ) -const ( - maxRecipes = 1000 -) - // ErrSinceParamTooOld is returned when a summary is not cached for a requested change number var ErrSinceParamTooOld = errors.New("summary for requested change number not cached")