Skip to content

Commit a64e3b4

Browse files
committed
feat: RetryHelper to report all previous exceptions
1 parent fa0a12e commit a64e3b4

File tree

2 files changed

+216
-9
lines changed

2 files changed

+216
-9
lines changed

google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryRetryHelper.java

Lines changed: 38 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
import io.opentelemetry.api.trace.Tracer;
3030
import io.opentelemetry.context.Scope;
3131
import java.io.IOException;
32+
import java.util.ArrayList;
33+
import java.util.List;
3234
import java.util.concurrent.Callable;
3335
import java.util.concurrent.ExecutionException;
3436
import java.util.logging.Level;
@@ -54,6 +56,8 @@ public static <V> V runWithRetries(
5456
.spanBuilder("com.google.cloud.bigquery.BigQueryRetryHelper.runWithRetries")
5557
.startSpan();
5658
}
59+
final List<Throwable> attemptFailures = new ArrayList<>();
60+
5761
try (Scope runWithRetriesScope = runWithRetries != null ? runWithRetries.makeCurrent() : null) {
5862
// Suppressing should be ok as a workaraund. Current and only ResultRetryAlgorithm
5963
// implementation does not use response at all, so ignoring its type is ok.
@@ -63,14 +67,29 @@ public static <V> V runWithRetries(
6367
callable,
6468
new ExponentialRetryAlgorithm(retrySettings, clock),
6569
algorithm,
66-
bigQueryRetryConfig);
70+
bigQueryRetryConfig,
71+
attemptFailures);
72+
6773
} catch (Exception e) {
68-
// Checks for IOException and translate it into BigQueryException. The BigQueryException
69-
// constructor parses the IOException and translate it into internal code.
70-
if (e.getCause() instanceof IOException) {
71-
throw new BigQueryRetryHelperException(new BigQueryException((IOException) e.getCause()));
74+
Throwable cause = e.getCause() != null ? e.getCause() : e;
75+
76+
// Attach previous retry failures (the terminal cause is not added to its own suppressed list).
77+
for (Throwable prev : attemptFailures) {
78+
if (prev != cause) {
79+
cause.addSuppressed(prev);
80+
}
81+
}
82+
83+
if (cause instanceof IOException) {
84+
BigQueryException bq = new BigQueryException((IOException) cause);
85+
// Preserve suppressed info after wrapping.
86+
for (Throwable s : cause.getSuppressed()) {
87+
bq.addSuppressed(s);
88+
}
89+
throw new BigQueryRetryHelperException(bq);
7290
}
73-
throw new BigQueryRetryHelperException(e.getCause());
91+
92+
throw new BigQueryRetryHelperException(cause);
7493
} finally {
7594
if (runWithRetries != null) {
7695
runWithRetries.end();
@@ -82,7 +101,8 @@ private static <V> V run(
82101
Callable<V> callable,
83102
TimedRetryAlgorithm timedAlgorithm,
84103
ResultRetryAlgorithm<V> resultAlgorithm,
85-
BigQueryRetryConfig bigQueryRetryConfig)
104+
BigQueryRetryConfig bigQueryRetryConfig,
105+
List<Throwable> attemptFailures)
86106
throws ExecutionException, InterruptedException {
87107
RetryAlgorithm<V> retryAlgorithm =
88108
new BigQueryRetryAlgorithm<>(
@@ -93,7 +113,16 @@ private static <V> V run(
93113
// BigQueryRetryAlgorithm retries considering bigQueryRetryConfig
94114
RetryingExecutor<V> executor = new DirectRetryingExecutor<>(retryAlgorithm);
95115

96-
// Log retry info
116+
Callable<V> recordingCallable =
117+
() -> {
118+
try {
119+
return callable.call();
120+
} catch (Throwable t) {
121+
attemptFailures.add(t);
122+
throw t;
123+
}
124+
};
125+
97126
if (LOG.isLoggable(Level.FINEST)) {
98127
LOG.log(
99128
Level.FINEST,
@@ -104,7 +133,7 @@ private static <V> V run(
104133
});
105134
}
106135

107-
RetryingFuture<V> retryingFuture = executor.createFuture(callable);
136+
RetryingFuture<V> retryingFuture = executor.createFuture(recordingCallable);
108137
executor.submit(retryingFuture);
109138
return retryingFuture.get();
110139
}
Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
package com.google.cloud.bigquery;
2+
3+
import static org.junit.Assert.assertEquals;
4+
import static org.junit.Assert.assertNotNull;
5+
import static org.junit.Assert.assertSame;
6+
import static org.junit.Assert.fail;
7+
8+
import com.google.api.core.ApiClock;
9+
import com.google.api.core.NanoClock;
10+
import com.google.api.gax.retrying.BasicResultRetryAlgorithm;
11+
import com.google.api.gax.retrying.ResultRetryAlgorithm;
12+
import com.google.api.gax.retrying.RetrySettings;
13+
import io.opentelemetry.api.trace.Tracer;
14+
import java.util.concurrent.Callable;
15+
import java.util.concurrent.atomic.AtomicInteger;
16+
import org.junit.Test;
17+
import org.threeten.bp.Duration;
18+
19+
public class BigQueryRetryHelperTest {
20+
21+
private static final ApiClock CLOCK = NanoClock.getDefaultClock();
22+
23+
@Test
24+
public void runWithRetries_happyPath_noRetries_success() {
25+
AtomicInteger calls = new AtomicInteger(0);
26+
27+
Callable<String> ok =
28+
() -> {
29+
calls.incrementAndGet();
30+
return "OK";
31+
};
32+
33+
String result =
34+
BigQueryRetryHelper.runWithRetries(
35+
ok,
36+
retrySettingsMaxAttempts(3),
37+
retryAlgorithm(),
38+
CLOCK,
39+
defaultRetryConfig(),
40+
/* isOpenTelemetryEnabled= */ false,
41+
/* openTelemetryTracer= */ (Tracer) null);
42+
43+
assertEquals("OK", result);
44+
assertEquals("Callable should be invoked exactly once", 1, calls.get());
45+
}
46+
47+
@Test
48+
public void runWithRetries_oneFail_thenSuccess_succeeds() {
49+
AtomicInteger calls = new AtomicInteger(0);
50+
51+
RuntimeException first = new RuntimeException("A");
52+
53+
Callable<String> flaky =
54+
() -> {
55+
int n = calls.incrementAndGet();
56+
if (n == 1) {
57+
throw first;
58+
}
59+
return "OK";
60+
};
61+
62+
String result =
63+
BigQueryRetryHelper.runWithRetries(
64+
flaky,
65+
retrySettingsMaxAttempts(3),
66+
retryAlgorithm(),
67+
CLOCK,
68+
defaultRetryConfig(),
69+
/* isOpenTelemetryEnabled= */ false,
70+
/* openTelemetryTracer= */ null);
71+
72+
assertEquals("OK", result);
73+
assertEquals("Expected exactly 2 calls (1 fail + 1 success)", 2, calls.get());
74+
}
75+
76+
@Test
77+
public void runWithRetries_twoFails_thenSuccess_succeedsWithinThreshold() {
78+
AtomicInteger calls = new AtomicInteger(0);
79+
80+
RuntimeException exA = new RuntimeException("A");
81+
RuntimeException exB = new RuntimeException("B");
82+
83+
Callable<String> flaky =
84+
() -> {
85+
int n = calls.incrementAndGet();
86+
if (n == 1) {
87+
throw exA;
88+
}
89+
if (n == 2) {
90+
throw exB;
91+
}
92+
return "OK";
93+
};
94+
95+
String result =
96+
BigQueryRetryHelper.runWithRetries(
97+
flaky,
98+
retrySettingsMaxAttempts(3),
99+
retryAlgorithm(),
100+
CLOCK,
101+
defaultRetryConfig(),
102+
/* isOpenTelemetryEnabled= */ false,
103+
/* openTelemetryTracer= */ null);
104+
105+
assertEquals("OK", result);
106+
assertEquals("Expected 3 calls (A fail, B fail, then success)", 3, calls.get());
107+
}
108+
109+
@Test
110+
public void runWithRetries_threeFails_threshold3_throws_withSuppressedHistory() {
111+
AtomicInteger calls = new AtomicInteger(0);
112+
113+
RuntimeException exA = new RuntimeException("A");
114+
RuntimeException exB = new RuntimeException("B");
115+
RuntimeException exC = new RuntimeException("C");
116+
117+
Callable<String> alwaysFail3Times =
118+
() -> {
119+
int n = calls.incrementAndGet();
120+
if (n == 1) {
121+
throw exA;
122+
}
123+
if (n == 2) {
124+
throw exB;
125+
}
126+
throw exC; // 3rd attempt fails and should be terminal at maxAttempts=3
127+
};
128+
129+
try {
130+
BigQueryRetryHelper.runWithRetries(
131+
alwaysFail3Times,
132+
retrySettingsMaxAttempts(3),
133+
retryAlgorithm(),
134+
CLOCK,
135+
defaultRetryConfig(),
136+
/* isOpenTelemetryEnabled= */ false,
137+
/* openTelemetryTracer= */ null);
138+
fail("Expected BigQueryRetryHelperException");
139+
} catch (BigQueryRetryHelper.BigQueryRetryHelperException e) {
140+
assertEquals("Expected exactly 3 attempts", 3, calls.get());
141+
142+
Throwable terminal = e.getCause();
143+
assertNotNull(terminal);
144+
145+
// Terminal cause should be exactly Exception C (identity check).
146+
assertSame(exC, terminal);
147+
148+
// Suppressed should contain exactly A and B (identity + order).
149+
Throwable[] suppressed = terminal.getSuppressed();
150+
assertEquals("Expected 2 suppressed exceptions (A,B)", 2, suppressed.length);
151+
assertSame(exA, suppressed[0]);
152+
assertSame(exB, suppressed[1]);
153+
}
154+
}
155+
156+
private RetrySettings retrySettingsMaxAttempts(int maxAttempts) {
157+
// Keep delays tiny so tests run fast.
158+
return RetrySettings.newBuilder()
159+
.setMaxAttempts(maxAttempts)
160+
.setInitialRetryDelay(Duration.ofMillis(1))
161+
.setRetryDelayMultiplier(1.0)
162+
.setMaxRetryDelay(Duration.ofMillis(5))
163+
.setInitialRpcTimeout(Duration.ofMillis(50))
164+
.setRpcTimeoutMultiplier(1.0)
165+
.setMaxRpcTimeout(Duration.ofMillis(50))
166+
.setTotalTimeout(Duration.ofSeconds(2))
167+
.build();
168+
}
169+
170+
private BigQueryRetryConfig defaultRetryConfig() {
171+
return BigQueryRetryConfig.newBuilder().build();
172+
}
173+
174+
@SuppressWarnings("unchecked")
175+
private <V> ResultRetryAlgorithm<V> retryAlgorithm() {
176+
return new BasicResultRetryAlgorithm<>();
177+
}
178+
}

0 commit comments

Comments
 (0)