Skip to content

Commit ab780a4

Browse files
committed
values
1 parent 85ef909 commit ab780a4

File tree

3 files changed

+35
-23
lines changed

3 files changed

+35
-23
lines changed

analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -119,14 +119,17 @@ public AnalyticsClient(
119119
looperThread.start();
120120

121121
CircuitBreaker<Response<UploadResponse>> breaker = CircuitBreaker.<Response<UploadResponse>>builder()
122-
// 5 failure in 2 minute open the circuit
123-
.withFailureThreshold(5, Duration.ofMinutes(2))
122+
// 10 failure in 2 minute open the circuit
123+
.withFailureThreshold(10, Duration.ofMinutes(2))
124124
// once open wait 30 seconds to be half-open
125125
.withDelay(Duration.ofSeconds(30))
126126
// after 1 success the circuit is closed
127127
.withSuccessThreshold(1)
128128
// 5xx or rate limit is an error
129129
.handleResultIf(response -> is5xx(response.code()) || response.code() == 429)
130+
.onOpen(el -> System.err.println("***\nOPEN\n***"))
131+
.onHalfOpen(el -> System.err.println("***\nHALF OPEN\n***"))
132+
.onClose(el -> System.err.println("***\nCLOSED\n***"))
130133
.build();
131134

132135
RetryPolicy<Response<UploadResponse>> retry = RetryPolicy.<Response<UploadResponse>>builder()
@@ -137,12 +140,6 @@ public AnalyticsClient(
137140
.handle(IOException.class)
138141
// retry on 5xx or rate limit
139142
.handleResultIf(response -> is5xx(response.code()) || response.code() == 429)
140-
.onRetriesExceeded(context -> {
141-
throw new RuntimeException("retries");
142-
})
143-
.onAbort(context -> {
144-
throw new RuntimeException("aborted");
145-
})
146143
.build();
147144

148145
this.failsafe = Failsafe.with(retry, breaker).with(networkExecutor);
@@ -167,6 +164,9 @@ public void enqueue(Message message) {
167164
if (!messageQueue.offer(message)) {
168165
handleError(message);
169166
}
167+
else {
168+
System.err.println("enqueued " + message.messageId());
169+
}
170170
}
171171

172172

analytics/src/main/java/com/segment/analytics/internal/FallbackAppender.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ public void close() {
6969
// block !!!
7070
public void add(Message msg) {
7171
try {
72+
System.err.println("failed " + msg.messageId());
7273
queue.put(msg);
7374
} catch (InterruptedException e) {
7475
Thread.currentThread().interrupt();
@@ -105,17 +106,15 @@ public void run() {
105106

106107
// FIXME batch
107108
while (!msgs.isEmpty()) {
108-
int reenqueued = 0;
109109
boolean canEnqueue = true;
110110
for (int i = msgs.size() - 1; canEnqueue && i >= 0; i--) {
111111
Message msg = msgs.get(i);
112112
canEnqueue = client.offer(msg);
113113
if (canEnqueue) {
114114
msgs.remove(i);
115-
reenqueued++;
115+
System.err.println("reenqueued " + msg.messageId());
116116
}
117117
}
118-
System.err.println("reenqueued " + reenqueued);
119118
try {
120119
Thread.sleep(1_000);
121120
} catch (InterruptedException e) {

analytics/src/test/java/com/segment/analytics/SegmentTest.java

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,10 @@
1616
import java.util.Iterator;
1717
import java.util.List;
1818
import java.util.Set;
19+
import java.util.concurrent.ExecutorService;
20+
import java.util.concurrent.Executors;
1921
import java.util.concurrent.TimeUnit;
22+
import java.util.concurrent.atomic.AtomicInteger;
2023
import org.awaitility.Awaitility;
2124
import org.junit.After;
2225
import org.junit.Before;
@@ -64,23 +67,30 @@ public void test() throws Throwable {
6467
.willReturn(
6568
WireMock.aResponse().withStatus(503).withBody("fail").withUniformRandomDelay(100, 1_000)));
6669

70+
int requestsPerSecond = 10;
71+
int numClients = 10;
72+
int timeToRun = 90_000;
73+
int timeToRestore = 30_000;
74+
6775
long start = System.currentTimeMillis();
6876
boolean upAgain = false;
69-
int id = 0;
77+
final AtomicInteger id = new AtomicInteger(0);
7078
List<String> ids = new ArrayList<>();
71-
RateLimiter rate = RateLimiter.create(5);
72-
while (System.currentTimeMillis() - start < 60_000) {
79+
80+
RateLimiter rate = RateLimiter.create(requestsPerSecond);
81+
ExecutorService exec = Executors.newWorkStealingPool(numClients);
82+
83+
while (System.currentTimeMillis() - start < timeToRun) {
7384
if (rate.tryAcquire()) {
74-
String msgid = "m" + id++;
75-
ids.add(msgid);
76-
analytics.enqueue(
77-
TrackMessage.builder("my-track").messageId(msgid).userId("userId"));
78-
System.err.println("enqued " + msgid);
85+
exec.submit(() -> {
86+
String msgid = "m" + id.getAndIncrement();
87+
ids.add(msgid);
88+
analytics.enqueue(
89+
TrackMessage.builder("my-track").messageId(msgid).userId("userId"));
90+
});
7991
}
80-
81-
Thread.sleep(50);
82-
83-
if (!upAgain && System.currentTimeMillis() - start > 20_000) {
92+
Thread.sleep(1);
93+
if (!upAgain && System.currentTimeMillis() - start > timeToRestore) {
8494
upAgain = true;
8595
stubFor(post(urlEqualTo("/v1/import/"))
8696
.willReturn(okJson("{\"success\": \"true\"}").withUniformRandomDelay(100, 1_000)));
@@ -92,6 +102,9 @@ public void test() throws Throwable {
92102
.atMost(10, TimeUnit.MINUTES)
93103
.pollInterval(1, TimeUnit.SECONDS)
94104
.until(() -> sentMessagesEqualsTo(ids.toArray(new String[ids.size()])));
105+
106+
exec.shutdownNow();
107+
exec.awaitTermination(10, TimeUnit.SECONDS);
95108
}
96109

97110
private static final ObjectMapper OM = new ObjectMapper();

0 commit comments

Comments
 (0)