Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions api/src/main/java/io/grpc/LoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -661,6 +661,28 @@ public static PickResult withSubchannel(Subchannel subchannel) {
return withSubchannel(subchannel, null);
}

/**
* Creates a new {@code PickResult} with the given {@code subchannel},
* but retains all other properties from this {@code PickResult}.
*
* @since 1.80.0
*/
public PickResult withSubchannelReplacement(Subchannel subchannel) {
return new PickResult(checkNotNull(subchannel, "subchannel"), streamTracerFactory,
status, drop, authorityOverride);
}

/**
* Creates a new {@code PickResult} with the given {@code streamTracerFactory},
* but retains all other properties from this {@code PickResult}.
*
* @since 1.80.0
*/
public PickResult withStreamTracerFactory(
@Nullable ClientStreamTracer.Factory streamTracerFactory) {
return new PickResult(subchannel, streamTracerFactory, status, drop, authorityOverride);
}

/**
* A decision to report a connectivity error to the RPC. If the RPC is {@link
* CallOptions#withWaitForReady wait-for-ready}, it will stay buffered. Otherwise, it will fail
Expand Down
20 changes: 20 additions & 0 deletions api/src/test/java/io/grpc/LoadBalancerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,26 @@ public void pickResult_withSubchannelAndTracer() {
assertThat(result.isDrop()).isFalse();
}

@Test
public void pickResult_withSubchannelReplacement() {
PickResult result = PickResult.withSubchannel(subchannel, tracerFactory)
.withSubchannelReplacement(subchannel2);
assertThat(result.getSubchannel()).isSameInstanceAs(subchannel2);
assertThat(result.getStatus()).isSameInstanceAs(Status.OK);
assertThat(result.getStreamTracerFactory()).isSameInstanceAs(tracerFactory);
assertThat(result.isDrop()).isFalse();
}

@Test
public void pickResult_withStreamTracerFactory() {
PickResult result = PickResult.withSubchannel(subchannel)
.withStreamTracerFactory(tracerFactory);
assertThat(result.getSubchannel()).isSameInstanceAs(subchannel);
assertThat(result.getStatus()).isSameInstanceAs(Status.OK);
assertThat(result.getStreamTracerFactory()).isSameInstanceAs(tracerFactory);
assertThat(result.isDrop()).isFalse();
}

@Test
public void pickResult_withNoResult() {
PickResult result = PickResult.withNoResult();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,30 @@ void setHealthCheckedService(@Nullable String service) {
public String toString() {
return MoreObjects.toStringHelper(this).add("delegate", delegate()).toString();
}

@Override
public void updateBalancingState(
io.grpc.ConnectivityState newState, LoadBalancer.SubchannelPicker newPicker) {
delegate().updateBalancingState(newState, new HealthCheckPicker(newPicker));
}

private final class HealthCheckPicker extends LoadBalancer.SubchannelPicker {
private final LoadBalancer.SubchannelPicker delegate;

HealthCheckPicker(LoadBalancer.SubchannelPicker delegate) {
this.delegate = delegate;
}

@Override
public LoadBalancer.PickResult pickSubchannel(LoadBalancer.PickSubchannelArgs args) {
LoadBalancer.PickResult result = delegate.pickSubchannel(args);
LoadBalancer.Subchannel subchannel = result.getSubchannel();
if (subchannel instanceof SubchannelImpl) {
return result.withSubchannelReplacement(((SubchannelImpl) subchannel).delegate());
}
return result;
}
}
}

@VisibleForTesting
Expand Down
26 changes: 26 additions & 0 deletions util/src/main/java/io/grpc/util/HealthProducerHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import com.google.common.annotations.VisibleForTesting;
import io.grpc.Attributes;
import io.grpc.ConnectivityState;
import io.grpc.ConnectivityStateInfo;
import io.grpc.Internal;
import io.grpc.LoadBalancer;
Expand Down Expand Up @@ -84,6 +85,31 @@ protected LoadBalancer.Helper delegate() {
return delegate;
}

@Override
public void updateBalancingState(
ConnectivityState newState, LoadBalancer.SubchannelPicker newPicker) {
delegate.updateBalancingState(newState, new HealthProducerPicker(newPicker));
}

private static final class HealthProducerPicker extends LoadBalancer.SubchannelPicker {
private final LoadBalancer.SubchannelPicker delegate;

HealthProducerPicker(LoadBalancer.SubchannelPicker delegate) {
this.delegate = delegate;
}

@Override
public LoadBalancer.PickResult pickSubchannel(LoadBalancer.PickSubchannelArgs args) {
LoadBalancer.PickResult result = delegate.pickSubchannel(args);
LoadBalancer.Subchannel subchannel = result.getSubchannel();
if (subchannel instanceof HealthProducerSubchannel) {
return result.withSubchannelReplacement(
((HealthProducerSubchannel) subchannel).delegate());
}
return result;
}
}

// The parent subchannel in the health check producer LB chain. It duplicates subchannel state to
// both the state listener and health listener.
@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -442,9 +442,14 @@ public PickResult pickSubchannel(PickSubchannelArgs args) {

Subchannel subchannel = pickResult.getSubchannel();
if (subchannel != null) {
return PickResult.withSubchannel(subchannel, new ResultCountingClientStreamTracerFactory(
subchannel.getAttributes().get(ENDPOINT_TRACKER_KEY),
pickResult.getStreamTracerFactory()));
EndpointTracker tracker = subchannel.getAttributes().get(ENDPOINT_TRACKER_KEY);
if (subchannel instanceof OutlierDetectionSubchannel) {
subchannel = ((OutlierDetectionSubchannel) subchannel).delegate();
}
return pickResult.withSubchannelReplacement(subchannel)
.withStreamTracerFactory(new ResultCountingClientStreamTracerFactory(
tracker,
pickResult.getStreamTracerFactory()));
}

return pickResult;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ public void delegatePick() throws Exception {
// Make sure that we can pick the single READY subchannel.
SubchannelPicker picker = pickerCaptor.getAllValues().get(2);
PickResult pickResult = picker.pickSubchannel(mock(PickSubchannelArgs.class));
Subchannel s = ((OutlierDetectionSubchannel) pickResult.getSubchannel()).delegate();
Subchannel s = pickResult.getSubchannel();
if (s instanceof HealthProducerHelper.HealthProducerSubchannel) {
s = ((HealthProducerHelper.HealthProducerSubchannel) s).delegate();
}
Expand Down
87 changes: 53 additions & 34 deletions xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -252,42 +252,55 @@ public Subchannel createSubchannel(CreateSubchannelArgs args) {
args = args.toBuilder().setAddresses(addresses).setAttributes(attrsBuilder.build()).build();
final Subchannel subchannel = delegate().createSubchannel(args);

return new ForwardingSubchannel() {
@Override
public void start(SubchannelStateListener listener) {
delegate().start(new SubchannelStateListener() {
@Override
public void onSubchannelState(ConnectivityStateInfo newState) {
// Do nothing if LB has been shutdown
if (xdsClient != null && newState.getState().equals(ConnectivityState.READY)) {
// Get locality based on the connected address attributes
ClusterLocality updatedClusterLocality = createClusterLocalityFromAttributes(
subchannel.getConnectedAddressAttributes());
ClusterLocality oldClusterLocality = localityAtomicReference
.getAndSet(updatedClusterLocality);
oldClusterLocality.release();
return new ClusterImplSubchannel(subchannel, localityAtomicReference);
}

private final class ClusterImplSubchannel extends ForwardingSubchannel {
private final Subchannel delegate;
private final AtomicReference<ClusterLocality> localityAtomicReference;

private ClusterImplSubchannel(
Subchannel delegate, AtomicReference<ClusterLocality> localityAtomicReference) {
this.delegate = delegate;
this.localityAtomicReference = localityAtomicReference;
}

@Override
public void start(SubchannelStateListener listener) {
delegate().start(
new SubchannelStateListener() {
@Override
public void onSubchannelState(ConnectivityStateInfo newState) {
// Do nothing if LB has been shutdown
if (xdsClient != null && newState.getState().equals(ConnectivityState.READY)) {
// Get locality based on the connected address attributes
ClusterLocality updatedClusterLocality =
createClusterLocalityFromAttributes(
delegate.getConnectedAddressAttributes());
ClusterLocality oldClusterLocality =
localityAtomicReference.getAndSet(updatedClusterLocality);
oldClusterLocality.release();
}
listener.onSubchannelState(newState);
}
listener.onSubchannelState(newState);
}
});
}
});
}

@Override
public void shutdown() {
localityAtomicReference.get().release();
delegate().shutdown();
}
@Override
public void shutdown() {
localityAtomicReference.get().release();
delegate().shutdown();
}

@Override
public void updateAddresses(List<EquivalentAddressGroup> addresses) {
delegate().updateAddresses(withAdditionalAttributes(addresses));
}
@Override
public void updateAddresses(List<EquivalentAddressGroup> addresses) {
delegate().updateAddresses(withAdditionalAttributes(addresses));
}

@Override
protected Subchannel delegate() {
return subchannel;
}
};
@Override
protected Subchannel delegate() {
return delegate;
}
}

private List<EquivalentAddressGroup> withAdditionalAttributes(
Expand Down Expand Up @@ -411,6 +424,13 @@ public PickResult pickSubchannel(PickSubchannelArgs args) {
}
}
PickResult result = delegate.pickSubchannel(args);
Subchannel subchannel = result.getSubchannel();
if (subchannel != null) {
if (subchannel instanceof ClusterImplLbHelper.ClusterImplSubchannel) {
subchannel = ((ClusterImplLbHelper.ClusterImplSubchannel) subchannel).delegate();
result = result.withSubchannelReplacement(subchannel);
}
}
if (result.getStatus().isOk() && result.getSubchannel() != null) {
if (enableCircuitBreaking) {
if (inFlights.get() >= maxConcurrentRequests) {
Expand All @@ -437,8 +457,7 @@ public PickResult pickSubchannel(PickSubchannelArgs args) {
stats, inFlights, result.getStreamTracerFactory());
ClientStreamTracer.Factory orcaTracerFactory = OrcaPerRequestUtil.getInstance()
.newOrcaClientStreamTracerFactory(tracerFactory, new OrcaPerRpcListener(stats));
result = PickResult.withSubchannel(result.getSubchannel(),
orcaTracerFactory);
result = result.withStreamTracerFactory(orcaTracerFactory);
}
}
if (args.getCallOptions().getOption(XdsNameResolver.AUTO_HOST_REWRITE_KEY) != null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -508,12 +508,15 @@ public PickResult pickSubchannel(PickSubchannelArgs args) {
if (subchannel == null) {
return pickResult;
}

subchannel = ((WrrSubchannel) subchannel).delegate();
if (!enableOobLoadReport) {
return PickResult.withSubchannel(subchannel,
OrcaPerRequestUtil.getInstance().newOrcaClientStreamTracerFactory(
reportListeners.get(pick)));
return pickResult.withSubchannelReplacement(subchannel)
.withStreamTracerFactory(
OrcaPerRequestUtil.getInstance().newOrcaClientStreamTracerFactory(
reportListeners.get(pick)));
} else {
return PickResult.withSubchannel(subchannel);
return pickResult.withSubchannelReplacement(subchannel);
}
}

Expand Down
27 changes: 27 additions & 0 deletions xds/src/main/java/io/grpc/xds/orca/OrcaOobUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,16 @@
import io.grpc.ChannelLogger;
import io.grpc.ChannelLogger.ChannelLogLevel;
import io.grpc.ClientCall;
import io.grpc.ConnectivityState;
import io.grpc.ConnectivityStateInfo;
import io.grpc.ExperimentalApi;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.CreateSubchannelArgs;
import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancer.PickResult;
import io.grpc.LoadBalancer.PickSubchannelArgs;
import io.grpc.LoadBalancer.Subchannel;
import io.grpc.LoadBalancer.SubchannelPicker;
import io.grpc.LoadBalancer.SubchannelStateListener;
import io.grpc.Metadata;
import io.grpc.Status;
Expand Down Expand Up @@ -236,6 +240,29 @@ protected Helper delegate() {
return delegate;
}

@Override
public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) {
delegate.updateBalancingState(newState, new OrcaOobPicker(newPicker));
}

private static final class OrcaOobPicker extends SubchannelPicker {
private final SubchannelPicker delegate;

OrcaOobPicker(SubchannelPicker delegate) {
this.delegate = delegate;
}

@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
PickResult result = delegate.pickSubchannel(args);
Subchannel subchannel = result.getSubchannel();
if (subchannel instanceof SubchannelImpl) {
return result.withSubchannelReplacement(((SubchannelImpl) subchannel).delegate());
}
return result;
}
}

@Override
public Subchannel createSubchannel(CreateSubchannelArgs args) {
syncContext.throwIfNotInThisSynchronizationContext();
Expand Down
Loading