Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Scope;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.logging.Level;
Expand All @@ -54,6 +56,8 @@ public static <V> V runWithRetries(
.spanBuilder("com.google.cloud.bigquery.BigQueryRetryHelper.runWithRetries")
.startSpan();
}
final List<Throwable> attemptFailures = new ArrayList<>();

try (Scope runWithRetriesScope = runWithRetries != null ? runWithRetries.makeCurrent() : null) {
// Suppressing should be ok as a workaraund. Current and only ResultRetryAlgorithm
// implementation does not use response at all, so ignoring its type is ok.
Expand All @@ -63,14 +67,29 @@ public static <V> V runWithRetries(
callable,
new ExponentialRetryAlgorithm(retrySettings, clock),
algorithm,
bigQueryRetryConfig);
bigQueryRetryConfig,
attemptFailures);

} catch (Exception e) {
// Checks for IOException and translate it into BigQueryException. The BigQueryException
// constructor parses the IOException and translate it into internal code.
if (e.getCause() instanceof IOException) {
throw new BigQueryRetryHelperException(new BigQueryException((IOException) e.getCause()));
Throwable cause = e.getCause() != null ? e.getCause() : e;

// Attach previous retry failures (the terminal cause is not added to its own suppressed list).
for (Throwable prev : attemptFailures) {
if (prev != cause) {
cause.addSuppressed(prev);
}
}

if (cause instanceof IOException) {
BigQueryException bq = new BigQueryException((IOException) cause);
// Preserve suppressed info after wrapping.
for (Throwable s : cause.getSuppressed()) {
bq.addSuppressed(s);
}
throw new BigQueryRetryHelperException(bq);
}
throw new BigQueryRetryHelperException(e.getCause());

throw new BigQueryRetryHelperException(cause);
} finally {
if (runWithRetries != null) {
runWithRetries.end();
Expand All @@ -82,7 +101,8 @@ private static <V> V run(
Callable<V> callable,
TimedRetryAlgorithm timedAlgorithm,
ResultRetryAlgorithm<V> resultAlgorithm,
BigQueryRetryConfig bigQueryRetryConfig)
BigQueryRetryConfig bigQueryRetryConfig,
List<Throwable> attemptFailures)
throws ExecutionException, InterruptedException {
RetryAlgorithm<V> retryAlgorithm =
new BigQueryRetryAlgorithm<>(
Expand All @@ -93,7 +113,16 @@ private static <V> V run(
// BigQueryRetryAlgorithm retries considering bigQueryRetryConfig
RetryingExecutor<V> executor = new DirectRetryingExecutor<>(retryAlgorithm);

// Log retry info
Callable<V> recordingCallable =
() -> {
try {
return callable.call();
} catch (Throwable t) {
attemptFailures.add(t);
throw t;
}
};

if (LOG.isLoggable(Level.FINEST)) {
LOG.log(
Level.FINEST,
Expand All @@ -104,7 +133,7 @@ private static <V> V run(
});
}

RetryingFuture<V> retryingFuture = executor.createFuture(callable);
RetryingFuture<V> retryingFuture = executor.createFuture(recordingCallable);
executor.submit(retryingFuture);
return retryingFuture.get();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
/*
* Copyright 2025 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.bigquery;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.fail;

import com.google.api.core.ApiClock;
import com.google.api.core.NanoClock;
import com.google.api.gax.retrying.BasicResultRetryAlgorithm;
import com.google.api.gax.retrying.ResultRetryAlgorithm;
import com.google.api.gax.retrying.RetrySettings;
import io.opentelemetry.api.trace.Tracer;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Test;
import org.threeten.bp.Duration;

public class BigQueryRetryHelperTest {

private static final ApiClock CLOCK = NanoClock.getDefaultClock();

@Test
public void runWithRetries_happyPath_noRetries_success() {
AtomicInteger calls = new AtomicInteger(0);

Callable<String> ok =
() -> {
calls.incrementAndGet();
return "OK";
};

String result =
BigQueryRetryHelper.runWithRetries(
ok,
retrySettingsMaxAttempts(3),
retryAlgorithm(),
CLOCK,
defaultRetryConfig(),
/* isOpenTelemetryEnabled= */ false,
/* openTelemetryTracer= */ (Tracer) null);

assertEquals("OK", result);
assertEquals("Callable should be invoked exactly once", 1, calls.get());
}

@Test
public void runWithRetries_oneFail_thenSuccess_succeeds() {
AtomicInteger calls = new AtomicInteger(0);

RuntimeException first = new RuntimeException("A");

Callable<String> flaky =
() -> {
int n = calls.incrementAndGet();
if (n == 1) {
throw first;
}
return "OK";
};

String result =
BigQueryRetryHelper.runWithRetries(
flaky,
retrySettingsMaxAttempts(3),
retryAlgorithm(),
CLOCK,
defaultRetryConfig(),
/* isOpenTelemetryEnabled= */ false,
/* openTelemetryTracer= */ null);

assertEquals("OK", result);
assertEquals("Expected exactly 2 calls (1 fail + 1 success)", 2, calls.get());
}

@Test
public void runWithRetries_twoFails_thenSuccess_succeedsWithinThreshold() {
AtomicInteger calls = new AtomicInteger(0);

RuntimeException exA = new RuntimeException("A");
RuntimeException exB = new RuntimeException("B");

Callable<String> flaky =
() -> {
int n = calls.incrementAndGet();
if (n == 1) {
throw exA;
}
if (n == 2) {
throw exB;
}
return "OK";
};

String result =
BigQueryRetryHelper.runWithRetries(
flaky,
retrySettingsMaxAttempts(3),
retryAlgorithm(),
CLOCK,
defaultRetryConfig(),
/* isOpenTelemetryEnabled= */ false,
/* openTelemetryTracer= */ null);

assertEquals("OK", result);
assertEquals("Expected 3 calls (A fail, B fail, then success)", 3, calls.get());
}

@Test
public void runWithRetries_threeFails_threshold3_throws_withSuppressedHistory() {
AtomicInteger calls = new AtomicInteger(0);

RuntimeException exA = new RuntimeException("A");
RuntimeException exB = new RuntimeException("B");
RuntimeException exC = new RuntimeException("C");

Callable<String> alwaysFail3Times =
() -> {
int n = calls.incrementAndGet();
if (n == 1) {
throw exA;
}
if (n == 2) {
throw exB;
}
throw exC; // 3rd attempt fails and should be terminal at maxAttempts=3
};

try {
BigQueryRetryHelper.runWithRetries(
alwaysFail3Times,
retrySettingsMaxAttempts(3),
retryAlgorithm(),
CLOCK,
defaultRetryConfig(),
/* isOpenTelemetryEnabled= */ false,
/* openTelemetryTracer= */ null);
fail("Expected BigQueryRetryHelperException");
} catch (BigQueryRetryHelper.BigQueryRetryHelperException e) {
assertEquals("Expected exactly 3 attempts", 3, calls.get());

Throwable terminal = e.getCause();
assertNotNull(terminal);

// Terminal cause should be exactly Exception C (identity check).
assertSame(exC, terminal);

// Suppressed should contain exactly A and B (identity + order).
Throwable[] suppressed = terminal.getSuppressed();
assertEquals("Expected 2 suppressed exceptions (A,B)", 2, suppressed.length);
assertSame(exA, suppressed[0]);
assertSame(exB, suppressed[1]);
}
}

private RetrySettings retrySettingsMaxAttempts(int maxAttempts) {
// Keep delays tiny so tests run fast.
return RetrySettings.newBuilder()
.setMaxAttempts(maxAttempts)
.setInitialRetryDelay(Duration.ofMillis(1))
.setRetryDelayMultiplier(1.0)
.setMaxRetryDelay(Duration.ofMillis(5))
.setInitialRpcTimeout(Duration.ofMillis(50))
.setRpcTimeoutMultiplier(1.0)
.setMaxRpcTimeout(Duration.ofMillis(50))
.setTotalTimeout(Duration.ofSeconds(2))
.build();
}

private BigQueryRetryConfig defaultRetryConfig() {
return BigQueryRetryConfig.newBuilder().build();
}

@SuppressWarnings("unchecked")
private <V> ResultRetryAlgorithm<V> retryAlgorithm() {
return new BasicResultRetryAlgorithm<>();
}
}
Loading