Skip to content

Commit d633f0a

Browse files
committed
Fixed segment sync
1 parent df822cd commit d633f0a

File tree

9 files changed

+93
-50
lines changed

9 files changed

+93
-50
lines changed

client/src/main/java/io/split/client/SplitFactoryImpl.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@
6969
import io.split.storages.SplitCacheProducer;
7070
import io.split.storages.RuleBasedSegmentCache;
7171
import io.split.storages.RuleBasedSegmentCacheProducer;
72+
import io.split.storages.RuleBasedSegmentCacheConsumer;
7273
import io.split.storages.enums.OperationMode;
7374
import io.split.storages.memory.InMemoryCacheImp;
7475
import io.split.storages.memory.SegmentCacheInMemoryImpl;
@@ -218,7 +219,7 @@ public SplitFactoryImpl(String apiToken, SplitClientConfig config) throws URISyn
218219
splitCache, _segmentCache, telemetryStorage, _startTime);
219220

220221
// Segments
221-
_segmentSynchronizationTaskImp = buildSegments(config, segmentCache, splitCache);
222+
_segmentSynchronizationTaskImp = buildSegments(config, segmentCache, splitCache, ruleBasedSegmentCache);
222223

223224
SplitParser splitParser = new SplitParser();
224225
RuleBasedSegmentParser ruleBasedSegmentParser = new RuleBasedSegmentParser();
@@ -420,7 +421,8 @@ protected SplitFactoryImpl(SplitClientConfig config) {
420421
segmentCache,
421422
_telemetryStorageProducer,
422423
_splitCache,
423-
config.getThreadFactory());
424+
config.getThreadFactory(),
425+
_ruleBasedSegmentCache);
424426

425427
// SplitFetcher
426428
SplitChangeFetcher splitChangeFetcher = createSplitChangeFetcher(config);
@@ -607,7 +609,7 @@ private static HttpClientBuilder setupProxy(HttpClientBuilder httpClientbuilder,
607609

608610
private SegmentSynchronizationTaskImp buildSegments(SplitClientConfig config,
609611
SegmentCacheProducer segmentCacheProducer,
610-
SplitCacheConsumer splitCacheConsumer) throws URISyntaxException {
612+
SplitCacheConsumer splitCacheConsumer, RuleBasedSegmentCacheConsumer ruleBasedSegmentCache) throws URISyntaxException {
611613
SegmentChangeFetcher segmentChangeFetcher = HttpSegmentChangeFetcher.create(_splitHttpClient, _rootTarget,
612614
_telemetryStorageProducer);
613615

@@ -617,7 +619,8 @@ private SegmentSynchronizationTaskImp buildSegments(SplitClientConfig config,
617619
segmentCacheProducer,
618620
_telemetryStorageProducer,
619621
splitCacheConsumer,
620-
config.getThreadFactory());
622+
config.getThreadFactory(),
623+
ruleBasedSegmentCache);
621624
}
622625

623626
private SplitFetcher buildSplitFetcher(SplitCacheProducer splitCacheProducer, SplitParser splitParser,

client/src/main/java/io/split/engine/segments/SegmentSynchronizationTaskImp.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import com.google.common.collect.Maps;
44
import io.split.client.utils.SplitExecutorFactory;
55
import io.split.engine.common.FetchOptions;
6+
import io.split.storages.RuleBasedSegmentCacheConsumer;
67
import io.split.storages.SegmentCacheProducer;
78
import io.split.storages.SplitCacheConsumer;
89
import io.split.telemetry.storage.TelemetryRuntimeProducer;
@@ -38,12 +39,14 @@ public class SegmentSynchronizationTaskImp implements SegmentSynchronizationTask
3839
private final ScheduledExecutorService _scheduledExecutorService;
3940
private final TelemetryRuntimeProducer _telemetryRuntimeProducer;
4041
private final SplitCacheConsumer _splitCacheConsumer;
42+
private final RuleBasedSegmentCacheConsumer _ruleBasedSegmentCacheConsumer;
4143

4244
private ScheduledFuture<?> _scheduledFuture;
4345

4446
public SegmentSynchronizationTaskImp(SegmentChangeFetcher segmentChangeFetcher, long refreshEveryNSeconds, int numThreads,
4547
SegmentCacheProducer segmentCacheProducer, TelemetryRuntimeProducer telemetryRuntimeProducer,
46-
SplitCacheConsumer splitCacheConsumer, ThreadFactory threadFactory) {
48+
SplitCacheConsumer splitCacheConsumer, ThreadFactory threadFactory,
49+
RuleBasedSegmentCacheConsumer ruleBasedSegmentCacheConsumer) {
4750
_segmentChangeFetcher = checkNotNull(segmentChangeFetcher);
4851

4952
checkArgument(refreshEveryNSeconds >= 0L);
@@ -54,6 +57,7 @@ public SegmentSynchronizationTaskImp(SegmentChangeFetcher segmentChangeFetcher,
5457
_segmentCacheProducer = checkNotNull(segmentCacheProducer);
5558
_telemetryRuntimeProducer = checkNotNull(telemetryRuntimeProducer);
5659
_splitCacheConsumer = checkNotNull(splitCacheConsumer);
60+
_ruleBasedSegmentCacheConsumer = checkNotNull(ruleBasedSegmentCacheConsumer);
5761
}
5862

5963
public void initializeSegment(String segmentName) {
@@ -137,6 +141,7 @@ public boolean isRunning() {
137141

138142
public void fetchAll(boolean addCacheHeader) {
139143
_splitCacheConsumer.getSegments().forEach(this::initialize);
144+
_ruleBasedSegmentCacheConsumer.getSegments().forEach(this::initialize);
140145
for (Map.Entry<String, SegmentFetcher> entry : _segmentFetchers.entrySet()) {
141146
SegmentFetcher fetcher = entry.getValue();
142147

@@ -155,6 +160,7 @@ public void fetchAll(boolean addCacheHeader) {
155160

156161
public boolean fetchAllSynchronous() {
157162
_splitCacheConsumer.getSegments().forEach(this::initialize);
163+
_ruleBasedSegmentCacheConsumer.getSegments().forEach(this::initialize);
158164
List<Future<Boolean>> segmentFetchExecutions = _segmentFetchers.entrySet()
159165
.stream().map(e -> _scheduledExecutorService.submit(e.getValue()::runWhitCacheHeader))
160166
.collect(Collectors.toList());

client/src/test/java/io/split/engine/common/LocalhostSynchronizerTest.java

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,7 @@
99
import io.split.engine.experiments.*;
1010
import io.split.engine.segments.SegmentChangeFetcher;
1111
import io.split.engine.segments.SegmentSynchronizationTaskImp;
12-
import io.split.storages.RuleBasedSegmentCacheProducer;
13-
import io.split.storages.SegmentCacheProducer;
14-
import io.split.storages.SplitCache;
15-
import io.split.storages.SplitCacheProducer;
12+
import io.split.storages.*;
1613
import io.split.storages.memory.InMemoryCacheImp;
1714
import io.split.storages.memory.RuleBasedSegmentCacheInMemoryImp;
1815
import io.split.storages.memory.SegmentCacheInMemoryImpl;
@@ -36,19 +33,19 @@ public void testSyncAll(){
3633
InputStreamProvider inputStreamProvider = new FileInputStreamProvider("src/test/resources/split_init.json");
3734
SplitChangeFetcher splitChangeFetcher = new JsonLocalhostSplitChangeFetcher(inputStreamProvider);
3835
SplitParser splitParser = new SplitParser();
39-
RuleBasedSegmentCacheProducer ruleBasedSegmentCacheProducer = new RuleBasedSegmentCacheInMemoryImp();
36+
RuleBasedSegmentCache ruleBasedSegmentCache = new RuleBasedSegmentCacheInMemoryImp();
4037
RuleBasedSegmentParser ruleBasedSegmentParser = new RuleBasedSegmentParser();
4138

4239
SplitFetcher splitFetcher = new SplitFetcherImp(splitChangeFetcher, splitParser, splitCacheProducer, TELEMETRY_STORAGE_NOOP, FLAG_SETS_FILTER,
43-
ruleBasedSegmentParser, ruleBasedSegmentCacheProducer);
40+
ruleBasedSegmentParser, ruleBasedSegmentCache);
4441

4542
SplitSynchronizationTask splitSynchronizationTask = new SplitSynchronizationTask(splitFetcher, splitCacheProducer, 1000L, null);
4643

4744
SegmentChangeFetcher segmentChangeFetcher = new LocalhostSegmentChangeFetcher("src/test/resources/");
4845
SegmentCacheProducer segmentCacheProducer = new SegmentCacheInMemoryImpl();
4946

5047
SegmentSynchronizationTaskImp segmentSynchronizationTaskImp = new SegmentSynchronizationTaskImp(segmentChangeFetcher, 1000, 1, segmentCacheProducer,
51-
TELEMETRY_STORAGE_NOOP, splitCacheProducer, null);
48+
TELEMETRY_STORAGE_NOOP, splitCacheProducer, null, ruleBasedSegmentCache);
5249
SplitTasks splitTasks = SplitTasks.build(splitSynchronizationTask, segmentSynchronizationTaskImp, null, null, null, null);
5350

5451
LocalhostSynchronizer localhostSynchronizer = new LocalhostSynchronizer(splitTasks, splitFetcher, false);
@@ -62,11 +59,11 @@ public void testPeriodicFetching() throws InterruptedException {
6259

6360
SplitChangeFetcher splitChangeFetcher = Mockito.mock(JsonLocalhostSplitChangeFetcher.class);
6461
SplitParser splitParser = new SplitParser();
65-
RuleBasedSegmentCacheProducer ruleBasedSegmentCacheProducer = new RuleBasedSegmentCacheInMemoryImp();
62+
RuleBasedSegmentCache ruleBasedSegmentCache = new RuleBasedSegmentCacheInMemoryImp();
6663
RuleBasedSegmentParser ruleBasedSegmentParser = new RuleBasedSegmentParser();
6764

6865
SplitFetcher splitFetcher = new SplitFetcherImp(splitChangeFetcher, splitParser, splitCacheProducer, TELEMETRY_STORAGE_NOOP, FLAG_SETS_FILTER,
69-
ruleBasedSegmentParser, ruleBasedSegmentCacheProducer);
66+
ruleBasedSegmentParser, ruleBasedSegmentCache);
7067

7168
SplitSynchronizationTask splitSynchronizationTask = new SplitSynchronizationTask(splitFetcher, splitCacheProducer, 1000L, null);
7269
FetchOptions fetchOptions = new FetchOptions.Builder().build();
@@ -75,7 +72,7 @@ public void testPeriodicFetching() throws InterruptedException {
7572
SegmentCacheProducer segmentCacheProducer = new SegmentCacheInMemoryImpl();
7673

7774
SegmentSynchronizationTaskImp segmentSynchronizationTaskImp = new SegmentSynchronizationTaskImp(segmentChangeFetcher, 1000, 1, segmentCacheProducer,
78-
TELEMETRY_STORAGE_NOOP, splitCacheProducer, null);
75+
TELEMETRY_STORAGE_NOOP, splitCacheProducer, null, ruleBasedSegmentCache);
7976

8077
SplitTasks splitTasks = SplitTasks.build(splitSynchronizationTask, segmentSynchronizationTaskImp, null, null, null, null);
8178
LocalhostSynchronizer localhostSynchronizer = new LocalhostSynchronizer(splitTasks, splitFetcher, true);

client/src/test/java/io/split/engine/common/SynchronizerTest.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,7 @@
77
import io.split.client.interceptors.FlagSetsFilterImpl;
88
import io.split.engine.segments.SegmentChangeFetcher;
99
import io.split.engine.segments.SegmentSynchronizationTaskImp;
10-
import io.split.storages.SegmentCache;
11-
import io.split.storages.SegmentCacheProducer;
12-
import io.split.storages.SplitCache;
13-
import io.split.storages.SplitCacheConsumer;
14-
import io.split.storages.SplitCacheProducer;
15-
import io.split.storages.RuleBasedSegmentCacheProducer;
10+
import io.split.storages.*;
1611
import io.split.storages.memory.InMemoryCacheImp;
1712
import io.split.engine.experiments.FetchResult;
1813
import io.split.engine.experiments.SplitFetcherImp;
@@ -88,7 +83,7 @@ public void syncAll() throws InterruptedException {
8883
public void testSyncAllSegments() throws InterruptedException, NoSuchFieldException, IllegalAccessException {
8984
SegmentSynchronizationTask segmentSynchronizationTask = new SegmentSynchronizationTaskImp(Mockito.mock(SegmentChangeFetcher.class),
9085
20L, 1, _segmentCacheProducer, Mockito.mock(TelemetryRuntimeProducer.class),
91-
Mockito.mock(SplitCacheConsumer.class), null);
86+
Mockito.mock(SplitCacheConsumer.class), null, Mockito.mock(RuleBasedSegmentCache.class));
9287
Field synchronizerSegmentFetcher = SynchronizerImp.class.getDeclaredField("_segmentSynchronizationTaskImp");
9388
synchronizerSegmentFetcher.setAccessible(true);
9489
Field modifiersField = Field.class.getDeclaredField("modifiers");

client/src/test/java/io/split/engine/experiments/SplitFetcherImpTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ public void testLocalHost() {
190190

191191
FetchResult fetchResult = splitFetcher.forceRefresh(fetchOptions);
192192

193-
Assert.assertEquals(1, fetchResult.getSegments().size());
193+
Assert.assertEquals(2, fetchResult.getSegments().size());
194194
}
195195

196196
@Test

client/src/test/java/io/split/engine/experiments/SplitFetcherTest.java

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,10 @@
33
import com.google.common.collect.Lists;
44
import io.split.client.interceptors.FlagSetsFilter;
55
import io.split.client.interceptors.FlagSetsFilterImpl;
6-
import io.split.storages.RuleBasedSegmentCacheProducer;
6+
import io.split.storages.*;
77
import io.split.storages.memory.InMemoryCacheImp;
8-
import io.split.storages.SegmentCache;
98
import io.split.storages.memory.RuleBasedSegmentCacheInMemoryImp;
109
import io.split.storages.memory.SegmentCacheInMemoryImpl;
11-
import io.split.storages.SplitCache;
1210
import io.split.client.dtos.*;
1311
import io.split.engine.ConditionsTestUtil;
1412
import io.split.engine.common.FetchOptions;
@@ -157,14 +155,14 @@ public void whenParserFailsWeRemoveTheExperiment() throws InterruptedException {
157155

158156
SegmentCache segmentCache = new SegmentCacheInMemoryImpl();
159157
SplitCache cache = new InMemoryCacheImp(-1, FLAG_SETS_FILTER);
160-
RuleBasedSegmentCacheProducer ruleBasedSegmentCacheProducer = new RuleBasedSegmentCacheInMemoryImp();
158+
RuleBasedSegmentCache ruleBasedSegmentCache = new RuleBasedSegmentCacheInMemoryImp();
161159
RuleBasedSegmentParser ruleBasedSegmentParser = new RuleBasedSegmentParser();
162160

163161
SegmentChangeFetcher segmentChangeFetcher = mock(SegmentChangeFetcher.class);
164-
SegmentSynchronizationTask segmentSynchronizationTask = new SegmentSynchronizationTaskImp(segmentChangeFetcher, 1,10, segmentCache, TELEMETRY_STORAGE, cache, null);
162+
SegmentSynchronizationTask segmentSynchronizationTask = new SegmentSynchronizationTaskImp(segmentChangeFetcher, 1,10, segmentCache, TELEMETRY_STORAGE, cache, null, ruleBasedSegmentCache);
165163
segmentSynchronizationTask.start();
166164
SplitFetcherImp fetcher = new SplitFetcherImp(splitChangeFetcher, new SplitParser(), cache, TELEMETRY_STORAGE, FLAG_SETS_FILTER,
167-
ruleBasedSegmentParser, ruleBasedSegmentCacheProducer);
165+
ruleBasedSegmentParser, ruleBasedSegmentCache);
168166

169167

170168
// execute the fetcher for a little bit.
@@ -182,14 +180,14 @@ public void ifThereIsAProblemTalkingToSplitChangeCountDownLatchIsNotDecremented(
182180
SplitChangeFetcher splitChangeFetcher = mock(SplitChangeFetcher.class);
183181
when(splitChangeFetcher.fetch(-1L, -1, new FetchOptions.Builder().build())).thenThrow(new RuntimeException());
184182
SegmentCache segmentCache = new SegmentCacheInMemoryImpl();
185-
RuleBasedSegmentCacheProducer ruleBasedSegmentCacheProducer = new RuleBasedSegmentCacheInMemoryImp();
183+
RuleBasedSegmentCache ruleBasedSegmentCache = new RuleBasedSegmentCacheInMemoryImp();
186184
RuleBasedSegmentParser ruleBasedSegmentParser = new RuleBasedSegmentParser();
187185

188186
SegmentChangeFetcher segmentChangeFetcher = mock(SegmentChangeFetcher.class);
189-
SegmentSynchronizationTask segmentSynchronizationTask = new SegmentSynchronizationTaskImp(segmentChangeFetcher, 1,10, segmentCache, TELEMETRY_STORAGE, cache, null);
187+
SegmentSynchronizationTask segmentSynchronizationTask = new SegmentSynchronizationTaskImp(segmentChangeFetcher, 1,10, segmentCache, TELEMETRY_STORAGE, cache, null, ruleBasedSegmentCache);
190188
segmentSynchronizationTask.start();
191189
SplitFetcherImp fetcher = new SplitFetcherImp(splitChangeFetcher, new SplitParser(), cache, TELEMETRY_STORAGE, FLAG_SETS_FILTER,
192-
ruleBasedSegmentParser, ruleBasedSegmentCacheProducer);
190+
ruleBasedSegmentParser, ruleBasedSegmentCache);
193191

194192
// execute the fetcher for a little bit.
195193
executeWaitAndTerminate(fetcher, 1, 5, TimeUnit.SECONDS);
@@ -224,11 +222,11 @@ public void addFeatureFlags() throws InterruptedException {
224222
SplitChangeFetcher splitChangeFetcher = mock(SplitChangeFetcher.class);
225223
when(splitChangeFetcher.fetch(Mockito.eq(-1L), Mockito.eq(-1L), Mockito.any())).thenReturn(validReturn);
226224

227-
RuleBasedSegmentCacheProducer ruleBasedSegmentCacheProducer = new RuleBasedSegmentCacheInMemoryImp();
225+
RuleBasedSegmentCache ruleBasedSegmentCache = new RuleBasedSegmentCacheInMemoryImp();
228226
RuleBasedSegmentParser ruleBasedSegmentParser = new RuleBasedSegmentParser();
229227
FlagSetsFilter flagSetsFilter = new FlagSetsFilterImpl(new HashSet<>(Arrays.asList("set_1", "set_2")));
230228
SplitFetcherImp fetcher = new SplitFetcherImp(splitChangeFetcher, new SplitParser(), cache, TELEMETRY_STORAGE, flagSetsFilter,
231-
ruleBasedSegmentParser, ruleBasedSegmentCacheProducer);
229+
ruleBasedSegmentParser, ruleBasedSegmentCache);
232230

233231
executeWaitAndTerminate(fetcher, 1, 5, TimeUnit.SECONDS);
234232

@@ -282,16 +280,16 @@ public void worksWithUserDefinedSegments() throws Exception {
282280
AChangePerCallSplitChangeFetcher experimentChangeFetcher = new AChangePerCallSplitChangeFetcher(segmentName);
283281
SplitCache cache = new InMemoryCacheImp(startingChangeNumber, FLAG_SETS_FILTER);
284282
SegmentCache segmentCache = new SegmentCacheInMemoryImpl();
285-
RuleBasedSegmentCacheProducer ruleBasedSegmentCacheProducer = new RuleBasedSegmentCacheInMemoryImp();
283+
RuleBasedSegmentCache ruleBasedSegmentCache = new RuleBasedSegmentCacheInMemoryImp();
286284
RuleBasedSegmentParser ruleBasedSegmentParser = new RuleBasedSegmentParser();
287285

288286
SegmentChangeFetcher segmentChangeFetcher = mock(SegmentChangeFetcher.class);
289287
SegmentChange segmentChange = getSegmentChange(0L, 0L, segmentName);
290288
when(segmentChangeFetcher.fetch(anyString(), anyLong(), any())).thenReturn(segmentChange);
291-
SegmentSynchronizationTask segmentSynchronizationTask = new SegmentSynchronizationTaskImp(segmentChangeFetcher, 1,10, segmentCache, Mockito.mock(TelemetryStorage.class), cache, null);
289+
SegmentSynchronizationTask segmentSynchronizationTask = new SegmentSynchronizationTaskImp(segmentChangeFetcher, 1,10, segmentCache, Mockito.mock(TelemetryStorage.class), cache, null, ruleBasedSegmentCache);
292290
segmentSynchronizationTask.start();
293291
SplitFetcherImp fetcher = new SplitFetcherImp(experimentChangeFetcher, new SplitParser(), cache, TELEMETRY_STORAGE, FLAG_SETS_FILTER,
294-
ruleBasedSegmentParser, ruleBasedSegmentCacheProducer);
292+
ruleBasedSegmentParser, ruleBasedSegmentCache);
295293

296294
// execute the fetcher for a little bit.
297295
executeWaitAndTerminate(fetcher, 1, 5, TimeUnit.SECONDS);

0 commit comments

Comments
 (0)