Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/gin-gonic/gin v1.10.1
github.com/google/uuid v1.3.0
github.com/splitio/gincache v1.0.1
github.com/splitio/go-split-commons/v8 v8.0.0-20251022154508-1ea26a26874f
github.com/splitio/go-split-commons/v8 v8.0.0-20251028203151-2b6d18a2f657
github.com/splitio/go-toolkit/v5 v5.4.1
github.com/stretchr/testify v1.11.1
go.etcd.io/bbolt v1.3.6
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ github.com/rogpeppe/go-internal v1.8.0 h1:FCbCCtXNOY3UtUuHUYaghJg4y7Fd14rXifAYUA
github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6poM+XZ2dLUbcbE=
github.com/splitio/gincache v1.0.1 h1:dLYdANY/BqH4KcUMCe/LluLyV5WtuE/LEdQWRE06IXU=
github.com/splitio/gincache v1.0.1/go.mod h1:CcgJDSM9Af75kyBH0724v55URVwMBuSj5x1eCWIOECY=
github.com/splitio/go-split-commons/v8 v8.0.0-20251022154508-1ea26a26874f h1:2o8Hu3G4jAoF6Y0Ceptr4Bwp3x9wFDenp494Cu/V5nU=
github.com/splitio/go-split-commons/v8 v8.0.0-20251022154508-1ea26a26874f/go.mod h1:vgRGPn0s4RC9/zp1nIn4KeeIEj/K3iXE2fxYQbCk/WI=
github.com/splitio/go-split-commons/v8 v8.0.0-20251028203151-2b6d18a2f657 h1:FYT0P+uFnXzALLgWOTIAJS6P4J1NpMGNi+rWsv2ZIkU=
github.com/splitio/go-split-commons/v8 v8.0.0-20251028203151-2b6d18a2f657/go.mod h1:vgRGPn0s4RC9/zp1nIn4KeeIEj/K3iXE2fxYQbCk/WI=
github.com/splitio/go-toolkit/v5 v5.4.1 h1:srTyvDBJZMUcJ/KiiQDMyjCuELVgTBh2TGRVn0sOXEE=
github.com/splitio/go-toolkit/v5 v5.4.1/go.mod h1:SifzysrOVDbzMcOE8zjX02+FG5az4FrR3Us/i5SeStw=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
Expand Down
30 changes: 22 additions & 8 deletions splitio/admin/common/config.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,28 @@
package common

import "github.com/splitio/go-split-commons/v8/storage"
import (
"github.com/splitio/go-split-commons/v8/engine/grammar/constants"
"github.com/splitio/go-split-commons/v8/storage"
)

var ProducerFeatureFlagsRules = []string{constants.MatcherTypeAllKeys, constants.MatcherTypeInSegment, constants.MatcherTypeWhitelist, constants.MatcherTypeEqualTo, constants.MatcherTypeGreaterThanOrEqualTo, constants.MatcherTypeLessThanOrEqualTo, constants.MatcherTypeBetween,
constants.MatcherTypeEqualToSet, constants.MatcherTypePartOfSet, constants.MatcherTypeContainsAllOfSet, constants.MatcherTypeContainsAnyOfSet, constants.MatcherTypeStartsWith, constants.MatcherTypeEndsWith, constants.MatcherTypeContainsString, constants.MatcherTypeInSplitTreatment,
constants.MatcherTypeEqualToBoolean, constants.MatcherTypeMatchesString, constants.MatcherEqualToSemver, constants.MatcherTypeGreaterThanOrEqualToSemver, constants.MatcherTypeLessThanOrEqualToSemver, constants.MatcherTypeBetweenSemver, constants.MatcherTypeInListSemver,
constants.MatcherTypeInRuleBasedSegment}

var ProducerRuleBasedSegmentRules = []string{constants.MatcherTypeAllKeys, constants.MatcherTypeInSegment, constants.MatcherTypeWhitelist, constants.MatcherTypeEqualTo, constants.MatcherTypeGreaterThanOrEqualTo, constants.MatcherTypeLessThanOrEqualTo, constants.MatcherTypeBetween,
constants.MatcherTypeEqualToSet, constants.MatcherTypePartOfSet, constants.MatcherTypeContainsAllOfSet, constants.MatcherTypeContainsAnyOfSet, constants.MatcherTypeStartsWith, constants.MatcherTypeEndsWith, constants.MatcherTypeContainsString,
constants.MatcherTypeEqualToBoolean, constants.MatcherTypeMatchesString, constants.MatcherEqualToSemver, constants.MatcherTypeGreaterThanOrEqualToSemver, constants.MatcherTypeLessThanOrEqualToSemver, constants.MatcherTypeBetweenSemver, constants.MatcherTypeInListSemver,
constants.MatcherTypeInRuleBasedSegment}

// Storages wraps storages in one struct
type Storages struct {
SplitStorage storage.SplitStorage
SegmentStorage storage.SegmentStorage
LocalTelemetryStorage storage.TelemetryRuntimeConsumer
EventStorage storage.EventMultiSdkConsumer
ImpressionStorage storage.ImpressionMultiSdkConsumer
UniqueKeysStorage storage.UniqueKeysMultiSdkConsumer
LargeSegmentStorage storage.LargeSegmentsStorage
SplitStorage storage.SplitStorage
SegmentStorage storage.SegmentStorage
LocalTelemetryStorage storage.TelemetryRuntimeConsumer
EventStorage storage.EventMultiSdkConsumer
ImpressionStorage storage.ImpressionMultiSdkConsumer
UniqueKeysStorage storage.UniqueKeysMultiSdkConsumer
LargeSegmentStorage storage.LargeSegmentsStorage
RuleBasedSegmentsStorage storage.RuleBasedSegmentsStorage
}
2 changes: 1 addition & 1 deletion splitio/commitversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ This file is created automatically, please do not edit
*/

// CommitVersion is the version of the last commit previous to release
const CommitVersion = "5e4d9e1"
const CommitVersion = "1807589"
2 changes: 1 addition & 1 deletion splitio/producer/conf/sections.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type Main struct {
Integrations conf.Integrations `json:"integrations" s-nested:"true"`
Logging conf.Logging `json:"logging" s-nested:"true"`
Healthcheck Healthcheck `json:"healthcheck" s-nested:"true"`
FlagSpecVersion string `json:"flagSpecVersion" s-cli:"flag-spec-version" s-def:"1.1" s-desc:"Spec version for flags"`
FlagSpecVersion string `json:"flagSpecVersion" s-cli:"flag-spec-version" s-def:"1.3" s-desc:"Spec version for flags"`
}

// BuildAdvancedConfig generates a commons-compatible advancedconfig with default + overriden parameters
Expand Down
29 changes: 19 additions & 10 deletions splitio/producer/initialization.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,13 @@ func Start(logger logging.LoggerInterface, cfg *conf.Main) error {
return fmt.Errorf("error instantiating observable segment storage: %w", err)
}
storages := adminCommon.Storages{
SplitStorage: splitStorage,
SegmentStorage: segmentStorage,
LocalTelemetryStorage: syncTelemetryStorage,
ImpressionStorage: redis.NewImpressionStorage(redisClient, dtos.Metadata{}, logger),
EventStorage: redis.NewEventsStorage(redisClient, dtos.Metadata{}, logger),
UniqueKeysStorage: redis.NewUniqueKeysMultiSdkConsumer(redisClient, logger),
SplitStorage: splitStorage,
SegmentStorage: segmentStorage,
LocalTelemetryStorage: syncTelemetryStorage,
ImpressionStorage: redis.NewImpressionStorage(redisClient, dtos.Metadata{}, logger),
EventStorage: redis.NewEventsStorage(redisClient, dtos.Metadata{}, logger),
UniqueKeysStorage: redis.NewUniqueKeysMultiSdkConsumer(redisClient, logger),
RuleBasedSegmentsStorage: redis.NewRuleBasedStorage(redisClient, logger),
}

// Healcheck Monitor
Expand All @@ -125,11 +126,19 @@ func Start(logger logging.LoggerInterface, cfg *conf.Main) error {
// Creating Workers and Tasks
eventEvictionMonitor := evcalc.New(1)

ruleBuilder := grammar.NewRuleBuilder(
storages.SegmentStorage,
storages.RuleBasedSegmentsStorage,
storages.LargeSegmentStorage,
adminCommon.ProducerFeatureFlagsRules,
adminCommon.ProducerRuleBasedSegmentRules,
logger,
nil)

workers := synchronizer.Workers{
// TODO add ruleBasedSegmentStorage, ruleBuilder, sdkOverrides
SplitUpdater: split.NewSplitUpdater(storages.SplitStorage, nil, splitAPI.SplitFetcher, logger, syncTelemetryStorage, appMonitor, flagSetsFilter, grammar.RuleBuilder{}, false, cfg.FlagSpecVersion),
// TODO add ruleBasedSegmentStorage
SegmentUpdater: segment.NewSegmentUpdater(storages.SplitStorage, storages.SegmentStorage, nil, splitAPI.SegmentFetcher,
// TODO add sdkOverrides
SplitUpdater: split.NewSplitUpdater(storages.SplitStorage, storages.RuleBasedSegmentsStorage, splitAPI.SplitFetcher, logger, syncTelemetryStorage, appMonitor, flagSetsFilter, ruleBuilder, false, cfg.FlagSpecVersion),
SegmentUpdater: segment.NewSegmentUpdater(storages.SplitStorage, storages.SegmentStorage, storages.RuleBasedSegmentsStorage, splitAPI.SegmentFetcher,
logger, syncTelemetryStorage, appMonitor),
ImpressionsCountRecorder: impressionscount.NewRecorderSingle(impressionsCounter, splitAPI.ImpressionRecorder,
metadata, logger, syncTelemetryStorage),
Expand Down
23 changes: 17 additions & 6 deletions splitio/proxy/caching/workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,34 +17,41 @@ import (
// CacheAwareSplitSynchronizer wraps a SplitSynchronizer and flushes cache when an update happens
type CacheAwareSplitSynchronizer struct {
splitStorage storage.SplitStorage
rbStorage storage.RuleBasedSegmentsStorage
wrapped split.Updater
cacheFlusher gincache.CacheFlusher
}

// NewCacheAwareSplitSync constructs a split-sync wrapper that evicts cache on updates
func NewCacheAwareSplitSync(
splitStorage storage.SplitStorage,
ruleBasedStorage storage.RuleBasedSegmentsStorage,
splitFetcher service.SplitFetcher,
logger logging.LoggerInterface,
runtimeTelemetry storage.TelemetryRuntimeProducer,
cacheFlusher gincache.CacheFlusher,
appMonitor application.MonitorProducerInterface,
flagSetsFilter flagsets.FlagSetFilter,
specVersion string,
ruleBuilder grammar.RuleBuilder,
) *CacheAwareSplitSynchronizer {
return &CacheAwareSplitSynchronizer{
// TODO add ruleBasedSegmentStorage, ruleBuilder, increase FLAG SPEC when we support RUleBased
wrapped: split.NewSplitUpdater(splitStorage, nil, splitFetcher, logger, runtimeTelemetry, appMonitor, flagSetsFilter, grammar.RuleBuilder{}, false, specVersion),
wrapped: split.NewSplitUpdater(splitStorage, ruleBasedStorage, splitFetcher, logger, runtimeTelemetry, appMonitor, flagSetsFilter, ruleBuilder, false, specVersion),
splitStorage: splitStorage,
rbStorage: ruleBasedStorage,
cacheFlusher: cacheFlusher,
}
}

// SynchronizeSplits synchronizes feature flags and if something changes, purges the cache appropriately
func (c *CacheAwareSplitSynchronizer) SynchronizeSplits(till *int64) (*split.UpdateResult, error) {
previous, _ := c.splitStorage.ChangeNumber()
previousRB, _ := c.rbStorage.ChangeNumber()

result, err := c.wrapped.SynchronizeSplits(till)
if current, _ := c.splitStorage.ChangeNumber(); current > previous || (previous != -1 && current == -1) {
current, _ := c.splitStorage.ChangeNumber()
currentRB, _ := c.rbStorage.ChangeNumber()
if current > previous || (previous != -1 && current == -1) || currentRB > previousRB || (previousRB != -1 && currentRB == -1) {
// if the changenumber was updated, evict splitChanges responses from cache
c.cacheFlusher.EvictBySurrogate(SplitSurrogate)
}
Expand All @@ -61,8 +68,12 @@ func (c *CacheAwareSplitSynchronizer) LocalKill(splitName string, defaultTreatme
// SynchronizeFeatureFlags synchronizes feature flags and if something changes, purges the cache appropriately
func (c *CacheAwareSplitSynchronizer) SynchronizeFeatureFlags(ffChange *dtos.SplitChangeUpdate) (*split.UpdateResult, error) {
previous, _ := c.splitStorage.ChangeNumber()
previousRB, _ := c.rbStorage.ChangeNumber()

result, err := c.wrapped.SynchronizeFeatureFlags(ffChange)
if current, _ := c.splitStorage.ChangeNumber(); current > previous || (previous != -1 && current == -1) {
current, _ := c.splitStorage.ChangeNumber()
currentRB, _ := c.rbStorage.ChangeNumber()
if current > previous || (previous != -1 && current == -1) || currentRB > previousRB || (previousRB != -1 && currentRB == -1) {
// if the changenumber was updated, evict splitChanges responses from cache
c.cacheFlusher.EvictBySurrogate(SplitSurrogate)
}
Expand All @@ -81,15 +92,15 @@ type CacheAwareSegmentSynchronizer struct {
func NewCacheAwareSegmentSync(
splitStorage storage.SplitStorage,
segmentStorage storage.SegmentStorage,
ruleBasedStorage storage.RuleBasedSegmentsStorage,
segmentFetcher service.SegmentFetcher,
logger logging.LoggerInterface,
runtimeTelemetry storage.TelemetryRuntimeProducer,
cacheFlusher gincache.CacheFlusher,
appMonitor application.MonitorProducerInterface,
) *CacheAwareSegmentSynchronizer {
return &CacheAwareSegmentSynchronizer{
// TODO add ruleBasedSegmentStorage
wrapped: segment.NewSegmentUpdater(splitStorage, segmentStorage, nil, segmentFetcher, logger, runtimeTelemetry, appMonitor),
wrapped: segment.NewSegmentUpdater(splitStorage, segmentStorage, ruleBasedStorage, segmentFetcher, logger, runtimeTelemetry, appMonitor),
cacheFlusher: cacheFlusher,
splitStorage: splitStorage,
segmentStorage: segmentStorage,
Expand Down
2 changes: 1 addition & 1 deletion splitio/proxy/conf/sections.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type Main struct {
Logging conf.Logging `json:"logging" s-nested:"true"`
Healthcheck Healthcheck `json:"healthcheck" s-nested:"true"`
Observability Observability `json:"observability" s-nested:"true"`
FlagSpecVersion string `json:"flagSpecVersion" s-cli:"flag-spec-version" s-def:"1.2" s-desc:"Spec version for flags"`
FlagSpecVersion string `json:"flagSpecVersion" s-cli:"flag-spec-version" s-def:"1.3" s-desc:"Spec version for flags"`
}

// BuildAdvancedConfig generates a commons-compatible advancedconfig with default + overriden parameters
Expand Down
103 changes: 74 additions & 29 deletions splitio/proxy/controllers/sdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,15 @@ import (

// SdkServerController bundles all request handler for sdk-server apis
type SdkServerController struct {
logger logging.LoggerInterface
fetcher service.SplitFetcher
proxySplitStorage storage.ProxySplitStorage
proxySegmentStorage storage.ProxySegmentStorage
fsmatcher flagsets.FlagSetMatcher
versionFilter specs.SplitVersionFilter
largeSegmentStorage cmnStorage.LargeSegmentsStorage
specVersion string
logger logging.LoggerInterface
fetcher service.SplitFetcher
proxySplitStorage storage.ProxySplitStorage
proxyRBSegmentStorage storage.ProxyRuleBasedSegmentsStorage
proxySegmentStorage storage.ProxySegmentStorage
fsmatcher flagsets.FlagSetMatcher
versionFilter specs.SplitVersionFilter
largeSegmentStorage cmnStorage.LargeSegmentsStorage
specVersion string
}

// NewSdkServerController instantiates a new sdk server controller
Expand All @@ -41,19 +42,21 @@ func NewSdkServerController(
fetcher service.SplitFetcher,
proxySplitStorage storage.ProxySplitStorage,
proxySegmentStorage storage.ProxySegmentStorage,
proxyRBSegmentStorage storage.ProxyRuleBasedSegmentsStorage,
fsmatcher flagsets.FlagSetMatcher,
largeSegmentStorage cmnStorage.LargeSegmentsStorage,
specVersion string,
) *SdkServerController {
return &SdkServerController{
logger: logger,
fetcher: fetcher,
proxySplitStorage: proxySplitStorage,
proxySegmentStorage: proxySegmentStorage,
fsmatcher: fsmatcher,
versionFilter: specs.NewSplitVersionFilter(),
largeSegmentStorage: largeSegmentStorage,
specVersion: specVersion,
logger: logger,
fetcher: fetcher,
proxySplitStorage: proxySplitStorage,
proxySegmentStorage: proxySegmentStorage,
proxyRBSegmentStorage: proxyRBSegmentStorage,
fsmatcher: fsmatcher,
versionFilter: specs.NewSplitVersionFilter(),
largeSegmentStorage: largeSegmentStorage,
specVersion: specVersion,
}
}

Expand Down Expand Up @@ -107,6 +110,11 @@ func (c *SdkServerController) SplitChanges(ctx *gin.Context) {
since = -1
}

rbsince, err := strconv.ParseInt(ctx.DefaultQuery("rbSince", "-1"), 10, 64)
if err != nil {
rbsince = -1
}

var rawSets []string
if fq, ok := ctx.GetQuery("sets"); ok {
rawSets = strings.Split(fq, ",")
Expand All @@ -116,9 +124,9 @@ func (c *SdkServerController) SplitChanges(ctx *gin.Context) {
c.logger.Warning(fmt.Sprintf("SDK [%s] is sending flagsets unordered or with duplicates.", ctx.Request.Header.Get("SplitSDKVersion")))
}

c.logger.Debug(fmt.Sprintf("SDK Fetches Feature Flags Since: %d", since))
c.logger.Debug(fmt.Sprintf("SDK Fetches Feature Flags Since: %d, RBSince: %d", since, rbsince))

splits, err := c.fetchSplitChangesSince(since, sets)
rules, err := c.fetchRulesSince(since, rbsince, sets)
if err != nil {
c.logger.Error("error fetching splitChanges payload from storage: ", err)
ctx.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
Expand All @@ -133,9 +141,23 @@ func (c *SdkServerController) SplitChanges(ctx *gin.Context) {
return
}

splits.Splits = c.patchUnsupportedMatchers(splits.Splits, spec)
rules.FeatureFlags.Splits = c.patchUnsupportedMatchers(rules.FeatureFlags.Splits, spec)

ctx.JSON(http.StatusOK, splits)
if spec == specs.FLAG_V1_3 {
fmt.Println("1.3 1.3 1.3 1.3 1.3 ")
fmt.Println("cantidad de ff:", len(rules.FeatureFlags.Splits), rules.FeatureFlags.Since, rules.FeatureFlags.Till)
fmt.Println("cantidad de rb:", len(rules.RuleBasedSegments.RuleBasedSegments), rules.RuleBasedSegments.Since, rules.RuleBasedSegments.Till)
ctx.JSON(http.StatusOK, rules)
ctx.Set(caching.SurrogateContextKey, []string{caching.SplitSurrogate})
ctx.Set(caching.StickyContextKey, true)
return
}
fmt.Println("otra otra otra otra")
ctx.JSON(http.StatusOK, dtos.SplitChangesDTO{
Splits: rules.FeatureFlags.Splits,
Since: rules.FeatureFlags.Since,
Till: rules.FeatureFlags.Till,
})
ctx.Set(caching.SurrogateContextKey, []string{caching.SplitSurrogate})
ctx.Set(caching.StickyContextKey, true)
}
Expand Down Expand Up @@ -187,26 +209,49 @@ func (c *SdkServerController) MySegments(ctx *gin.Context) {
ctx.Set(caching.SurrogateContextKey, caching.MakeSurrogateForMySegments(mySegments))
}

func (c *SdkServerController) fetchSplitChangesSince(since int64, sets []string) (*dtos.SplitChangesDTO, error) {
func (c *SdkServerController) fetchRulesSince(since int64, rbsince int64, sets []string) (*dtos.RuleChangesDTO, error) {
fmt.Println("split change since: ", since)
splits, err := c.proxySplitStorage.ChangesSince(since, sets)
if err == nil {
return splits, nil
fmt.Println("split result: ", splits, err)
fmt.Println("rule baseed since: ", rbsince)
rbs, rbsErr := c.proxyRBSegmentStorage.ChangesSince(rbsince)
fmt.Println("rulebased result: ", rbs, rbsErr)
if err == nil && rbsErr == nil {
return &dtos.RuleChangesDTO{
FeatureFlags: dtos.FeatureFlagsDTO{
Splits: splits.Splits,
Till: splits.Till,
Since: splits.Since,
},
RuleBasedSegments: *rbs,
}, err
}
if !errors.Is(err, storage.ErrSinceParamTooOld) {
if err != nil && !errors.Is(err, storage.ErrSinceParamTooOld) {
return nil, fmt.Errorf("unexpected error fetching feature flag changes from storage: %w", err)
}

if rbsErr != nil && !errors.Is(rbsErr, storage.ErrSinceParamTooOld) {
return nil, fmt.Errorf("unexpected error fetching rule-based segments changes from storage: %w", rbsErr)
}

// perform a fetch to the BE using the supplied `since`, have the storage process it's response &, retry
// TODO(mredolatti): implement basic collapsing here to avoid flooding the BE with requests
fetchOptions := service.MakeFlagRequestParams().WithSpecVersion(common.StringRef(c.specVersion)).WithChangeNumber(since).WithFlagSetsFilter(strings.Join(sets, ",")) // at this point the sets have been sanitized & sorted
fetchOptions := service.MakeFlagRequestParams().WithSpecVersion(common.StringRef(c.specVersion)).WithChangeNumber(since).WithChangeNumberRB(rbsince).WithFlagSetsFilter(strings.Join(sets, ",")) // at this point the sets have been sanitized & sorted
ruleChanges, err := c.fetcher.Fetch(fetchOptions)
if err != nil {
return nil, err
}
return &dtos.SplitChangesDTO{
Since: ruleChanges.FFSince(),
Till: ruleChanges.FFTill(),
Splits: ruleChanges.FeatureFlags(),
return &dtos.RuleChangesDTO{
FeatureFlags: dtos.FeatureFlagsDTO{
Splits: ruleChanges.FeatureFlags(),
Till: ruleChanges.FFTill(),
Since: ruleChanges.FFSince(),
},
RuleBasedSegments: dtos.RuleBasedSegmentsDTO{
RuleBasedSegments: ruleChanges.RuleBasedSegments(),
Till: ruleChanges.RBTill(),
Since: ruleChanges.RBSince(),
},
}, nil
}

Expand Down
Loading
Loading