Skip to content

Commit 82fb908

Browse files
committed
simple wiremock test
1 parent 3d232fa commit 82fb908

File tree

3 files changed

+85
-9
lines changed

3 files changed

+85
-9
lines changed

analytics/pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,14 @@
7272
<groupId>org.wiremock</groupId>
7373
<artifactId>wiremock-standalone</artifactId>
7474
<version>3.2.0</version>
75+
<scope>test</scope>
7576
</dependency>
77+
<dependency>
78+
<groupId>org.awaitility</groupId>
79+
<artifactId>awaitility</artifactId>
80+
<version>4.2.2</version>
81+
<scope>test</scope>
82+
</dependency>
7683
</dependencies>
7784

7885
<build>

analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import java.util.concurrent.ThreadFactory;
2929
import java.util.concurrent.TimeUnit;
3030
import java.util.concurrent.atomic.AtomicBoolean;
31-
import java.util.concurrent.atomic.AtomicInteger;
3231
import okhttp3.HttpUrl;
3332
import retrofit2.Call;
3433
import retrofit2.Response;
@@ -129,6 +128,7 @@ public AnalyticsClient(
129128
this.isShutDown = isShutDown;
130129
this.writeKey = writeKey;
131130
this.gsonInstance = gsonInstance;
131+
looperThread.start();
132132
}
133133

134134
public int messageSizeInBytes(Message message) {
@@ -141,7 +141,10 @@ public boolean offer(Message message) {
141141
return messageQueue.offer(message);
142142
}
143143

144-
public void enqueue(Message message) {}
144+
public void enqueue(Message message) {
145+
146+
enqueueSend(message);
147+
}
145148

146149
public void enqueueSend(Message message) {
147150
if (isShutDown.get()) {
@@ -289,7 +292,7 @@ static BatchUploadTask create(AnalyticsClient client, Batch batch, int maxRetrie
289292
private void notifyCallbacksWithException(Batch batch, Exception exception) {
290293
for (Message message : batch.batch()) {
291294
for (Callback callback : client.callbacks) {
292-
callback.failure(message, exception);
295+
callback.failure(message, exception);
293296
}
294297
}
295298
}

analytics/src/test/java/com/segment/analytics/SegmentTest.java

Lines changed: 72 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,34 +7,100 @@
77
import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig;
88

99
import com.github.tomakehurst.wiremock.junit.WireMockRule;
10+
import com.github.tomakehurst.wiremock.stubbing.ServeEvent;
11+
import com.google.gson.Gson;
12+
import com.google.gson.GsonBuilder;
13+
import com.segment.analytics.gson.AutoValueAdapterFactory;
14+
import com.segment.analytics.gson.ISO8601DateAdapter;
1015
import com.segment.analytics.messages.TrackMessage;
11-
import java.util.UUID;
16+
import java.util.ArrayList;
17+
import java.util.Arrays;
18+
import java.util.Date;
19+
import java.util.HashSet;
20+
import java.util.Iterator;
21+
import java.util.List;
22+
import java.util.concurrent.TimeUnit;
23+
import org.awaitility.Awaitility;
1224
import org.junit.Before;
1325
import org.junit.Rule;
1426
import org.junit.Test;
27+
import wiremock.com.fasterxml.jackson.core.JsonProcessingException;
28+
import wiremock.com.fasterxml.jackson.databind.JsonNode;
29+
import wiremock.com.fasterxml.jackson.databind.ObjectMapper;
1530

1631
public class SegmentTest {
1732

1833
@Rule
19-
public WireMockRule wireMockRule = new WireMockRule(wireMockConfig().dynamicPort(), false);
34+
public WireMockRule wireMockRule =
35+
new WireMockRule(wireMockConfig().dynamicPort().gzipDisabled(true), false);
2036

2137
Analytics analytics;
2238

39+
GsonBuilder gsonBuilder = new GsonBuilder()
40+
.registerTypeAdapterFactory(new AutoValueAdapterFactory())
41+
.registerTypeAdapter(Date.class, new ISO8601DateAdapter());
42+
43+
Gson gson = gsonBuilder.create();
44+
2345
@Before
2446
public void confWireMock() {
2547
stubFor(post(urlEqualTo("/v1/import/")).willReturn(okJson("{\"success\": \"true\"}")));
2648

2749
analytics = Analytics.builder("write-key")
2850
.endpoint(wireMockRule.baseUrl())
51+
.flushInterval(1, TimeUnit.SECONDS)
52+
.queueCapacity(500)
2953
// callback
3054
// http client
3155
.build();
3256
}
3357

3458
@Test
35-
public void test() {
36-
analytics.enqueue(TrackMessage.builder("my-track")
37-
.messageId(UUID.randomUUID().toString())
38-
.userId("userId"));
59+
public void test() throws Throwable {
60+
analytics.enqueue(TrackMessage.builder("my-track").messageId("m1").userId("userId"));
61+
analytics.enqueue(TrackMessage.builder("my-track").messageId("m2").userId("userId"));
62+
63+
Awaitility.await().until(() -> sentMessagesEqualsTo("m1", "m2"));
64+
}
65+
66+
@Test
67+
public void testMore() throws Throwable {
68+
System.err.println("wm at " + wireMockRule.baseUrl());
69+
int num = 100_000;
70+
String[] expectedIds = new String[num];
71+
for (int i = 0; i < num; i++) {
72+
String id = "m" + i;
73+
expectedIds[i] = id;
74+
analytics.enqueue(TrackMessage.builder("my-track").messageId(id).userId("userId"));
75+
}
76+
77+
Awaitility.await().atMost(1, TimeUnit.MINUTES).until(() -> sentMessagesEqualsTo(expectedIds));
78+
}
79+
80+
private static final ObjectMapper OM = new ObjectMapper();
81+
82+
private boolean sentMessagesEqualsTo(String... msgIds) {
83+
return new HashSet<>(sentMessages()).equals(new HashSet<>(Arrays.asList(msgIds)));
84+
}
85+
86+
private List<String> sentMessages() {
87+
List<String> messageIds = new ArrayList<>();
88+
for (ServeEvent event : wireMockRule.getAllServeEvents()) {
89+
JsonNode batch;
90+
try {
91+
JsonNode json = OM.readTree(event.getRequest().getBodyAsString());
92+
batch = json.get("batch");
93+
if (batch == null) {
94+
continue;
95+
}
96+
} catch (JsonProcessingException e) {
97+
continue;
98+
}
99+
Iterator<JsonNode> msgs = batch.elements();
100+
while (msgs.hasNext()) {
101+
messageIds.add(msgs.next().get("messageId").asText());
102+
}
103+
}
104+
return messageIds;
39105
}
40106
}

0 commit comments

Comments
 (0)