Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.api.gax.core.FixedExecutorProvider;
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
import com.google.api.gax.rpc.ClientContext;
import com.google.auth.Credentials;
Expand Down Expand Up @@ -83,9 +84,8 @@ public static BigtableClientContext create(EnhancedBigtableStubSettings settings
boolean shouldAutoClose = settings.getBackgroundExecutorProvider().shouldAutoClose();
ScheduledExecutorService backgroundExecutor =
settings.getBackgroundExecutorProvider().getExecutor();
// TODO: after gax change is merged, migrate to use gax's FixedExecutorProvider
BigtableExecutorProvider executorProvider =
BigtableExecutorProvider.create(backgroundExecutor, shouldAutoClose);
FixedExecutorProvider executorProvider =
FixedExecutorProvider.create(backgroundExecutor, shouldAutoClose);
builder.setBackgroundExecutorProvider(executorProvider);

// Set up OpenTelemetry
Expand Down Expand Up @@ -153,7 +153,8 @@ public static BigtableClientContext create(EnhancedBigtableStubSettings settings
BigtableTransportChannelProvider.create(
(InstantiatingGrpcChannelProvider) transportProvider.build(),
channelPrimer,
channelPoolMetricsTracer);
channelPoolMetricsTracer,
backgroundExecutor);

builder.setTransportChannelProvider(btTransportProvider);
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@
import java.util.Random;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -75,14 +75,16 @@ public class BigtableChannelPool extends ManagedChannel implements BigtableChann
private final String authority;
private final Random rng = new Random();
private final Supplier<Integer> picker;
private ScheduledFuture<?> resizeFuture = null;
private ScheduledFuture<?> refreshFuture = null;

public static BigtableChannelPool create(
BigtableChannelPoolSettings settings,
ChannelFactory channelFactory,
ChannelPrimer channelPrimer)
ChannelPrimer channelPrimer,
ScheduledExecutorService backgroundExecutor)
throws IOException {
return new BigtableChannelPool(
settings, channelFactory, channelPrimer, Executors.newSingleThreadScheduledExecutor());
return new BigtableChannelPool(settings, channelFactory, channelPrimer, backgroundExecutor);
}

/**
Expand Down Expand Up @@ -137,18 +139,20 @@ public static BigtableChannelPool create(
this.executor = executor;

if (!settings.isStaticSize()) {
executor.scheduleAtFixedRate(
this::resizeSafely,
BigtableChannelPoolSettings.RESIZE_INTERVAL.getSeconds(),
BigtableChannelPoolSettings.RESIZE_INTERVAL.getSeconds(),
TimeUnit.SECONDS);
this.resizeFuture =
executor.scheduleAtFixedRate(
this::resizeSafely,
BigtableChannelPoolSettings.RESIZE_INTERVAL.getSeconds(),
BigtableChannelPoolSettings.RESIZE_INTERVAL.getSeconds(),
TimeUnit.SECONDS);
}
if (settings.isPreemptiveRefreshEnabled()) {
executor.scheduleAtFixedRate(
this::refreshSafely,
REFRESH_PERIOD.getSeconds(),
REFRESH_PERIOD.getSeconds(),
TimeUnit.SECONDS);
this.refreshFuture =
executor.scheduleAtFixedRate(
this::refreshSafely,
REFRESH_PERIOD.getSeconds(),
REFRESH_PERIOD.getSeconds(),
TimeUnit.SECONDS);
}
}

Expand Down Expand Up @@ -234,14 +238,21 @@ Channel getChannel(int index) {
public ManagedChannel shutdown() {
LOG.fine("Initiating graceful shutdown due to explicit request");

// Resize and refresh tasks can block on channel priming. We don't need
// to wait for the channels to be ready since we're shutting down the
// pool. Allowing interrupt to speed it up.
// Background executor lifecycle is managed by BigtableClientContext.
// Do not shut it down here.
if (resizeFuture != null) {
resizeFuture.cancel(true);
}
if (refreshFuture != null) {
refreshFuture.cancel(true);
}
List<Entry> localEntries = entries.get();
for (Entry entry : localEntries) {
entry.channel.shutdown();
}
if (executor != null) {
// shutdownNow will cancel scheduled tasks
executor.shutdownNow();
}
return this;
}

Expand All @@ -254,7 +265,7 @@ public boolean isShutdown() {
return false;
}
}
return executor == null || executor.isShutdown();
return true;
}

/** {@inheritDoc} */
Expand All @@ -267,21 +278,26 @@ public boolean isTerminated() {
}
}

return executor == null || executor.isTerminated();
return true;
}

/** {@inheritDoc} */
@Override
public ManagedChannel shutdownNow() {
LOG.fine("Initiating immediate shutdown due to explicit request");

if (resizeFuture != null) {
resizeFuture.cancel(true);
}
if (refreshFuture != null) {
refreshFuture.cancel(true);
}

List<Entry> localEntries = entries.get();
for (Entry entry : localEntries) {
entry.channel.shutdownNow();
}
if (executor != null) {
executor.shutdownNow();
}

return this;
}

Expand All @@ -297,10 +313,6 @@ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedE
}
entry.channel.awaitTermination(awaitTimeNanos, TimeUnit.NANOSECONDS);
}
if (executor != null) {
long awaitTimeNanos = endTimeNanos - System.nanoTime();
executor.awaitTermination(awaitTimeNanos, TimeUnit.NANOSECONDS);
}
return isTerminated();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,17 @@ public final class BigtableTransportChannelProvider implements TransportChannelP
private final InstantiatingGrpcChannelProvider delegate;
private final ChannelPrimer channelPrimer;
@Nullable private final ChannelPoolMetricsTracer channelPoolMetricsTracer;
@Nullable private final ScheduledExecutorService backgroundExecutor;

private BigtableTransportChannelProvider(
InstantiatingGrpcChannelProvider instantiatingGrpcChannelProvider,
ChannelPrimer channelPrimer,
ChannelPoolMetricsTracer channelPoolMetricsTracer) {
ChannelPoolMetricsTracer channelPoolMetricsTracer,
ScheduledExecutorService backgroundExecutor) {
delegate = Preconditions.checkNotNull(instantiatingGrpcChannelProvider);
this.channelPrimer = channelPrimer;
this.channelPoolMetricsTracer = channelPoolMetricsTracer;
this.backgroundExecutor = backgroundExecutor;
}

@Override
Expand All @@ -67,12 +70,27 @@ public BigtableTransportChannelProvider withExecutor(ScheduledExecutorService ex
return withExecutor((Executor) executor);
}

// This executor if set is for handling rpc callbacks so we can't use it as the background
// executor
@Override
public BigtableTransportChannelProvider withExecutor(Executor executor) {
InstantiatingGrpcChannelProvider newChannelProvider =
(InstantiatingGrpcChannelProvider) delegate.withExecutor(executor);
return new BigtableTransportChannelProvider(
newChannelProvider, channelPrimer, channelPoolMetricsTracer);
newChannelProvider, channelPrimer, channelPoolMetricsTracer, backgroundExecutor);
}

@Override
public boolean needsBackgroundExecutor() {
return delegate.needsBackgroundExecutor();
}

@Override
public TransportChannelProvider withBackgroundExecutor(ScheduledExecutorService executor) {
InstantiatingGrpcChannelProvider newChannelProvider =
(InstantiatingGrpcChannelProvider) delegate.withBackgroundExecutor(executor);
return new BigtableTransportChannelProvider(
newChannelProvider, channelPrimer, channelPoolMetricsTracer, executor);
}

@Override
Expand All @@ -85,7 +103,7 @@ public BigtableTransportChannelProvider withHeaders(Map<String, String> headers)
InstantiatingGrpcChannelProvider newChannelProvider =
(InstantiatingGrpcChannelProvider) delegate.withHeaders(headers);
return new BigtableTransportChannelProvider(
newChannelProvider, channelPrimer, channelPoolMetricsTracer);
newChannelProvider, channelPrimer, channelPoolMetricsTracer, backgroundExecutor);
}

@Override
Expand All @@ -98,7 +116,7 @@ public TransportChannelProvider withEndpoint(String endpoint) {
InstantiatingGrpcChannelProvider newChannelProvider =
(InstantiatingGrpcChannelProvider) delegate.withEndpoint(endpoint);
return new BigtableTransportChannelProvider(
newChannelProvider, channelPrimer, channelPoolMetricsTracer);
newChannelProvider, channelPrimer, channelPoolMetricsTracer, backgroundExecutor);
}

@Deprecated
Expand All @@ -113,7 +131,7 @@ public TransportChannelProvider withPoolSize(int size) {
InstantiatingGrpcChannelProvider newChannelProvider =
(InstantiatingGrpcChannelProvider) delegate.withPoolSize(size);
return new BigtableTransportChannelProvider(
newChannelProvider, channelPrimer, channelPoolMetricsTracer);
newChannelProvider, channelPrimer, channelPoolMetricsTracer, backgroundExecutor);
}

/** Expected to only be called once when BigtableClientContext is created */
Expand Down Expand Up @@ -143,7 +161,8 @@ public TransportChannel getTransportChannel() throws IOException {
BigtableChannelPoolSettings.copyFrom(delegate.getChannelPoolSettings());

BigtableChannelPool btChannelPool =
BigtableChannelPool.create(btPoolSettings, channelFactory, channelPrimer);
BigtableChannelPool.create(
btPoolSettings, channelFactory, channelPrimer, backgroundExecutor);

if (channelPoolMetricsTracer != null) {
channelPoolMetricsTracer.registerChannelInsightsProvider(btChannelPool::getChannelInfos);
Expand All @@ -169,15 +188,19 @@ public TransportChannelProvider withCredentials(Credentials credentials) {
InstantiatingGrpcChannelProvider newChannelProvider =
(InstantiatingGrpcChannelProvider) delegate.withCredentials(credentials);
return new BigtableTransportChannelProvider(
newChannelProvider, channelPrimer, channelPoolMetricsTracer);
newChannelProvider, channelPrimer, channelPoolMetricsTracer, backgroundExecutor);
}

/** Creates a BigtableTransportChannelProvider. */
public static BigtableTransportChannelProvider create(
InstantiatingGrpcChannelProvider instantiatingGrpcChannelProvider,
ChannelPrimer channelPrimer,
ChannelPoolMetricsTracer outstandingRpcsMetricTracke) {
ChannelPoolMetricsTracer outstandingRpcsMetricTracker,
ScheduledExecutorService backgroundExecutor) {
return new BigtableTransportChannelProvider(
instantiatingGrpcChannelProvider, channelPrimer, outstandingRpcsMetricTracke);
instantiatingGrpcChannelProvider,
channelPrimer,
outstandingRpcsMetricTracker,
backgroundExecutor);
}
}
Loading