Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static datadog.trace.bootstrap.instrumentation.api.Java8BytecodeBridge.spanFromContext;
import static datadog.trace.instrumentation.netty41.AttributeKeys.CONTEXT_ATTRIBUTE_KEY;
import static datadog.trace.instrumentation.netty41.AttributeKeys.STREAMING_CONTEXT_KEY;
import static datadog.trace.instrumentation.netty41.AttributeKeys.WEBSOCKET_SENDER_HANDLER_CONTEXT;
import static datadog.trace.instrumentation.netty41.server.NettyHttpServerDecorator.DECORATE;

Expand All @@ -13,9 +14,11 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.LastHttpContent;

@ChannelHandler.Sharable
public class HttpServerResponseTracingHandler extends ChannelOutboundHandlerAdapter {
Expand All @@ -26,36 +29,161 @@ public void write(final ChannelHandlerContext ctx, final Object msg, final Chann
final Context storedContext = ctx.channel().attr(CONTEXT_ATTRIBUTE_KEY).get();
final AgentSpan span = spanFromContext(storedContext);

if (span == null || !(msg instanceof HttpResponse)) {
ctx.write(msg, prm);
// FullHttpResponse must be checked BEFORE LastHttpContent and HttpResponse,
// because FullHttpResponse extends both LastHttpContent and HttpResponse.
if (msg instanceof FullHttpResponse) {
handleFullHttpResponse(ctx, storedContext, span, (FullHttpResponse) msg, prm);
return;
}

try (final ContextScope scope = storedContext.attach()) {
final HttpResponse response = (HttpResponse) msg;
// Handle HttpResponse (headers only — start of chunked/streaming response).
// Must be checked BEFORE LastHttpContent/HttpContent.
if (msg instanceof HttpResponse) {
handleHttpResponse(ctx, storedContext, span, (HttpResponse) msg, prm);
return;
}

// Handle LastHttpContent (end of chunked/streaming response).
// Must be checked BEFORE HttpContent (LastHttpContent extends HttpContent).
// IMPORTANT: Use STREAMING_CONTEXT_KEY to avoid keep-alive race condition where
// channelRead for the next request may overwrite CONTEXT_ATTRIBUTE_KEY before
// this LastHttpContent write task runs on the EventLoop.
if (msg instanceof LastHttpContent) {
Context streamingContext = ctx.channel().attr(STREAMING_CONTEXT_KEY).getAndRemove();
Context contextForLastContent = streamingContext != null ? streamingContext : storedContext;
AgentSpan spanForLastContent =
streamingContext != null ? spanFromContext(streamingContext) : span;
handleLastHttpContent(
ctx, contextForLastContent, spanForLastContent, (LastHttpContent) msg, prm);
return;
}

// Intermediate HttpContent chunks — pass through without touching the span.
ctx.write(msg, prm);
}

/** Complete response in a single message (non-streaming). Finish span immediately. */
private void handleFullHttpResponse(
final ChannelHandlerContext ctx,
final Context storedContext,
final AgentSpan span,
final FullHttpResponse response,
final ChannelPromise prm) {

if (span == null) {
ctx.write(response, prm);
return;
}

try (final ContextScope scope = storedContext.attach()) {
try {
ctx.write(msg, prm);
ctx.write(response, prm);
} catch (final Throwable throwable) {
DECORATE.onError(span, throwable);
span.setHttpStatusCode(500);
span.finish(); // Finish the span manually since finishSpanOnClose was false
span.finish();
ctx.channel().attr(CONTEXT_ATTRIBUTE_KEY).remove();
throw throwable;
}

final boolean isWebsocketUpgrade =
response.status() == HttpResponseStatus.SWITCHING_PROTOCOLS
&& "websocket".equals(response.headers().get(HttpHeaderNames.UPGRADE));

if (isWebsocketUpgrade) {
ctx.channel()
.attr(WEBSOCKET_SENDER_HANDLER_CONTEXT)
.set(new HandlerContext.Sender(span, ctx.channel().id().asShortText()));
}

if (response.status() != HttpResponseStatus.CONTINUE
&& (response.status() != HttpResponseStatus.SWITCHING_PROTOCOLS || isWebsocketUpgrade)) {
DECORATE.onResponse(span, response);
DECORATE.beforeFinish(scope.context());
span.finish(); // Finish the span manually since finishSpanOnClose was false
span.finish();
ctx.channel().attr(CONTEXT_ATTRIBUTE_KEY).remove();
}
}
}

/**
* Chunked response headers — record status but do NOT finish the span yet. The span will be
* finished when the corresponding LastHttpContent is written. Context is saved to
* STREAMING_CONTEXT_KEY so that a keep-alive channelRead for the next request cannot overwrite it
* before LastHttpContent arrives.
*/
private void handleHttpResponse(
final ChannelHandlerContext ctx,
final Context storedContext,
final AgentSpan span,
final HttpResponse response,
final ChannelPromise prm) {

if (span == null) {
ctx.write(response, prm);
return;
}

try (final ContextScope scope = storedContext.attach()) {
try {
ctx.write(response, prm);
} catch (final Throwable throwable) {
DECORATE.onError(span, throwable);
span.setHttpStatusCode(500);
span.finish();
ctx.channel().attr(CONTEXT_ATTRIBUTE_KEY).remove();
throw throwable;
}

final boolean isWebsocketUpgrade =
response.status() == HttpResponseStatus.SWITCHING_PROTOCOLS
&& "websocket".equals(response.headers().get(HttpHeaderNames.UPGRADE));

if (isWebsocketUpgrade) {
ctx.channel()
.attr(WEBSOCKET_SENDER_HANDLER_CONTEXT)
.set(new HandlerContext.Sender(span, ctx.channel().id().asShortText()));
}

if (response.status() != HttpResponseStatus.CONTINUE
&& (response.status() != HttpResponseStatus.SWITCHING_PROTOCOLS || isWebsocketUpgrade)) {
DECORATE.onResponse(span, response);
ctx.channel().attr(STREAMING_CONTEXT_KEY).set(storedContext);
// Span finish is deferred to handleLastHttpContent.
}
}
}

/** End of chunked/streaming response — finish the span now that the full duration is known. */
private void handleLastHttpContent(
final ChannelHandlerContext ctx,
final Context storedContext,
final AgentSpan span,
final LastHttpContent msg,
final ChannelPromise prm) {

if (span == null) {
ctx.write(msg, prm);
return;
}

try (final ContextScope scope = storedContext.attach()) {
try {
ctx.write(msg, prm);
} catch (final Throwable throwable) {
DECORATE.onError(span, throwable);
span.setHttpStatusCode(500);
span.finish();
ctx.channel().attr(CONTEXT_ATTRIBUTE_KEY).remove();
throw throwable;
}

DECORATE.beforeFinish(scope.context());
span.finish();
// Only remove CONTEXT_ATTRIBUTE_KEY if it still holds our context.
// Under keep-alive a new request's channelRead may have already replaced it.
// All channel ops run on the same EventLoop thread so this check is race-free.
if (ctx.channel().attr(CONTEXT_ATTRIBUTE_KEY).get() == storedContext) {
ctx.channel().attr(CONTEXT_ATTRIBUTE_KEY).remove();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,16 @@ public final class AttributeKeys {
public static final AttributeKey<Context> CONTEXT_ATTRIBUTE_KEY =
attributeKey(DD_CONTEXT_ATTRIBUTE);

/**
* Stores the context of the currently-streaming (chunked) response. Set when the HTTP response
* headers are sent, cleared when LastHttpContent is processed. Using a separate key (instead of
* CONTEXT_ATTRIBUTE_KEY) avoids a keep-alive race: Netty can process the next request's
* channelRead before the current response's LastHttpContent write task runs, overwriting
* CONTEXT_ATTRIBUTE_KEY with the new request's span.
*/
public static final AttributeKey<Context> STREAMING_CONTEXT_KEY =
attributeKey("datadog.server.streaming.context");

public static final AttributeKey<AgentSpan> CLIENT_PARENT_ATTRIBUTE_KEY =
attributeKey("datadog.client.parent.span");

Expand Down