Skip to content

Commit 721fc2d

Browse files
authored
Merge pull request #555 from splitio/rbs-fix-segment-sync
Fixed segment sync
2 parents 1765280 + 58a5be5 commit 721fc2d

File tree

10 files changed

+107
-53
lines changed

10 files changed

+107
-53
lines changed

client/src/main/java/io/split/client/SplitFactoryImpl.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@
6969
import io.split.storages.SplitCacheProducer;
7070
import io.split.storages.RuleBasedSegmentCache;
7171
import io.split.storages.RuleBasedSegmentCacheProducer;
72+
import io.split.storages.RuleBasedSegmentCacheConsumer;
7273
import io.split.storages.enums.OperationMode;
7374
import io.split.storages.memory.InMemoryCacheImp;
7475
import io.split.storages.memory.SegmentCacheInMemoryImpl;
@@ -218,7 +219,7 @@ public SplitFactoryImpl(String apiToken, SplitClientConfig config) throws URISyn
218219
splitCache, _segmentCache, telemetryStorage, _startTime);
219220

220221
// Segments
221-
_segmentSynchronizationTaskImp = buildSegments(config, segmentCache, splitCache);
222+
_segmentSynchronizationTaskImp = buildSegments(config, segmentCache, splitCache, ruleBasedSegmentCache);
222223

223224
SplitParser splitParser = new SplitParser();
224225
RuleBasedSegmentParser ruleBasedSegmentParser = new RuleBasedSegmentParser();
@@ -420,7 +421,8 @@ protected SplitFactoryImpl(SplitClientConfig config) {
420421
segmentCache,
421422
_telemetryStorageProducer,
422423
_splitCache,
423-
config.getThreadFactory());
424+
config.getThreadFactory(),
425+
_ruleBasedSegmentCache);
424426

425427
// SplitFetcher
426428
SplitChangeFetcher splitChangeFetcher = createSplitChangeFetcher(config);
@@ -607,7 +609,7 @@ private static HttpClientBuilder setupProxy(HttpClientBuilder httpClientbuilder,
607609

608610
private SegmentSynchronizationTaskImp buildSegments(SplitClientConfig config,
609611
SegmentCacheProducer segmentCacheProducer,
610-
SplitCacheConsumer splitCacheConsumer) throws URISyntaxException {
612+
SplitCacheConsumer splitCacheConsumer, RuleBasedSegmentCacheConsumer ruleBasedSegmentCache) throws URISyntaxException {
611613
SegmentChangeFetcher segmentChangeFetcher = HttpSegmentChangeFetcher.create(_splitHttpClient, _rootTarget,
612614
_telemetryStorageProducer);
613615

@@ -617,7 +619,8 @@ private SegmentSynchronizationTaskImp buildSegments(SplitClientConfig config,
617619
segmentCacheProducer,
618620
_telemetryStorageProducer,
619621
splitCacheConsumer,
620-
config.getThreadFactory());
622+
config.getThreadFactory(),
623+
ruleBasedSegmentCache);
621624
}
622625

623626
private SplitFetcher buildSplitFetcher(SplitCacheProducer splitCacheProducer, SplitParser splitParser,

client/src/main/java/io/split/engine/segments/SegmentSynchronizationTaskImp.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,18 @@
33
import com.google.common.collect.Maps;
44
import io.split.client.utils.SplitExecutorFactory;
55
import io.split.engine.common.FetchOptions;
6+
import io.split.storages.RuleBasedSegmentCacheConsumer;
67
import io.split.storages.SegmentCacheProducer;
78
import io.split.storages.SplitCacheConsumer;
89
import io.split.telemetry.storage.TelemetryRuntimeProducer;
910
import org.slf4j.Logger;
1011
import org.slf4j.LoggerFactory;
1112

1213
import java.io.Closeable;
14+
import java.util.HashSet;
1315
import java.util.List;
1416
import java.util.Map;
17+
import java.util.Set;
1518
import java.util.concurrent.ConcurrentMap;
1619
import java.util.concurrent.ExecutionException;
1720
import java.util.concurrent.Future;
@@ -38,12 +41,14 @@ public class SegmentSynchronizationTaskImp implements SegmentSynchronizationTask
3841
private final ScheduledExecutorService _scheduledExecutorService;
3942
private final TelemetryRuntimeProducer _telemetryRuntimeProducer;
4043
private final SplitCacheConsumer _splitCacheConsumer;
44+
private final RuleBasedSegmentCacheConsumer _ruleBasedSegmentCacheConsumer;
4145

4246
private ScheduledFuture<?> _scheduledFuture;
4347

4448
public SegmentSynchronizationTaskImp(SegmentChangeFetcher segmentChangeFetcher, long refreshEveryNSeconds, int numThreads,
4549
SegmentCacheProducer segmentCacheProducer, TelemetryRuntimeProducer telemetryRuntimeProducer,
46-
SplitCacheConsumer splitCacheConsumer, ThreadFactory threadFactory) {
50+
SplitCacheConsumer splitCacheConsumer, ThreadFactory threadFactory,
51+
RuleBasedSegmentCacheConsumer ruleBasedSegmentCacheConsumer) {
4752
_segmentChangeFetcher = checkNotNull(segmentChangeFetcher);
4853

4954
checkArgument(refreshEveryNSeconds >= 0L);
@@ -54,6 +59,7 @@ public SegmentSynchronizationTaskImp(SegmentChangeFetcher segmentChangeFetcher,
5459
_segmentCacheProducer = checkNotNull(segmentCacheProducer);
5560
_telemetryRuntimeProducer = checkNotNull(telemetryRuntimeProducer);
5661
_splitCacheConsumer = checkNotNull(splitCacheConsumer);
62+
_ruleBasedSegmentCacheConsumer = checkNotNull(ruleBasedSegmentCacheConsumer);
5763
}
5864

5965
public void initializeSegment(String segmentName) {
@@ -136,7 +142,8 @@ public boolean isRunning() {
136142
}
137143

138144
public void fetchAll(boolean addCacheHeader) {
139-
_splitCacheConsumer.getSegments().forEach(this::initialize);
145+
Set<String> names = getSegmentNames();
146+
names.forEach(this::initialize);
140147
for (Map.Entry<String, SegmentFetcher> entry : _segmentFetchers.entrySet()) {
141148
SegmentFetcher fetcher = entry.getValue();
142149

@@ -155,6 +162,7 @@ public void fetchAll(boolean addCacheHeader) {
155162

156163
public boolean fetchAllSynchronous() {
157164
_splitCacheConsumer.getSegments().forEach(this::initialize);
165+
_ruleBasedSegmentCacheConsumer.getSegments().forEach(this::initialize);
158166
List<Future<Boolean>> segmentFetchExecutions = _segmentFetchers.entrySet()
159167
.stream().map(e -> _scheduledExecutorService.submit(e.getValue()::runWhitCacheHeader))
160168
.collect(Collectors.toList());
@@ -192,4 +200,11 @@ private void initialize(String segmentName) {
192200
_segmentFetchers.putIfAbsent(segmentName, segment);
193201
}
194202
}
203+
204+
private Set<String> getSegmentNames() {
205+
Set<String> names = new HashSet<>(_splitCacheConsumer.getSegments());
206+
names.addAll(_ruleBasedSegmentCacheConsumer.getSegments());
207+
208+
return names;
209+
}
195210
}

client/src/main/java/io/split/engine/sse/NotificationProcessorImp.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
package io.split.engine.sse;
22

33
import com.google.common.annotations.VisibleForTesting;
4+
import io.split.client.dtos.Split;
45
import io.split.engine.sse.dtos.GenericNotificationData;
56
import io.split.engine.sse.dtos.IncomingNotification;
67
import io.split.engine.sse.dtos.SplitKillNotification;
78
import io.split.engine.sse.dtos.StatusNotification;
89
import io.split.engine.sse.dtos.SegmentQueueDto;
10+
import io.split.engine.sse.dtos.CommonChangeNotification;
911
import io.split.engine.sse.workers.FeatureFlagsWorker;
1012
import io.split.engine.sse.workers.Worker;
1113

@@ -42,10 +44,10 @@ public void process(IncomingNotification notification) {
4244
@Override
4345
public void processSplitKill(SplitKillNotification splitKillNotification) {
4446
_featureFlagsWorker.kill(splitKillNotification);
45-
_featureFlagsWorker.addToQueue(new SplitKillNotification(GenericNotificationData.builder()
47+
_featureFlagsWorker.addToQueue(new CommonChangeNotification<>(GenericNotificationData.builder()
4648
.changeNumber(splitKillNotification.getChangeNumber())
4749
.channel(splitKillNotification.getChannel())
48-
.build()));
50+
.build(), Split.class));
4951
}
5052

5153
@Override

client/src/test/java/io/split/engine/common/LocalhostSynchronizerTest.java

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,7 @@
99
import io.split.engine.experiments.*;
1010
import io.split.engine.segments.SegmentChangeFetcher;
1111
import io.split.engine.segments.SegmentSynchronizationTaskImp;
12-
import io.split.storages.RuleBasedSegmentCacheProducer;
13-
import io.split.storages.SegmentCacheProducer;
14-
import io.split.storages.SplitCache;
15-
import io.split.storages.SplitCacheProducer;
12+
import io.split.storages.*;
1613
import io.split.storages.memory.InMemoryCacheImp;
1714
import io.split.storages.memory.RuleBasedSegmentCacheInMemoryImp;
1815
import io.split.storages.memory.SegmentCacheInMemoryImpl;
@@ -36,19 +33,19 @@ public void testSyncAll(){
3633
InputStreamProvider inputStreamProvider = new FileInputStreamProvider("src/test/resources/split_init.json");
3734
SplitChangeFetcher splitChangeFetcher = new JsonLocalhostSplitChangeFetcher(inputStreamProvider);
3835
SplitParser splitParser = new SplitParser();
39-
RuleBasedSegmentCacheProducer ruleBasedSegmentCacheProducer = new RuleBasedSegmentCacheInMemoryImp();
36+
RuleBasedSegmentCache ruleBasedSegmentCache = new RuleBasedSegmentCacheInMemoryImp();
4037
RuleBasedSegmentParser ruleBasedSegmentParser = new RuleBasedSegmentParser();
4138

4239
SplitFetcher splitFetcher = new SplitFetcherImp(splitChangeFetcher, splitParser, splitCacheProducer, TELEMETRY_STORAGE_NOOP, FLAG_SETS_FILTER,
43-
ruleBasedSegmentParser, ruleBasedSegmentCacheProducer);
40+
ruleBasedSegmentParser, ruleBasedSegmentCache);
4441

4542
SplitSynchronizationTask splitSynchronizationTask = new SplitSynchronizationTask(splitFetcher, splitCacheProducer, 1000L, null);
4643

4744
SegmentChangeFetcher segmentChangeFetcher = new LocalhostSegmentChangeFetcher("src/test/resources/");
4845
SegmentCacheProducer segmentCacheProducer = new SegmentCacheInMemoryImpl();
4946

5047
SegmentSynchronizationTaskImp segmentSynchronizationTaskImp = new SegmentSynchronizationTaskImp(segmentChangeFetcher, 1000, 1, segmentCacheProducer,
51-
TELEMETRY_STORAGE_NOOP, splitCacheProducer, null);
48+
TELEMETRY_STORAGE_NOOP, splitCacheProducer, null, ruleBasedSegmentCache);
5249
SplitTasks splitTasks = SplitTasks.build(splitSynchronizationTask, segmentSynchronizationTaskImp, null, null, null, null);
5350

5451
LocalhostSynchronizer localhostSynchronizer = new LocalhostSynchronizer(splitTasks, splitFetcher, false);
@@ -62,11 +59,11 @@ public void testPeriodicFetching() throws InterruptedException {
6259

6360
SplitChangeFetcher splitChangeFetcher = Mockito.mock(JsonLocalhostSplitChangeFetcher.class);
6461
SplitParser splitParser = new SplitParser();
65-
RuleBasedSegmentCacheProducer ruleBasedSegmentCacheProducer = new RuleBasedSegmentCacheInMemoryImp();
62+
RuleBasedSegmentCache ruleBasedSegmentCache = new RuleBasedSegmentCacheInMemoryImp();
6663
RuleBasedSegmentParser ruleBasedSegmentParser = new RuleBasedSegmentParser();
6764

6865
SplitFetcher splitFetcher = new SplitFetcherImp(splitChangeFetcher, splitParser, splitCacheProducer, TELEMETRY_STORAGE_NOOP, FLAG_SETS_FILTER,
69-
ruleBasedSegmentParser, ruleBasedSegmentCacheProducer);
66+
ruleBasedSegmentParser, ruleBasedSegmentCache);
7067

7168
SplitSynchronizationTask splitSynchronizationTask = new SplitSynchronizationTask(splitFetcher, splitCacheProducer, 1000L, null);
7269
FetchOptions fetchOptions = new FetchOptions.Builder().build();
@@ -75,7 +72,7 @@ public void testPeriodicFetching() throws InterruptedException {
7572
SegmentCacheProducer segmentCacheProducer = new SegmentCacheInMemoryImpl();
7673

7774
SegmentSynchronizationTaskImp segmentSynchronizationTaskImp = new SegmentSynchronizationTaskImp(segmentChangeFetcher, 1000, 1, segmentCacheProducer,
78-
TELEMETRY_STORAGE_NOOP, splitCacheProducer, null);
75+
TELEMETRY_STORAGE_NOOP, splitCacheProducer, null, ruleBasedSegmentCache);
7976

8077
SplitTasks splitTasks = SplitTasks.build(splitSynchronizationTask, segmentSynchronizationTaskImp, null, null, null, null);
8178
LocalhostSynchronizer localhostSynchronizer = new LocalhostSynchronizer(splitTasks, splitFetcher, true);

client/src/test/java/io/split/engine/common/SynchronizerTest.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,7 @@
77
import io.split.client.interceptors.FlagSetsFilterImpl;
88
import io.split.engine.segments.SegmentChangeFetcher;
99
import io.split.engine.segments.SegmentSynchronizationTaskImp;
10-
import io.split.storages.SegmentCache;
11-
import io.split.storages.SegmentCacheProducer;
12-
import io.split.storages.SplitCache;
13-
import io.split.storages.SplitCacheConsumer;
14-
import io.split.storages.SplitCacheProducer;
15-
import io.split.storages.RuleBasedSegmentCacheProducer;
10+
import io.split.storages.*;
1611
import io.split.storages.memory.InMemoryCacheImp;
1712
import io.split.engine.experiments.FetchResult;
1813
import io.split.engine.experiments.SplitFetcherImp;
@@ -88,7 +83,7 @@ public void syncAll() throws InterruptedException {
8883
public void testSyncAllSegments() throws InterruptedException, NoSuchFieldException, IllegalAccessException {
8984
SegmentSynchronizationTask segmentSynchronizationTask = new SegmentSynchronizationTaskImp(Mockito.mock(SegmentChangeFetcher.class),
9085
20L, 1, _segmentCacheProducer, Mockito.mock(TelemetryRuntimeProducer.class),
91-
Mockito.mock(SplitCacheConsumer.class), null);
86+
Mockito.mock(SplitCacheConsumer.class), null, Mockito.mock(RuleBasedSegmentCache.class));
9287
Field synchronizerSegmentFetcher = SynchronizerImp.class.getDeclaredField("_segmentSynchronizationTaskImp");
9388
synchronizerSegmentFetcher.setAccessible(true);
9489
Field modifiersField = Field.class.getDeclaredField("modifiers");

client/src/test/java/io/split/engine/experiments/SplitFetcherImpTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ public void testLocalHost() {
190190

191191
FetchResult fetchResult = splitFetcher.forceRefresh(fetchOptions);
192192

193-
Assert.assertEquals(1, fetchResult.getSegments().size());
193+
Assert.assertEquals(2, fetchResult.getSegments().size());
194194
}
195195

196196
@Test

client/src/test/java/io/split/engine/experiments/SplitFetcherTest.java

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,10 @@
33
import com.google.common.collect.Lists;
44
import io.split.client.interceptors.FlagSetsFilter;
55
import io.split.client.interceptors.FlagSetsFilterImpl;
6-
import io.split.storages.RuleBasedSegmentCacheProducer;
6+
import io.split.storages.*;
77
import io.split.storages.memory.InMemoryCacheImp;
8-
import io.split.storages.SegmentCache;
98
import io.split.storages.memory.RuleBasedSegmentCacheInMemoryImp;
109
import io.split.storages.memory.SegmentCacheInMemoryImpl;
11-
import io.split.storages.SplitCache;
1210
import io.split.client.dtos.*;
1311
import io.split.engine.ConditionsTestUtil;
1412
import io.split.engine.common.FetchOptions;
@@ -157,14 +155,14 @@ public void whenParserFailsWeRemoveTheExperiment() throws InterruptedException {
157155

158156
SegmentCache segmentCache = new SegmentCacheInMemoryImpl();
159157
SplitCache cache = new InMemoryCacheImp(-1, FLAG_SETS_FILTER);
160-
RuleBasedSegmentCacheProducer ruleBasedSegmentCacheProducer = new RuleBasedSegmentCacheInMemoryImp();
158+
RuleBasedSegmentCache ruleBasedSegmentCache = new RuleBasedSegmentCacheInMemoryImp();
161159
RuleBasedSegmentParser ruleBasedSegmentParser = new RuleBasedSegmentParser();
162160

163161
SegmentChangeFetcher segmentChangeFetcher = mock(SegmentChangeFetcher.class);
164-
SegmentSynchronizationTask segmentSynchronizationTask = new SegmentSynchronizationTaskImp(segmentChangeFetcher, 1,10, segmentCache, TELEMETRY_STORAGE, cache, null);
162+
SegmentSynchronizationTask segmentSynchronizationTask = new SegmentSynchronizationTaskImp(segmentChangeFetcher, 1,10, segmentCache, TELEMETRY_STORAGE, cache, null, ruleBasedSegmentCache);
165163
segmentSynchronizationTask.start();
166164
SplitFetcherImp fetcher = new SplitFetcherImp(splitChangeFetcher, new SplitParser(), cache, TELEMETRY_STORAGE, FLAG_SETS_FILTER,
167-
ruleBasedSegmentParser, ruleBasedSegmentCacheProducer);
165+
ruleBasedSegmentParser, ruleBasedSegmentCache);
168166

169167

170168
// execute the fetcher for a little bit.
@@ -182,14 +180,14 @@ public void ifThereIsAProblemTalkingToSplitChangeCountDownLatchIsNotDecremented(
182180
SplitChangeFetcher splitChangeFetcher = mock(SplitChangeFetcher.class);
183181
when(splitChangeFetcher.fetch(-1L, -1, new FetchOptions.Builder().build())).thenThrow(new RuntimeException());
184182
SegmentCache segmentCache = new SegmentCacheInMemoryImpl();
185-
RuleBasedSegmentCacheProducer ruleBasedSegmentCacheProducer = new RuleBasedSegmentCacheInMemoryImp();
183+
RuleBasedSegmentCache ruleBasedSegmentCache = new RuleBasedSegmentCacheInMemoryImp();
186184
RuleBasedSegmentParser ruleBasedSegmentParser = new RuleBasedSegmentParser();
187185

188186
SegmentChangeFetcher segmentChangeFetcher = mock(SegmentChangeFetcher.class);
189-
SegmentSynchronizationTask segmentSynchronizationTask = new SegmentSynchronizationTaskImp(segmentChangeFetcher, 1,10, segmentCache, TELEMETRY_STORAGE, cache, null);
187+
SegmentSynchronizationTask segmentSynchronizationTask = new SegmentSynchronizationTaskImp(segmentChangeFetcher, 1,10, segmentCache, TELEMETRY_STORAGE, cache, null, ruleBasedSegmentCache);
190188
segmentSynchronizationTask.start();
191189
SplitFetcherImp fetcher = new SplitFetcherImp(splitChangeFetcher, new SplitParser(), cache, TELEMETRY_STORAGE, FLAG_SETS_FILTER,
192-
ruleBasedSegmentParser, ruleBasedSegmentCacheProducer);
190+
ruleBasedSegmentParser, ruleBasedSegmentCache);
193191

194192
// execute the fetcher for a little bit.
195193
executeWaitAndTerminate(fetcher, 1, 5, TimeUnit.SECONDS);
@@ -224,11 +222,11 @@ public void addFeatureFlags() throws InterruptedException {
224222
SplitChangeFetcher splitChangeFetcher = mock(SplitChangeFetcher.class);
225223
when(splitChangeFetcher.fetch(Mockito.eq(-1L), Mockito.eq(-1L), Mockito.any())).thenReturn(validReturn);
226224

227-
RuleBasedSegmentCacheProducer ruleBasedSegmentCacheProducer = new RuleBasedSegmentCacheInMemoryImp();
225+
RuleBasedSegmentCache ruleBasedSegmentCache = new RuleBasedSegmentCacheInMemoryImp();
228226
RuleBasedSegmentParser ruleBasedSegmentParser = new RuleBasedSegmentParser();
229227
FlagSetsFilter flagSetsFilter = new FlagSetsFilterImpl(new HashSet<>(Arrays.asList("set_1", "set_2")));
230228
SplitFetcherImp fetcher = new SplitFetcherImp(splitChangeFetcher, new SplitParser(), cache, TELEMETRY_STORAGE, flagSetsFilter,
231-
ruleBasedSegmentParser, ruleBasedSegmentCacheProducer);
229+
ruleBasedSegmentParser, ruleBasedSegmentCache);
232230

233231
executeWaitAndTerminate(fetcher, 1, 5, TimeUnit.SECONDS);
234232

@@ -282,16 +280,16 @@ public void worksWithUserDefinedSegments() throws Exception {
282280
AChangePerCallSplitChangeFetcher experimentChangeFetcher = new AChangePerCallSplitChangeFetcher(segmentName);
283281
SplitCache cache = new InMemoryCacheImp(startingChangeNumber, FLAG_SETS_FILTER);
284282
SegmentCache segmentCache = new SegmentCacheInMemoryImpl();
285-
RuleBasedSegmentCacheProducer ruleBasedSegmentCacheProducer = new RuleBasedSegmentCacheInMemoryImp();
283+
RuleBasedSegmentCache ruleBasedSegmentCache = new RuleBasedSegmentCacheInMemoryImp();
286284
RuleBasedSegmentParser ruleBasedSegmentParser = new RuleBasedSegmentParser();
287285

288286
SegmentChangeFetcher segmentChangeFetcher = mock(SegmentChangeFetcher.class);
289287
SegmentChange segmentChange = getSegmentChange(0L, 0L, segmentName);
290288
when(segmentChangeFetcher.fetch(anyString(), anyLong(), any())).thenReturn(segmentChange);
291-
SegmentSynchronizationTask segmentSynchronizationTask = new SegmentSynchronizationTaskImp(segmentChangeFetcher, 1,10, segmentCache, Mockito.mock(TelemetryStorage.class), cache, null);
289+
SegmentSynchronizationTask segmentSynchronizationTask = new SegmentSynchronizationTaskImp(segmentChangeFetcher, 1,10, segmentCache, Mockito.mock(TelemetryStorage.class), cache, null, ruleBasedSegmentCache);
292290
segmentSynchronizationTask.start();
293291
SplitFetcherImp fetcher = new SplitFetcherImp(experimentChangeFetcher, new SplitParser(), cache, TELEMETRY_STORAGE, FLAG_SETS_FILTER,
294-
ruleBasedSegmentParser, ruleBasedSegmentCacheProducer);
292+
ruleBasedSegmentParser, ruleBasedSegmentCache);
295293

296294
// execute the fetcher for a little bit.
297295
executeWaitAndTerminate(fetcher, 1, 5, TimeUnit.SECONDS);

0 commit comments

Comments
 (0)