From 508c141a25e73879b782dc7f2271c2ea5ce17391 Mon Sep 17 00:00:00 2001 From: agravator Date: Wed, 11 Feb 2026 12:55:01 +0530 Subject: [PATCH 1/8] otel: add tcp metrics --- .../grpc/internal/ClientTransportFactory.java | 12 ++ .../java/io/grpc/internal/InternalServer.java | 6 + .../io/grpc/internal/InternalSubchannel.java | 3 + .../java/io/grpc/internal/ServerImpl.java | 7 + .../io/grpc/internal/ServerImplBuilder.java | 10 ++ .../io/grpc/internal/ServerTransport.java | 1 + .../io/grpc/netty/NettyChannelBuilder.java | 1 + .../io/grpc/netty/NettyClientHandler.java | 24 ++- .../io/grpc/netty/NettyClientTransport.java | 7 +- .../main/java/io/grpc/netty/NettyServer.java | 5 +- .../io/grpc/netty/NettyServerHandler.java | 21 ++- .../io/grpc/netty/NettyServerTransport.java | 10 +- .../main/java/io/grpc/netty/TcpMetrics.java | 147 ++++++++++++++++++ .../io/grpc/netty/NettyClientHandlerTest.java | 3 +- .../grpc/netty/NettyClientTransportTest.java | 3 + .../io/grpc/netty/NettyServerHandlerTest.java | 3 +- .../java/io/grpc/netty/TcpMetricsTest.java | 71 +++++++++ .../okhttp/OkHttpClientTransportTest.java | 1 + 18 files changed, 319 insertions(+), 16 deletions(-) create mode 100644 netty/src/main/java/io/grpc/netty/TcpMetrics.java create mode 100644 netty/src/test/java/io/grpc/netty/TcpMetricsTest.java diff --git a/core/src/main/java/io/grpc/internal/ClientTransportFactory.java b/core/src/main/java/io/grpc/internal/ClientTransportFactory.java index 6c10ced4652..dffd18ae5c7 100644 --- a/core/src/main/java/io/grpc/internal/ClientTransportFactory.java +++ b/core/src/main/java/io/grpc/internal/ClientTransportFactory.java @@ -24,6 +24,7 @@ import io.grpc.ChannelCredentials; import io.grpc.ChannelLogger; import io.grpc.HttpConnectProxiedSocketAddress; +import io.grpc.MetricRecorder; import java.io.Closeable; import java.net.SocketAddress; import java.util.Collection; @@ -91,6 +92,7 @@ final class ClientTransportOptions { private Attributes eagAttributes = Attributes.EMPTY; @Nullable private String userAgent; @Nullable private HttpConnectProxiedSocketAddress connectProxiedSocketAddr; + @Nullable private MetricRecorder metricRecorder; public ChannelLogger getChannelLogger() { return channelLogger; @@ -101,6 +103,16 @@ public ClientTransportOptions setChannelLogger(ChannelLogger channelLogger) { return this; } + @Nullable + public MetricRecorder getMetricRecorder() { + return metricRecorder; + } + + public ClientTransportOptions setMetricRecorder(@Nullable MetricRecorder metricRecorder) { + this.metricRecorder = metricRecorder; + return this; + } + public String getAuthority() { return authority; } diff --git a/core/src/main/java/io/grpc/internal/InternalServer.java b/core/src/main/java/io/grpc/internal/InternalServer.java index a6079081233..d8cf764be15 100644 --- a/core/src/main/java/io/grpc/internal/InternalServer.java +++ b/core/src/main/java/io/grpc/internal/InternalServer.java @@ -18,6 +18,7 @@ import io.grpc.InternalChannelz.SocketStats; import io.grpc.InternalInstrumented; +import io.grpc.MetricRecorder; import java.io.IOException; import java.net.SocketAddress; import java.util.List; @@ -71,4 +72,9 @@ public interface InternalServer { */ @Nullable List> getListenSocketStatsList(); + /** + * Sets the MetricRecorder for the server. This optional method allows setting + * the MetricRecorder after construction but before start(). + */ + default void setMetricRecorder(MetricRecorder metricRecorder) {} } diff --git a/core/src/main/java/io/grpc/internal/InternalSubchannel.java b/core/src/main/java/io/grpc/internal/InternalSubchannel.java index 7a48bf642fe..ce31921e316 100644 --- a/core/src/main/java/io/grpc/internal/InternalSubchannel.java +++ b/core/src/main/java/io/grpc/internal/InternalSubchannel.java @@ -80,6 +80,7 @@ final class InternalSubchannel implements InternalInstrumented, Tr private final InternalChannelz channelz; private final CallTracer callsTracer; private final ChannelTracer channelTracer; + private final MetricRecorder metricRecorder; private final ChannelLogger channelLogger; private final boolean reconnectDisabled; @@ -191,6 +192,7 @@ protected void handleNotInUse() { this.scheduledExecutor = scheduledExecutor; this.connectingTimer = stopwatchSupplier.get(); this.syncContext = syncContext; + this.metricRecorder = metricRecorder; this.callback = callback; this.channelz = channelz; this.callsTracer = callsTracer; @@ -265,6 +267,7 @@ private void startNewTransport() { .setAuthority(eagChannelAuthority != null ? eagChannelAuthority : authority) .setEagAttributes(currentEagAttributes) .setUserAgent(userAgent) + .setMetricRecorder(metricRecorder) .setHttpConnectProxiedSocketAddress(proxiedAddr); TransportLogger transportLogger = new TransportLogger(); // In case the transport logs in the constructor, use the subchannel logId diff --git a/core/src/main/java/io/grpc/internal/ServerImpl.java b/core/src/main/java/io/grpc/internal/ServerImpl.java index dc0709e1fb8..0c422d3408e 100644 --- a/core/src/main/java/io/grpc/internal/ServerImpl.java +++ b/core/src/main/java/io/grpc/internal/ServerImpl.java @@ -48,6 +48,8 @@ import io.grpc.InternalServerInterceptors; import io.grpc.InternalStatus; import io.grpc.Metadata; +import io.grpc.MetricInstrumentRegistry; +import io.grpc.MetricRecorder; import io.grpc.ServerCall; import io.grpc.ServerCallExecutorSupplier; import io.grpc.ServerCallHandler; @@ -97,6 +99,7 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume private final InternalLogId logId; private final ObjectPool executorPool; + private final MetricRecorder metricRecorder; /** Executor for application processing. Safe to read after {@link #start()}. */ private Executor executor; private final HandlerRegistry registry; @@ -143,6 +146,9 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume InternalServer transportServer, Context rootContext) { this.executorPool = Preconditions.checkNotNull(builder.executorPool, "executorPool"); + this.metricRecorder = + new MetricRecorderImpl(builder.metricSinks, MetricInstrumentRegistry.getDefaultRegistry()); + this.registry = Preconditions.checkNotNull(builder.registryBuilder.build(), "registryBuilder"); this.fallbackRegistry = Preconditions.checkNotNull(builder.fallbackRegistry, "fallbackRegistry"); @@ -182,6 +188,7 @@ public ServerImpl start() throws IOException { // Start and wait for any ports to actually be bound. ServerListenerImpl listener = new ServerListenerImpl(); + transportServer.setMetricRecorder(metricRecorder); transportServer.start(listener); executor = Preconditions.checkNotNull(executorPool.getObject(), "executor"); started = true; diff --git a/core/src/main/java/io/grpc/internal/ServerImplBuilder.java b/core/src/main/java/io/grpc/internal/ServerImplBuilder.java index f6566e067db..64cb238fbf4 100644 --- a/core/src/main/java/io/grpc/internal/ServerImplBuilder.java +++ b/core/src/main/java/io/grpc/internal/ServerImplBuilder.java @@ -31,6 +31,7 @@ import io.grpc.HandlerRegistry; import io.grpc.InternalChannelz; import io.grpc.InternalConfiguratorRegistry; +import io.grpc.MetricSink; import io.grpc.Server; import io.grpc.ServerBuilder; import io.grpc.ServerCallExecutorSupplier; @@ -80,6 +81,7 @@ public static ServerBuilder forPort(int port) { final List transportFilters = new ArrayList<>(); final List interceptors = new ArrayList<>(); private final List streamTracerFactories = new ArrayList<>(); + final List metricSinks = new ArrayList<>(); private final ClientTransportServersBuilder clientTransportServersBuilder; HandlerRegistry fallbackRegistry = DEFAULT_FALLBACK_REGISTRY; ObjectPool executorPool = DEFAULT_EXECUTOR_POOL; @@ -157,6 +159,14 @@ public ServerImplBuilder intercept(ServerInterceptor interceptor) { return this; } + /** + * Adds a MetricSink to the server. + */ + public ServerImplBuilder addMetricSink(MetricSink metricSink) { + metricSinks.add(checkNotNull(metricSink, "metricSink")); + return this; + } + @Override public ServerImplBuilder addStreamTracerFactory(ServerStreamTracer.Factory factory) { streamTracerFactories.add(checkNotNull(factory, "factory")); diff --git a/core/src/main/java/io/grpc/internal/ServerTransport.java b/core/src/main/java/io/grpc/internal/ServerTransport.java index 1bda2f2b41e..fb26ddfb3f7 100644 --- a/core/src/main/java/io/grpc/internal/ServerTransport.java +++ b/core/src/main/java/io/grpc/internal/ServerTransport.java @@ -44,4 +44,5 @@ public interface ServerTransport extends InternalInstrumented { * outstanding tasks are cancelled when the transport terminates. */ ScheduledExecutorService getScheduledExecutorService(); + } diff --git a/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java b/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java index 258aa15b005..e64f1065681 100644 --- a/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java +++ b/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java @@ -856,6 +856,7 @@ public void run() { localSocketPicker, channelLogger, useGetForSafeMethods, + options.getMetricRecorder(), Ticker.systemTicker()); return transport; } diff --git a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java index 8ebf89842ad..5c1bb509ce6 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java @@ -30,6 +30,7 @@ import io.grpc.InternalChannelz; import io.grpc.InternalStatus; import io.grpc.Metadata; +import io.grpc.MetricRecorder; import io.grpc.Status; import io.grpc.StatusException; import io.grpc.internal.ClientStreamListener.RpcProgress; @@ -123,6 +124,7 @@ class NettyClientHandler extends AbstractNettyHandler { private final Supplier stopwatchFactory; private final TransportTracer transportTracer; private final Attributes eagAttributes; + private final TcpMetrics.Tracker tcpMetrics; private final String authority; private final InUseStateAggregator inUseState = new InUseStateAggregator() { @@ -164,7 +166,8 @@ static NettyClientHandler newHandler( Attributes eagAttributes, String authority, ChannelLogger negotiationLogger, - Ticker ticker) { + Ticker ticker, + MetricRecorder metricRecorder) { Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive"); Http2HeadersDecoder headersDecoder = new GrpcHttp2ClientHeadersDecoder(maxHeaderListSize); Http2FrameReader frameReader = new DefaultHttp2FrameReader(headersDecoder); @@ -194,7 +197,8 @@ static NettyClientHandler newHandler( eagAttributes, authority, negotiationLogger, - ticker); + ticker, + metricRecorder); } @VisibleForTesting @@ -214,7 +218,8 @@ static NettyClientHandler newHandler( Attributes eagAttributes, String authority, ChannelLogger negotiationLogger, - Ticker ticker) { + Ticker ticker, + MetricRecorder metricRecorder) { Preconditions.checkNotNull(connection, "connection"); Preconditions.checkNotNull(frameReader, "frameReader"); Preconditions.checkNotNull(lifecycleManager, "lifecycleManager"); @@ -269,7 +274,8 @@ static NettyClientHandler newHandler( pingCounter, ticker, maxHeaderListSize, - softLimitHeaderListSize); + softLimitHeaderListSize, + metricRecorder); } private NettyClientHandler( @@ -288,7 +294,8 @@ private NettyClientHandler( PingLimiter pingLimiter, Ticker ticker, int maxHeaderListSize, - int softLimitHeaderListSize) { + int softLimitHeaderListSize, + MetricRecorder metricRecorder) { super( /* channelUnused= */ null, decoder, @@ -350,6 +357,7 @@ public void onStreamClosed(Http2Stream stream) { } } }); + this.tcpMetrics = new TcpMetrics.Tracker(metricRecorder, "client"); } /** @@ -490,6 +498,12 @@ public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exce /** * Handler for the Channel shutting down. */ + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + tcpMetrics.channelActive(); + super.channelActive(ctx); + } + @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { try { diff --git a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java index 53914b3c877..6585df42df3 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java @@ -34,6 +34,7 @@ import io.grpc.InternalLogId; import io.grpc.Metadata; import io.grpc.MethodDescriptor; +import io.grpc.MetricRecorder; import io.grpc.Status; import io.grpc.internal.ClientStream; import io.grpc.internal.ConnectionClientTransport; @@ -108,6 +109,7 @@ class NettyClientTransport implements ConnectionClientTransport, private final ChannelLogger channelLogger; private final boolean useGetForSafeMethods; private final Ticker ticker; + private final MetricRecorder metricRecorder; NettyClientTransport( @@ -132,6 +134,7 @@ class NettyClientTransport implements ConnectionClientTransport, LocalSocketPicker localSocketPicker, ChannelLogger channelLogger, boolean useGetForSafeMethods, + MetricRecorder metricRecorder, Ticker ticker) { this.negotiator = Preconditions.checkNotNull(negotiator, "negotiator"); @@ -159,6 +162,7 @@ class NettyClientTransport implements ConnectionClientTransport, this.logId = InternalLogId.allocate(getClass(), remoteAddress.toString()); this.channelLogger = Preconditions.checkNotNull(channelLogger, "channelLogger"); this.useGetForSafeMethods = useGetForSafeMethods; + this.metricRecorder = metricRecorder; this.ticker = Preconditions.checkNotNull(ticker, "ticker"); } @@ -251,7 +255,8 @@ public Runnable start(Listener transportListener) { eagAttributes, authorityString, channelLogger, - ticker); + ticker, + metricRecorder); ChannelHandler negotiationHandler = negotiator.newHandler(handler); diff --git a/netty/src/main/java/io/grpc/netty/NettyServer.java b/netty/src/main/java/io/grpc/netty/NettyServer.java index 1cf67ea25ca..9314f235d87 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServer.java +++ b/netty/src/main/java/io/grpc/netty/NettyServer.java @@ -31,6 +31,7 @@ import io.grpc.InternalInstrumented; import io.grpc.InternalLogId; import io.grpc.InternalWithLogId; +import io.grpc.MetricRecorder; import io.grpc.ServerStreamTracer; import io.grpc.internal.InternalServer; import io.grpc.internal.ObjectPool; @@ -93,6 +94,7 @@ class NettyServer implements InternalServer, InternalWithLogId { private final int maxMessageSize; private final int maxHeaderListSize; private final int softLimitHeaderListSize; + private MetricRecorder metricRecorder; private final long keepAliveTimeInNanos; private final long keepAliveTimeoutInNanos; private final long maxConnectionIdleInNanos; @@ -272,7 +274,8 @@ public void initChannel(Channel ch) { permitKeepAliveTimeInNanos, maxRstCount, maxRstPeriodNanos, - eagAttributes); + eagAttributes, + metricRecorder); ServerTransportListener transportListener; // This is to order callbacks on the listener, not to guard access to channel. synchronized (NettyServer.this) { diff --git a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java index 036fde55e2c..4bbc6f42377 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java @@ -42,6 +42,7 @@ import io.grpc.InternalMetadata; import io.grpc.InternalStatus; import io.grpc.Metadata; +import io.grpc.MetricRecorder; import io.grpc.ServerStreamTracer; import io.grpc.Status; import io.grpc.internal.GrpcUtil; @@ -127,6 +128,7 @@ class NettyServerHandler extends AbstractNettyHandler { private final Http2Connection.PropertyKey streamKey; private final ServerTransportListener transportListener; private final int maxMessageSize; + private final TcpMetrics.Tracker tcpMetrics; private final long keepAliveTimeInNanos; private final long keepAliveTimeoutInNanos; private final long maxConnectionAgeInNanos; @@ -174,7 +176,8 @@ static NettyServerHandler newHandler( long permitKeepAliveTimeInNanos, int maxRstCount, long maxRstPeriodNanos, - Attributes eagAttributes) { + Attributes eagAttributes, + MetricRecorder metricRecorder) { Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive: %s", maxHeaderListSize); Http2FrameLogger frameLogger = new Http2FrameLogger(LogLevel.DEBUG, NettyServerHandler.class); @@ -208,7 +211,8 @@ static NettyServerHandler newHandler( maxRstCount, maxRstPeriodNanos, eagAttributes, - Ticker.systemTicker()); + Ticker.systemTicker(), + metricRecorder); } static NettyServerHandler newHandler( @@ -234,7 +238,8 @@ static NettyServerHandler newHandler( int maxRstCount, long maxRstPeriodNanos, Attributes eagAttributes, - Ticker ticker) { + Ticker ticker, + MetricRecorder metricRecorder) { Preconditions.checkArgument(maxStreams > 0, "maxStreams must be positive: %s", maxStreams); Preconditions.checkArgument(flowControlWindow > 0, "flowControlWindow must be positive: %s", flowControlWindow); @@ -294,7 +299,8 @@ static NettyServerHandler newHandler( keepAliveEnforcer, autoFlowControl, rstStreamCounter, - eagAttributes, ticker); + eagAttributes, ticker, + metricRecorder); } private NettyServerHandler( @@ -318,7 +324,8 @@ private NettyServerHandler( boolean autoFlowControl, RstStreamCounter rstStreamCounter, Attributes eagAttributes, - Ticker ticker) { + Ticker ticker, + MetricRecorder metricRecorder) { super( channelUnused, decoder, @@ -362,6 +369,8 @@ public void onStreamClosed(Http2Stream stream) { checkArgument(maxMessageSize >= 0, "maxMessageSize must be non-negative: %s", maxMessageSize); this.maxMessageSize = maxMessageSize; + this.tcpMetrics = new TcpMetrics.Tracker(metricRecorder, "server"); + this.keepAliveTimeInNanos = keepAliveTimeInNanos; this.keepAliveTimeoutInNanos = keepAliveTimeoutInNanos; this.maxConnectionIdleManager = maxConnectionIdleManager; @@ -663,6 +672,8 @@ void setKeepAliveManagerForTest(KeepAliveManager keepAliveManager) { */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { + tcpMetrics.channelInactive(ctx.channel()); + try { if (keepAliveManager != null) { keepAliveManager.onTransportTermination(); diff --git a/netty/src/main/java/io/grpc/netty/NettyServerTransport.java b/netty/src/main/java/io/grpc/netty/NettyServerTransport.java index 758ffeee5b1..8d78b8387ee 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerTransport.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerTransport.java @@ -25,6 +25,7 @@ import io.grpc.Attributes; import io.grpc.InternalChannelz.SocketStats; import io.grpc.InternalLogId; +import io.grpc.MetricRecorder; import io.grpc.ServerStreamTracer; import io.grpc.Status; import io.grpc.internal.ServerTransport; @@ -45,6 +46,7 @@ import java.util.logging.Level; import java.util.logging.Logger; + /** * The Netty-based server transport. */ @@ -81,6 +83,7 @@ class NettyServerTransport implements ServerTransport { private final int maxRstCount; private final long maxRstPeriodNanos; private final Attributes eagAttributes; + private final MetricRecorder metricRecorder; private final List streamTracerFactories; private final TransportTracer transportTracer; @@ -105,7 +108,8 @@ class NettyServerTransport implements ServerTransport { long permitKeepAliveTimeInNanos, int maxRstCount, long maxRstPeriodNanos, - Attributes eagAttributes) { + Attributes eagAttributes, + MetricRecorder metricRecorder) { this.channel = Preconditions.checkNotNull(channel, "channel"); this.channelUnused = channelUnused; this.protocolNegotiator = Preconditions.checkNotNull(protocolNegotiator, "protocolNegotiator"); @@ -128,6 +132,7 @@ class NettyServerTransport implements ServerTransport { this.maxRstCount = maxRstCount; this.maxRstPeriodNanos = maxRstPeriodNanos; this.eagAttributes = Preconditions.checkNotNull(eagAttributes, "eagAttributes"); + this.metricRecorder = metricRecorder; SocketAddress remote = channel.remoteAddress(); this.logId = InternalLogId.allocate(getClass(), remote != null ? remote.toString() : null); } @@ -289,6 +294,7 @@ private NettyServerHandler createHandler( permitKeepAliveTimeInNanos, maxRstCount, maxRstPeriodNanos, - eagAttributes); + eagAttributes, + metricRecorder); } } diff --git a/netty/src/main/java/io/grpc/netty/TcpMetrics.java b/netty/src/main/java/io/grpc/netty/TcpMetrics.java new file mode 100644 index 00000000000..97b86d89799 --- /dev/null +++ b/netty/src/main/java/io/grpc/netty/TcpMetrics.java @@ -0,0 +1,147 @@ +/* + * Copyright 2026 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.netty; + +import com.google.common.collect.ImmutableList; +import io.grpc.DoubleHistogramMetricInstrument; +import io.grpc.LongCounterMetricInstrument; +import io.grpc.LongUpDownCounterMetricInstrument; +import io.grpc.MetricInstrumentRegistry; +import io.grpc.MetricRecorder; +import io.netty.channel.Channel; +import java.lang.reflect.Method; +import java.util.Collections; + +final class TcpMetrics { + + static final LongCounterMetricInstrument connectionsCreated; + static final LongUpDownCounterMetricInstrument connectionCount; + static final LongCounterMetricInstrument packetsRetransmitted; + static final LongCounterMetricInstrument recurringRetransmits; + static final DoubleHistogramMetricInstrument minRtt; + + static { + MetricInstrumentRegistry registry = MetricInstrumentRegistry.getDefaultRegistry(); + ImmutableList requiredLabels = ImmutableList.of("grpc.target"); + ImmutableList optionalLabels = ImmutableList.of(); + + connectionsCreated = registry.registerLongCounter( + "grpc.tcp.connections_created", + "Number of TCP connections created.", + "{connection}", + requiredLabels, + optionalLabels, + false + ); + + connectionCount = registry.registerLongUpDownCounter( + "grpc.tcp.connection_count", + "Number of active TCP connections.", + "{connection}", + requiredLabels, + optionalLabels, + false + ); + + packetsRetransmitted = registry.registerLongCounter( + "grpc.tcp.packets_retransmitted", + "Total packets sent by TCP except those sent for the first time.", + "{packet}", + requiredLabels, + optionalLabels, + false + ); + + recurringRetransmits = registry.registerLongCounter( + "grpc.tcp.recurring_retransmits", + "The number of times the latest TCP packet was retransmitted.", + "{packet}", + requiredLabels, + optionalLabels, + false + ); + + minRtt = registry.registerDoubleHistogram( + "grpc.tcp.min_rtt", + "TCP's current estimate of minimum round trip time (RTT).", + "s", + ImmutableList.of( + 0.0001, 0.0005, 0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 50.0, 100.0, 500.0, + 1000.0 + ), + requiredLabels, + optionalLabels, + false + ); + } + + + static final class Tracker { + private final MetricRecorder metricRecorder; + private final String target; + + Tracker(MetricRecorder metricRecorder, String target) { + this.metricRecorder = metricRecorder; + this.target = target; + } + + void channelActive() { + if (metricRecorder != null && target != null) { + metricRecorder.addLongCounter(TcpMetrics.connectionsCreated, 1, + Collections.singletonList(target), Collections.emptyList()); + metricRecorder.addLongUpDownCounter(TcpMetrics.connectionCount, 1, + Collections.singletonList(target), Collections.emptyList()); + } + } + + void channelInactive(Channel channel) { + if (metricRecorder != null && target != null) { + metricRecorder.addLongUpDownCounter(TcpMetrics.connectionCount, -1, + Collections.singletonList(target), Collections.emptyList()); + + try { + if (channel.getClass().getName().equals("io.netty.channel.epoll.EpollSocketChannel")) { + Method tcpInfoMethod = channel.getClass().getMethod("tcpInfo", + Class.forName("io.netty.channel.epoll.EpollTcpInfo")); + Object info = Class.forName("io.netty.channel.epoll.EpollTcpInfo") + .getDeclaredConstructor().newInstance(); + tcpInfoMethod.invoke(channel, info); + + Method totalRetransMethod = info.getClass().getMethod("totalRetrans"); + Method retransmitsMethod = info.getClass().getMethod("retransmits"); + Method rttMethod = info.getClass().getMethod("rtt"); + + long totalRetrans = (Long) totalRetransMethod.invoke(info); + long retransmits = (Long) retransmitsMethod.invoke(info); + long rtt = ((Number) rttMethod.invoke(info)).longValue(); + + metricRecorder.addLongCounter(TcpMetrics.packetsRetransmitted, totalRetrans, + Collections.singletonList(target), Collections.emptyList()); + metricRecorder.addLongCounter(TcpMetrics.recurringRetransmits, retransmits, + Collections.singletonList(target), Collections.emptyList()); + metricRecorder.recordDoubleHistogram(TcpMetrics.minRtt, rtt / 1000000.0, + Collections.singletonList(target), Collections.emptyList()); + } + } catch (Throwable t) { + // Epoll not available or error getting tcp_info, just ignore. + } + } + } + } + + private TcpMetrics() {} +} diff --git a/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java b/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java index 53598727efd..599aecf80b2 100644 --- a/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java @@ -1165,7 +1165,8 @@ public Stopwatch get() { Attributes.EMPTY, "someauthority", null, - fakeClock().getTicker()); + fakeClock().getTicker(), + new io.grpc.MetricRecorder() {}); } @Override diff --git a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java index db44c8f50fd..b0f9cb26594 100644 --- a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java @@ -251,6 +251,7 @@ public void setSoLingerChannelOption() throws IOException, GeneralSecurityExcept new SocketPicker(), new FakeChannelLogger(), false, + new io.grpc.MetricRecorder() {} , Ticker.systemTicker()); transports.add(transport); callMeMaybe(transport.start(clientTransportListener)); @@ -526,6 +527,7 @@ public void failingToConstructChannelShouldFailGracefully() throws Exception { new SocketPicker(), new FakeChannelLogger(), false, + new io.grpc.MetricRecorder() {} , Ticker.systemTicker()); transports.add(transport); @@ -1148,6 +1150,7 @@ private NettyClientTransport newTransport(ProtocolNegotiator negotiator, int max new SocketPicker(), new FakeChannelLogger(), false, + new io.grpc.MetricRecorder() {} , Ticker.systemTicker()); transports.add(transport); return transport; diff --git a/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java b/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java index 0d5a9bab176..a5ab42d15a6 100644 --- a/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java @@ -1416,7 +1416,8 @@ protected NettyServerHandler newHandler() { maxRstCount, maxRstPeriodNanos, Attributes.EMPTY, - fakeClock().getTicker()); + fakeClock().getTicker(), + new io.grpc.MetricRecorder() {}); } @Override diff --git a/netty/src/test/java/io/grpc/netty/TcpMetricsTest.java b/netty/src/test/java/io/grpc/netty/TcpMetricsTest.java new file mode 100644 index 00000000000..8b17fd8c736 --- /dev/null +++ b/netty/src/test/java/io/grpc/netty/TcpMetricsTest.java @@ -0,0 +1,71 @@ +/* + * Copyright 2026 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.netty; + +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; + +import io.grpc.MetricRecorder; +import io.netty.channel.Channel; +import java.util.Collections; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; + +@RunWith(JUnit4.class) +public class TcpMetricsTest { + + @Rule public final MockitoRule mocks = MockitoJUnit.rule(); + + @Mock private MetricRecorder metricRecorder; + @Mock private Channel channel; + + private TcpMetrics.Tracker metrics; + + @Before + public void setUp() { + metrics = new TcpMetrics.Tracker(metricRecorder, "target1"); + } + + @Test + public void channelActive_incrementsCounts() { + metrics.channelActive(); + verify(metricRecorder).addLongCounter( + eq(TcpMetrics.connectionsCreated), eq(1L), eq(Collections.singletonList("target1")), + eq(Collections.emptyList())); + verify(metricRecorder).addLongUpDownCounter( + eq(TcpMetrics.connectionCount), eq(1L), eq(Collections.singletonList("target1")), + eq(Collections.emptyList())); + verifyNoMoreInteractions(metricRecorder); + } + + @Test + public void channelInactive_decrementsCount_noEpoll_noError() { + metrics.channelInactive(channel); + // It should decrement connectionCount + verify(metricRecorder).addLongUpDownCounter( + eq(TcpMetrics.connectionCount), eq(-1L), eq(Collections.singletonList("target1")), + eq(Collections.emptyList())); + verifyNoMoreInteractions(metricRecorder); + } +} diff --git a/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java b/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java index f87912c44ea..987ce2e5130 100644 --- a/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java +++ b/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java @@ -146,6 +146,7 @@ */ @RunWith(JUnit4.class) public class OkHttpClientTransportTest { + private static final int TIME_OUT_MS = 2000; private static final int INITIAL_WINDOW_SIZE = 65535; private static final String NETWORK_ISSUE_MESSAGE = "network issue"; From 5ef3b1cc72a2832ee576040320cbb2e3c4b5534b Mon Sep 17 00:00:00 2001 From: agravator Date: Wed, 11 Feb 2026 17:40:15 +0530 Subject: [PATCH 2/8] add optional labels --- .../io/grpc/netty/NettyClientHandler.java | 2 +- .../main/java/io/grpc/netty/TcpMetrics.java | 47 ++++++++-- .../java/io/grpc/netty/TcpMetricsTest.java | 93 ++++++++++++++++++- 3 files changed, 128 insertions(+), 14 deletions(-) diff --git a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java index 5c1bb509ce6..ec64809469c 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java @@ -500,7 +500,7 @@ public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exce */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { - tcpMetrics.channelActive(); + tcpMetrics.channelActive(ctx.channel()); super.channelActive(ctx); } diff --git a/netty/src/main/java/io/grpc/netty/TcpMetrics.java b/netty/src/main/java/io/grpc/netty/TcpMetrics.java index 97b86d89799..f353f7b8ea3 100644 --- a/netty/src/main/java/io/grpc/netty/TcpMetrics.java +++ b/netty/src/main/java/io/grpc/netty/TcpMetrics.java @@ -37,7 +37,12 @@ final class TcpMetrics { static { MetricInstrumentRegistry registry = MetricInstrumentRegistry.getDefaultRegistry(); ImmutableList requiredLabels = ImmutableList.of("grpc.target"); - ImmutableList optionalLabels = ImmutableList.of(); + ImmutableList optionalLabels = ImmutableList.of( + "network.local.address", + "network.local.port", + "network.peer.address", + "network.peer.port" + ); connectionsCreated = registry.registerLongCounter( "grpc.tcp.connections_created", @@ -99,19 +104,21 @@ static final class Tracker { this.target = target; } - void channelActive() { + void channelActive(Channel channel) { if (metricRecorder != null && target != null) { + java.util.List labelValues = getLabelValues(channel); metricRecorder.addLongCounter(TcpMetrics.connectionsCreated, 1, - Collections.singletonList(target), Collections.emptyList()); + Collections.singletonList(target), labelValues); metricRecorder.addLongUpDownCounter(TcpMetrics.connectionCount, 1, - Collections.singletonList(target), Collections.emptyList()); + Collections.singletonList(target), labelValues); } } void channelInactive(Channel channel) { if (metricRecorder != null && target != null) { + java.util.List labelValues = getLabelValues(channel); metricRecorder.addLongUpDownCounter(TcpMetrics.connectionCount, -1, - Collections.singletonList(target), Collections.emptyList()); + Collections.singletonList(target), labelValues); try { if (channel.getClass().getName().equals("io.netty.channel.epoll.EpollSocketChannel")) { @@ -130,11 +137,11 @@ void channelInactive(Channel channel) { long rtt = ((Number) rttMethod.invoke(info)).longValue(); metricRecorder.addLongCounter(TcpMetrics.packetsRetransmitted, totalRetrans, - Collections.singletonList(target), Collections.emptyList()); + Collections.singletonList(target), labelValues); metricRecorder.addLongCounter(TcpMetrics.recurringRetransmits, retransmits, - Collections.singletonList(target), Collections.emptyList()); + Collections.singletonList(target), labelValues); metricRecorder.recordDoubleHistogram(TcpMetrics.minRtt, rtt / 1000000.0, - Collections.singletonList(target), Collections.emptyList()); + Collections.singletonList(target), labelValues); } } catch (Throwable t) { // Epoll not available or error getting tcp_info, just ignore. @@ -143,5 +150,29 @@ void channelInactive(Channel channel) { } } + private static java.util.List getLabelValues(Channel channel) { + String localAddress = ""; + String localPort = ""; + String peerAddress = ""; + String peerPort = ""; + + java.net.SocketAddress local = channel.localAddress(); + if (local instanceof java.net.InetSocketAddress) { + java.net.InetSocketAddress inetLocal = (java.net.InetSocketAddress) local; + localAddress = inetLocal.getAddress().getHostAddress(); + localPort = String.valueOf(inetLocal.getPort()); + } + + java.net.SocketAddress remote = channel.remoteAddress(); + if (remote instanceof java.net.InetSocketAddress) { + java.net.InetSocketAddress inetRemote = (java.net.InetSocketAddress) remote; + peerAddress = inetRemote.getAddress().getHostAddress(); + peerPort = String.valueOf(inetRemote.getPort()); + } + + return java.util.Arrays.asList(localAddress, localPort, peerAddress, peerPort); + } + + private TcpMetrics() {} } diff --git a/netty/src/test/java/io/grpc/netty/TcpMetricsTest.java b/netty/src/test/java/io/grpc/netty/TcpMetricsTest.java index 8b17fd8c736..1eef3050e93 100644 --- a/netty/src/test/java/io/grpc/netty/TcpMetricsTest.java +++ b/netty/src/test/java/io/grpc/netty/TcpMetricsTest.java @@ -19,9 +19,14 @@ import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; import io.grpc.MetricRecorder; import io.netty.channel.Channel; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.Arrays; import java.util.Collections; import org.junit.Before; import org.junit.Rule; @@ -47,25 +52,103 @@ public void setUp() { metrics = new TcpMetrics.Tracker(metricRecorder, "target1"); } + + @Test + public void registeredMetrics_haveCorrectOptionalLabels() { + java.util.List expectedOptionalLabels = Arrays.asList( + "network.local.address", + "network.local.port", + "network.peer.address", + "network.peer.port" + ); + + org.junit.Assert.assertEquals( + expectedOptionalLabels, TcpMetrics.connectionsCreated.getOptionalLabelKeys()); + org.junit.Assert.assertEquals( + expectedOptionalLabels, TcpMetrics.connectionCount.getOptionalLabelKeys()); + org.junit.Assert.assertEquals( + expectedOptionalLabels, TcpMetrics.packetsRetransmitted.getOptionalLabelKeys()); + org.junit.Assert.assertEquals( + expectedOptionalLabels, TcpMetrics.recurringRetransmits.getOptionalLabelKeys()); + org.junit.Assert.assertEquals( + expectedOptionalLabels, TcpMetrics.minRtt.getOptionalLabelKeys()); + } + + @Test + public void channelActive_extractsLabels_ipv4() throws Exception { + + InetAddress localInet = InetAddress.getByAddress(new byte[] { 127, 0, 0, 1 }); + InetAddress remoteInet = InetAddress.getByAddress(new byte[] { 10, 0, 0, 1 }); + when(channel.localAddress()).thenReturn(new InetSocketAddress(localInet, 8080)); + when(channel.remoteAddress()).thenReturn(new InetSocketAddress(remoteInet, 443)); + + metrics.channelActive(channel); + + verify(metricRecorder).addLongCounter( + eq(TcpMetrics.connectionsCreated), eq(1L), eq(Collections.singletonList("target1")), + eq(Arrays.asList( + localInet.getHostAddress(), "8080", remoteInet.getHostAddress(), "443"))); + verify(metricRecorder).addLongUpDownCounter( + eq(TcpMetrics.connectionCount), eq(1L), eq(Collections.singletonList("target1")), + eq(Arrays.asList( + localInet.getHostAddress(), "8080", remoteInet.getHostAddress(), "443"))); + verifyNoMoreInteractions(metricRecorder); + } + + @Test + public void channelInactive_extractsLabels_ipv6() throws Exception { + + InetAddress localInet = InetAddress.getByAddress( + new byte[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1 }); + InetAddress remoteInet = InetAddress.getByAddress( + new byte[] { 32, 1, 13, -72, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1 }); + when(channel.localAddress()).thenReturn(new InetSocketAddress(localInet, 8080)); + when(channel.remoteAddress()).thenReturn(new InetSocketAddress(remoteInet, 443)); + + metrics.channelInactive(channel); + + verify(metricRecorder).addLongUpDownCounter( + eq(TcpMetrics.connectionCount), eq(-1L), eq(Collections.singletonList("target1")), + eq(Arrays.asList( + localInet.getHostAddress(), "8080", remoteInet.getHostAddress(), "443"))); + verifyNoMoreInteractions(metricRecorder); + } + + @Test + public void channelActive_extractsLabels_nonInetAddress() throws Exception { + SocketAddress dummyAddress = new SocketAddress() {}; + when(channel.localAddress()).thenReturn(dummyAddress); + when(channel.remoteAddress()).thenReturn(dummyAddress); + + metrics.channelActive(channel); + + verify(metricRecorder).addLongCounter( + eq(TcpMetrics.connectionsCreated), eq(1L), eq(Collections.singletonList("target1")), + eq(Arrays.asList("", "", "", ""))); + verify(metricRecorder).addLongUpDownCounter( + eq(TcpMetrics.connectionCount), eq(1L), eq(Collections.singletonList("target1")), + eq(Arrays.asList("", "", "", ""))); + verifyNoMoreInteractions(metricRecorder); + } + @Test public void channelActive_incrementsCounts() { - metrics.channelActive(); + metrics.channelActive(channel); verify(metricRecorder).addLongCounter( eq(TcpMetrics.connectionsCreated), eq(1L), eq(Collections.singletonList("target1")), - eq(Collections.emptyList())); + eq(Arrays.asList("", "", "", ""))); verify(metricRecorder).addLongUpDownCounter( eq(TcpMetrics.connectionCount), eq(1L), eq(Collections.singletonList("target1")), - eq(Collections.emptyList())); + eq(Arrays.asList("", "", "", ""))); verifyNoMoreInteractions(metricRecorder); } @Test public void channelInactive_decrementsCount_noEpoll_noError() { metrics.channelInactive(channel); - // It should decrement connectionCount verify(metricRecorder).addLongUpDownCounter( eq(TcpMetrics.connectionCount), eq(-1L), eq(Collections.singletonList("target1")), - eq(Collections.emptyList())); + eq(Arrays.asList("", "", "", ""))); verifyNoMoreInteractions(metricRecorder); } } From c4111e225631c5b4cca919fa6e4538eda2512bfa Mon Sep 17 00:00:00 2001 From: agravator Date: Fri, 13 Feb 2026 14:25:40 +0530 Subject: [PATCH 3/8] tcp: add jitter --- .../main/java/io/grpc/netty/TcpMetrics.java | 113 ++++++++++++++---- .../java/io/grpc/netty/TcpMetricsTest.java | 48 ++++++++ 2 files changed, 135 insertions(+), 26 deletions(-) diff --git a/netty/src/main/java/io/grpc/netty/TcpMetrics.java b/netty/src/main/java/io/grpc/netty/TcpMetrics.java index f353f7b8ea3..dd51daea07d 100644 --- a/netty/src/main/java/io/grpc/netty/TcpMetrics.java +++ b/netty/src/main/java/io/grpc/netty/TcpMetrics.java @@ -34,6 +34,10 @@ final class TcpMetrics { static final LongCounterMetricInstrument recurringRetransmits; static final DoubleHistogramMetricInstrument minRtt; + // Note: Metrics like delivery_rate, bytes_sent, packets_sent, + // bytes_retransmitted, etc., are not + // currently exposed by Netty's EpollTcpInfo.java wrapper around + // getSockOpt(TCP_INFO)." static { MetricInstrumentRegistry registry = MetricInstrumentRegistry.getDefaultRegistry(); ImmutableList requiredLabels = ImmutableList.of("grpc.target"); @@ -104,6 +108,24 @@ static final class Tracker { this.target = target; } + private static final long RECORD_INTERVAL_MILLIS; + + static { + long interval = 5; + try { + String flagValue = System.getProperty("io.grpc.netty.tcpMetricsRecordIntervalMinutes"); + if (flagValue != null) { + interval = Long.parseLong(flagValue); + } + } catch (NumberFormatException e) { + // Use default + } + RECORD_INTERVAL_MILLIS = java.util.concurrent.TimeUnit.MINUTES.toMillis(interval); + } + + private static final java.util.Random RANDOM = new java.util.Random(); + private io.netty.util.concurrent.ScheduledFuture reportTimer; + void channelActive(Channel channel) { if (metricRecorder != null && target != null) { java.util.List labelValues = getLabelValues(channel); @@ -111,41 +133,80 @@ void channelActive(Channel channel) { Collections.singletonList(target), labelValues); metricRecorder.addLongUpDownCounter(TcpMetrics.connectionCount, 1, Collections.singletonList(target), labelValues); + scheduleNextReport(channel); + } + } + + private void scheduleNextReport(final Channel channel) { + if (RECORD_INTERVAL_MILLIS <= 0) { + return; + } + if (!channel.isActive()) { + return; + } + + double jitter = 0.1 + RANDOM.nextDouble(); // 10% to 110% + long delayMillis = (long) (RECORD_INTERVAL_MILLIS * jitter); + + try { + reportTimer = channel.eventLoop().schedule(new Runnable() { + @Override + public void run() { + if (channel.isActive()) { + Tracker.this.recordTcpInfo(channel); // Renamed from channelInactive to recordTcpInfo + scheduleNextReport(channel); // Re-arm + } + } + }, delayMillis, java.util.concurrent.TimeUnit.MILLISECONDS); + } catch (Throwable t) { + // Channel closed, event loop shut down, etc. } } void channelInactive(Channel channel) { + if (reportTimer != null) { + reportTimer.cancel(false); + } if (metricRecorder != null && target != null) { java.util.List labelValues = getLabelValues(channel); metricRecorder.addLongUpDownCounter(TcpMetrics.connectionCount, -1, Collections.singletonList(target), labelValues); - - try { - if (channel.getClass().getName().equals("io.netty.channel.epoll.EpollSocketChannel")) { - Method tcpInfoMethod = channel.getClass().getMethod("tcpInfo", - Class.forName("io.netty.channel.epoll.EpollTcpInfo")); - Object info = Class.forName("io.netty.channel.epoll.EpollTcpInfo") - .getDeclaredConstructor().newInstance(); - tcpInfoMethod.invoke(channel, info); - - Method totalRetransMethod = info.getClass().getMethod("totalRetrans"); - Method retransmitsMethod = info.getClass().getMethod("retransmits"); - Method rttMethod = info.getClass().getMethod("rtt"); - - long totalRetrans = (Long) totalRetransMethod.invoke(info); - long retransmits = (Long) retransmitsMethod.invoke(info); - long rtt = ((Number) rttMethod.invoke(info)).longValue(); - - metricRecorder.addLongCounter(TcpMetrics.packetsRetransmitted, totalRetrans, - Collections.singletonList(target), labelValues); - metricRecorder.addLongCounter(TcpMetrics.recurringRetransmits, retransmits, - Collections.singletonList(target), labelValues); - metricRecorder.recordDoubleHistogram(TcpMetrics.minRtt, rtt / 1000000.0, - Collections.singletonList(target), labelValues); - } - } catch (Throwable t) { - // Epoll not available or error getting tcp_info, just ignore. + // Final collection on close + recordTcpInfo(channel); + } + } + + private void recordTcpInfo(Channel channel) { + if (metricRecorder == null || target == null) { + return; + } + java.util.List labelValues = getLabelValues(channel); + try { + if (channel.getClass().getName().equals("io.netty.channel.epoll.EpollSocketChannel")) { + Method tcpInfoMethod = channel.getClass().getMethod("tcpInfo", + Class.forName("io.netty.channel.epoll.EpollTcpInfo")); + Object info = Class.forName("io.netty.channel.epoll.EpollTcpInfo") + .getDeclaredConstructor().newInstance(); + tcpInfoMethod.invoke(channel, info); + + Method totalRetransMethod = info.getClass().getMethod("totalRetrans"); + Method retransmitsMethod = info.getClass().getMethod("retransmits"); + Method rttMethod = info.getClass().getMethod("rtt"); + + long totalRetrans = (Long) totalRetransMethod.invoke(info); + int retransmits = (Integer) retransmitsMethod.invoke(info); + long rtt = (Long) rttMethod.invoke(info); + + metricRecorder.addLongCounter(TcpMetrics.packetsRetransmitted, totalRetrans, + Collections.singletonList(target), labelValues); + metricRecorder.addLongCounter(TcpMetrics.recurringRetransmits, retransmits, + Collections.singletonList(target), labelValues); + metricRecorder.recordDoubleHistogram(TcpMetrics.minRtt, + rtt / 1000000.0, // Convert microseconds to seconds + Collections.singletonList(target), labelValues); } + } catch (Throwable t) { + // Epoll not available or error getting tcp_info, just ignore. } } } diff --git a/netty/src/test/java/io/grpc/netty/TcpMetricsTest.java b/netty/src/test/java/io/grpc/netty/TcpMetricsTest.java index 1eef3050e93..19dba67df5f 100644 --- a/netty/src/test/java/io/grpc/netty/TcpMetricsTest.java +++ b/netty/src/test/java/io/grpc/netty/TcpMetricsTest.java @@ -16,6 +16,8 @@ package io.grpc.netty; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; @@ -23,16 +25,20 @@ import io.grpc.MetricRecorder; import io.netty.channel.Channel; +import io.netty.channel.EventLoop; +import io.netty.util.concurrent.ScheduledFuture; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.Arrays; import java.util.Collections; +import java.util.concurrent.TimeUnit; import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.junit.MockitoJUnit; import org.mockito.junit.MockitoRule; @@ -44,11 +50,18 @@ public class TcpMetricsTest { @Mock private MetricRecorder metricRecorder; @Mock private Channel channel; + @Mock + private EventLoop eventLoop; + @Mock + private ScheduledFuture scheduledFuture; private TcpMetrics.Tracker metrics; @Before public void setUp() { + when(channel.eventLoop()).thenReturn(eventLoop); + when(eventLoop.schedule(any(Runnable.class), anyLong(), any(TimeUnit.class))) + .thenAnswer(invocation -> scheduledFuture); metrics = new TcpMetrics.Tracker(metricRecorder, "target1"); } @@ -151,4 +164,39 @@ public void channelInactive_decrementsCount_noEpoll_noError() { eq(Arrays.asList("", "", "", ""))); verifyNoMoreInteractions(metricRecorder); } + + @Test + public void channelActive_schedulesReportTimer() { + when(channel.isActive()).thenReturn(true); + metrics.channelActive(channel); + + ArgumentCaptor runnableCaptor = ArgumentCaptor.forClass(Runnable.class); + ArgumentCaptor delayCaptor = ArgumentCaptor.forClass(Long.class); + verify(eventLoop).schedule( + runnableCaptor.capture(), delayCaptor.capture(), eq(TimeUnit.MILLISECONDS)); + + Runnable task = runnableCaptor.getValue(); + long delay = delayCaptor.getValue(); + + // Default RECORD_INTERVAL_MILLIS is 5 minutes (300,000 ms) + // Jitter is 10% to 110%, so 30,000 ms to 330,000 ms + org.junit.Assert.assertTrue("Delay should be >= 30000 but was " + delay, delay >= 30_000); + org.junit.Assert.assertTrue("Delay should be <= 330000 but was " + delay, delay <= 330_000); + + // Run the task to verify rescheduling + task.run(); + + verify(eventLoop, org.mockito.Mockito.times(2)) + .schedule(any(Runnable.class), anyLong(), eq(TimeUnit.MILLISECONDS)); + } + + @Test + public void channelInactive_cancelsReportTimer() { + when(channel.isActive()).thenReturn(true); + metrics.channelActive(channel); + + metrics.channelInactive(channel); + + verify(scheduledFuture).cancel(false); + } } From 172f6c45ad9b08ccfdba8fc054cef8a6ddf597a1 Mon Sep 17 00:00:00 2001 From: agravator Date: Fri, 13 Feb 2026 15:30:34 +0530 Subject: [PATCH 4/8] tcp: add null check --- .../test/java/io/grpc/netty/TcpMetricsTest.java | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/netty/src/test/java/io/grpc/netty/TcpMetricsTest.java b/netty/src/test/java/io/grpc/netty/TcpMetricsTest.java index 19dba67df5f..8ddc25bcd24 100644 --- a/netty/src/test/java/io/grpc/netty/TcpMetricsTest.java +++ b/netty/src/test/java/io/grpc/netty/TcpMetricsTest.java @@ -79,12 +79,15 @@ public void registeredMetrics_haveCorrectOptionalLabels() { expectedOptionalLabels, TcpMetrics.connectionsCreated.getOptionalLabelKeys()); org.junit.Assert.assertEquals( expectedOptionalLabels, TcpMetrics.connectionCount.getOptionalLabelKeys()); - org.junit.Assert.assertEquals( - expectedOptionalLabels, TcpMetrics.packetsRetransmitted.getOptionalLabelKeys()); - org.junit.Assert.assertEquals( - expectedOptionalLabels, TcpMetrics.recurringRetransmits.getOptionalLabelKeys()); - org.junit.Assert.assertEquals( - expectedOptionalLabels, TcpMetrics.minRtt.getOptionalLabelKeys()); + + if (TcpMetrics.packetsRetransmitted != null) { + org.junit.Assert.assertEquals( + expectedOptionalLabels, TcpMetrics.packetsRetransmitted.getOptionalLabelKeys()); + org.junit.Assert.assertEquals( + expectedOptionalLabels, TcpMetrics.recurringRetransmits.getOptionalLabelKeys()); + org.junit.Assert.assertEquals( + expectedOptionalLabels, TcpMetrics.minRtt.getOptionalLabelKeys()); + } } @Test From 6996afd8d67bbe0620b1e4a1958b61fb8c702e4d Mon Sep 17 00:00:00 2001 From: agravator Date: Mon, 16 Feb 2026 12:17:59 +0530 Subject: [PATCH 5/8] fix test --- .../main/java/io/grpc/netty/TcpMetrics.java | 153 +++++++++++++----- 1 file changed, 112 insertions(+), 41 deletions(-) diff --git a/netty/src/main/java/io/grpc/netty/TcpMetrics.java b/netty/src/main/java/io/grpc/netty/TcpMetrics.java index dd51daea07d..51780b41180 100644 --- a/netty/src/main/java/io/grpc/netty/TcpMetrics.java +++ b/netty/src/main/java/io/grpc/netty/TcpMetrics.java @@ -16,15 +16,17 @@ package io.grpc.netty; -import com.google.common.collect.ImmutableList; import io.grpc.DoubleHistogramMetricInstrument; import io.grpc.LongCounterMetricInstrument; import io.grpc.LongUpDownCounterMetricInstrument; +import io.grpc.MetricInstrument; import io.grpc.MetricInstrumentRegistry; import io.grpc.MetricRecorder; import io.netty.channel.Channel; import java.lang.reflect.Method; +import java.util.Arrays; import java.util.Collections; +import java.util.List; final class TcpMetrics { @@ -38,64 +40,133 @@ final class TcpMetrics { // bytes_retransmitted, etc., are not // currently exposed by Netty's EpollTcpInfo.java wrapper around // getSockOpt(TCP_INFO)." + /** + * Safe metric registration or retrieval for environments where TcpMetrics might + * be loaded multiple times (e.g., shaded and unshaded). + */ + private static LongCounterMetricInstrument safelyRegisterLongCounter( + MetricInstrumentRegistry registry, String name, String description, String unit, + List requiredLabelKeys, List optionalLabelKeys) { + try { + return registry.registerLongCounter(name, description, unit, requiredLabelKeys, + optionalLabelKeys, false); + } catch (IllegalStateException e) { + if (e.getMessage() != null && e.getMessage().contains("already exists")) { + for (MetricInstrument instrument : registry.getMetricInstruments()) { + if (instrument.getName().equals(name) + && instrument instanceof LongCounterMetricInstrument) { + return (LongCounterMetricInstrument) instrument; + } + } + } + throw e; + } + } + + private static LongUpDownCounterMetricInstrument safelyRegisterLongUpDownCounter( + MetricInstrumentRegistry registry, String name, String description, String unit, + List requiredLabelKeys, List optionalLabelKeys) { + try { + return registry.registerLongUpDownCounter(name, description, unit, requiredLabelKeys, + optionalLabelKeys, false); + } catch (IllegalStateException e) { + if (e.getMessage() != null && e.getMessage().contains("already exists")) { + for (MetricInstrument instrument : registry.getMetricInstruments()) { + if (instrument.getName().equals(name) + && instrument instanceof LongUpDownCounterMetricInstrument) { + return (LongUpDownCounterMetricInstrument) instrument; + } + } + } + throw e; + } + } + + private static DoubleHistogramMetricInstrument safelyRegisterDoubleHistogram( + MetricInstrumentRegistry registry, String name, String description, String unit, + List bucketBoundaries, List requiredLabelKeys, + List optionalLabelKeys) { + try { + return registry.registerDoubleHistogram(name, description, unit, bucketBoundaries, + requiredLabelKeys, optionalLabelKeys, false); + } catch (IllegalStateException e) { + if (e.getMessage() != null && e.getMessage().contains("already exists")) { + for (MetricInstrument instrument : registry.getMetricInstruments()) { + if (instrument.getName().equals(name) + && instrument instanceof DoubleHistogramMetricInstrument) { + return (DoubleHistogramMetricInstrument) instrument; + } + } + } + throw e; + } + } + static { MetricInstrumentRegistry registry = MetricInstrumentRegistry.getDefaultRegistry(); - ImmutableList requiredLabels = ImmutableList.of("grpc.target"); - ImmutableList optionalLabels = ImmutableList.of( + List requiredLabels = Collections.singletonList("grpc.target"); + List optionalLabels = Arrays.asList( "network.local.address", "network.local.port", "network.peer.address", "network.peer.port" ); - connectionsCreated = registry.registerLongCounter( - "grpc.tcp.connections_created", + connectionsCreated = safelyRegisterLongCounter(registry, + "grpc.tcp.connections_created", "Number of TCP connections created.", "{connection}", requiredLabels, - optionalLabels, - false + optionalLabels ); - connectionCount = registry.registerLongUpDownCounter( - "grpc.tcp.connection_count", - "Number of active TCP connections.", - "{connection}", + connectionCount = safelyRegisterLongUpDownCounter(registry, + "grpc.tcp.connection_count", + "Number of currently open TCP connections.", + "{connection}", requiredLabels, - optionalLabels, - false + optionalLabels ); - packetsRetransmitted = registry.registerLongCounter( - "grpc.tcp.packets_retransmitted", - "Total packets sent by TCP except those sent for the first time.", - "{packet}", - requiredLabels, - optionalLabels, - false - ); + boolean epollAvailable = false; + try { + Class epollClass = Class.forName("io.netty.channel.epoll.Epoll"); + Method isAvailableMethod = epollClass.getDeclaredMethod("isAvailable"); + epollAvailable = (Boolean) isAvailableMethod.invoke(null); + } catch (Throwable t) { + // Ignored + } - recurringRetransmits = registry.registerLongCounter( - "grpc.tcp.recurring_retransmits", - "The number of times the latest TCP packet was retransmitted.", - "{packet}", - requiredLabels, - optionalLabels, - false - ); + if (epollAvailable) { + packetsRetransmitted = safelyRegisterLongCounter(registry, + "grpc.tcp.packets_retransmitted", + "Total number of packets retransmitted for a single TCP connection.", + "{packet}", + requiredLabels, + optionalLabels); - minRtt = registry.registerDoubleHistogram( - "grpc.tcp.min_rtt", - "TCP's current estimate of minimum round trip time (RTT).", - "s", - ImmutableList.of( - 0.0001, 0.0005, 0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 50.0, 100.0, 500.0, - 1000.0 - ), - requiredLabels, - optionalLabels, - false - ); + recurringRetransmits = safelyRegisterLongCounter(registry, + "grpc.tcp.recurring_retransmits", + "Total number of unacknowledged packets to be retransmitted " + + "since the last acknowledgment.", + "{packet}", + requiredLabels, + optionalLabels); + + minRtt = safelyRegisterDoubleHistogram(registry, + "grpc.tcp.min_rtt", + "Minimum RTT observed for a single TCP connection.", + "s", + Arrays.asList(0.0001, 0.0005, 0.001, 0.005, 0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, + 5.0, 10.0, 25.0, 50.0, 100.0, 250.0), + requiredLabels, + optionalLabels); + } else { + // Initialize to null if epoll is not available, as these metrics won't be used. + packetsRetransmitted = null; + recurringRetransmits = null; + minRtt = null; + } } From 42a834bca5ff774448edc2249ceb8bf5aaf93404 Mon Sep 17 00:00:00 2001 From: agravator Date: Mon, 16 Feb 2026 13:35:10 +0530 Subject: [PATCH 6/8] increase coverage --- .../main/java/io/grpc/netty/TcpMetrics.java | 215 ++++++++++-------- .../java/io/grpc/netty/TcpMetricsTest.java | 153 +++++++++++-- 2 files changed, 258 insertions(+), 110 deletions(-) diff --git a/netty/src/main/java/io/grpc/netty/TcpMetrics.java b/netty/src/main/java/io/grpc/netty/TcpMetrics.java index 51780b41180..b6052996da9 100644 --- a/netty/src/main/java/io/grpc/netty/TcpMetrics.java +++ b/netty/src/main/java/io/grpc/netty/TcpMetrics.java @@ -30,16 +30,85 @@ final class TcpMetrics { - static final LongCounterMetricInstrument connectionsCreated; - static final LongUpDownCounterMetricInstrument connectionCount; - static final LongCounterMetricInstrument packetsRetransmitted; - static final LongCounterMetricInstrument recurringRetransmits; - static final DoubleHistogramMetricInstrument minRtt; - - // Note: Metrics like delivery_rate, bytes_sent, packets_sent, - // bytes_retransmitted, etc., are not - // currently exposed by Netty's EpollTcpInfo.java wrapper around - // getSockOpt(TCP_INFO)." + private static final Metrics DEFAULT_METRICS; + + static { + boolean epollAvailable = false; + try { + Class epollClass = Class.forName("io.netty.channel.epoll.Epoll"); + Method isAvailableMethod = epollClass.getDeclaredMethod("isAvailable"); + epollAvailable = (Boolean) isAvailableMethod.invoke(null); + } catch (Throwable t) { + // Ignored + } + DEFAULT_METRICS = new Metrics(MetricInstrumentRegistry.getDefaultRegistry(), epollAvailable); + } + + static Metrics getDefaultMetrics() { + return DEFAULT_METRICS; + } + + static final class Metrics { + final LongCounterMetricInstrument connectionsCreated; + final LongUpDownCounterMetricInstrument connectionCount; + final LongCounterMetricInstrument packetsRetransmitted; + final LongCounterMetricInstrument recurringRetransmits; + final DoubleHistogramMetricInstrument minRtt; + + Metrics(MetricInstrumentRegistry registry, boolean epollAvailable) { + List requiredLabels = Collections.singletonList("grpc.target"); + List optionalLabels = Arrays.asList( + "network.local.address", + "network.local.port", + "network.peer.address", + "network.peer.port"); + + connectionsCreated = safelyRegisterLongCounter(registry, + "grpc.tcp.connections_created", + "Number of TCP connections created.", + "{connection}", + requiredLabels, + optionalLabels); + + connectionCount = safelyRegisterLongUpDownCounter(registry, + "grpc.tcp.connection_count", + "Number of currently open TCP connections.", + "{connection}", + requiredLabels, + optionalLabels); + + if (epollAvailable) { + packetsRetransmitted = safelyRegisterLongCounter(registry, + "grpc.tcp.packets_retransmitted", + "Total number of packets retransmitted for a single TCP connection.", + "{packet}", + requiredLabels, + optionalLabels); + + recurringRetransmits = safelyRegisterLongCounter(registry, + "grpc.tcp.recurring_retransmits", + "Total number of unacknowledged packets to be retransmitted " + + "since the last acknowledgment.", + "{packet}", + requiredLabels, + optionalLabels); + + minRtt = safelyRegisterDoubleHistogram(registry, + "grpc.tcp.min_rtt", + "Minimum RTT observed for a single TCP connection.", + "s", + Arrays.asList(0.0001, 0.0005, 0.001, 0.005, 0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, + 5.0, 10.0, 25.0, 50.0, 100.0, 250.0), + requiredLabels, + optionalLabels); + } else { + packetsRetransmitted = null; + recurringRetransmits = null; + minRtt = null; + } + } + } + /** * Safe metric registration or retrieval for environments where TcpMetrics might * be loaded multiple times (e.g., shaded and unshaded). @@ -102,81 +171,30 @@ private static DoubleHistogramMetricInstrument safelyRegisterDoubleHistogram( } } - static { - MetricInstrumentRegistry registry = MetricInstrumentRegistry.getDefaultRegistry(); - List requiredLabels = Collections.singletonList("grpc.target"); - List optionalLabels = Arrays.asList( - "network.local.address", - "network.local.port", - "network.peer.address", - "network.peer.port" - ); - - connectionsCreated = safelyRegisterLongCounter(registry, - "grpc.tcp.connections_created", - "Number of TCP connections created.", - "{connection}", - requiredLabels, - optionalLabels - ); - - connectionCount = safelyRegisterLongUpDownCounter(registry, - "grpc.tcp.connection_count", - "Number of currently open TCP connections.", - "{connection}", - requiredLabels, - optionalLabels - ); - - boolean epollAvailable = false; - try { - Class epollClass = Class.forName("io.netty.channel.epoll.Epoll"); - Method isAvailableMethod = epollClass.getDeclaredMethod("isAvailable"); - epollAvailable = (Boolean) isAvailableMethod.invoke(null); - } catch (Throwable t) { - // Ignored - } - - if (epollAvailable) { - packetsRetransmitted = safelyRegisterLongCounter(registry, - "grpc.tcp.packets_retransmitted", - "Total number of packets retransmitted for a single TCP connection.", - "{packet}", - requiredLabels, - optionalLabels); - - recurringRetransmits = safelyRegisterLongCounter(registry, - "grpc.tcp.recurring_retransmits", - "Total number of unacknowledged packets to be retransmitted " - + "since the last acknowledgment.", - "{packet}", - requiredLabels, - optionalLabels); - - minRtt = safelyRegisterDoubleHistogram(registry, - "grpc.tcp.min_rtt", - "Minimum RTT observed for a single TCP connection.", - "s", - Arrays.asList(0.0001, 0.0005, 0.001, 0.005, 0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, - 5.0, 10.0, 25.0, 50.0, 100.0, 250.0), - requiredLabels, - optionalLabels); - } else { - // Initialize to null if epoll is not available, as these metrics won't be used. - packetsRetransmitted = null; - recurringRetransmits = null; - minRtt = null; - } - } - - static final class Tracker { private final MetricRecorder metricRecorder; private final String target; + private final Metrics metrics; + private final String epollSocketChannelClassName; + private final String epollTcpInfoClassName; Tracker(MetricRecorder metricRecorder, String target) { + this(metricRecorder, target, DEFAULT_METRICS); + } + + Tracker(MetricRecorder metricRecorder, String target, Metrics metrics) { + this(metricRecorder, target, metrics, + "io.netty.channel.epoll.EpollSocketChannel", + "io.netty.channel.epoll.EpollTcpInfo"); + } + + Tracker(MetricRecorder metricRecorder, String target, Metrics metrics, + String epollSocketChannelClassName, String epollTcpInfoClassName) { this.metricRecorder = metricRecorder; this.target = target; + this.metrics = metrics; + this.epollSocketChannelClassName = epollSocketChannelClassName; + this.epollTcpInfoClassName = epollTcpInfoClassName; } private static final long RECORD_INTERVAL_MILLIS; @@ -200,9 +218,9 @@ static final class Tracker { void channelActive(Channel channel) { if (metricRecorder != null && target != null) { java.util.List labelValues = getLabelValues(channel); - metricRecorder.addLongCounter(TcpMetrics.connectionsCreated, 1, + metricRecorder.addLongCounter(metrics.connectionsCreated, 1, Collections.singletonList(target), labelValues); - metricRecorder.addLongUpDownCounter(TcpMetrics.connectionCount, 1, + metricRecorder.addLongUpDownCounter(metrics.connectionCount, 1, Collections.singletonList(target), labelValues); scheduleNextReport(channel); } @@ -240,7 +258,7 @@ void channelInactive(Channel channel) { } if (metricRecorder != null && target != null) { java.util.List labelValues = getLabelValues(channel); - metricRecorder.addLongUpDownCounter(TcpMetrics.connectionCount, -1, + metricRecorder.addLongUpDownCounter(metrics.connectionCount, -1, Collections.singletonList(target), labelValues); // Final collection on close recordTcpInfo(channel); @@ -253,28 +271,33 @@ private void recordTcpInfo(Channel channel) { } java.util.List labelValues = getLabelValues(channel); try { - if (channel.getClass().getName().equals("io.netty.channel.epoll.EpollSocketChannel")) { - Method tcpInfoMethod = channel.getClass().getMethod("tcpInfo", - Class.forName("io.netty.channel.epoll.EpollTcpInfo")); - Object info = Class.forName("io.netty.channel.epoll.EpollTcpInfo") - .getDeclaredConstructor().newInstance(); + if (channel.getClass().getName().equals(epollSocketChannelClassName)) { + Class tcpInfoClass = Class.forName(epollTcpInfoClassName); + Method tcpInfoMethod = channel.getClass().getMethod("tcpInfo", tcpInfoClass); + Object info = tcpInfoClass.getDeclaredConstructor().newInstance(); tcpInfoMethod.invoke(channel, info); - Method totalRetransMethod = info.getClass().getMethod("totalRetrans"); - Method retransmitsMethod = info.getClass().getMethod("retransmits"); - Method rttMethod = info.getClass().getMethod("rtt"); + Method totalRetransMethod = tcpInfoClass.getMethod("totalRetrans"); + Method retransmitsMethod = tcpInfoClass.getMethod("retransmits"); + Method rttMethod = tcpInfoClass.getMethod("rtt"); long totalRetrans = (Long) totalRetransMethod.invoke(info); int retransmits = (Integer) retransmitsMethod.invoke(info); long rtt = (Long) rttMethod.invoke(info); - metricRecorder.addLongCounter(TcpMetrics.packetsRetransmitted, totalRetrans, - Collections.singletonList(target), labelValues); - metricRecorder.addLongCounter(TcpMetrics.recurringRetransmits, retransmits, - Collections.singletonList(target), labelValues); - metricRecorder.recordDoubleHistogram(TcpMetrics.minRtt, - rtt / 1000000.0, // Convert microseconds to seconds - Collections.singletonList(target), labelValues); + if (metrics.packetsRetransmitted != null) { + metricRecorder.addLongCounter(metrics.packetsRetransmitted, totalRetrans, + Collections.singletonList(target), labelValues); + } + if (metrics.recurringRetransmits != null) { + metricRecorder.addLongCounter(metrics.recurringRetransmits, retransmits, + Collections.singletonList(target), labelValues); + } + if (metrics.minRtt != null) { + metricRecorder.recordDoubleHistogram(metrics.minRtt, + rtt / 1000000.0, // Convert microseconds to seconds + Collections.singletonList(target), labelValues); + } } } catch (Throwable t) { // Epoll not available or error getting tcp_info, just ignore. diff --git a/netty/src/test/java/io/grpc/netty/TcpMetricsTest.java b/netty/src/test/java/io/grpc/netty/TcpMetricsTest.java index 8ddc25bcd24..aa5ce3e9532 100644 --- a/netty/src/test/java/io/grpc/netty/TcpMetricsTest.java +++ b/netty/src/test/java/io/grpc/netty/TcpMetricsTest.java @@ -65,6 +65,119 @@ public void setUp() { metrics = new TcpMetrics.Tracker(metricRecorder, "target1"); } + @Test + public void metricsInitialization_epollUnavailable() { + TcpMetrics.Metrics metrics = new TcpMetrics.Metrics( + io.grpc.MetricInstrumentRegistry.getDefaultRegistry(), false); + + org.junit.Assert.assertNotNull(metrics.connectionsCreated); + org.junit.Assert.assertNotNull(metrics.connectionCount); + org.junit.Assert.assertNull(metrics.packetsRetransmitted); + org.junit.Assert.assertNull(metrics.recurringRetransmits); + org.junit.Assert.assertNull(metrics.minRtt); + } + + @Test + public void metricsInitialization_epollAvailable() { + TcpMetrics.Metrics metrics = new TcpMetrics.Metrics( + io.grpc.MetricInstrumentRegistry.getDefaultRegistry(), true); + + org.junit.Assert.assertNotNull(metrics.connectionsCreated); + org.junit.Assert.assertNotNull(metrics.connectionCount); + org.junit.Assert.assertNotNull(metrics.packetsRetransmitted); + org.junit.Assert.assertNotNull(metrics.recurringRetransmits); + org.junit.Assert.assertNotNull(metrics.minRtt); + } + + @Test + public void safelyRegister_collision() { + io.grpc.MetricInstrumentRegistry registry = + io.grpc.MetricInstrumentRegistry.getDefaultRegistry(); + + // Explicitly register one metric to ensure collision path is triggered + try { + registry.registerLongCounter("grpc.tcp.connections_created", "desc", "unit", + Collections.emptyList(), Collections.emptyList(), false); + } catch (IllegalStateException e) { + // Already exists, which is fine for this test + } + + TcpMetrics.Metrics metrics = new TcpMetrics.Metrics(registry, true); + + org.junit.Assert.assertNotNull(metrics.connectionsCreated); + org.junit.Assert.assertNotNull(metrics.connectionCount); + org.junit.Assert.assertNotNull(metrics.minRtt); + } + + public static class FakeWithTcpInfo extends io.netty.channel.embedded.EmbeddedChannel { + public void tcpInfo(FakeEpollTcpInfo info) { + info.totalRetrans = 123; + info.retransmits = 4; + info.rtt = 5000; + } + } + + public static class FakeEpollTcpInfo { + long totalRetrans; + int retransmits; + long rtt; + + public void setValues(long totalRetrans, int retransmits, long rtt) { + this.totalRetrans = totalRetrans; + this.retransmits = retransmits; + this.rtt = rtt; + } + + public long totalRetrans() { + return totalRetrans; + } + + public int retransmits() { + return retransmits; + } + + public long rtt() { + return rtt; + } + } + + @Test + public void tracker_recordTcpInfo_reflectionSuccess() throws Exception { + MetricRecorder recorder = org.mockito.Mockito.mock(MetricRecorder.class); + TcpMetrics.Metrics metrics = new TcpMetrics.Metrics( + io.grpc.MetricInstrumentRegistry.getDefaultRegistry(), true); + + String fakeChannelName = FakeWithTcpInfo.class.getName(); + String fakeInfoName = FakeEpollTcpInfo.class.getName(); + + TcpMetrics.Tracker tracker = new TcpMetrics.Tracker(recorder, "target", metrics, + fakeChannelName, fakeInfoName); + + FakeWithTcpInfo channel = new FakeWithTcpInfo(); + channel.writeInbound("dummy"); + + tracker.channelInactive(channel); + + verify(recorder).addLongCounter(eq(metrics.packetsRetransmitted), eq(123L), any(), any()); + verify(recorder).addLongCounter(eq(metrics.recurringRetransmits), eq(4L), any(), any()); + verify(recorder).recordDoubleHistogram(eq(metrics.minRtt), eq(0.005), any(), any()); + } + + @Test + public void tracker_recordTcpInfo_reflectionFailure() { + MetricRecorder recorder = org.mockito.Mockito.mock(MetricRecorder.class); + TcpMetrics.Metrics metrics = new TcpMetrics.Metrics( + io.grpc.MetricInstrumentRegistry.getDefaultRegistry(), true); + + TcpMetrics.Tracker tracker = new TcpMetrics.Tracker(recorder, "target", metrics, + "non.existent.Class", "non.existent.Info"); + + Channel channel = org.mockito.Mockito.mock(Channel.class); + when(channel.isActive()).thenReturn(true); + + // Should catch exception and ignore + tracker.channelInactive(channel); + } @Test public void registeredMetrics_haveCorrectOptionalLabels() { @@ -76,17 +189,21 @@ public void registeredMetrics_haveCorrectOptionalLabels() { ); org.junit.Assert.assertEquals( - expectedOptionalLabels, TcpMetrics.connectionsCreated.getOptionalLabelKeys()); + expectedOptionalLabels, + TcpMetrics.getDefaultMetrics().connectionsCreated.getOptionalLabelKeys()); org.junit.Assert.assertEquals( - expectedOptionalLabels, TcpMetrics.connectionCount.getOptionalLabelKeys()); + expectedOptionalLabels, + TcpMetrics.getDefaultMetrics().connectionCount.getOptionalLabelKeys()); - if (TcpMetrics.packetsRetransmitted != null) { + if (TcpMetrics.getDefaultMetrics().packetsRetransmitted != null) { org.junit.Assert.assertEquals( - expectedOptionalLabels, TcpMetrics.packetsRetransmitted.getOptionalLabelKeys()); + expectedOptionalLabels, + TcpMetrics.getDefaultMetrics().packetsRetransmitted.getOptionalLabelKeys()); org.junit.Assert.assertEquals( - expectedOptionalLabels, TcpMetrics.recurringRetransmits.getOptionalLabelKeys()); + expectedOptionalLabels, + TcpMetrics.getDefaultMetrics().recurringRetransmits.getOptionalLabelKeys()); org.junit.Assert.assertEquals( - expectedOptionalLabels, TcpMetrics.minRtt.getOptionalLabelKeys()); + expectedOptionalLabels, TcpMetrics.getDefaultMetrics().minRtt.getOptionalLabelKeys()); } } @@ -101,11 +218,13 @@ public void channelActive_extractsLabels_ipv4() throws Exception { metrics.channelActive(channel); verify(metricRecorder).addLongCounter( - eq(TcpMetrics.connectionsCreated), eq(1L), eq(Collections.singletonList("target1")), + eq(TcpMetrics.getDefaultMetrics().connectionsCreated), eq(1L), + eq(Collections.singletonList("target1")), eq(Arrays.asList( localInet.getHostAddress(), "8080", remoteInet.getHostAddress(), "443"))); verify(metricRecorder).addLongUpDownCounter( - eq(TcpMetrics.connectionCount), eq(1L), eq(Collections.singletonList("target1")), + eq(TcpMetrics.getDefaultMetrics().connectionCount), eq(1L), + eq(Collections.singletonList("target1")), eq(Arrays.asList( localInet.getHostAddress(), "8080", remoteInet.getHostAddress(), "443"))); verifyNoMoreInteractions(metricRecorder); @@ -124,7 +243,8 @@ public void channelInactive_extractsLabels_ipv6() throws Exception { metrics.channelInactive(channel); verify(metricRecorder).addLongUpDownCounter( - eq(TcpMetrics.connectionCount), eq(-1L), eq(Collections.singletonList("target1")), + eq(TcpMetrics.getDefaultMetrics().connectionCount), eq(-1L), + eq(Collections.singletonList("target1")), eq(Arrays.asList( localInet.getHostAddress(), "8080", remoteInet.getHostAddress(), "443"))); verifyNoMoreInteractions(metricRecorder); @@ -139,10 +259,12 @@ public void channelActive_extractsLabels_nonInetAddress() throws Exception { metrics.channelActive(channel); verify(metricRecorder).addLongCounter( - eq(TcpMetrics.connectionsCreated), eq(1L), eq(Collections.singletonList("target1")), + eq(TcpMetrics.getDefaultMetrics().connectionsCreated), eq(1L), + eq(Collections.singletonList("target1")), eq(Arrays.asList("", "", "", ""))); verify(metricRecorder).addLongUpDownCounter( - eq(TcpMetrics.connectionCount), eq(1L), eq(Collections.singletonList("target1")), + eq(TcpMetrics.getDefaultMetrics().connectionCount), eq(1L), + eq(Collections.singletonList("target1")), eq(Arrays.asList("", "", "", ""))); verifyNoMoreInteractions(metricRecorder); } @@ -151,10 +273,12 @@ public void channelActive_extractsLabels_nonInetAddress() throws Exception { public void channelActive_incrementsCounts() { metrics.channelActive(channel); verify(metricRecorder).addLongCounter( - eq(TcpMetrics.connectionsCreated), eq(1L), eq(Collections.singletonList("target1")), + eq(TcpMetrics.getDefaultMetrics().connectionsCreated), eq(1L), + eq(Collections.singletonList("target1")), eq(Arrays.asList("", "", "", ""))); verify(metricRecorder).addLongUpDownCounter( - eq(TcpMetrics.connectionCount), eq(1L), eq(Collections.singletonList("target1")), + eq(TcpMetrics.getDefaultMetrics().connectionCount), eq(1L), + eq(Collections.singletonList("target1")), eq(Arrays.asList("", "", "", ""))); verifyNoMoreInteractions(metricRecorder); } @@ -163,7 +287,8 @@ public void channelActive_incrementsCounts() { public void channelInactive_decrementsCount_noEpoll_noError() { metrics.channelInactive(channel); verify(metricRecorder).addLongUpDownCounter( - eq(TcpMetrics.connectionCount), eq(-1L), eq(Collections.singletonList("target1")), + eq(TcpMetrics.getDefaultMetrics().connectionCount), eq(-1L), + eq(Collections.singletonList("target1")), eq(Arrays.asList("", "", "", ""))); verifyNoMoreInteractions(metricRecorder); } From 65487c638042244b8c3090baa84cf42eb5922f38 Mon Sep 17 00:00:00 2001 From: agravator Date: Mon, 16 Feb 2026 13:57:51 +0530 Subject: [PATCH 7/8] formatting changes --- core/src/main/java/io/grpc/internal/ServerTransport.java | 1 - netty/src/main/java/io/grpc/netty/NettyServerHandler.java | 2 -- netty/src/main/java/io/grpc/netty/NettyServerTransport.java | 1 - .../src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java | 1 - 4 files changed, 5 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/ServerTransport.java b/core/src/main/java/io/grpc/internal/ServerTransport.java index fb26ddfb3f7..1bda2f2b41e 100644 --- a/core/src/main/java/io/grpc/internal/ServerTransport.java +++ b/core/src/main/java/io/grpc/internal/ServerTransport.java @@ -44,5 +44,4 @@ public interface ServerTransport extends InternalInstrumented { * outstanding tasks are cancelled when the transport terminates. */ ScheduledExecutorService getScheduledExecutorService(); - } diff --git a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java index 4bbc6f42377..fff816dcd6e 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java @@ -370,7 +370,6 @@ public void onStreamClosed(Http2Stream stream) { checkArgument(maxMessageSize >= 0, "maxMessageSize must be non-negative: %s", maxMessageSize); this.maxMessageSize = maxMessageSize; this.tcpMetrics = new TcpMetrics.Tracker(metricRecorder, "server"); - this.keepAliveTimeInNanos = keepAliveTimeInNanos; this.keepAliveTimeoutInNanos = keepAliveTimeoutInNanos; this.maxConnectionIdleManager = maxConnectionIdleManager; @@ -673,7 +672,6 @@ void setKeepAliveManagerForTest(KeepAliveManager keepAliveManager) { @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { tcpMetrics.channelInactive(ctx.channel()); - try { if (keepAliveManager != null) { keepAliveManager.onTransportTermination(); diff --git a/netty/src/main/java/io/grpc/netty/NettyServerTransport.java b/netty/src/main/java/io/grpc/netty/NettyServerTransport.java index 8d78b8387ee..c0e52b75876 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerTransport.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerTransport.java @@ -46,7 +46,6 @@ import java.util.logging.Level; import java.util.logging.Logger; - /** * The Netty-based server transport. */ diff --git a/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java b/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java index 987ce2e5130..f87912c44ea 100644 --- a/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java +++ b/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java @@ -146,7 +146,6 @@ */ @RunWith(JUnit4.class) public class OkHttpClientTransportTest { - private static final int TIME_OUT_MS = 2000; private static final int INITIAL_WINDOW_SIZE = 65535; private static final String NETWORK_ISSUE_MESSAGE = "network issue"; From 2b448891c403a3228c8b66c7eaa391c717daff48 Mon Sep 17 00:00:00 2001 From: agravator Date: Tue, 17 Feb 2026 14:11:01 +0530 Subject: [PATCH 8/8] add missing metric collection when channel becomes inactive --- netty/src/main/java/io/grpc/netty/NettyClientHandler.java | 1 + 1 file changed, 1 insertion(+) diff --git a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java index ec64809469c..28d5e719903 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java @@ -508,6 +508,7 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception { public void channelInactive(ChannelHandlerContext ctx) throws Exception { try { logger.fine("Network channel is closed"); + tcpMetrics.channelInactive(ctx.channel()); Status status = Status.UNAVAILABLE.withDescription("Network closed for unknown reason"); lifecycleManager.notifyShutdown(status, SimpleDisconnectError.UNKNOWN); final Status streamStatus;