Skip to content

Commit 85ef909

Browse files
committed
fallback appender
1 parent af74b24 commit 85ef909

File tree

8 files changed

+280
-1252
lines changed

8 files changed

+280
-1252
lines changed

analytics/pom.xml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@
5050
<artifactId>failsafe-retrofit</artifactId>
5151
<version>3.3.2</version>
5252
</dependency>
53-
5453
<dependency>
5554
<groupId>junit</groupId>
5655
<artifactId>junit</artifactId>

analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java

Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@
4141
public class AnalyticsClient {
4242
private static final Map<String, ?> CONTEXT;
4343
private static final int BATCH_MAX_SIZE = 1024 * 500;
44-
private static final int MSG_MAX_SIZE = 1024 * 32;
4544
private static final Charset ENCODING = StandardCharsets.UTF_8;
4645
private Gson gsonInstance;
4746
private static final String instanceId = UUID.randomUUID().toString(); // TODO configurable ?
@@ -67,6 +66,7 @@ public class AnalyticsClient {
6766
private final AtomicBoolean isShutDown;
6867
private final String writeKey;
6968
private final FailsafeExecutor<Response<UploadResponse>> failsafe;
69+
private final FallbackAppender fallback;
7070

7171
public static AnalyticsClient create(
7272
HttpUrl uploadUrl,
@@ -119,20 +119,20 @@ public AnalyticsClient(
119119
looperThread.start();
120120

121121
CircuitBreaker<Response<UploadResponse>> breaker = CircuitBreaker.<Response<UploadResponse>>builder()
122-
// 2 failure in 5 minute open the circuit
123-
.withFailureThreshold(2, Duration.ofMinutes(5))
124-
// once open wait 1 minute to be half-open
125-
.withDelay(Duration.ofMinutes(1))
122+
// 5 failure in 2 minute open the circuit
123+
.withFailureThreshold(5, Duration.ofMinutes(2))
124+
// once open wait 30 seconds to be half-open
125+
.withDelay(Duration.ofSeconds(30))
126126
// after 1 success the circuit is closed
127127
.withSuccessThreshold(1)
128128
// 5xx or rate limit is an error
129129
.handleResultIf(response -> is5xx(response.code()) || response.code() == 429)
130130
.build();
131131

132132
RetryPolicy<Response<UploadResponse>> retry = RetryPolicy.<Response<UploadResponse>>builder()
133-
.withMaxAttempts(3)
133+
.withMaxAttempts(5)
134134
.withBackoff(1, 300, ChronoUnit.SECONDS)
135-
.withJitter(.1)
135+
.withJitter(.2)
136136
// retry on IOException
137137
.handle(IOException.class)
138138
// retry on 5xx or rate limit
@@ -146,6 +146,7 @@ public AnalyticsClient(
146146
.build();
147147

148148
this.failsafe = Failsafe.with(retry, breaker).with(networkExecutor);
149+
this.fallback = new FallbackAppender(this);
149150
}
150151

151152
public int messageSizeInBytes(Message message) {
@@ -167,13 +168,16 @@ public void enqueue(Message message) {
167168
handleError(message);
168169
}
169170
}
171+
170172

173+
// FIXME closeable
171174
public void shutdown() {
172175
if (isShutDown.compareAndSet(false, true)) {
173176
final long start = System.currentTimeMillis();
174177

175178
// first let's tell the system to stop
176179
looperThread.interrupt();
180+
fallback.close();
177181

178182
shutdownAndWait(networkExecutor, "network");
179183

@@ -182,10 +186,8 @@ public void shutdown() {
182186
}
183187
}
184188

185-
public void shutdownAndWait(ExecutorService executor, String name) {
189+
private void shutdownAndWait(ExecutorService executor, String name) {
186190
try {
187-
this.looperThread.interrupt();
188-
189191
executor.shutdown();
190192
final boolean executorTerminated = executor.awaitTermination(1, TimeUnit.SECONDS);
191193

@@ -291,14 +293,18 @@ public void run() {
291293
}
292294

293295
void handleError(Batch batch, Throwable t) {
294-
if(t instanceof CompletionException && t.getCause() instanceof CircuitBreakerOpenException) {
295-
System.err.println("OPEN");
296+
if(t instanceof CompletionException ) {
297+
if(t.getCause() instanceof CircuitBreakerOpenException) {
298+
System.err.println("OPEN");
299+
}
300+
}
301+
for(Message msg : batch.batch()) {
302+
fallback.add(msg);
296303
}
297-
298-
System.err.println("" + batch);
299304
}
300-
void handleError(Message message) {
301-
System.err.println("" + message);
305+
306+
void handleError(Message msg) {
307+
fallback.add(msg);
302308
}
303309

304310
private static boolean is5xx(int status) {
Lines changed: 227 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,227 @@
1+
package com.segment.analytics.internal;
2+
3+
import com.google.gson.Gson;
4+
import com.google.gson.GsonBuilder;
5+
import com.segment.analytics.gson.AutoValueAdapterFactory;
6+
import com.segment.analytics.gson.ISO8601DateAdapter;
7+
import com.segment.analytics.messages.Message;
8+
import com.segment.analytics.messages.TrackMessage;
9+
import java.io.File;
10+
import java.io.IOException;
11+
import java.io.OutputStream;
12+
import java.nio.channels.Channels;
13+
import java.nio.channels.FileChannel;
14+
import java.nio.charset.StandardCharsets;
15+
import java.nio.file.StandardOpenOption;
16+
import java.util.ArrayList;
17+
import java.util.Arrays;
18+
import java.util.Collections;
19+
import java.util.Date;
20+
import java.util.List;
21+
import java.util.Objects;
22+
import java.util.concurrent.ArrayBlockingQueue;
23+
import java.util.concurrent.BlockingQueue;
24+
import java.util.concurrent.TimeUnit;
25+
import java.util.concurrent.locks.Lock;
26+
import java.util.concurrent.locks.ReentrantLock;
27+
import java.util.stream.Collectors;
28+
29+
public class FallbackAppender {
30+
31+
private static final int FLUSH_MS = 100;
32+
private static final int BATCH = 20;
33+
private static final int LASTMESSAGE_RETRY_MS = 10_000;
34+
private static final String PATH = "pending";
35+
36+
private final AnalyticsClient client;
37+
private final BlockingQueue<Message> queue;
38+
private final File file;
39+
private final Lock lock = new ReentrantLock();
40+
private final Thread writer;
41+
private final Thread reader;
42+
private final Gson gson;
43+
44+
private transient long lastMessage;
45+
46+
public FallbackAppender(AnalyticsClient client) {
47+
this.client = client;
48+
this.file = new File(PATH);
49+
this.queue = new ArrayBlockingQueue<Message>(100);
50+
this.writer = new Thread(new FileWriter()); // XXX threadFactory daemon
51+
this.reader = new Thread(new FileReader()); // XXX threadFactory daemon
52+
this.gson = new GsonBuilder()
53+
.registerTypeAdapterFactory(new AutoValueAdapterFactory())
54+
.registerTypeAdapter(Date.class, new ISO8601DateAdapter())
55+
.create();
56+
57+
file.delete(); // FIXME do not remove on start
58+
59+
this.lastMessage = System.currentTimeMillis();
60+
this.writer.start();
61+
this.reader.start();
62+
}
63+
64+
public void close() {
65+
reader.interrupt();
66+
writer.interrupt();
67+
}
68+
69+
// block !!!
70+
public void add(Message msg) {
71+
try {
72+
queue.put(msg);
73+
} catch (InterruptedException e) {
74+
Thread.currentThread().interrupt();
75+
}
76+
}
77+
78+
class FileReader implements Runnable {
79+
@Override
80+
public void run() {
81+
while (!Thread.currentThread().isInterrupted()) {
82+
if (queue.isEmpty() && System.currentTimeMillis() - lastMessage > LASTMESSAGE_RETRY_MS) {
83+
if (file.length() == 0) {
84+
continue;
85+
}
86+
87+
List<Message> msgs;
88+
lock.lock();
89+
try {
90+
msgs = read();
91+
if (msgs.isEmpty()) {
92+
continue;
93+
}
94+
// FIXME now its reading all the msgs and waits until all is processed
95+
// it will be better to work with batch and truncate the file
96+
file.delete();
97+
} catch (IOException e) {
98+
// TODO Auto-generated catch block
99+
e.printStackTrace();
100+
lastMessage = System.currentTimeMillis();
101+
continue;
102+
} finally {
103+
lock.unlock();
104+
}
105+
106+
// FIXME batch
107+
while (!msgs.isEmpty()) {
108+
int reenqueued = 0;
109+
boolean canEnqueue = true;
110+
for (int i = msgs.size() - 1; canEnqueue && i >= 0; i--) {
111+
Message msg = msgs.get(i);
112+
canEnqueue = client.offer(msg);
113+
if (canEnqueue) {
114+
msgs.remove(i);
115+
reenqueued++;
116+
}
117+
}
118+
System.err.println("reenqueued " + reenqueued);
119+
try {
120+
Thread.sleep(1_000);
121+
} catch (InterruptedException e) {
122+
Thread.currentThread().interrupt();
123+
}
124+
}
125+
126+
lastMessage = System.currentTimeMillis();
127+
}
128+
129+
try {
130+
Thread.sleep(1_000);
131+
} catch (InterruptedException e) {
132+
Thread.currentThread().interrupt();
133+
}
134+
}
135+
}
136+
}
137+
138+
class FileWriter implements Runnable {
139+
@Override
140+
public void run() {
141+
final List<Message> batch = new ArrayList<>();
142+
while (!Thread.currentThread().isInterrupted()) {
143+
try {
144+
final Message msg = queue.poll(FLUSH_MS, TimeUnit.MILLISECONDS);
145+
if (msg == null) {
146+
if (!batch.isEmpty()) {
147+
write(batch);
148+
}
149+
} else {
150+
batch.add(msg);
151+
if (batch.size() >= BATCH) {
152+
write(batch);
153+
}
154+
}
155+
} catch (InterruptedException e) {
156+
Thread.currentThread().interrupt();
157+
}
158+
}
159+
if (!batch.isEmpty()) {
160+
write(batch);
161+
}
162+
}
163+
}
164+
165+
List<Message> read() throws IOException {
166+
if (file.exists()) {
167+
try (FileChannel fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.READ)) {
168+
fileChannel.lock(0, Long.MAX_VALUE, true);
169+
170+
final String[] lines = new String(
171+
Channels.newInputStream(fileChannel).readAllBytes(), StandardCharsets.UTF_8)
172+
.split(System.lineSeparator());
173+
return Arrays.stream(lines)
174+
.map(m -> fromJson(m))
175+
.filter(Objects::nonNull)
176+
.collect(Collectors.toList());
177+
}
178+
} else {
179+
return Collections.emptyList();
180+
}
181+
}
182+
183+
private void write(List<Message> batch) {
184+
lock.lock();
185+
try (FileChannel fileChannel = FileChannel.open(
186+
file.toPath(), StandardOpenOption.WRITE, StandardOpenOption.APPEND, StandardOpenOption.CREATE)) {
187+
fileChannel.lock();
188+
189+
final String lines = batch.stream()
190+
.map(this::toJson)
191+
.filter(Objects::nonNull)
192+
.collect(Collectors.joining(System.lineSeparator()));
193+
194+
OutputStream os = Channels.newOutputStream(fileChannel);
195+
os.write(lines.getBytes(StandardCharsets.UTF_8));
196+
os.write(System.lineSeparator().getBytes(StandardCharsets.UTF_8));
197+
fileChannel.force(true);
198+
199+
batch.clear();
200+
201+
lastMessage = System.currentTimeMillis();
202+
} catch (IOException e) {
203+
e.printStackTrace(); // FIXME
204+
} finally {
205+
lock.unlock();
206+
}
207+
}
208+
209+
private String toJson(final Message msg) {
210+
try {
211+
return gson.toJson(msg);
212+
} catch (Exception e) {
213+
e.printStackTrace();
214+
return null;
215+
}
216+
}
217+
218+
private Message fromJson(final String msg) {
219+
try {
220+
// FIXME only track
221+
return gson.fromJson(msg, TrackMessage.class);
222+
} catch (Exception e) {
223+
e.printStackTrace();
224+
return null;
225+
}
226+
}
227+
}

0 commit comments

Comments
 (0)