Skip to content

Commit 09f292f

Browse files
Reverted changes
1 parent 90c2539 commit 09f292f

File tree

1 file changed

+10
-74
lines changed

1 file changed

+10
-74
lines changed

analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java

Lines changed: 10 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,7 @@
2525
import java.util.concurrent.BlockingQueue;
2626
import java.util.concurrent.ExecutorService;
2727
import java.util.concurrent.Executors;
28-
import java.util.concurrent.Future;
2928
import java.util.concurrent.LinkedBlockingQueue;
30-
import java.util.concurrent.RejectedExecutionException;
3129
import java.util.concurrent.ScheduledExecutorService;
3230
import java.util.concurrent.ThreadFactory;
3331
import java.util.concurrent.TimeUnit;
@@ -69,7 +67,6 @@ public class AnalyticsClient {
6967
private final ScheduledExecutorService flushScheduler;
7068
private final AtomicBoolean isShutDown;
7169
private final String writeKey;
72-
private volatile Future<?> looperFuture;
7370

7471
public static AnalyticsClient create(
7572
HttpUrl uploadUrl,
@@ -133,9 +130,7 @@ public AnalyticsClient(
133130

134131
this.currentQueueSizeInBytes = 0;
135132

136-
if (!isShutDown.get()) {
137-
this.looperFuture = looperExecutor.submit(new Looper());
138-
}
133+
if (!isShutDown.get()) looperExecutor.submit(new Looper());
139134

140135
flushScheduler = Executors.newScheduledThreadPool(1, threadFactory);
141136
flushScheduler.scheduleAtFixedRate(
@@ -223,8 +218,6 @@ public void shutdown() {
223218
// we can shutdown the flush scheduler without worrying
224219
flushScheduler.shutdownNow();
225220

226-
// Wait for the looper to complete processing before shutting down executors
227-
waitForLooperCompletion();
228221
shutdownAndWait(looperExecutor, "looper");
229222
shutdownAndWait(networkExecutor, "network");
230223

@@ -233,62 +226,19 @@ public void shutdown() {
233226
}
234227
}
235228

236-
/**
237-
* Wait for the looper to complete processing all messages before proceeding with shutdown. This
238-
* prevents the race condition where the network executor is shut down before the looper finishes
239-
* submitting all batches.
240-
*/
241-
private void waitForLooperCompletion() {
242-
if (looperFuture != null) {
243-
try {
244-
// Wait for the looper to complete processing the STOP message and finish
245-
// Use a reasonable timeout to avoid hanging indefinitely
246-
looperFuture.get(5, TimeUnit.SECONDS);
247-
log.print(VERBOSE, "Looper completed successfully.");
248-
} catch (Exception e) {
249-
log.print(ERROR, e, "Error waiting for looper to complete: %s", e.getMessage());
250-
// Cancel the looper if it's taking too long or if there's an error
251-
if (!looperFuture.isDone()) {
252-
looperFuture.cancel(true);
253-
log.print(VERBOSE, "Looper was cancelled due to timeout or error.");
254-
}
255-
}
256-
}
257-
}
258-
259229
public void shutdownAndWait(ExecutorService executor, String name) {
260-
boolean isLooperExecutor = name != null && name.equalsIgnoreCase("looper");
261230
try {
262231
executor.shutdown();
263-
boolean terminated = executor.awaitTermination(1, TimeUnit.MILLISECONDS);
264-
if (terminated) {
265-
log.print(VERBOSE, "%s executor terminated normally.", name);
266-
return;
267-
}
268-
if (isLooperExecutor) {
269-
// not terminated within timeout -> force shutdown
270-
log.print(VERBOSE, "%s did not terminate in %d ms; requesting shutdownNow().", name, 1);
271-
List<Runnable> dropped = executor.shutdownNow(); // interrupts running tasks
272-
log.print(VERBOSE, "%s shutdownNow returned %d queued tasks that never started.", name, dropped.size());
273-
274-
// optional short wait to give interrupted tasks a chance to exit
275-
boolean terminatedAfterForce = executor.awaitTermination(1, TimeUnit.MILLISECONDS);
276-
log.print(VERBOSE, "%s executor %s after shutdownNow().", name,
277-
terminatedAfterForce ? "terminated" : "still running (did not terminate)");
278-
279-
if (!terminatedAfterForce) {
280-
// final warning — investigate tasks that ignore interrupts
281-
log.print(ERROR, "%s executor still did not terminate; tasks may be ignoring interrupts.", name);
282-
}
283-
}
232+
final boolean executorTerminated = executor.awaitTermination(1, TimeUnit.SECONDS);
233+
234+
log.print(
235+
VERBOSE,
236+
"%s executor %s.",
237+
name,
238+
executorTerminated ? "terminated normally" : "timed out");
284239
} catch (InterruptedException e) {
285-
// Preserve interrupt status and attempt forceful shutdown
286240
log.print(ERROR, e, "Interrupted while stopping %s executor.", name);
287241
Thread.currentThread().interrupt();
288-
if (isLooperExecutor) {
289-
List<Runnable> dropped = executor.shutdownNow();
290-
log.print(VERBOSE, "%s shutdownNow invoked after interrupt; %d tasks returned.", name, dropped.size());
291-
}
292242
}
293243
}
294244

@@ -349,22 +299,8 @@ public void run() {
349299
"Batching %s message(s) into batch %s.",
350300
batch.batch().size(),
351301
batch.sequence());
352-
try {
353-
networkExecutor.submit(
354-
BatchUploadTask.create(AnalyticsClient.this, batch, maximumRetries));
355-
} catch (RejectedExecutionException e) {
356-
log.print(
357-
ERROR,
358-
e,
359-
"Failed to submit batch %s to network executor during shutdown. Batch will be lost.",
360-
batch.sequence());
361-
// Notify callbacks about the failure
362-
for (Message msg : batch.batch()) {
363-
for (Callback callback : callbacks) {
364-
callback.failure(msg, e);
365-
}
366-
}
367-
}
302+
networkExecutor.submit(
303+
BatchUploadTask.create(AnalyticsClient.this, batch, maximumRetries));
368304

369305
currentBatchSize.set(0);
370306
messages.clear();

0 commit comments

Comments
 (0)