From 6f9763f3a936cd7b042fa125a35780cd1e9f7451 Mon Sep 17 00:00:00 2001 From: Mattie Fu Date: Tue, 13 Jan 2026 21:35:44 -0500 Subject: [PATCH] fix: update BigtableChannelPool to use the background executor --- .../data/v2/stub/BigtableClientContext.java | 9 +-- .../v2/stub/BigtableExecutorProvider.java | 47 ------------- .../gaxx/grpc/BigtableChannelPool.java | 66 +++++++++++-------- .../BigtableTransportChannelProvider.java | 41 +++++++++--- 4 files changed, 76 insertions(+), 87 deletions(-) delete mode 100644 google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableExecutorProvider.java diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableClientContext.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableClientContext.java index f366190eb6..0d27f60d4e 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableClientContext.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableClientContext.java @@ -21,6 +21,7 @@ import com.google.api.gax.core.CredentialsProvider; import com.google.api.gax.core.ExecutorProvider; import com.google.api.gax.core.FixedCredentialsProvider; +import com.google.api.gax.core.FixedExecutorProvider; import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider; import com.google.api.gax.rpc.ClientContext; import com.google.auth.Credentials; @@ -83,9 +84,8 @@ public static BigtableClientContext create(EnhancedBigtableStubSettings settings boolean shouldAutoClose = settings.getBackgroundExecutorProvider().shouldAutoClose(); ScheduledExecutorService backgroundExecutor = settings.getBackgroundExecutorProvider().getExecutor(); - // TODO: after gax change is merged, migrate to use gax's FixedExecutorProvider - BigtableExecutorProvider executorProvider = - BigtableExecutorProvider.create(backgroundExecutor, shouldAutoClose); + FixedExecutorProvider executorProvider = + FixedExecutorProvider.create(backgroundExecutor, shouldAutoClose); builder.setBackgroundExecutorProvider(executorProvider); // Set up OpenTelemetry @@ -153,7 +153,8 @@ public static BigtableClientContext create(EnhancedBigtableStubSettings settings BigtableTransportChannelProvider.create( (InstantiatingGrpcChannelProvider) transportProvider.build(), channelPrimer, - channelPoolMetricsTracer); + channelPoolMetricsTracer, + backgroundExecutor); builder.setTransportChannelProvider(btTransportProvider); } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableExecutorProvider.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableExecutorProvider.java deleted file mode 100644 index 6b38b92909..0000000000 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableExecutorProvider.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Copyright 2025 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.bigtable.data.v2.stub; - -import com.google.api.gax.core.ExecutorProvider; -import java.util.concurrent.ScheduledExecutorService; - -// TODO: migrate to gax's FixedExecutorProvider once the change is merged -class BigtableExecutorProvider implements ExecutorProvider { - - private final ScheduledExecutorService executorService; - private final boolean shouldAutoClose; - - @Override - public boolean shouldAutoClose() { - return shouldAutoClose; - } - - @Override - public ScheduledExecutorService getExecutor() { - return executorService; - } - - static BigtableExecutorProvider create( - ScheduledExecutorService executor, boolean shouldAutoClose) { - return new BigtableExecutorProvider(executor, shouldAutoClose); - } - - private BigtableExecutorProvider( - ScheduledExecutorService executorService, boolean shouldAutoClose) { - this.shouldAutoClose = shouldAutoClose; - this.executorService = executorService; - } -} diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/BigtableChannelPool.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/BigtableChannelPool.java index 5f1f1677ef..f5f1928c2a 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/BigtableChannelPool.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/BigtableChannelPool.java @@ -37,8 +37,8 @@ import java.util.Random; import java.util.concurrent.CancellationException; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -75,14 +75,16 @@ public class BigtableChannelPool extends ManagedChannel implements BigtableChann private final String authority; private final Random rng = new Random(); private final Supplier picker; + private ScheduledFuture resizeFuture = null; + private ScheduledFuture refreshFuture = null; public static BigtableChannelPool create( BigtableChannelPoolSettings settings, ChannelFactory channelFactory, - ChannelPrimer channelPrimer) + ChannelPrimer channelPrimer, + ScheduledExecutorService backgroundExecutor) throws IOException { - return new BigtableChannelPool( - settings, channelFactory, channelPrimer, Executors.newSingleThreadScheduledExecutor()); + return new BigtableChannelPool(settings, channelFactory, channelPrimer, backgroundExecutor); } /** @@ -137,18 +139,20 @@ public static BigtableChannelPool create( this.executor = executor; if (!settings.isStaticSize()) { - executor.scheduleAtFixedRate( - this::resizeSafely, - BigtableChannelPoolSettings.RESIZE_INTERVAL.getSeconds(), - BigtableChannelPoolSettings.RESIZE_INTERVAL.getSeconds(), - TimeUnit.SECONDS); + this.resizeFuture = + executor.scheduleAtFixedRate( + this::resizeSafely, + BigtableChannelPoolSettings.RESIZE_INTERVAL.getSeconds(), + BigtableChannelPoolSettings.RESIZE_INTERVAL.getSeconds(), + TimeUnit.SECONDS); } if (settings.isPreemptiveRefreshEnabled()) { - executor.scheduleAtFixedRate( - this::refreshSafely, - REFRESH_PERIOD.getSeconds(), - REFRESH_PERIOD.getSeconds(), - TimeUnit.SECONDS); + this.refreshFuture = + executor.scheduleAtFixedRate( + this::refreshSafely, + REFRESH_PERIOD.getSeconds(), + REFRESH_PERIOD.getSeconds(), + TimeUnit.SECONDS); } } @@ -234,14 +238,21 @@ Channel getChannel(int index) { public ManagedChannel shutdown() { LOG.fine("Initiating graceful shutdown due to explicit request"); + // Resize and refresh tasks can block on channel priming. We don't need + // to wait for the channels to be ready since we're shutting down the + // pool. Allowing interrupt to speed it up. + // Background executor lifecycle is managed by BigtableClientContext. + // Do not shut it down here. + if (resizeFuture != null) { + resizeFuture.cancel(true); + } + if (refreshFuture != null) { + refreshFuture.cancel(true); + } List localEntries = entries.get(); for (Entry entry : localEntries) { entry.channel.shutdown(); } - if (executor != null) { - // shutdownNow will cancel scheduled tasks - executor.shutdownNow(); - } return this; } @@ -254,7 +265,7 @@ public boolean isShutdown() { return false; } } - return executor == null || executor.isShutdown(); + return true; } /** {@inheritDoc} */ @@ -267,7 +278,7 @@ public boolean isTerminated() { } } - return executor == null || executor.isTerminated(); + return true; } /** {@inheritDoc} */ @@ -275,13 +286,18 @@ public boolean isTerminated() { public ManagedChannel shutdownNow() { LOG.fine("Initiating immediate shutdown due to explicit request"); + if (resizeFuture != null) { + resizeFuture.cancel(true); + } + if (refreshFuture != null) { + refreshFuture.cancel(true); + } + List localEntries = entries.get(); for (Entry entry : localEntries) { entry.channel.shutdownNow(); } - if (executor != null) { - executor.shutdownNow(); - } + return this; } @@ -297,10 +313,6 @@ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedE } entry.channel.awaitTermination(awaitTimeNanos, TimeUnit.NANOSECONDS); } - if (executor != null) { - long awaitTimeNanos = endTimeNanos - System.nanoTime(); - executor.awaitTermination(awaitTimeNanos, TimeUnit.NANOSECONDS); - } return isTerminated(); } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/BigtableTransportChannelProvider.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/BigtableTransportChannelProvider.java index 13340c4086..a38e8ad602 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/BigtableTransportChannelProvider.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/BigtableTransportChannelProvider.java @@ -42,14 +42,17 @@ public final class BigtableTransportChannelProvider implements TransportChannelP private final InstantiatingGrpcChannelProvider delegate; private final ChannelPrimer channelPrimer; @Nullable private final ChannelPoolMetricsTracer channelPoolMetricsTracer; + @Nullable private final ScheduledExecutorService backgroundExecutor; private BigtableTransportChannelProvider( InstantiatingGrpcChannelProvider instantiatingGrpcChannelProvider, ChannelPrimer channelPrimer, - ChannelPoolMetricsTracer channelPoolMetricsTracer) { + ChannelPoolMetricsTracer channelPoolMetricsTracer, + ScheduledExecutorService backgroundExecutor) { delegate = Preconditions.checkNotNull(instantiatingGrpcChannelProvider); this.channelPrimer = channelPrimer; this.channelPoolMetricsTracer = channelPoolMetricsTracer; + this.backgroundExecutor = backgroundExecutor; } @Override @@ -67,12 +70,27 @@ public BigtableTransportChannelProvider withExecutor(ScheduledExecutorService ex return withExecutor((Executor) executor); } + // This executor if set is for handling rpc callbacks so we can't use it as the background + // executor @Override public BigtableTransportChannelProvider withExecutor(Executor executor) { InstantiatingGrpcChannelProvider newChannelProvider = (InstantiatingGrpcChannelProvider) delegate.withExecutor(executor); return new BigtableTransportChannelProvider( - newChannelProvider, channelPrimer, channelPoolMetricsTracer); + newChannelProvider, channelPrimer, channelPoolMetricsTracer, backgroundExecutor); + } + + @Override + public boolean needsBackgroundExecutor() { + return delegate.needsBackgroundExecutor(); + } + + @Override + public TransportChannelProvider withBackgroundExecutor(ScheduledExecutorService executor) { + InstantiatingGrpcChannelProvider newChannelProvider = + (InstantiatingGrpcChannelProvider) delegate.withBackgroundExecutor(executor); + return new BigtableTransportChannelProvider( + newChannelProvider, channelPrimer, channelPoolMetricsTracer, executor); } @Override @@ -85,7 +103,7 @@ public BigtableTransportChannelProvider withHeaders(Map headers) InstantiatingGrpcChannelProvider newChannelProvider = (InstantiatingGrpcChannelProvider) delegate.withHeaders(headers); return new BigtableTransportChannelProvider( - newChannelProvider, channelPrimer, channelPoolMetricsTracer); + newChannelProvider, channelPrimer, channelPoolMetricsTracer, backgroundExecutor); } @Override @@ -98,7 +116,7 @@ public TransportChannelProvider withEndpoint(String endpoint) { InstantiatingGrpcChannelProvider newChannelProvider = (InstantiatingGrpcChannelProvider) delegate.withEndpoint(endpoint); return new BigtableTransportChannelProvider( - newChannelProvider, channelPrimer, channelPoolMetricsTracer); + newChannelProvider, channelPrimer, channelPoolMetricsTracer, backgroundExecutor); } @Deprecated @@ -113,7 +131,7 @@ public TransportChannelProvider withPoolSize(int size) { InstantiatingGrpcChannelProvider newChannelProvider = (InstantiatingGrpcChannelProvider) delegate.withPoolSize(size); return new BigtableTransportChannelProvider( - newChannelProvider, channelPrimer, channelPoolMetricsTracer); + newChannelProvider, channelPrimer, channelPoolMetricsTracer, backgroundExecutor); } /** Expected to only be called once when BigtableClientContext is created */ @@ -143,7 +161,8 @@ public TransportChannel getTransportChannel() throws IOException { BigtableChannelPoolSettings.copyFrom(delegate.getChannelPoolSettings()); BigtableChannelPool btChannelPool = - BigtableChannelPool.create(btPoolSettings, channelFactory, channelPrimer); + BigtableChannelPool.create( + btPoolSettings, channelFactory, channelPrimer, backgroundExecutor); if (channelPoolMetricsTracer != null) { channelPoolMetricsTracer.registerChannelInsightsProvider(btChannelPool::getChannelInfos); @@ -169,15 +188,19 @@ public TransportChannelProvider withCredentials(Credentials credentials) { InstantiatingGrpcChannelProvider newChannelProvider = (InstantiatingGrpcChannelProvider) delegate.withCredentials(credentials); return new BigtableTransportChannelProvider( - newChannelProvider, channelPrimer, channelPoolMetricsTracer); + newChannelProvider, channelPrimer, channelPoolMetricsTracer, backgroundExecutor); } /** Creates a BigtableTransportChannelProvider. */ public static BigtableTransportChannelProvider create( InstantiatingGrpcChannelProvider instantiatingGrpcChannelProvider, ChannelPrimer channelPrimer, - ChannelPoolMetricsTracer outstandingRpcsMetricTracke) { + ChannelPoolMetricsTracer outstandingRpcsMetricTracker, + ScheduledExecutorService backgroundExecutor) { return new BigtableTransportChannelProvider( - instantiatingGrpcChannelProvider, channelPrimer, outstandingRpcsMetricTracke); + instantiatingGrpcChannelProvider, + channelPrimer, + outstandingRpcsMetricTracker, + backgroundExecutor); } }