Skip to content

Commit 6c5dd25

Browse files
Patch for Github issue #524
1 parent c57a583 commit 6c5dd25

File tree

1 file changed

+45
-3
lines changed

1 file changed

+45
-3
lines changed

analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java

Lines changed: 45 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@
2525
import java.util.concurrent.BlockingQueue;
2626
import java.util.concurrent.ExecutorService;
2727
import java.util.concurrent.Executors;
28+
import java.util.concurrent.Future;
2829
import java.util.concurrent.LinkedBlockingQueue;
30+
import java.util.concurrent.RejectedExecutionException;
2931
import java.util.concurrent.ScheduledExecutorService;
3032
import java.util.concurrent.ThreadFactory;
3133
import java.util.concurrent.TimeUnit;
@@ -67,6 +69,7 @@ public class AnalyticsClient {
6769
private final ScheduledExecutorService flushScheduler;
6870
private final AtomicBoolean isShutDown;
6971
private final String writeKey;
72+
private volatile Future<?> looperFuture;
7073

7174
public static AnalyticsClient create(
7275
HttpUrl uploadUrl,
@@ -130,7 +133,9 @@ public AnalyticsClient(
130133

131134
this.currentQueueSizeInBytes = 0;
132135

133-
if (!isShutDown.get()) looperExecutor.submit(new Looper());
136+
if (!isShutDown.get()) {
137+
this.looperFuture = looperExecutor.submit(new Looper());
138+
}
134139

135140
flushScheduler = Executors.newScheduledThreadPool(1, threadFactory);
136141
flushScheduler.scheduleAtFixedRate(
@@ -218,6 +223,8 @@ public void shutdown() {
218223
// we can shutdown the flush scheduler without worrying
219224
flushScheduler.shutdownNow();
220225

226+
// Wait for the looper to complete processing before shutting down executors
227+
waitForLooperCompletion();
221228
shutdownAndWait(looperExecutor, "looper");
222229
shutdownAndWait(networkExecutor, "network");
223230

@@ -226,6 +233,29 @@ public void shutdown() {
226233
}
227234
}
228235

236+
/**
237+
* Wait for the looper to complete processing all messages before proceeding with shutdown.
238+
* This prevents the race condition where the network executor is shut down before the looper
239+
* finishes submitting all batches.
240+
*/
241+
private void waitForLooperCompletion() {
242+
if (looperFuture != null) {
243+
try {
244+
// Wait for the looper to complete processing the STOP message and finish
245+
// Use a reasonable timeout to avoid hanging indefinitely
246+
looperFuture.get(5, TimeUnit.SECONDS);
247+
log.print(VERBOSE, "Looper completed successfully.");
248+
} catch (Exception e) {
249+
log.print(ERROR, e, "Error waiting for looper to complete: %s", e.getMessage());
250+
// Cancel the looper if it's taking too long or if there's an error
251+
if (!looperFuture.isDone()) {
252+
looperFuture.cancel(true);
253+
log.print(VERBOSE, "Looper was cancelled due to timeout or error.");
254+
}
255+
}
256+
}
257+
}
258+
229259
public void shutdownAndWait(ExecutorService executor, String name) {
230260
try {
231261
executor.shutdown();
@@ -299,8 +329,20 @@ public void run() {
299329
"Batching %s message(s) into batch %s.",
300330
batch.batch().size(),
301331
batch.sequence());
302-
networkExecutor.submit(
303-
BatchUploadTask.create(AnalyticsClient.this, batch, maximumRetries));
332+
try {
333+
networkExecutor.submit(
334+
BatchUploadTask.create(AnalyticsClient.this, batch, maximumRetries));
335+
} catch (RejectedExecutionException e) {
336+
log.print(ERROR, e,
337+
"Failed to submit batch %s to network executor during shutdown. Batch will be lost.",
338+
batch.sequence());
339+
// Notify callbacks about the failure
340+
for (Message msg : batch.batch()) {
341+
for (Callback callback : callbacks) {
342+
callback.failure(msg, e);
343+
}
344+
}
345+
}
304346

305347
currentBatchSize.set(0);
306348
messages.clear();

0 commit comments

Comments
 (0)