diff --git a/core/src/main/java/io/grpc/internal/ClientTransportFactory.java b/core/src/main/java/io/grpc/internal/ClientTransportFactory.java index 6c10ced4652..dffd18ae5c7 100644 --- a/core/src/main/java/io/grpc/internal/ClientTransportFactory.java +++ b/core/src/main/java/io/grpc/internal/ClientTransportFactory.java @@ -24,6 +24,7 @@ import io.grpc.ChannelCredentials; import io.grpc.ChannelLogger; import io.grpc.HttpConnectProxiedSocketAddress; +import io.grpc.MetricRecorder; import java.io.Closeable; import java.net.SocketAddress; import java.util.Collection; @@ -91,6 +92,7 @@ final class ClientTransportOptions { private Attributes eagAttributes = Attributes.EMPTY; @Nullable private String userAgent; @Nullable private HttpConnectProxiedSocketAddress connectProxiedSocketAddr; + @Nullable private MetricRecorder metricRecorder; public ChannelLogger getChannelLogger() { return channelLogger; @@ -101,6 +103,16 @@ public ClientTransportOptions setChannelLogger(ChannelLogger channelLogger) { return this; } + @Nullable + public MetricRecorder getMetricRecorder() { + return metricRecorder; + } + + public ClientTransportOptions setMetricRecorder(@Nullable MetricRecorder metricRecorder) { + this.metricRecorder = metricRecorder; + return this; + } + public String getAuthority() { return authority; } diff --git a/core/src/main/java/io/grpc/internal/InternalServer.java b/core/src/main/java/io/grpc/internal/InternalServer.java index a6079081233..d8cf764be15 100644 --- a/core/src/main/java/io/grpc/internal/InternalServer.java +++ b/core/src/main/java/io/grpc/internal/InternalServer.java @@ -18,6 +18,7 @@ import io.grpc.InternalChannelz.SocketStats; import io.grpc.InternalInstrumented; +import io.grpc.MetricRecorder; import java.io.IOException; import java.net.SocketAddress; import java.util.List; @@ -71,4 +72,9 @@ public interface InternalServer { */ @Nullable List> getListenSocketStatsList(); + /** + * Sets the MetricRecorder for the server. This optional method allows setting + * the MetricRecorder after construction but before start(). + */ + default void setMetricRecorder(MetricRecorder metricRecorder) {} } diff --git a/core/src/main/java/io/grpc/internal/InternalSubchannel.java b/core/src/main/java/io/grpc/internal/InternalSubchannel.java index 7a48bf642fe..ce31921e316 100644 --- a/core/src/main/java/io/grpc/internal/InternalSubchannel.java +++ b/core/src/main/java/io/grpc/internal/InternalSubchannel.java @@ -80,6 +80,7 @@ final class InternalSubchannel implements InternalInstrumented, Tr private final InternalChannelz channelz; private final CallTracer callsTracer; private final ChannelTracer channelTracer; + private final MetricRecorder metricRecorder; private final ChannelLogger channelLogger; private final boolean reconnectDisabled; @@ -191,6 +192,7 @@ protected void handleNotInUse() { this.scheduledExecutor = scheduledExecutor; this.connectingTimer = stopwatchSupplier.get(); this.syncContext = syncContext; + this.metricRecorder = metricRecorder; this.callback = callback; this.channelz = channelz; this.callsTracer = callsTracer; @@ -265,6 +267,7 @@ private void startNewTransport() { .setAuthority(eagChannelAuthority != null ? eagChannelAuthority : authority) .setEagAttributes(currentEagAttributes) .setUserAgent(userAgent) + .setMetricRecorder(metricRecorder) .setHttpConnectProxiedSocketAddress(proxiedAddr); TransportLogger transportLogger = new TransportLogger(); // In case the transport logs in the constructor, use the subchannel logId diff --git a/core/src/main/java/io/grpc/internal/ServerImpl.java b/core/src/main/java/io/grpc/internal/ServerImpl.java index dc0709e1fb8..0c422d3408e 100644 --- a/core/src/main/java/io/grpc/internal/ServerImpl.java +++ b/core/src/main/java/io/grpc/internal/ServerImpl.java @@ -48,6 +48,8 @@ import io.grpc.InternalServerInterceptors; import io.grpc.InternalStatus; import io.grpc.Metadata; +import io.grpc.MetricInstrumentRegistry; +import io.grpc.MetricRecorder; import io.grpc.ServerCall; import io.grpc.ServerCallExecutorSupplier; import io.grpc.ServerCallHandler; @@ -97,6 +99,7 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume private final InternalLogId logId; private final ObjectPool executorPool; + private final MetricRecorder metricRecorder; /** Executor for application processing. Safe to read after {@link #start()}. */ private Executor executor; private final HandlerRegistry registry; @@ -143,6 +146,9 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume InternalServer transportServer, Context rootContext) { this.executorPool = Preconditions.checkNotNull(builder.executorPool, "executorPool"); + this.metricRecorder = + new MetricRecorderImpl(builder.metricSinks, MetricInstrumentRegistry.getDefaultRegistry()); + this.registry = Preconditions.checkNotNull(builder.registryBuilder.build(), "registryBuilder"); this.fallbackRegistry = Preconditions.checkNotNull(builder.fallbackRegistry, "fallbackRegistry"); @@ -182,6 +188,7 @@ public ServerImpl start() throws IOException { // Start and wait for any ports to actually be bound. ServerListenerImpl listener = new ServerListenerImpl(); + transportServer.setMetricRecorder(metricRecorder); transportServer.start(listener); executor = Preconditions.checkNotNull(executorPool.getObject(), "executor"); started = true; diff --git a/core/src/main/java/io/grpc/internal/ServerImplBuilder.java b/core/src/main/java/io/grpc/internal/ServerImplBuilder.java index f6566e067db..64cb238fbf4 100644 --- a/core/src/main/java/io/grpc/internal/ServerImplBuilder.java +++ b/core/src/main/java/io/grpc/internal/ServerImplBuilder.java @@ -31,6 +31,7 @@ import io.grpc.HandlerRegistry; import io.grpc.InternalChannelz; import io.grpc.InternalConfiguratorRegistry; +import io.grpc.MetricSink; import io.grpc.Server; import io.grpc.ServerBuilder; import io.grpc.ServerCallExecutorSupplier; @@ -80,6 +81,7 @@ public static ServerBuilder forPort(int port) { final List transportFilters = new ArrayList<>(); final List interceptors = new ArrayList<>(); private final List streamTracerFactories = new ArrayList<>(); + final List metricSinks = new ArrayList<>(); private final ClientTransportServersBuilder clientTransportServersBuilder; HandlerRegistry fallbackRegistry = DEFAULT_FALLBACK_REGISTRY; ObjectPool executorPool = DEFAULT_EXECUTOR_POOL; @@ -157,6 +159,14 @@ public ServerImplBuilder intercept(ServerInterceptor interceptor) { return this; } + /** + * Adds a MetricSink to the server. + */ + public ServerImplBuilder addMetricSink(MetricSink metricSink) { + metricSinks.add(checkNotNull(metricSink, "metricSink")); + return this; + } + @Override public ServerImplBuilder addStreamTracerFactory(ServerStreamTracer.Factory factory) { streamTracerFactories.add(checkNotNull(factory, "factory")); diff --git a/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java b/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java index 258aa15b005..e64f1065681 100644 --- a/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java +++ b/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java @@ -856,6 +856,7 @@ public void run() { localSocketPicker, channelLogger, useGetForSafeMethods, + options.getMetricRecorder(), Ticker.systemTicker()); return transport; } diff --git a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java index 8ebf89842ad..28d5e719903 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java @@ -30,6 +30,7 @@ import io.grpc.InternalChannelz; import io.grpc.InternalStatus; import io.grpc.Metadata; +import io.grpc.MetricRecorder; import io.grpc.Status; import io.grpc.StatusException; import io.grpc.internal.ClientStreamListener.RpcProgress; @@ -123,6 +124,7 @@ class NettyClientHandler extends AbstractNettyHandler { private final Supplier stopwatchFactory; private final TransportTracer transportTracer; private final Attributes eagAttributes; + private final TcpMetrics.Tracker tcpMetrics; private final String authority; private final InUseStateAggregator inUseState = new InUseStateAggregator() { @@ -164,7 +166,8 @@ static NettyClientHandler newHandler( Attributes eagAttributes, String authority, ChannelLogger negotiationLogger, - Ticker ticker) { + Ticker ticker, + MetricRecorder metricRecorder) { Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive"); Http2HeadersDecoder headersDecoder = new GrpcHttp2ClientHeadersDecoder(maxHeaderListSize); Http2FrameReader frameReader = new DefaultHttp2FrameReader(headersDecoder); @@ -194,7 +197,8 @@ static NettyClientHandler newHandler( eagAttributes, authority, negotiationLogger, - ticker); + ticker, + metricRecorder); } @VisibleForTesting @@ -214,7 +218,8 @@ static NettyClientHandler newHandler( Attributes eagAttributes, String authority, ChannelLogger negotiationLogger, - Ticker ticker) { + Ticker ticker, + MetricRecorder metricRecorder) { Preconditions.checkNotNull(connection, "connection"); Preconditions.checkNotNull(frameReader, "frameReader"); Preconditions.checkNotNull(lifecycleManager, "lifecycleManager"); @@ -269,7 +274,8 @@ static NettyClientHandler newHandler( pingCounter, ticker, maxHeaderListSize, - softLimitHeaderListSize); + softLimitHeaderListSize, + metricRecorder); } private NettyClientHandler( @@ -288,7 +294,8 @@ private NettyClientHandler( PingLimiter pingLimiter, Ticker ticker, int maxHeaderListSize, - int softLimitHeaderListSize) { + int softLimitHeaderListSize, + MetricRecorder metricRecorder) { super( /* channelUnused= */ null, decoder, @@ -350,6 +357,7 @@ public void onStreamClosed(Http2Stream stream) { } } }); + this.tcpMetrics = new TcpMetrics.Tracker(metricRecorder, "client"); } /** @@ -490,10 +498,17 @@ public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exce /** * Handler for the Channel shutting down. */ + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + tcpMetrics.channelActive(ctx.channel()); + super.channelActive(ctx); + } + @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { try { logger.fine("Network channel is closed"); + tcpMetrics.channelInactive(ctx.channel()); Status status = Status.UNAVAILABLE.withDescription("Network closed for unknown reason"); lifecycleManager.notifyShutdown(status, SimpleDisconnectError.UNKNOWN); final Status streamStatus; diff --git a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java index 53914b3c877..6585df42df3 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java @@ -34,6 +34,7 @@ import io.grpc.InternalLogId; import io.grpc.Metadata; import io.grpc.MethodDescriptor; +import io.grpc.MetricRecorder; import io.grpc.Status; import io.grpc.internal.ClientStream; import io.grpc.internal.ConnectionClientTransport; @@ -108,6 +109,7 @@ class NettyClientTransport implements ConnectionClientTransport, private final ChannelLogger channelLogger; private final boolean useGetForSafeMethods; private final Ticker ticker; + private final MetricRecorder metricRecorder; NettyClientTransport( @@ -132,6 +134,7 @@ class NettyClientTransport implements ConnectionClientTransport, LocalSocketPicker localSocketPicker, ChannelLogger channelLogger, boolean useGetForSafeMethods, + MetricRecorder metricRecorder, Ticker ticker) { this.negotiator = Preconditions.checkNotNull(negotiator, "negotiator"); @@ -159,6 +162,7 @@ class NettyClientTransport implements ConnectionClientTransport, this.logId = InternalLogId.allocate(getClass(), remoteAddress.toString()); this.channelLogger = Preconditions.checkNotNull(channelLogger, "channelLogger"); this.useGetForSafeMethods = useGetForSafeMethods; + this.metricRecorder = metricRecorder; this.ticker = Preconditions.checkNotNull(ticker, "ticker"); } @@ -251,7 +255,8 @@ public Runnable start(Listener transportListener) { eagAttributes, authorityString, channelLogger, - ticker); + ticker, + metricRecorder); ChannelHandler negotiationHandler = negotiator.newHandler(handler); diff --git a/netty/src/main/java/io/grpc/netty/NettyServer.java b/netty/src/main/java/io/grpc/netty/NettyServer.java index 1cf67ea25ca..9314f235d87 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServer.java +++ b/netty/src/main/java/io/grpc/netty/NettyServer.java @@ -31,6 +31,7 @@ import io.grpc.InternalInstrumented; import io.grpc.InternalLogId; import io.grpc.InternalWithLogId; +import io.grpc.MetricRecorder; import io.grpc.ServerStreamTracer; import io.grpc.internal.InternalServer; import io.grpc.internal.ObjectPool; @@ -93,6 +94,7 @@ class NettyServer implements InternalServer, InternalWithLogId { private final int maxMessageSize; private final int maxHeaderListSize; private final int softLimitHeaderListSize; + private MetricRecorder metricRecorder; private final long keepAliveTimeInNanos; private final long keepAliveTimeoutInNanos; private final long maxConnectionIdleInNanos; @@ -272,7 +274,8 @@ public void initChannel(Channel ch) { permitKeepAliveTimeInNanos, maxRstCount, maxRstPeriodNanos, - eagAttributes); + eagAttributes, + metricRecorder); ServerTransportListener transportListener; // This is to order callbacks on the listener, not to guard access to channel. synchronized (NettyServer.this) { diff --git a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java index 036fde55e2c..fff816dcd6e 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java @@ -42,6 +42,7 @@ import io.grpc.InternalMetadata; import io.grpc.InternalStatus; import io.grpc.Metadata; +import io.grpc.MetricRecorder; import io.grpc.ServerStreamTracer; import io.grpc.Status; import io.grpc.internal.GrpcUtil; @@ -127,6 +128,7 @@ class NettyServerHandler extends AbstractNettyHandler { private final Http2Connection.PropertyKey streamKey; private final ServerTransportListener transportListener; private final int maxMessageSize; + private final TcpMetrics.Tracker tcpMetrics; private final long keepAliveTimeInNanos; private final long keepAliveTimeoutInNanos; private final long maxConnectionAgeInNanos; @@ -174,7 +176,8 @@ static NettyServerHandler newHandler( long permitKeepAliveTimeInNanos, int maxRstCount, long maxRstPeriodNanos, - Attributes eagAttributes) { + Attributes eagAttributes, + MetricRecorder metricRecorder) { Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive: %s", maxHeaderListSize); Http2FrameLogger frameLogger = new Http2FrameLogger(LogLevel.DEBUG, NettyServerHandler.class); @@ -208,7 +211,8 @@ static NettyServerHandler newHandler( maxRstCount, maxRstPeriodNanos, eagAttributes, - Ticker.systemTicker()); + Ticker.systemTicker(), + metricRecorder); } static NettyServerHandler newHandler( @@ -234,7 +238,8 @@ static NettyServerHandler newHandler( int maxRstCount, long maxRstPeriodNanos, Attributes eagAttributes, - Ticker ticker) { + Ticker ticker, + MetricRecorder metricRecorder) { Preconditions.checkArgument(maxStreams > 0, "maxStreams must be positive: %s", maxStreams); Preconditions.checkArgument(flowControlWindow > 0, "flowControlWindow must be positive: %s", flowControlWindow); @@ -294,7 +299,8 @@ static NettyServerHandler newHandler( keepAliveEnforcer, autoFlowControl, rstStreamCounter, - eagAttributes, ticker); + eagAttributes, ticker, + metricRecorder); } private NettyServerHandler( @@ -318,7 +324,8 @@ private NettyServerHandler( boolean autoFlowControl, RstStreamCounter rstStreamCounter, Attributes eagAttributes, - Ticker ticker) { + Ticker ticker, + MetricRecorder metricRecorder) { super( channelUnused, decoder, @@ -362,6 +369,7 @@ public void onStreamClosed(Http2Stream stream) { checkArgument(maxMessageSize >= 0, "maxMessageSize must be non-negative: %s", maxMessageSize); this.maxMessageSize = maxMessageSize; + this.tcpMetrics = new TcpMetrics.Tracker(metricRecorder, "server"); this.keepAliveTimeInNanos = keepAliveTimeInNanos; this.keepAliveTimeoutInNanos = keepAliveTimeoutInNanos; this.maxConnectionIdleManager = maxConnectionIdleManager; @@ -663,6 +671,7 @@ void setKeepAliveManagerForTest(KeepAliveManager keepAliveManager) { */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { + tcpMetrics.channelInactive(ctx.channel()); try { if (keepAliveManager != null) { keepAliveManager.onTransportTermination(); diff --git a/netty/src/main/java/io/grpc/netty/NettyServerTransport.java b/netty/src/main/java/io/grpc/netty/NettyServerTransport.java index 758ffeee5b1..c0e52b75876 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerTransport.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerTransport.java @@ -25,6 +25,7 @@ import io.grpc.Attributes; import io.grpc.InternalChannelz.SocketStats; import io.grpc.InternalLogId; +import io.grpc.MetricRecorder; import io.grpc.ServerStreamTracer; import io.grpc.Status; import io.grpc.internal.ServerTransport; @@ -81,6 +82,7 @@ class NettyServerTransport implements ServerTransport { private final int maxRstCount; private final long maxRstPeriodNanos; private final Attributes eagAttributes; + private final MetricRecorder metricRecorder; private final List streamTracerFactories; private final TransportTracer transportTracer; @@ -105,7 +107,8 @@ class NettyServerTransport implements ServerTransport { long permitKeepAliveTimeInNanos, int maxRstCount, long maxRstPeriodNanos, - Attributes eagAttributes) { + Attributes eagAttributes, + MetricRecorder metricRecorder) { this.channel = Preconditions.checkNotNull(channel, "channel"); this.channelUnused = channelUnused; this.protocolNegotiator = Preconditions.checkNotNull(protocolNegotiator, "protocolNegotiator"); @@ -128,6 +131,7 @@ class NettyServerTransport implements ServerTransport { this.maxRstCount = maxRstCount; this.maxRstPeriodNanos = maxRstPeriodNanos; this.eagAttributes = Preconditions.checkNotNull(eagAttributes, "eagAttributes"); + this.metricRecorder = metricRecorder; SocketAddress remote = channel.remoteAddress(); this.logId = InternalLogId.allocate(getClass(), remote != null ? remote.toString() : null); } @@ -289,6 +293,7 @@ private NettyServerHandler createHandler( permitKeepAliveTimeInNanos, maxRstCount, maxRstPeriodNanos, - eagAttributes); + eagAttributes, + metricRecorder); } } diff --git a/netty/src/main/java/io/grpc/netty/TcpMetrics.java b/netty/src/main/java/io/grpc/netty/TcpMetrics.java new file mode 100644 index 00000000000..b6052996da9 --- /dev/null +++ b/netty/src/main/java/io/grpc/netty/TcpMetrics.java @@ -0,0 +1,333 @@ +/* + * Copyright 2026 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.netty; + +import io.grpc.DoubleHistogramMetricInstrument; +import io.grpc.LongCounterMetricInstrument; +import io.grpc.LongUpDownCounterMetricInstrument; +import io.grpc.MetricInstrument; +import io.grpc.MetricInstrumentRegistry; +import io.grpc.MetricRecorder; +import io.netty.channel.Channel; +import java.lang.reflect.Method; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +final class TcpMetrics { + + private static final Metrics DEFAULT_METRICS; + + static { + boolean epollAvailable = false; + try { + Class epollClass = Class.forName("io.netty.channel.epoll.Epoll"); + Method isAvailableMethod = epollClass.getDeclaredMethod("isAvailable"); + epollAvailable = (Boolean) isAvailableMethod.invoke(null); + } catch (Throwable t) { + // Ignored + } + DEFAULT_METRICS = new Metrics(MetricInstrumentRegistry.getDefaultRegistry(), epollAvailable); + } + + static Metrics getDefaultMetrics() { + return DEFAULT_METRICS; + } + + static final class Metrics { + final LongCounterMetricInstrument connectionsCreated; + final LongUpDownCounterMetricInstrument connectionCount; + final LongCounterMetricInstrument packetsRetransmitted; + final LongCounterMetricInstrument recurringRetransmits; + final DoubleHistogramMetricInstrument minRtt; + + Metrics(MetricInstrumentRegistry registry, boolean epollAvailable) { + List requiredLabels = Collections.singletonList("grpc.target"); + List optionalLabels = Arrays.asList( + "network.local.address", + "network.local.port", + "network.peer.address", + "network.peer.port"); + + connectionsCreated = safelyRegisterLongCounter(registry, + "grpc.tcp.connections_created", + "Number of TCP connections created.", + "{connection}", + requiredLabels, + optionalLabels); + + connectionCount = safelyRegisterLongUpDownCounter(registry, + "grpc.tcp.connection_count", + "Number of currently open TCP connections.", + "{connection}", + requiredLabels, + optionalLabels); + + if (epollAvailable) { + packetsRetransmitted = safelyRegisterLongCounter(registry, + "grpc.tcp.packets_retransmitted", + "Total number of packets retransmitted for a single TCP connection.", + "{packet}", + requiredLabels, + optionalLabels); + + recurringRetransmits = safelyRegisterLongCounter(registry, + "grpc.tcp.recurring_retransmits", + "Total number of unacknowledged packets to be retransmitted " + + "since the last acknowledgment.", + "{packet}", + requiredLabels, + optionalLabels); + + minRtt = safelyRegisterDoubleHistogram(registry, + "grpc.tcp.min_rtt", + "Minimum RTT observed for a single TCP connection.", + "s", + Arrays.asList(0.0001, 0.0005, 0.001, 0.005, 0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, + 5.0, 10.0, 25.0, 50.0, 100.0, 250.0), + requiredLabels, + optionalLabels); + } else { + packetsRetransmitted = null; + recurringRetransmits = null; + minRtt = null; + } + } + } + + /** + * Safe metric registration or retrieval for environments where TcpMetrics might + * be loaded multiple times (e.g., shaded and unshaded). + */ + private static LongCounterMetricInstrument safelyRegisterLongCounter( + MetricInstrumentRegistry registry, String name, String description, String unit, + List requiredLabelKeys, List optionalLabelKeys) { + try { + return registry.registerLongCounter(name, description, unit, requiredLabelKeys, + optionalLabelKeys, false); + } catch (IllegalStateException e) { + if (e.getMessage() != null && e.getMessage().contains("already exists")) { + for (MetricInstrument instrument : registry.getMetricInstruments()) { + if (instrument.getName().equals(name) + && instrument instanceof LongCounterMetricInstrument) { + return (LongCounterMetricInstrument) instrument; + } + } + } + throw e; + } + } + + private static LongUpDownCounterMetricInstrument safelyRegisterLongUpDownCounter( + MetricInstrumentRegistry registry, String name, String description, String unit, + List requiredLabelKeys, List optionalLabelKeys) { + try { + return registry.registerLongUpDownCounter(name, description, unit, requiredLabelKeys, + optionalLabelKeys, false); + } catch (IllegalStateException e) { + if (e.getMessage() != null && e.getMessage().contains("already exists")) { + for (MetricInstrument instrument : registry.getMetricInstruments()) { + if (instrument.getName().equals(name) + && instrument instanceof LongUpDownCounterMetricInstrument) { + return (LongUpDownCounterMetricInstrument) instrument; + } + } + } + throw e; + } + } + + private static DoubleHistogramMetricInstrument safelyRegisterDoubleHistogram( + MetricInstrumentRegistry registry, String name, String description, String unit, + List bucketBoundaries, List requiredLabelKeys, + List optionalLabelKeys) { + try { + return registry.registerDoubleHistogram(name, description, unit, bucketBoundaries, + requiredLabelKeys, optionalLabelKeys, false); + } catch (IllegalStateException e) { + if (e.getMessage() != null && e.getMessage().contains("already exists")) { + for (MetricInstrument instrument : registry.getMetricInstruments()) { + if (instrument.getName().equals(name) + && instrument instanceof DoubleHistogramMetricInstrument) { + return (DoubleHistogramMetricInstrument) instrument; + } + } + } + throw e; + } + } + + static final class Tracker { + private final MetricRecorder metricRecorder; + private final String target; + private final Metrics metrics; + private final String epollSocketChannelClassName; + private final String epollTcpInfoClassName; + + Tracker(MetricRecorder metricRecorder, String target) { + this(metricRecorder, target, DEFAULT_METRICS); + } + + Tracker(MetricRecorder metricRecorder, String target, Metrics metrics) { + this(metricRecorder, target, metrics, + "io.netty.channel.epoll.EpollSocketChannel", + "io.netty.channel.epoll.EpollTcpInfo"); + } + + Tracker(MetricRecorder metricRecorder, String target, Metrics metrics, + String epollSocketChannelClassName, String epollTcpInfoClassName) { + this.metricRecorder = metricRecorder; + this.target = target; + this.metrics = metrics; + this.epollSocketChannelClassName = epollSocketChannelClassName; + this.epollTcpInfoClassName = epollTcpInfoClassName; + } + + private static final long RECORD_INTERVAL_MILLIS; + + static { + long interval = 5; + try { + String flagValue = System.getProperty("io.grpc.netty.tcpMetricsRecordIntervalMinutes"); + if (flagValue != null) { + interval = Long.parseLong(flagValue); + } + } catch (NumberFormatException e) { + // Use default + } + RECORD_INTERVAL_MILLIS = java.util.concurrent.TimeUnit.MINUTES.toMillis(interval); + } + + private static final java.util.Random RANDOM = new java.util.Random(); + private io.netty.util.concurrent.ScheduledFuture reportTimer; + + void channelActive(Channel channel) { + if (metricRecorder != null && target != null) { + java.util.List labelValues = getLabelValues(channel); + metricRecorder.addLongCounter(metrics.connectionsCreated, 1, + Collections.singletonList(target), labelValues); + metricRecorder.addLongUpDownCounter(metrics.connectionCount, 1, + Collections.singletonList(target), labelValues); + scheduleNextReport(channel); + } + } + + private void scheduleNextReport(final Channel channel) { + if (RECORD_INTERVAL_MILLIS <= 0) { + return; + } + if (!channel.isActive()) { + return; + } + + double jitter = 0.1 + RANDOM.nextDouble(); // 10% to 110% + long delayMillis = (long) (RECORD_INTERVAL_MILLIS * jitter); + + try { + reportTimer = channel.eventLoop().schedule(new Runnable() { + @Override + public void run() { + if (channel.isActive()) { + Tracker.this.recordTcpInfo(channel); // Renamed from channelInactive to recordTcpInfo + scheduleNextReport(channel); // Re-arm + } + } + }, delayMillis, java.util.concurrent.TimeUnit.MILLISECONDS); + } catch (Throwable t) { + // Channel closed, event loop shut down, etc. + } + } + + void channelInactive(Channel channel) { + if (reportTimer != null) { + reportTimer.cancel(false); + } + if (metricRecorder != null && target != null) { + java.util.List labelValues = getLabelValues(channel); + metricRecorder.addLongUpDownCounter(metrics.connectionCount, -1, + Collections.singletonList(target), labelValues); + // Final collection on close + recordTcpInfo(channel); + } + } + + private void recordTcpInfo(Channel channel) { + if (metricRecorder == null || target == null) { + return; + } + java.util.List labelValues = getLabelValues(channel); + try { + if (channel.getClass().getName().equals(epollSocketChannelClassName)) { + Class tcpInfoClass = Class.forName(epollTcpInfoClassName); + Method tcpInfoMethod = channel.getClass().getMethod("tcpInfo", tcpInfoClass); + Object info = tcpInfoClass.getDeclaredConstructor().newInstance(); + tcpInfoMethod.invoke(channel, info); + + Method totalRetransMethod = tcpInfoClass.getMethod("totalRetrans"); + Method retransmitsMethod = tcpInfoClass.getMethod("retransmits"); + Method rttMethod = tcpInfoClass.getMethod("rtt"); + + long totalRetrans = (Long) totalRetransMethod.invoke(info); + int retransmits = (Integer) retransmitsMethod.invoke(info); + long rtt = (Long) rttMethod.invoke(info); + + if (metrics.packetsRetransmitted != null) { + metricRecorder.addLongCounter(metrics.packetsRetransmitted, totalRetrans, + Collections.singletonList(target), labelValues); + } + if (metrics.recurringRetransmits != null) { + metricRecorder.addLongCounter(metrics.recurringRetransmits, retransmits, + Collections.singletonList(target), labelValues); + } + if (metrics.minRtt != null) { + metricRecorder.recordDoubleHistogram(metrics.minRtt, + rtt / 1000000.0, // Convert microseconds to seconds + Collections.singletonList(target), labelValues); + } + } + } catch (Throwable t) { + // Epoll not available or error getting tcp_info, just ignore. + } + } + } + + private static java.util.List getLabelValues(Channel channel) { + String localAddress = ""; + String localPort = ""; + String peerAddress = ""; + String peerPort = ""; + + java.net.SocketAddress local = channel.localAddress(); + if (local instanceof java.net.InetSocketAddress) { + java.net.InetSocketAddress inetLocal = (java.net.InetSocketAddress) local; + localAddress = inetLocal.getAddress().getHostAddress(); + localPort = String.valueOf(inetLocal.getPort()); + } + + java.net.SocketAddress remote = channel.remoteAddress(); + if (remote instanceof java.net.InetSocketAddress) { + java.net.InetSocketAddress inetRemote = (java.net.InetSocketAddress) remote; + peerAddress = inetRemote.getAddress().getHostAddress(); + peerPort = String.valueOf(inetRemote.getPort()); + } + + return java.util.Arrays.asList(localAddress, localPort, peerAddress, peerPort); + } + + + private TcpMetrics() {} +} diff --git a/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java b/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java index 53598727efd..599aecf80b2 100644 --- a/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java @@ -1165,7 +1165,8 @@ public Stopwatch get() { Attributes.EMPTY, "someauthority", null, - fakeClock().getTicker()); + fakeClock().getTicker(), + new io.grpc.MetricRecorder() {}); } @Override diff --git a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java index db44c8f50fd..b0f9cb26594 100644 --- a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java @@ -251,6 +251,7 @@ public void setSoLingerChannelOption() throws IOException, GeneralSecurityExcept new SocketPicker(), new FakeChannelLogger(), false, + new io.grpc.MetricRecorder() {} , Ticker.systemTicker()); transports.add(transport); callMeMaybe(transport.start(clientTransportListener)); @@ -526,6 +527,7 @@ public void failingToConstructChannelShouldFailGracefully() throws Exception { new SocketPicker(), new FakeChannelLogger(), false, + new io.grpc.MetricRecorder() {} , Ticker.systemTicker()); transports.add(transport); @@ -1148,6 +1150,7 @@ private NettyClientTransport newTransport(ProtocolNegotiator negotiator, int max new SocketPicker(), new FakeChannelLogger(), false, + new io.grpc.MetricRecorder() {} , Ticker.systemTicker()); transports.add(transport); return transport; diff --git a/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java b/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java index 0d5a9bab176..a5ab42d15a6 100644 --- a/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java @@ -1416,7 +1416,8 @@ protected NettyServerHandler newHandler() { maxRstCount, maxRstPeriodNanos, Attributes.EMPTY, - fakeClock().getTicker()); + fakeClock().getTicker(), + new io.grpc.MetricRecorder() {}); } @Override diff --git a/netty/src/test/java/io/grpc/netty/TcpMetricsTest.java b/netty/src/test/java/io/grpc/netty/TcpMetricsTest.java new file mode 100644 index 00000000000..aa5ce3e9532 --- /dev/null +++ b/netty/src/test/java/io/grpc/netty/TcpMetricsTest.java @@ -0,0 +1,330 @@ +/* + * Copyright 2026 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.netty; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +import io.grpc.MetricRecorder; +import io.netty.channel.Channel; +import io.netty.channel.EventLoop; +import io.netty.util.concurrent.ScheduledFuture; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.Arrays; +import java.util.Collections; +import java.util.concurrent.TimeUnit; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; + +@RunWith(JUnit4.class) +public class TcpMetricsTest { + + @Rule public final MockitoRule mocks = MockitoJUnit.rule(); + + @Mock private MetricRecorder metricRecorder; + @Mock private Channel channel; + @Mock + private EventLoop eventLoop; + @Mock + private ScheduledFuture scheduledFuture; + + private TcpMetrics.Tracker metrics; + + @Before + public void setUp() { + when(channel.eventLoop()).thenReturn(eventLoop); + when(eventLoop.schedule(any(Runnable.class), anyLong(), any(TimeUnit.class))) + .thenAnswer(invocation -> scheduledFuture); + metrics = new TcpMetrics.Tracker(metricRecorder, "target1"); + } + + @Test + public void metricsInitialization_epollUnavailable() { + TcpMetrics.Metrics metrics = new TcpMetrics.Metrics( + io.grpc.MetricInstrumentRegistry.getDefaultRegistry(), false); + + org.junit.Assert.assertNotNull(metrics.connectionsCreated); + org.junit.Assert.assertNotNull(metrics.connectionCount); + org.junit.Assert.assertNull(metrics.packetsRetransmitted); + org.junit.Assert.assertNull(metrics.recurringRetransmits); + org.junit.Assert.assertNull(metrics.minRtt); + } + + @Test + public void metricsInitialization_epollAvailable() { + TcpMetrics.Metrics metrics = new TcpMetrics.Metrics( + io.grpc.MetricInstrumentRegistry.getDefaultRegistry(), true); + + org.junit.Assert.assertNotNull(metrics.connectionsCreated); + org.junit.Assert.assertNotNull(metrics.connectionCount); + org.junit.Assert.assertNotNull(metrics.packetsRetransmitted); + org.junit.Assert.assertNotNull(metrics.recurringRetransmits); + org.junit.Assert.assertNotNull(metrics.minRtt); + } + + @Test + public void safelyRegister_collision() { + io.grpc.MetricInstrumentRegistry registry = + io.grpc.MetricInstrumentRegistry.getDefaultRegistry(); + + // Explicitly register one metric to ensure collision path is triggered + try { + registry.registerLongCounter("grpc.tcp.connections_created", "desc", "unit", + Collections.emptyList(), Collections.emptyList(), false); + } catch (IllegalStateException e) { + // Already exists, which is fine for this test + } + + TcpMetrics.Metrics metrics = new TcpMetrics.Metrics(registry, true); + + org.junit.Assert.assertNotNull(metrics.connectionsCreated); + org.junit.Assert.assertNotNull(metrics.connectionCount); + org.junit.Assert.assertNotNull(metrics.minRtt); + } + + public static class FakeWithTcpInfo extends io.netty.channel.embedded.EmbeddedChannel { + public void tcpInfo(FakeEpollTcpInfo info) { + info.totalRetrans = 123; + info.retransmits = 4; + info.rtt = 5000; + } + } + + public static class FakeEpollTcpInfo { + long totalRetrans; + int retransmits; + long rtt; + + public void setValues(long totalRetrans, int retransmits, long rtt) { + this.totalRetrans = totalRetrans; + this.retransmits = retransmits; + this.rtt = rtt; + } + + public long totalRetrans() { + return totalRetrans; + } + + public int retransmits() { + return retransmits; + } + + public long rtt() { + return rtt; + } + } + + @Test + public void tracker_recordTcpInfo_reflectionSuccess() throws Exception { + MetricRecorder recorder = org.mockito.Mockito.mock(MetricRecorder.class); + TcpMetrics.Metrics metrics = new TcpMetrics.Metrics( + io.grpc.MetricInstrumentRegistry.getDefaultRegistry(), true); + + String fakeChannelName = FakeWithTcpInfo.class.getName(); + String fakeInfoName = FakeEpollTcpInfo.class.getName(); + + TcpMetrics.Tracker tracker = new TcpMetrics.Tracker(recorder, "target", metrics, + fakeChannelName, fakeInfoName); + + FakeWithTcpInfo channel = new FakeWithTcpInfo(); + channel.writeInbound("dummy"); + + tracker.channelInactive(channel); + + verify(recorder).addLongCounter(eq(metrics.packetsRetransmitted), eq(123L), any(), any()); + verify(recorder).addLongCounter(eq(metrics.recurringRetransmits), eq(4L), any(), any()); + verify(recorder).recordDoubleHistogram(eq(metrics.minRtt), eq(0.005), any(), any()); + } + + @Test + public void tracker_recordTcpInfo_reflectionFailure() { + MetricRecorder recorder = org.mockito.Mockito.mock(MetricRecorder.class); + TcpMetrics.Metrics metrics = new TcpMetrics.Metrics( + io.grpc.MetricInstrumentRegistry.getDefaultRegistry(), true); + + TcpMetrics.Tracker tracker = new TcpMetrics.Tracker(recorder, "target", metrics, + "non.existent.Class", "non.existent.Info"); + + Channel channel = org.mockito.Mockito.mock(Channel.class); + when(channel.isActive()).thenReturn(true); + + // Should catch exception and ignore + tracker.channelInactive(channel); + } + + @Test + public void registeredMetrics_haveCorrectOptionalLabels() { + java.util.List expectedOptionalLabels = Arrays.asList( + "network.local.address", + "network.local.port", + "network.peer.address", + "network.peer.port" + ); + + org.junit.Assert.assertEquals( + expectedOptionalLabels, + TcpMetrics.getDefaultMetrics().connectionsCreated.getOptionalLabelKeys()); + org.junit.Assert.assertEquals( + expectedOptionalLabels, + TcpMetrics.getDefaultMetrics().connectionCount.getOptionalLabelKeys()); + + if (TcpMetrics.getDefaultMetrics().packetsRetransmitted != null) { + org.junit.Assert.assertEquals( + expectedOptionalLabels, + TcpMetrics.getDefaultMetrics().packetsRetransmitted.getOptionalLabelKeys()); + org.junit.Assert.assertEquals( + expectedOptionalLabels, + TcpMetrics.getDefaultMetrics().recurringRetransmits.getOptionalLabelKeys()); + org.junit.Assert.assertEquals( + expectedOptionalLabels, TcpMetrics.getDefaultMetrics().minRtt.getOptionalLabelKeys()); + } + } + + @Test + public void channelActive_extractsLabels_ipv4() throws Exception { + + InetAddress localInet = InetAddress.getByAddress(new byte[] { 127, 0, 0, 1 }); + InetAddress remoteInet = InetAddress.getByAddress(new byte[] { 10, 0, 0, 1 }); + when(channel.localAddress()).thenReturn(new InetSocketAddress(localInet, 8080)); + when(channel.remoteAddress()).thenReturn(new InetSocketAddress(remoteInet, 443)); + + metrics.channelActive(channel); + + verify(metricRecorder).addLongCounter( + eq(TcpMetrics.getDefaultMetrics().connectionsCreated), eq(1L), + eq(Collections.singletonList("target1")), + eq(Arrays.asList( + localInet.getHostAddress(), "8080", remoteInet.getHostAddress(), "443"))); + verify(metricRecorder).addLongUpDownCounter( + eq(TcpMetrics.getDefaultMetrics().connectionCount), eq(1L), + eq(Collections.singletonList("target1")), + eq(Arrays.asList( + localInet.getHostAddress(), "8080", remoteInet.getHostAddress(), "443"))); + verifyNoMoreInteractions(metricRecorder); + } + + @Test + public void channelInactive_extractsLabels_ipv6() throws Exception { + + InetAddress localInet = InetAddress.getByAddress( + new byte[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1 }); + InetAddress remoteInet = InetAddress.getByAddress( + new byte[] { 32, 1, 13, -72, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1 }); + when(channel.localAddress()).thenReturn(new InetSocketAddress(localInet, 8080)); + when(channel.remoteAddress()).thenReturn(new InetSocketAddress(remoteInet, 443)); + + metrics.channelInactive(channel); + + verify(metricRecorder).addLongUpDownCounter( + eq(TcpMetrics.getDefaultMetrics().connectionCount), eq(-1L), + eq(Collections.singletonList("target1")), + eq(Arrays.asList( + localInet.getHostAddress(), "8080", remoteInet.getHostAddress(), "443"))); + verifyNoMoreInteractions(metricRecorder); + } + + @Test + public void channelActive_extractsLabels_nonInetAddress() throws Exception { + SocketAddress dummyAddress = new SocketAddress() {}; + when(channel.localAddress()).thenReturn(dummyAddress); + when(channel.remoteAddress()).thenReturn(dummyAddress); + + metrics.channelActive(channel); + + verify(metricRecorder).addLongCounter( + eq(TcpMetrics.getDefaultMetrics().connectionsCreated), eq(1L), + eq(Collections.singletonList("target1")), + eq(Arrays.asList("", "", "", ""))); + verify(metricRecorder).addLongUpDownCounter( + eq(TcpMetrics.getDefaultMetrics().connectionCount), eq(1L), + eq(Collections.singletonList("target1")), + eq(Arrays.asList("", "", "", ""))); + verifyNoMoreInteractions(metricRecorder); + } + + @Test + public void channelActive_incrementsCounts() { + metrics.channelActive(channel); + verify(metricRecorder).addLongCounter( + eq(TcpMetrics.getDefaultMetrics().connectionsCreated), eq(1L), + eq(Collections.singletonList("target1")), + eq(Arrays.asList("", "", "", ""))); + verify(metricRecorder).addLongUpDownCounter( + eq(TcpMetrics.getDefaultMetrics().connectionCount), eq(1L), + eq(Collections.singletonList("target1")), + eq(Arrays.asList("", "", "", ""))); + verifyNoMoreInteractions(metricRecorder); + } + + @Test + public void channelInactive_decrementsCount_noEpoll_noError() { + metrics.channelInactive(channel); + verify(metricRecorder).addLongUpDownCounter( + eq(TcpMetrics.getDefaultMetrics().connectionCount), eq(-1L), + eq(Collections.singletonList("target1")), + eq(Arrays.asList("", "", "", ""))); + verifyNoMoreInteractions(metricRecorder); + } + + @Test + public void channelActive_schedulesReportTimer() { + when(channel.isActive()).thenReturn(true); + metrics.channelActive(channel); + + ArgumentCaptor runnableCaptor = ArgumentCaptor.forClass(Runnable.class); + ArgumentCaptor delayCaptor = ArgumentCaptor.forClass(Long.class); + verify(eventLoop).schedule( + runnableCaptor.capture(), delayCaptor.capture(), eq(TimeUnit.MILLISECONDS)); + + Runnable task = runnableCaptor.getValue(); + long delay = delayCaptor.getValue(); + + // Default RECORD_INTERVAL_MILLIS is 5 minutes (300,000 ms) + // Jitter is 10% to 110%, so 30,000 ms to 330,000 ms + org.junit.Assert.assertTrue("Delay should be >= 30000 but was " + delay, delay >= 30_000); + org.junit.Assert.assertTrue("Delay should be <= 330000 but was " + delay, delay <= 330_000); + + // Run the task to verify rescheduling + task.run(); + + verify(eventLoop, org.mockito.Mockito.times(2)) + .schedule(any(Runnable.class), anyLong(), eq(TimeUnit.MILLISECONDS)); + } + + @Test + public void channelInactive_cancelsReportTimer() { + when(channel.isActive()).thenReturn(true); + metrics.channelActive(channel); + + metrics.channelInactive(channel); + + verify(scheduledFuture).cancel(false); + } +}