Skip to content

Commit 3d232fa

Browse files
committed
remove
1 parent 8d81420 commit 3d232fa

File tree

4 files changed

+35
-142
lines changed

4 files changed

+35
-142
lines changed

analytics/src/main/java/com/segment/analytics/Analytics.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -90,11 +90,6 @@ public boolean offer(MessageBuilder builder) {
9090
return client.offer(message);
9191
}
9292

93-
/** Flush events in the message queue. */
94-
public void flush() {
95-
client.flush();
96-
}
97-
9893
/** Stops this instance from processing further requests. */
9994
public void shutdown() {
10095
client.shutdown();

analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java

Lines changed: 29 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,7 @@
2424
import java.util.UUID;
2525
import java.util.concurrent.BlockingQueue;
2626
import java.util.concurrent.ExecutorService;
27-
import java.util.concurrent.Executors;
2827
import java.util.concurrent.LinkedBlockingQueue;
29-
import java.util.concurrent.ScheduledExecutorService;
3028
import java.util.concurrent.ThreadFactory;
3129
import java.util.concurrent.TimeUnit;
3230
import java.util.concurrent.atomic.AtomicBoolean;
@@ -54,17 +52,17 @@ public class AnalyticsClient {
5452
}
5553

5654
private final BlockingQueue<Message> messageQueue;
55+
private final BlockingQueue<Message> pendingQueue;
5756
private final HttpUrl uploadUrl;
5857
private final SegmentService service;
5958
private final int size;
59+
private final long flushIntervalInMillis;
6060
private final int maximumRetries;
6161
private final int maximumQueueByteSize;
62-
private int currentQueueSizeInBytes;
6362
private final Log log;
6463
private final List<Callback> callbacks;
6564
private final ExecutorService networkExecutor;
66-
private final ExecutorService looperExecutor;
67-
private final ScheduledExecutorService flushScheduler;
65+
private final Thread looperThread;
6866
private final AtomicBoolean isShutDown;
6967
private final String writeKey;
7068

@@ -83,6 +81,7 @@ public static AnalyticsClient create(
8381
String writeKey,
8482
Gson gsonInstance) {
8583
return new AnalyticsClient(
84+
new LinkedBlockingQueue<Message>(queueCapacity),
8685
new LinkedBlockingQueue<Message>(queueCapacity),
8786
uploadUrl,
8887
segmentService,
@@ -101,6 +100,7 @@ public static AnalyticsClient create(
101100

102101
public AnalyticsClient(
103102
BlockingQueue<Message> messageQueue,
103+
BlockingQueue<Message> pendingQueue,
104104
HttpUrl uploadUrl,
105105
SegmentService service,
106106
int maxQueueSize,
@@ -115,34 +115,20 @@ public AnalyticsClient(
115115
String writeKey,
116116
Gson gsonInstance) {
117117
this.messageQueue = messageQueue;
118+
this.pendingQueue = pendingQueue;
118119
this.uploadUrl = uploadUrl;
119120
this.service = service;
120121
this.size = maxQueueSize;
122+
this.flushIntervalInMillis = flushIntervalInMillis;
121123
this.maximumRetries = maximumRetries;
122124
this.maximumQueueByteSize = maximumQueueSizeInBytes;
123125
this.log = log;
124126
this.callbacks = callbacks;
125-
this.looperExecutor = Executors.newSingleThreadExecutor(threadFactory);
127+
this.looperThread = threadFactory.newThread(new Looper());
126128
this.networkExecutor = networkExecutor;
127129
this.isShutDown = isShutDown;
128130
this.writeKey = writeKey;
129131
this.gsonInstance = gsonInstance;
130-
131-
this.currentQueueSizeInBytes = 0;
132-
133-
if (!isShutDown.get()) looperExecutor.submit(new Looper());
134-
135-
flushScheduler = Executors.newScheduledThreadPool(1, threadFactory);
136-
flushScheduler.scheduleAtFixedRate(
137-
new Runnable() {
138-
@Override
139-
public void run() {
140-
flush();
141-
}
142-
},
143-
flushIntervalInMillis,
144-
flushIntervalInMillis,
145-
TimeUnit.MILLISECONDS);
146132
}
147133

148134
public int messageSizeInBytes(Message message) {
@@ -151,74 +137,33 @@ public int messageSizeInBytes(Message message) {
151137
return stringifiedMessage.getBytes(ENCODING).length;
152138
}
153139

154-
private Boolean isBackPressuredAfterSize(int incomingSize) {
155-
int POISON_BYTE_SIZE = messageSizeInBytes(FlushMessage.POISON);
156-
int sizeAfterAdd = this.currentQueueSizeInBytes + incomingSize + POISON_BYTE_SIZE;
157-
// Leave a 10% buffer since the unsynchronized enqueue could add multiple at a time
158-
return sizeAfterAdd >= Math.min(this.maximumQueueByteSize, BATCH_MAX_SIZE) * 0.9;
159-
}
160-
161140
public boolean offer(Message message) {
162141
return messageQueue.offer(message);
163142
}
164143

165-
public void enqueue(Message message) {
166-
if (message != StopMessage.STOP && isShutDown.get()) {
144+
public void enqueue(Message message) {}
145+
146+
public void enqueueSend(Message message) {
147+
if (isShutDown.get()) {
167148
log.print(ERROR, "Attempt to enqueue a message when shutdown has been called %s.", message);
168149
return;
169150
}
170151

171152
try {
172-
// @jorgen25 message here could be regular msg, POISON or STOP. Only do regular logic if its
173-
// valid message
174-
if (message != StopMessage.STOP && message != FlushMessage.POISON) {
175-
int messageByteSize = messageSizeInBytes(message);
176-
177-
// @jorgen25 check if message is below 32kb limit for individual messages, no need to check
178-
// for extra characters
179-
if (messageByteSize <= MSG_MAX_SIZE) {
180-
if (isBackPressuredAfterSize(messageByteSize)) {
181-
this.currentQueueSizeInBytes = messageByteSize;
182-
messageQueue.put(FlushMessage.POISON);
183-
messageQueue.put(message);
184-
185-
log.print(VERBOSE, "Maximum storage size has been hit Flushing...");
186-
} else {
187-
messageQueue.put(message);
188-
this.currentQueueSizeInBytes += messageByteSize;
189-
}
190-
} else {
191-
log.print(
192-
ERROR, "Message was above individual limit. MessageId: %s", message.messageId());
193-
throw new IllegalArgumentException(
194-
"Message was above individual limit. MessageId: " + message.messageId());
195-
}
196-
} else {
197-
messageQueue.put(message);
198-
}
153+
messageQueue.put(message);
199154
} catch (InterruptedException e) {
200155
log.print(ERROR, e, "Interrupted while adding message %s.", message);
201156
Thread.currentThread().interrupt();
202157
}
203158
}
204159

205-
public void flush() {
206-
if (!isShutDown.get()) {
207-
enqueue(FlushMessage.POISON);
208-
}
209-
}
210-
211160
public void shutdown() {
212161
if (isShutDown.compareAndSet(false, true)) {
213162
final long start = System.currentTimeMillis();
214163

215164
// first let's tell the system to stop
216-
enqueue(StopMessage.STOP);
217-
218-
// we can shutdown the flush scheduler without worrying
219-
flushScheduler.shutdownNow();
165+
looperThread.interrupt();
220166

221-
shutdownAndWait(looperExecutor, "looper");
222167
shutdownAndWait(networkExecutor, "network");
223168

224169
log.print(
@@ -247,49 +192,44 @@ public void shutdownAndWait(ExecutorService executor, String name) {
247192
* messages, it triggers a flush.
248193
*/
249194
class Looper implements Runnable {
250-
private boolean stop;
251195

252196
public Looper() {
253-
this.stop = false;
254197
}
255198

256199
@Override
257200
public void run() {
258201
LinkedList<Message> messages = new LinkedList<>();
259-
AtomicInteger currentBatchSize = new AtomicInteger();
202+
int currentBatchSize = 0;
260203
boolean batchSizeLimitReached = false;
261204
int contextSize = gsonInstance.toJson(CONTEXT).getBytes(ENCODING).length;
262205
try {
263-
while (!stop) {
264-
Message message = messageQueue.take();
265-
266-
if (message == StopMessage.STOP) {
267-
log.print(VERBOSE, "Stopping the Looper");
268-
stop = true;
269-
} else if (message == FlushMessage.POISON) {
270-
if (!messages.isEmpty()) {
271-
log.print(VERBOSE, "Flushing messages.");
272-
}
273-
} else {
206+
while (!Thread.currentThread().isInterrupted()) {
207+
Message message = messageQueue.poll(flushIntervalInMillis, TimeUnit.MILLISECONDS);
208+
209+
if (message != null) {
274210
// we do +1 because we are accounting for this new message we just took from the queue
275211
// which is not in list yet
276212
// need to check if this message is going to make us go over the limit considering
277213
// default batch size as well
278214
int defaultBatchSize =
279215
BatchUtility.getBatchDefaultSize(contextSize, messages.size() + 1);
280216
int msgSize = messageSizeInBytes(message);
281-
if (currentBatchSize.get() + msgSize + defaultBatchSize <= BATCH_MAX_SIZE) {
217+
if (currentBatchSize + msgSize + defaultBatchSize <= BATCH_MAX_SIZE) {
282218
messages.add(message);
283-
currentBatchSize.addAndGet(msgSize);
219+
currentBatchSize+=msgSize;
284220
} else {
285221
// put message that did not make the cut this time back on the queue, we already took
286222
// this message if we dont put it back its lost
287223
// we take care of that after submitting the batch
288224
batchSizeLimitReached = true;
289225
}
290226
}
227+
228+
if (messages.isEmpty()) {
229+
continue;
230+
}
291231

292-
Boolean isBlockingSignal = message == FlushMessage.POISON || message == StopMessage.STOP;
232+
Boolean isBlockingSignal = message == null;
293233
Boolean isOverflow = messages.size() >= size;
294234

295235
if (!messages.isEmpty() && (isOverflow || isBlockingSignal || batchSizeLimitReached)) {
@@ -302,7 +242,7 @@ public void run() {
302242
networkExecutor.submit(
303243
BatchUploadTask.create(AnalyticsClient.this, batch, maximumRetries));
304244

305-
currentBatchSize.set(0);
245+
currentBatchSize=0;
306246
messages.clear();
307247
if (batchSizeLimitReached) {
308248
// If this is true that means the last message that would make us go over the limit
@@ -315,8 +255,9 @@ public void run() {
315255
}
316256
} catch (InterruptedException e) {
317257
log.print(DEBUG, "Looper interrupted while polling for messages.");
318-
Thread.currentThread().interrupt();
258+
Thread.currentThread().interrupt(); //XXX
319259
}
260+
// SEND pending
320261
log.print(VERBOSE, "Looper stopped");
321262
}
322263
}

analytics/src/test/java/com/segment/analytics/AnalyticsTest.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -91,13 +91,6 @@ public void shutdownIsDispatched() {
9191
verify(client).shutdown();
9292
}
9393

94-
@Test
95-
public void flushIsDispatched() {
96-
analytics.flush();
97-
98-
verify(client).flush();
99-
}
100-
10194
@Test
10295
public void offerIsDispatched(MessageBuilderTest builder) {
10396
MessageBuilder messageBuilder = builder.get().userId("dummy");

analytics/src/test/java/com/segment/analytics/internal/AnalyticsClientTest.java

Lines changed: 6 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ public class AnalyticsClientTest {
7575

7676
ThreadFactory threadFactory;
7777
@Spy LinkedBlockingQueue<Message> messageQueue;
78+
@Spy LinkedBlockingQueue<Message> pendingQueue;
7879
@Mock SegmentService segmentService;
7980
@Mock ExecutorService networkExecutor;
8081
@Mock Callback callback;
@@ -95,6 +96,7 @@ public void setUp() {
9596
AnalyticsClient newClient() {
9697
return new AnalyticsClient(
9798
messageQueue,
99+
pendingQueue,
98100
null,
99101
segmentService,
100102
50,
@@ -131,15 +133,6 @@ public void shutdown() throws InterruptedException {
131133
verify(networkExecutor).awaitTermination(1, TimeUnit.SECONDS);
132134
}
133135

134-
@Test
135-
public void flushInsertsPoison() throws InterruptedException {
136-
AnalyticsClient client = newClient();
137-
138-
client.flush();
139-
140-
verify(messageQueue).put(FlushMessage.POISON);
141-
}
142-
143136
/** Wait until the queue is drained. */
144137
static void wait(Queue<?> queue) {
145138
// noinspection StatementWithEmptyBody
@@ -198,21 +191,6 @@ private static String generateDataOfSizeSpecialChars(
198191
return builder.toString();
199192
}
200193

201-
@Test
202-
public void flushSubmitsToExecutor() {
203-
messageQueue = new LinkedBlockingQueue<>();
204-
AnalyticsClient client = newClient();
205-
206-
TrackMessage first = TrackMessage.builder("foo").userId("bar").build();
207-
TrackMessage second = TrackMessage.builder("qaz").userId("qux").build();
208-
client.enqueue(first);
209-
client.enqueue(second);
210-
client.flush();
211-
wait(messageQueue);
212-
213-
assertThat(captureBatch(networkExecutor).batch()).containsExactly(first, second);
214-
}
215-
216194
@Test
217195
public void enqueueMaxTriggersFlush() {
218196
messageQueue = new LinkedBlockingQueue<>();
@@ -284,6 +262,7 @@ public void flushHowManyTimesNecessaryToStayWithinLimit() throws InterruptedExce
284262
AnalyticsClient client =
285263
new AnalyticsClient(
286264
messageQueue,
265+
pendingQueue,
287266
null,
288267
segmentService,
289268
50,
@@ -544,24 +523,6 @@ public boolean matches(IOException exception) {
544523
}));
545524
}
546525

547-
@Test
548-
public void flushWhenNotShutDown() throws InterruptedException {
549-
AnalyticsClient client = newClient();
550-
551-
client.flush();
552-
verify(messageQueue).put(POISON);
553-
}
554-
555-
@Test
556-
public void flushWhenShutDown() throws InterruptedException {
557-
AnalyticsClient client = newClient();
558-
isShutDown.set(true);
559-
560-
client.flush();
561-
562-
verify(messageQueue, times(0)).put(any(Message.class));
563-
}
564-
565526
@Test
566527
public void enqueueWithRegularMessageWhenNotShutdown(MessageBuilderTest builder)
567528
throws InterruptedException {
@@ -860,6 +821,7 @@ public void submitBatchBelowThreshold() throws InterruptedException, IllegalArgu
860821
AnalyticsClient client =
861822
new AnalyticsClient(
862823
messageQueue,
824+
pendingQueue,
863825
null,
864826
segmentService,
865827
50,
@@ -902,6 +864,7 @@ public void submitBatchAboveThreshold() throws InterruptedException, IllegalArgu
902864
AnalyticsClient client =
903865
new AnalyticsClient(
904866
messageQueue,
867+
pendingQueue,
905868
null,
906869
segmentService,
907870
50,
@@ -937,6 +900,7 @@ public void submitManySmallMessagesBatchAboveThreshold() throws InterruptedExcep
937900
AnalyticsClient client =
938901
new AnalyticsClient(
939902
messageQueue,
903+
pendingQueue,
940904
null,
941905
segmentService,
942906
50,

0 commit comments

Comments
 (0)