Skip to content

Commit c92748c

Browse files
authored
Merge branch 'master' into release/3.5.2
2 parents 0beea9a + 4326150 commit c92748c

File tree

4 files changed

+142
-10
lines changed

4 files changed

+142
-10
lines changed
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package com.segment.analytics.gson;
2+
3+
import com.google.gson.*;
4+
import com.google.gson.stream.JsonReader;
5+
import com.google.gson.stream.JsonWriter;
6+
import java.io.IOException;
7+
import java.lang.reflect.Type;
8+
import java.time.Instant;
9+
10+
/**
11+
* A {@link JsonSerializer} that formats {@link Instant} objects into iso8601 formatted strings, and
12+
* {@link JsonDeserializer} that parses iso8601 formatted strings into {@link Instant} objects.
13+
*/
14+
public class ISO8601InstantAdapter extends TypeAdapter<Instant>
15+
implements JsonSerializer<Instant>, JsonDeserializer<Instant> {
16+
@Override
17+
public JsonElement serialize(Instant src, Type typeOfSrc, JsonSerializationContext context) {
18+
return new JsonPrimitive(Iso8601Utils.format(src)); // ISO 8601 format
19+
}
20+
21+
@Override
22+
public Instant deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context)
23+
throws JsonParseException {
24+
return Instant.parse(json.getAsString());
25+
}
26+
27+
@Override
28+
public void write(JsonWriter out, Instant value) throws IOException {
29+
out.value(value == null ? null : value.toString());
30+
}
31+
32+
@Override
33+
public Instant read(JsonReader in) throws IOException {
34+
String str = in.nextString();
35+
return str == null ? null : Instant.parse(str);
36+
}
37+
}

analytics-core/src/main/java/com/segment/analytics/gson/Iso8601Utils.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package com.segment.analytics.gson;
1717

1818
import com.google.gson.JsonParseException;
19+
import java.time.Instant;
1920
import java.util.Calendar;
2021
import java.util.Date;
2122
import java.util.GregorianCalendar;
@@ -277,4 +278,10 @@ private static int indexOfNonDigit(String string, int offset) {
277278
}
278279
return string.length();
279280
}
281+
282+
/** Returns {@code date} formatted as yyyy-MM-ddThh:mm:ss.sssZ */
283+
static String format(Instant instant) {
284+
// Format the instant
285+
return String.valueOf(instant.getEpochSecond());
286+
}
280287
}

analytics/src/main/java/com/segment/analytics/Analytics.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,13 @@
44
import com.google.gson.GsonBuilder;
55
import com.segment.analytics.gson.AutoValueAdapterFactory;
66
import com.segment.analytics.gson.ISO8601DateAdapter;
7+
import com.segment.analytics.gson.ISO8601InstantAdapter;
78
import com.segment.analytics.http.SegmentService;
89
import com.segment.analytics.internal.AnalyticsClient;
910
import com.segment.analytics.internal.AnalyticsVersion;
1011
import com.segment.analytics.messages.Message;
1112
import com.segment.analytics.messages.MessageBuilder;
13+
import java.time.Instant;
1214
import java.util.ArrayList;
1315
import java.util.Arrays;
1416
import java.util.Collections;
@@ -362,6 +364,7 @@ public Analytics build() {
362364

363365
gsonBuilder
364366
.registerTypeAdapterFactory(new AutoValueAdapterFactory())
367+
.registerTypeAdapter(Instant.class, new ISO8601InstantAdapter())
365368
.registerTypeAdapter(Date.class, new ISO8601DateAdapter());
366369

367370
Gson gson = gsonBuilder.create();

analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java

Lines changed: 95 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@
2525
import java.util.concurrent.BlockingQueue;
2626
import java.util.concurrent.ExecutorService;
2727
import java.util.concurrent.Executors;
28+
import java.util.concurrent.Future;
2829
import java.util.concurrent.LinkedBlockingQueue;
30+
import java.util.concurrent.RejectedExecutionException;
2931
import java.util.concurrent.ScheduledExecutorService;
3032
import java.util.concurrent.ThreadFactory;
3133
import java.util.concurrent.TimeUnit;
@@ -42,6 +44,8 @@ public class AnalyticsClient {
4244
private static final Charset ENCODING = StandardCharsets.UTF_8;
4345
private Gson gsonInstance;
4446
private static final String instanceId = UUID.randomUUID().toString();
47+
private static final int WAIT_FOR_THREAD_COMPLETE_S = 5;
48+
private static final int TERMINATION_TIMEOUT_S = 1;
4549

4650
static {
4751
Map<String, String> library = new LinkedHashMap<>();
@@ -67,6 +71,7 @@ public class AnalyticsClient {
6771
private final ScheduledExecutorService flushScheduler;
6872
private final AtomicBoolean isShutDown;
6973
private final String writeKey;
74+
private volatile Future<?> looperFuture;
7075

7176
public static AnalyticsClient create(
7277
HttpUrl uploadUrl,
@@ -130,7 +135,9 @@ public AnalyticsClient(
130135

131136
this.currentQueueSizeInBytes = 0;
132137

133-
if (!isShutDown.get()) looperExecutor.submit(new Looper());
138+
if (!isShutDown.get()) {
139+
this.looperFuture = looperExecutor.submit(new Looper());
140+
}
134141

135142
flushScheduler = Executors.newScheduledThreadPool(1, threadFactory);
136143
flushScheduler.scheduleAtFixedRate(
@@ -218,6 +225,8 @@ public void shutdown() {
218225
// we can shutdown the flush scheduler without worrying
219226
flushScheduler.shutdownNow();
220227

228+
// Wait for the looper to complete processing before shutting down executors
229+
waitForLooperCompletion();
221230
shutdownAndWait(looperExecutor, "looper");
222231
shutdownAndWait(networkExecutor, "network");
223232

@@ -226,19 +235,81 @@ public void shutdown() {
226235
}
227236
}
228237

238+
/**
239+
* Wait for the looper to complete processing all messages before proceeding with shutdown. This
240+
* prevents the race condition where the network executor is shut down before the looper finishes
241+
* submitting all batches.
242+
*/
243+
private void waitForLooperCompletion() {
244+
if (looperFuture != null) {
245+
try {
246+
// Wait for the looper to complete processing the STOP message and finish
247+
// Use a reasonable timeout to avoid hanging indefinitely
248+
looperFuture.get(WAIT_FOR_THREAD_COMPLETE_S, TimeUnit.SECONDS);
249+
log.print(VERBOSE, "Looper completed successfully.");
250+
} catch (Exception e) {
251+
log.print(ERROR, e, "Error waiting for looper to complete.");
252+
// Cancel the looper if it's taking too long or if there's an error
253+
if (!looperFuture.isDone()) {
254+
looperFuture.cancel(true);
255+
log.print(VERBOSE, "Looper was cancelled due to timeout or error.");
256+
}
257+
}
258+
}
259+
}
260+
229261
public void shutdownAndWait(ExecutorService executor, String name) {
262+
boolean isLooperExecutor = name != null && name.equalsIgnoreCase("looper");
230263
try {
231264
executor.shutdown();
232-
final boolean executorTerminated = executor.awaitTermination(1, TimeUnit.SECONDS);
233-
234-
log.print(
235-
VERBOSE,
236-
"%s executor %s.",
237-
name,
238-
executorTerminated ? "terminated normally" : "timed out");
265+
boolean terminated = executor.awaitTermination(TERMINATION_TIMEOUT_S, TimeUnit.SECONDS);
266+
if (terminated) {
267+
log.print(VERBOSE, "%s executor terminated normally.", name);
268+
return;
269+
}
270+
if (isLooperExecutor) { // Handle looper - network should finish on its own
271+
// not terminated within timeout -> force shutdown
272+
log.print(
273+
VERBOSE,
274+
"%s did not terminate in %d seconds; requesting shutdownNow().",
275+
name,
276+
TERMINATION_TIMEOUT_S);
277+
List<Runnable> dropped = executor.shutdownNow(); // interrupts running tasks
278+
log.print(
279+
VERBOSE,
280+
"%s shutdownNow returned %d queued tasks that never started.",
281+
name,
282+
dropped.size());
283+
284+
// optional short wait to give interrupted tasks a chance to exit
285+
boolean terminatedAfterForce =
286+
executor.awaitTermination(TERMINATION_TIMEOUT_S, TimeUnit.SECONDS);
287+
log.print(
288+
VERBOSE,
289+
"%s executor %s after shutdownNow().",
290+
name,
291+
terminatedAfterForce ? "terminated" : "still running (did not terminate)");
292+
293+
if (!terminatedAfterForce) {
294+
// final warning — investigate tasks that ignore interrupts
295+
log.print(
296+
ERROR,
297+
"%s executor still did not terminate; tasks may be ignoring interrupts.",
298+
name);
299+
}
300+
}
239301
} catch (InterruptedException e) {
302+
// Preserve interrupt status and attempt forceful shutdown
240303
log.print(ERROR, e, "Interrupted while stopping %s executor.", name);
241304
Thread.currentThread().interrupt();
305+
if (isLooperExecutor) {
306+
List<Runnable> dropped = executor.shutdownNow();
307+
log.print(
308+
VERBOSE,
309+
"%s shutdownNow invoked after interrupt; %d tasks returned.",
310+
name,
311+
dropped.size());
312+
}
242313
}
243314
}
244315

@@ -299,8 +370,22 @@ public void run() {
299370
"Batching %s message(s) into batch %s.",
300371
batch.batch().size(),
301372
batch.sequence());
302-
networkExecutor.submit(
303-
BatchUploadTask.create(AnalyticsClient.this, batch, maximumRetries));
373+
try {
374+
networkExecutor.submit(
375+
BatchUploadTask.create(AnalyticsClient.this, batch, maximumRetries));
376+
} catch (RejectedExecutionException e) {
377+
log.print(
378+
ERROR,
379+
e,
380+
"Failed to submit batch %s to network executor during shutdown. Batch will be lost.",
381+
batch.sequence());
382+
// Notify callbacks about the failure
383+
for (Message msg : batch.batch()) {
384+
for (Callback callback : callbacks) {
385+
callback.failure(msg, e);
386+
}
387+
}
388+
}
304389

305390
currentBatchSize.set(0);
306391
messages.clear();

0 commit comments

Comments
 (0)