From afdef61b7a8d52a1ed824d814c457969d345cd79 Mon Sep 17 00:00:00 2001 From: kinsaurralde Date: Mon, 29 Dec 2025 20:49:59 +0000 Subject: [PATCH 1/6] Add GCPFallback support --- .../cloud/spanner/spi/v1/GapicSpannerRpc.java | 228 ++++++++++++++---- 1 file changed, 180 insertions(+), 48 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java index 08a13f2ca9..80ade813bf 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java @@ -51,12 +51,14 @@ import com.google.api.gax.rpc.StatusCode; import com.google.api.gax.rpc.StatusCode.Code; import com.google.api.gax.rpc.StreamController; +import com.google.api.gax.rpc.TransportChannel; import com.google.api.gax.rpc.TransportChannelProvider; import com.google.api.gax.rpc.UnaryCallSettings; import com.google.api.gax.rpc.UnaryCallable; import com.google.api.gax.rpc.UnavailableException; import com.google.api.gax.rpc.WatchdogProvider; import com.google.api.pathtemplate.PathTemplate; +import com.google.auth.Credentials; import com.google.cloud.RetryHelper; import com.google.cloud.RetryHelper.RetryHelperException; import com.google.cloud.grpc.GcpManagedChannel; @@ -64,6 +66,9 @@ import com.google.cloud.grpc.GcpManagedChannelOptions; import com.google.cloud.grpc.GcpManagedChannelOptions.GcpMetricsOptions; import com.google.cloud.grpc.GrpcTransportOptions; +import com.google.cloud.grpc.fallback.GcpFallbackChannel; +import com.google.cloud.grpc.fallback.GcpFallbackChannelOptions; +import com.google.cloud.grpc.fallback.GcpFallbackOpenTelemetry; import com.google.cloud.spanner.AdminRequestsPerMinuteExceededException; import com.google.cloud.spanner.BackupId; import com.google.cloud.spanner.ErrorCode; @@ -185,9 +190,21 @@ import com.google.spanner.v1.SpannerGrpc; import com.google.spanner.v1.Transaction; import io.grpc.CallCredentials; +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ClientCall; +import io.grpc.ClientInterceptor; import io.grpc.Context; +import io.grpc.ForwardingChannelBuilder2; +import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.MethodDescriptor; +import io.grpc.NameResolver; +import io.grpc.StatusRuntimeException; +import io.grpc.auth.MoreCallCredentials; +import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder; +import io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoopGroup; +import io.grpc.netty.shaded.io.netty.channel.socket.nio.NioSocketChannel; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.net.URLDecoder; @@ -214,6 +231,7 @@ import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import java.util.stream.Stream; import javax.annotation.Nullable; @@ -341,65 +359,89 @@ public GapicSpannerRpc(final SpannerOptions options) { this.isDynamicChannelPoolEnabled = options.isDynamicChannelPoolEnabled(); this.baseGrpcCallContext = createBaseCallContext(); + boolean isEnableDirectAccess = options.isEnableDirectAccess(); + if (initializeStubs) { - // First check if SpannerOptions provides a TransportChannelProvider. Create one - // with information gathered from SpannerOptions if none is provided - InstantiatingGrpcChannelProvider.Builder defaultChannelProviderBuilder = - InstantiatingGrpcChannelProvider.newBuilder() - .setChannelConfigurator(options.getChannelConfigurator()) - .setEndpoint(options.getEndpoint()) - .setMaxInboundMessageSize(MAX_MESSAGE_SIZE) - .setMaxInboundMetadataSize(MAX_METADATA_SIZE) - .setPoolSize(options.getNumChannels()) - - // Set a keepalive time of 120 seconds to help long running - // commit GRPC calls succeed - .setKeepAliveTimeDuration(Duration.ofSeconds(GRPC_KEEPALIVE_SECONDS)) - - // Then check if SpannerOptions provides an InterceptorProvider. Create a default - // SpannerInterceptorProvider if none is provided - .setInterceptorProvider( - SpannerInterceptorProvider.create( - MoreObjects.firstNonNull( - options.getInterceptorProvider(), - SpannerInterceptorProvider.createDefault(options.getOpenTelemetry()))) - // This sets the trace context headers. - .withTraceContext(endToEndTracingEnabled, options.getOpenTelemetry()) - // This sets the response compressor (Server -> Client). - .withEncoding(compressorName)) - .setHeaderProvider(headerProviderWithUserAgent) - .setAllowNonDefaultServiceAccount(true); - boolean isEnableDirectAccess = options.isEnableDirectAccess(); - if (isEnableDirectAccess) { - defaultChannelProviderBuilder.setAttemptDirectPath(true); - if (isEnableDirectPathBoundToken()) { - // This will let the credentials try to fetch a hard-bound access token if the runtime - // environment supports it. - defaultChannelProviderBuilder.setAllowHardBoundTokenTypes( - Collections.singletonList(InstantiatingGrpcChannelProvider.HardBoundTokenTypes.ALTS)); + CredentialsProvider credentialsProvider = + GrpcTransportOptions.setUpCredentialsProvider(options); + + InstantiatingGrpcChannelProvider.Builder defaultChannelProviderBuilder = getDefaultChannelProviderBuilder(options, headerProviderWithUserAgent, isEnableDirectAccess); + + if (options.getChannelProvider() == null && isEnableDirectAccess && isEnableGcpFallbackEnv()) { + InstantiatingGrpcChannelProvider.Builder cloudPathProviderBuilder = getDefaultChannelProviderBuilder(options, headerProviderWithUserAgent, /*isEnableDirectAccess=*/false); + cloudPathProviderBuilder.setAttemptDirectPath(false); + + final AtomicReference cloudPathBuilderRef = new AtomicReference<>(); + cloudPathProviderBuilder.setChannelConfigurator(builder -> { + cloudPathBuilderRef.set(builder); + return builder; + }); + + try (TransportChannel ignored = cloudPathProviderBuilder.build().getTransportChannel()) { + } catch (Exception e) { + throw asSpannerException(e); } - defaultChannelProviderBuilder.setAttemptDirectPathXds(); - } - options.enablegRPCMetrics(defaultChannelProviderBuilder); + ManagedChannelBuilder cloudPathBuilder = cloudPathBuilderRef.get(); + if (cloudPathBuilder == null) { + throw new IllegalStateException("CloudPath builder was not captured."); + } - if (options.isUseVirtualThreads()) { - ExecutorService executor = - tryCreateVirtualThreadPerTaskExecutor("spanner-virtual-grpc-executor"); - if (executor != null) { - defaultChannelProviderBuilder.setExecutor(executor); + try { + Credentials credentials = credentialsProvider.getCredentials(); + if (credentials != null) { + cloudPathBuilder.intercept(new ClientInterceptor() { + @Override + public ClientCall interceptCall( + MethodDescriptor method, CallOptions callOptions, Channel next) { + return next.newCall(method, callOptions.withCallCredentials(MoreCallCredentials.from(credentials))); + } + }); + } + } catch (Exception e) { + throw asSpannerException(e); } + + defaultChannelProviderBuilder.setChannelConfigurator(directPathBuilder -> { + String jsonApiConfig = parseGrpcGcpApiConfig(); + GcpManagedChannelOptions gcpOptions = options.getGrpcGcpOptions(); + if (gcpOptions == null) { + gcpOptions = GcpManagedChannelOptions.newBuilder().build(); + } + + GcpManagedChannelBuilder primaryGcpBuilder = + GcpManagedChannelBuilder.forDelegateBuilder(directPathBuilder) + .withApiConfigJsonString(jsonApiConfig) + .withOptions(gcpOptions); + + GcpManagedChannelBuilder fallbackGcpBuilder = + GcpManagedChannelBuilder.forDelegateBuilder(cloudPathBuilder) + .withApiConfigJsonString(jsonApiConfig) + .withOptions(gcpOptions); + + GcpFallbackOpenTelemetry fallbackTelemetry = GcpFallbackOpenTelemetry.newBuilder() + .withSdk(options.getOpenTelemetry()) + .build(); + + return new FallbackChannelBuilder( + primaryGcpBuilder, + fallbackGcpBuilder, + GcpFallbackChannelOptions.newBuilder() + .setPrimaryChannelName("directpath") + .setFallbackChannelName("cloudpath") + .setMinFailedCalls(1) + .setGcpFallbackOpenTelemetry(fallbackTelemetry) + .build() + ); + }); } - // If it is enabled in options uses the channel pool provided by the gRPC-GCP extension. - maybeEnableGrpcGcpExtension(defaultChannelProviderBuilder, options); + // First check if SpannerOptions provides a TransportChannelProvider. Create one + // with information gathered from SpannerOptions if none is provided TransportChannelProvider channelProvider = MoreObjects.firstNonNull( options.getChannelProvider(), defaultChannelProviderBuilder.build()); - CredentialsProvider credentialsProvider = - GrpcTransportOptions.setUpCredentialsProvider(options); - spannerWatchdog = Executors.newSingleThreadScheduledExecutor( new ThreadFactoryBuilder() @@ -572,6 +614,57 @@ private static String parseGrpcGcpApiConfig() { } } + private InstantiatingGrpcChannelProvider.Builder getDefaultChannelProviderBuilder(final SpannerOptions options, final HeaderProvider headerProviderWithUserAgent, Boolean isEnableDirectAccess) { + InstantiatingGrpcChannelProvider.Builder defaultChannelProviderBuilder = + InstantiatingGrpcChannelProvider.newBuilder() + .setChannelConfigurator(options.getChannelConfigurator()) + .setEndpoint(options.getEndpoint()) + .setMaxInboundMessageSize(MAX_MESSAGE_SIZE) + .setMaxInboundMetadataSize(MAX_METADATA_SIZE) + .setPoolSize(options.getNumChannels()) + + // Set a keepalive time of 120 seconds to help long running + // commit GRPC calls succeed + .setKeepAliveTimeDuration(Duration.ofSeconds(GRPC_KEEPALIVE_SECONDS)) + + // Then check if SpannerOptions provides an InterceptorProvider. Create a default + // SpannerInterceptorProvider if none is provided + .setInterceptorProvider( + SpannerInterceptorProvider.create( + MoreObjects.firstNonNull( + options.getInterceptorProvider(), + SpannerInterceptorProvider.createDefault(options.getOpenTelemetry()))) + // This sets the trace context headers. + .withTraceContext(endToEndTracingEnabled, options.getOpenTelemetry()) + // This sets the response compressor (Server -> Client). + .withEncoding(compressorName)) + .setHeaderProvider(headerProviderWithUserAgent) + .setAllowNonDefaultServiceAccount(true); + if (isEnableDirectAccess) { + defaultChannelProviderBuilder.setAttemptDirectPath(true); + defaultChannelProviderBuilder.setAttemptDirectPathXds(); + if (isEnableDirectPathBoundToken()) { + // This will let the credentials try to fetch a hard-bound access token if the runtime + // environment supports it. + defaultChannelProviderBuilder.setAllowHardBoundTokenTypes( + Collections.singletonList(InstantiatingGrpcChannelProvider.HardBoundTokenTypes.ALTS)); + } + } + + options.enablegRPCMetrics(defaultChannelProviderBuilder); + + if (options.isUseVirtualThreads()) { + ExecutorService executor = + tryCreateVirtualThreadPerTaskExecutor("spanner-virtual-grpc-executor"); + if (executor != null) { + defaultChannelProviderBuilder.setExecutor(executor); + } + } + // If it is enabled in options uses the channel pool provided by the gRPC-GCP extension. + maybeEnableGrpcGcpExtension(defaultChannelProviderBuilder, options); + return defaultChannelProviderBuilder; + } + // Enhance gRPC-GCP options with metrics and dynamic channel pool configuration. private static GcpManagedChannelOptions grpcGcpOptionsWithMetricsAndDcp(SpannerOptions options) { GcpManagedChannelOptions grpcGcpOptions = @@ -715,6 +808,10 @@ public static boolean isEnableDirectPathBoundToken() { return !Boolean.parseBoolean(System.getenv("GOOGLE_SPANNER_DISABLE_DIRECT_ACCESS_BOUND_TOKEN")); } + public static boolean isEnableGcpFallbackEnv() { + return Boolean.parseBoolean(System.getenv("GOOGLE_SPANNER_ENABLE_GCP_FALLBACK")); + } + private static final RetrySettings ADMIN_REQUESTS_LIMIT_EXCEEDED_RETRY_SETTINGS = RetrySettings.newBuilder() .setInitialRetryDelayDuration(Duration.ofSeconds(5L)) @@ -2313,4 +2410,39 @@ private static Duration systemProperty(String name, int defaultValue) { String stringValue = System.getProperty(name, ""); return Duration.ofSeconds(stringValue.isEmpty() ? defaultValue : Integer.parseInt(stringValue)); } + + // Wrapper class to build the GcpFallbackChannel using GAX's configuration + private static class FallbackChannelBuilder extends ForwardingChannelBuilder2 { + private final GcpFallbackChannelOptions options; + + private final GcpManagedChannelBuilder primaryGcpBuilder; + private final GcpManagedChannelBuilder fallbackGcpBuilder; + + private FallbackChannelBuilder( + GcpManagedChannelBuilder primary, + GcpManagedChannelBuilder fallback, + GcpFallbackChannelOptions options) { + this.primaryGcpBuilder = primary; + this.fallbackGcpBuilder = fallback; + this.options = options; + } + + /** + * Delegates all configuration calls (e.g., interceptors, userAgent) to the primary builder. + * This ensures the primary channel receives all of GAX's standard configuration. + */ + @Override + protected ManagedChannelBuilder delegate() { + return primaryGcpBuilder; + } + + /** + * Overrides the build method to return our custom GcpFallbackChannel + * instead of a standard gRPC channel. + */ + @Override + public ManagedChannel build() { + return new GcpFallbackChannel(options, primaryGcpBuilder, fallbackGcpBuilder); + } + } } From d317412ff29932d48108dd3d295a88d89ce23142 Mon Sep 17 00:00:00 2001 From: kinsaurralde Date: Mon, 29 Dec 2025 23:18:27 +0000 Subject: [PATCH 2/6] formatting --- .../cloud/spanner/spi/v1/GapicSpannerRpc.java | 128 +++++++++--------- 1 file changed, 67 insertions(+), 61 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java index 80ade813bf..947982c6a6 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java @@ -190,21 +190,12 @@ import com.google.spanner.v1.SpannerGrpc; import com.google.spanner.v1.Transaction; import io.grpc.CallCredentials; -import io.grpc.CallOptions; -import io.grpc.Channel; -import io.grpc.ClientCall; -import io.grpc.ClientInterceptor; import io.grpc.Context; import io.grpc.ForwardingChannelBuilder2; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.MethodDescriptor; -import io.grpc.NameResolver; -import io.grpc.StatusRuntimeException; import io.grpc.auth.MoreCallCredentials; -import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder; -import io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoopGroup; -import io.grpc.netty.shaded.io.netty.channel.socket.nio.NioSocketChannel; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.net.URLDecoder; @@ -363,20 +354,29 @@ public GapicSpannerRpc(final SpannerOptions options) { if (initializeStubs) { CredentialsProvider credentialsProvider = - GrpcTransportOptions.setUpCredentialsProvider(options); - - InstantiatingGrpcChannelProvider.Builder defaultChannelProviderBuilder = getDefaultChannelProviderBuilder(options, headerProviderWithUserAgent, isEnableDirectAccess); - - if (options.getChannelProvider() == null && isEnableDirectAccess && isEnableGcpFallbackEnv()) { - InstantiatingGrpcChannelProvider.Builder cloudPathProviderBuilder = getDefaultChannelProviderBuilder(options, headerProviderWithUserAgent, /*isEnableDirectAccess=*/false); + GrpcTransportOptions.setUpCredentialsProvider(options); + + InstantiatingGrpcChannelProvider.Builder defaultChannelProviderBuilder = + getDefaultChannelProviderBuilder( + options, headerProviderWithUserAgent, isEnableDirectAccess); + + if (options.getChannelProvider() == null + && isEnableDirectAccess + && isEnableGcpFallbackEnv()) { + InstantiatingGrpcChannelProvider.Builder cloudPathProviderBuilder = + getDefaultChannelProviderBuilder( + options, headerProviderWithUserAgent, /* isEnableDirectAccess= */ false); cloudPathProviderBuilder.setAttemptDirectPath(false); final AtomicReference cloudPathBuilderRef = new AtomicReference<>(); - cloudPathProviderBuilder.setChannelConfigurator(builder -> { - cloudPathBuilderRef.set(builder); - return builder; - }); + cloudPathProviderBuilder.setChannelConfigurator( + builder -> { + cloudPathBuilderRef.set(builder); + return builder; + }); + // Build the cloudPathProvider to extract the builder which will be provided to + // FallbackChannelBuilder. try (TransportChannel ignored = cloudPathProviderBuilder.build().getTransportChannel()) { } catch (Exception e) { throw asSpannerException(e); @@ -390,50 +390,52 @@ public GapicSpannerRpc(final SpannerOptions options) { try { Credentials credentials = credentialsProvider.getCredentials(); if (credentials != null) { - cloudPathBuilder.intercept(new ClientInterceptor() { - @Override - public ClientCall interceptCall( - MethodDescriptor method, CallOptions callOptions, Channel next) { - return next.newCall(method, callOptions.withCallCredentials(MoreCallCredentials.from(credentials))); - } - }); + cloudPathBuilder.intercept( + new ClientInterceptor() { + @Override + public ClientCall interceptCall( + MethodDescriptor method, CallOptions callOptions, Channel next) { + return next.newCall( + method, + callOptions.withCallCredentials(MoreCallCredentials.from(credentials))); + } + }); } } catch (Exception e) { throw asSpannerException(e); } - defaultChannelProviderBuilder.setChannelConfigurator(directPathBuilder -> { - String jsonApiConfig = parseGrpcGcpApiConfig(); - GcpManagedChannelOptions gcpOptions = options.getGrpcGcpOptions(); - if (gcpOptions == null) { - gcpOptions = GcpManagedChannelOptions.newBuilder().build(); - } + defaultChannelProviderBuilder.setChannelConfigurator( + directPathBuilder -> { + String jsonApiConfig = parseGrpcGcpApiConfig(); + GcpManagedChannelOptions gcpOptions = options.getGrpcGcpOptions(); + if (gcpOptions == null) { + gcpOptions = GcpManagedChannelOptions.newBuilder().build(); + } - GcpManagedChannelBuilder primaryGcpBuilder = - GcpManagedChannelBuilder.forDelegateBuilder(directPathBuilder) - .withApiConfigJsonString(jsonApiConfig) - .withOptions(gcpOptions); - - GcpManagedChannelBuilder fallbackGcpBuilder = - GcpManagedChannelBuilder.forDelegateBuilder(cloudPathBuilder) - .withApiConfigJsonString(jsonApiConfig) - .withOptions(gcpOptions); - - GcpFallbackOpenTelemetry fallbackTelemetry = GcpFallbackOpenTelemetry.newBuilder() - .withSdk(options.getOpenTelemetry()) - .build(); - - return new FallbackChannelBuilder( - primaryGcpBuilder, - fallbackGcpBuilder, - GcpFallbackChannelOptions.newBuilder() - .setPrimaryChannelName("directpath") - .setFallbackChannelName("cloudpath") - .setMinFailedCalls(1) - .setGcpFallbackOpenTelemetry(fallbackTelemetry) - .build() - ); - }); + GcpManagedChannelBuilder primaryGcpBuilder = + GcpManagedChannelBuilder.forDelegateBuilder(directPathBuilder) + .withApiConfigJsonString(jsonApiConfig) + .withOptions(gcpOptions); + + GcpManagedChannelBuilder fallbackGcpBuilder = + GcpManagedChannelBuilder.forDelegateBuilder(cloudPathBuilder) + .withApiConfigJsonString(jsonApiConfig) + .withOptions(gcpOptions); + + GcpFallbackOpenTelemetry fallbackTelemetry = + GcpFallbackOpenTelemetry.newBuilder().withSdk(options.getOpenTelemetry()).build(); + + return new FallbackChannelBuilder( + primaryGcpBuilder, + fallbackGcpBuilder, + GcpFallbackChannelOptions.newBuilder() + .setPrimaryChannelName("directpath") + .setFallbackChannelName("cloudpath") + .setMinFailedCalls(1) + .setGcpFallbackOpenTelemetry(fallbackTelemetry) + .build()); + }); } // First check if SpannerOptions provides a TransportChannelProvider. Create one @@ -614,7 +616,10 @@ private static String parseGrpcGcpApiConfig() { } } - private InstantiatingGrpcChannelProvider.Builder getDefaultChannelProviderBuilder(final SpannerOptions options, final HeaderProvider headerProviderWithUserAgent, Boolean isEnableDirectAccess) { + private InstantiatingGrpcChannelProvider.Builder getDefaultChannelProviderBuilder( + final SpannerOptions options, + final HeaderProvider headerProviderWithUserAgent, + boolean isEnableDirectAccess) { InstantiatingGrpcChannelProvider.Builder defaultChannelProviderBuilder = InstantiatingGrpcChannelProvider.newBuilder() .setChannelConfigurator(options.getChannelConfigurator()) @@ -2412,7 +2417,8 @@ private static Duration systemProperty(String name, int defaultValue) { } // Wrapper class to build the GcpFallbackChannel using GAX's configuration - private static class FallbackChannelBuilder extends ForwardingChannelBuilder2 { + private static class FallbackChannelBuilder + extends ForwardingChannelBuilder2 { private final GcpFallbackChannelOptions options; private final GcpManagedChannelBuilder primaryGcpBuilder; @@ -2437,8 +2443,8 @@ protected ManagedChannelBuilder delegate() { } /** - * Overrides the build method to return our custom GcpFallbackChannel - * instead of a standard gRPC channel. + * Overrides the build method to return our custom GcpFallbackChannel instead of a standard gRPC + * channel. */ @Override public ManagedChannel build() { From 02a1bb1a2f1806fad3119d394cc9a7cce293cbf0 Mon Sep 17 00:00:00 2001 From: kinsaurralde Date: Mon, 5 Jan 2026 18:05:55 +0000 Subject: [PATCH 3/6] fix imports --- .../java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java index 947982c6a6..90f7b58c7c 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java @@ -190,6 +190,10 @@ import com.google.spanner.v1.SpannerGrpc; import com.google.spanner.v1.Transaction; import io.grpc.CallCredentials; +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ClientCall; +import io.grpc.ClientInterceptor; import io.grpc.Context; import io.grpc.ForwardingChannelBuilder2; import io.grpc.ManagedChannel; From 9eed709ac3f0a2e99aad155ff5746d4bc7731c2e Mon Sep 17 00:00:00 2001 From: kinsaurralde Date: Tue, 6 Jan 2026 21:57:23 +0000 Subject: [PATCH 4/6] Add fallback test to GapicSpannerRpcTest --- .../cloud/spanner/spi/v1/GapicSpannerRpc.java | 23 +++-- .../spanner/spi/v1/GapicSpannerRpcTest.java | 86 +++++++++++++++++++ 2 files changed, 103 insertions(+), 6 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java index 90f7b58c7c..a9e3554741 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java @@ -433,12 +433,7 @@ public ClientCall interceptCall( return new FallbackChannelBuilder( primaryGcpBuilder, fallbackGcpBuilder, - GcpFallbackChannelOptions.newBuilder() - .setPrimaryChannelName("directpath") - .setFallbackChannelName("cloudpath") - .setMinFailedCalls(1) - .setGcpFallbackOpenTelemetry(fallbackTelemetry) - .build()); + createFallbackChannelOptions(fallbackTelemetry)); }); } @@ -611,6 +606,17 @@ public UnaryCallable createUnaryCalla } } + @VisibleForTesting + GcpFallbackChannelOptions createFallbackChannelOptions( + GcpFallbackOpenTelemetry fallbackTelemetry) { + return GcpFallbackChannelOptions.newBuilder() + .setPrimaryChannelName("directpath") + .setFallbackChannelName("cloudpath") + .setMinFailedCalls(1) + .setGcpFallbackOpenTelemetry(fallbackTelemetry) + .build(); + } + private static String parseGrpcGcpApiConfig() { try { return Resources.toString( @@ -817,7 +823,12 @@ public static boolean isEnableDirectPathBoundToken() { return !Boolean.parseBoolean(System.getenv("GOOGLE_SPANNER_DISABLE_DIRECT_ACCESS_BOUND_TOKEN")); } + @VisibleForTesting static Boolean enableGcpFallbackEnv = null; + public static boolean isEnableGcpFallbackEnv() { + if (enableGcpFallbackEnv != null) { + return enableGcpFallbackEnv; + } return Boolean.parseBoolean(System.getenv("GOOGLE_SPANNER_ENABLE_GCP_FALLBACK")); } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpcTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpcTest.java index ee90f99d26..3e5b95e847 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpcTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpcTest.java @@ -34,7 +34,10 @@ import com.google.api.gax.rpc.HeaderProvider; import com.google.auth.oauth2.AccessToken; import com.google.auth.oauth2.OAuth2Credentials; +import com.google.cloud.NoCredentials; import com.google.cloud.ServiceOptions; +import com.google.cloud.grpc.fallback.GcpFallbackChannelOptions; +import com.google.cloud.grpc.fallback.GcpFallbackOpenTelemetry; import com.google.cloud.spanner.DatabaseClient; import com.google.cloud.spanner.DatabaseId; import com.google.cloud.spanner.Dialect; @@ -82,10 +85,15 @@ import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator; import io.opentelemetry.context.propagation.ContextPropagators; import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.metrics.SdkMeterProvider; +import io.opentelemetry.sdk.metrics.data.LongPointData; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader; import io.opentelemetry.sdk.trace.SdkTracerProvider; import io.opentelemetry.sdk.trace.samplers.Sampler; import java.net.InetSocketAddress; import java.time.Duration; +import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.Objects; @@ -889,4 +897,82 @@ private SpannerOptions createSpannerOptions() { .setCallCredentialsProvider(() -> MoreCallCredentials.from(VARIABLE_CREDENTIALS)) .build(); } + + static class TestableGapicSpannerRpc extends GapicSpannerRpc { + public TestableGapicSpannerRpc(SpannerOptions options) { + super(options); + } + + @Override + GcpFallbackChannelOptions createFallbackChannelOptions( + GcpFallbackOpenTelemetry fallbackTelemetry) { + // Override default 1-minute period to 10ms for instant testing + return GcpFallbackChannelOptions.newBuilder() + .setPrimaryChannelName("directpath") + .setFallbackChannelName("cloudpath") + .setMinFailedCalls(1) + .setPeriod(Duration.ofMillis(10)) + .setGcpFallbackOpenTelemetry(fallbackTelemetry) + .build(); + } + } + + @Test + public void testFallbackIntegration_switchesToFallbackOnFailure() throws Exception { + GapicSpannerRpc.enableGcpFallbackEnv = true; + + // Setup OpenTelemetry to capture metrics + InMemoryMetricReader metricReader = InMemoryMetricReader.create(); + SdkMeterProvider meterProvider = + SdkMeterProvider.builder().registerMetricReader(metricReader).build(); + OpenTelemetrySdk openTelemetry = + OpenTelemetrySdk.builder().setMeterProvider(meterProvider).build(); + + // Setup Options with invalid host to force error + SpannerOptions options = + SpannerOptions.newBuilder() + .setProjectId("test-project") + .setEnableDirectAccess(true) + .setHost("http://localhost:1") // Closed port + .setCredentials(NoCredentials.getInstance()) + .setOpenTelemetry(openTelemetry) + .build(); + + TestableGapicSpannerRpc rpc = new TestableGapicSpannerRpc(options); + + try { + // Make a call that is expected to fail + try { + rpc.executeBatchDml( + com.google.spanner.v1.ExecuteBatchDmlRequest.newBuilder() + .setSession("projects/p/instances/i/databases/d/sessions/s") + .build(), + null); + } catch (Exception expected) { + // Expect a connection error + } + + // Wait briefly for the 10ms period to trigger the fallback check + Thread.sleep(100); + + // Verify Fallback via Metrics + Collection metrics = metricReader.collectAllMetrics(); + boolean fallbackOccurred = + metrics.stream().anyMatch(md -> md.getName().contains("fallback_count") && hasValue(md)); + + assertTrue( + "Fallback metric should be present, indicating GcpFallbackChannel is active", + fallbackOccurred); + + } finally { + rpc.shutdown(); + } + } + + private boolean hasValue(MetricData metricData) { + for (LongPointData point : metricData.getLongSumData().getPoints()) { + if (point.getValue() > 0) return true; + } + return false; + } } From bd64b27363c65af8cf34ee65c094aaa17ecee282 Mon Sep 17 00:00:00 2001 From: kinsaurralde Date: Thu, 8 Jan 2026 21:40:17 +0000 Subject: [PATCH 5/6] respond to comments --- .../cloud/spanner/spi/v1/GapicSpannerRpc.java | 9 +-- .../spanner/spi/v1/GapicSpannerRpcTest.java | 71 ++++++++++++++++++- 2 files changed, 73 insertions(+), 7 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java index a9e3554741..a34cd94e93 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java @@ -361,16 +361,14 @@ public GapicSpannerRpc(final SpannerOptions options) { GrpcTransportOptions.setUpCredentialsProvider(options); InstantiatingGrpcChannelProvider.Builder defaultChannelProviderBuilder = - getDefaultChannelProviderBuilder( - options, headerProviderWithUserAgent, isEnableDirectAccess); + createChannelProviderBuilder(options, headerProviderWithUserAgent, isEnableDirectAccess); if (options.getChannelProvider() == null && isEnableDirectAccess && isEnableGcpFallbackEnv()) { InstantiatingGrpcChannelProvider.Builder cloudPathProviderBuilder = - getDefaultChannelProviderBuilder( + createChannelProviderBuilder( options, headerProviderWithUserAgent, /* isEnableDirectAccess= */ false); - cloudPathProviderBuilder.setAttemptDirectPath(false); final AtomicReference cloudPathBuilderRef = new AtomicReference<>(); cloudPathProviderBuilder.setChannelConfigurator( @@ -612,7 +610,6 @@ GcpFallbackChannelOptions createFallbackChannelOptions( return GcpFallbackChannelOptions.newBuilder() .setPrimaryChannelName("directpath") .setFallbackChannelName("cloudpath") - .setMinFailedCalls(1) .setGcpFallbackOpenTelemetry(fallbackTelemetry) .build(); } @@ -626,7 +623,7 @@ private static String parseGrpcGcpApiConfig() { } } - private InstantiatingGrpcChannelProvider.Builder getDefaultChannelProviderBuilder( + private InstantiatingGrpcChannelProvider.Builder createChannelProviderBuilder( final SpannerOptions options, final HeaderProvider headerProviderWithUserAgent, boolean isEnableDirectAccess) { diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpcTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpcTest.java index 3e5b95e847..56396695a8 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpcTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpcTest.java @@ -903,6 +903,74 @@ public TestableGapicSpannerRpc(SpannerOptions options) { super(options); } + @Override + GcpFallbackChannelOptions createFallbackChannelOptions( + GcpFallbackOpenTelemetry fallbackTelemetry) { + // Override default 1-minute period to 10ms for instant testing + return GcpFallbackChannelOptions.newBuilder() + .setPrimaryChannelName("directpath") + .setFallbackChannelName("cloudpath") + .setPeriod(Duration.ofMillis(10)) + .setGcpFallbackOpenTelemetry(fallbackTelemetry) + .build(); + } + } + + @Test + public void testFallbackIntegration_doesNotSwitchWhenThresholdNotMet() throws Exception { + GapicSpannerRpc.enableGcpFallbackEnv = true; + + // Setup OpenTelemetry to capture metrics + InMemoryMetricReader metricReader = InMemoryMetricReader.create(); + SdkMeterProvider meterProvider = + SdkMeterProvider.builder().registerMetricReader(metricReader).build(); + OpenTelemetrySdk openTelemetry = + OpenTelemetrySdk.builder().setMeterProvider(meterProvider).build(); + + // Setup Options with invalid host to force error + SpannerOptions options = + SpannerOptions.newBuilder() + .setProjectId("test-project") + .setEnableDirectAccess(true) + .setHost("http://localhost:1") // Closed port + .setCredentials(NoCredentials.getInstance()) + .setOpenTelemetry(openTelemetry) + .build(); + + TestableGapicSpannerRpc rpc = new TestableGapicSpannerRpc(options); + + try { + // Make a call that is expected to fail + try { + rpc.executeBatchDml( + com.google.spanner.v1.ExecuteBatchDmlRequest.newBuilder() + .setSession("projects/p/instances/i/databases/d/sessions/s") + .build(), + null); + } catch (Exception expected) { + // Expect a connection error + } + + // Wait briefly for the 10ms period to trigger the fallback check + Thread.sleep(100); + + // Verify Fallback via Metrics + Collection metrics = metricReader.collectAllMetrics(); + boolean fallbackOccurred = + metrics.stream().anyMatch(md -> md.getName().contains("fallback_count") && hasValue(md)); + + assertFalse("Fallback metric should not be present", fallbackOccurred); + + } finally { + rpc.shutdown(); + } + } + + static class TestableGapicSpannerRpcWithLowerMinFailedCalls extends GapicSpannerRpc { + public TestableGapicSpannerRpcWithLowerMinFailedCalls(SpannerOptions options) { + super(options); + } + @Override GcpFallbackChannelOptions createFallbackChannelOptions( GcpFallbackOpenTelemetry fallbackTelemetry) { @@ -938,7 +1006,8 @@ public void testFallbackIntegration_switchesToFallbackOnFailure() throws Excepti .setOpenTelemetry(openTelemetry) .build(); - TestableGapicSpannerRpc rpc = new TestableGapicSpannerRpc(options); + TestableGapicSpannerRpcWithLowerMinFailedCalls rpc = + new TestableGapicSpannerRpcWithLowerMinFailedCalls(options); try { // Make a call that is expected to fail From ca5b5b6044b264880f0e3766b78b2aa18c822981 Mon Sep 17 00:00:00 2001 From: kinsaurralde Date: Fri, 9 Jan 2026 00:06:07 +0000 Subject: [PATCH 6/6] fixes --- .../google/cloud/spanner/spi/v1/GapicSpannerRpc.java | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java index a34cd94e93..cbc8129979 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java @@ -373,6 +373,9 @@ && isEnableGcpFallbackEnv()) { final AtomicReference cloudPathBuilderRef = new AtomicReference<>(); cloudPathProviderBuilder.setChannelConfigurator( builder -> { + if (options.getChannelConfigurator() != null) { + builder = options.getChannelConfigurator().apply(builder); + } cloudPathBuilderRef.set(builder); return builder; }); @@ -409,8 +412,12 @@ public ClientCall interceptCall( defaultChannelProviderBuilder.setChannelConfigurator( directPathBuilder -> { + if (options.getChannelConfigurator() != null) { + directPathBuilder = options.getChannelConfigurator().apply(directPathBuilder); + } + String jsonApiConfig = parseGrpcGcpApiConfig(); - GcpManagedChannelOptions gcpOptions = options.getGrpcGcpOptions(); + GcpManagedChannelOptions gcpOptions = grpcGcpOptionsWithMetricsAndDcp(options); if (gcpOptions == null) { gcpOptions = GcpManagedChannelOptions.newBuilder().build(); } @@ -610,6 +617,7 @@ GcpFallbackChannelOptions createFallbackChannelOptions( return GcpFallbackChannelOptions.newBuilder() .setPrimaryChannelName("directpath") .setFallbackChannelName("cloudpath") + .setMinFailedCalls(1) .setGcpFallbackOpenTelemetry(fallbackTelemetry) .build(); }