From a04c75d7b566a453ca860cc2e3c4f761d821d525 Mon Sep 17 00:00:00 2001 From: DJ Gregor Date: Fri, 24 Oct 2025 15:44:38 -0700 Subject: [PATCH 1/3] Trace dump refactor in preparation for adding long running traces --- .../common/writer/TraceDumpJsonExporter.java | 22 ++++---- .../trace/core/PendingTraceBuffer.java | 53 +++++++++++-------- 2 files changed, 40 insertions(+), 35 deletions(-) diff --git a/dd-trace-core/src/main/java/datadog/trace/common/writer/TraceDumpJsonExporter.java b/dd-trace-core/src/main/java/datadog/trace/common/writer/TraceDumpJsonExporter.java index e2b3c0fc976..ec830c55cd5 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/writer/TraceDumpJsonExporter.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/writer/TraceDumpJsonExporter.java @@ -3,12 +3,9 @@ import com.squareup.moshi.JsonAdapter; import com.squareup.moshi.Moshi; import com.squareup.moshi.Types; -import datadog.trace.api.flare.TracerFlare; import datadog.trace.core.DDSpan; -import java.io.IOException; import java.util.Collection; import java.util.List; -import java.util.zip.ZipOutputStream; public class TraceDumpJsonExporter implements Writer { @@ -17,11 +14,9 @@ public class TraceDumpJsonExporter implements Writer { .add(DDSpanJsonAdapter.buildFactory(false)) .build() .adapter(Types.newParameterizedType(Collection.class, DDSpan.class)); - private StringBuilder dumpText; - private ZipOutputStream zip; + private final StringBuilder dumpText; - public TraceDumpJsonExporter(ZipOutputStream zip) { - this.zip = zip; + public TraceDumpJsonExporter() { dumpText = new StringBuilder(); } @@ -32,7 +27,8 @@ public void write(final Collection trace) { @Override public void write(List trace) { - // Do nothing + Collection collectionTrace = trace; + write(collectionTrace); } @Override @@ -42,14 +38,14 @@ public void start() { @Override public boolean flush() { - try { - TracerFlare.addText(zip, "pending_traces.txt", dumpText.toString()); - } catch (IOException e) { - // do nothing - } + // do nothing return true; } + public String getDumpJson() { + return dumpText.toString(); + } + @Override public void close() { // do nothing diff --git a/dd-trace-core/src/main/java/datadog/trace/core/PendingTraceBuffer.java b/dd-trace-core/src/main/java/datadog/trace/core/PendingTraceBuffer.java index 0057eb2ce7d..851a11efdab 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/PendingTraceBuffer.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/PendingTraceBuffer.java @@ -71,6 +71,23 @@ private static class DelayingPendingTraceBuffer extends PendingTraceBuffer { private final LongRunningTracesTracker runningTracesTracker; + private void dumpTraces() { + if (worker.isAlive()) { + int count = dumpCounter.get(); + int loop = 1; + boolean signaled = queue.offer(DUMP_ELEMENT); + while (!closed && !signaled) { + yieldOrSleep(loop++); + signaled = queue.offer(DUMP_ELEMENT); + } + int newCount = dumpCounter.get(); + while (!closed && count >= newCount) { + yieldOrSleep(loop++); + newCount = dumpCounter.get(); + } + } + } + public boolean longRunningSpansEnabled() { return runningTracesTracker != null; } @@ -136,6 +153,18 @@ public void flush() { } } + private String getDumpJson() { + try (TraceDumpJsonExporter writer = new TraceDumpJsonExporter()) { + for (Element e : DumpDrain.DUMP_DRAIN.collectTraces()) { + if (e instanceof PendingTrace) { + PendingTrace trace = (PendingTrace) e; + writer.write(trace.getSpans()); + } + } + return writer.getDumpJson(); + } + } + private static final class WriteDrain implements MessagePassingQueue.Consumer { private static final WriteDrain WRITE_DRAIN = new WriteDrain(); @@ -354,32 +383,12 @@ private TracerDump(DelayingPendingTraceBuffer buffer) { @Override public void prepareForFlare() { - if (buffer.worker.isAlive()) { - int count = buffer.dumpCounter.get(); - int loop = 1; - boolean signaled = buffer.queue.offer(DelayingPendingTraceBuffer.DUMP_ELEMENT); - while (!buffer.closed && !signaled) { - buffer.yieldOrSleep(loop++); - signaled = buffer.queue.offer(DelayingPendingTraceBuffer.DUMP_ELEMENT); - } - int newCount = buffer.dumpCounter.get(); - while (!buffer.closed && count >= newCount) { - buffer.yieldOrSleep(loop++); - newCount = buffer.dumpCounter.get(); - } - } + buffer.dumpTraces(); } @Override public void addReportToFlare(ZipOutputStream zip) throws IOException { - TraceDumpJsonExporter writer = new TraceDumpJsonExporter(zip); - for (Element e : DelayingPendingTraceBuffer.DumpDrain.DUMP_DRAIN.collectTraces()) { - if (e instanceof PendingTrace) { - PendingTrace trace = (PendingTrace) e; - writer.write(trace.getSpans()); - } - } - writer.flush(); + TracerFlare.addText(zip, "pending_traces.txt", buffer.getDumpJson()); } } } From 248c7de863d0d3842f99aebeaf0686910089ddfd Mon Sep 17 00:00:00 2001 From: DJ Gregor Date: Fri, 24 Oct 2025 15:46:47 -0700 Subject: [PATCH 2/3] Add long_running_traces.json to flare report Synchronized accesses to traceArray in LongRunningTracesTracker since the flare reporter can now access the array. This shouldn't be a concern for blocking because addTrace and flushAndCompact are the existing calls from PendingTraceBuffer's run() loop and getTracesAsJson is called by the reporter thread and will complete fairly quickly. --- .../trace/core/LongRunningTracesTracker.java | 43 +++++++++++++++++-- .../core/LongRunningTracesTrackerTest.groovy | 39 +++++++++++++++++ .../trace/core/PendingTraceBufferTest.groovy | 2 +- 3 files changed, 80 insertions(+), 4 deletions(-) diff --git a/dd-trace-core/src/main/java/datadog/trace/core/LongRunningTracesTracker.java b/dd-trace-core/src/main/java/datadog/trace/core/LongRunningTracesTracker.java index 5d2e32fecf6..c53c64347eb 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/LongRunningTracesTracker.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/LongRunningTracesTracker.java @@ -1,14 +1,28 @@ package datadog.trace.core; +import static java.util.Comparator.comparingLong; + import datadog.communication.ddagent.DDAgentFeaturesDiscovery; import datadog.communication.ddagent.SharedCommunicationObjects; import datadog.trace.api.Config; +import datadog.trace.api.flare.TracerFlare; +import datadog.trace.common.writer.TraceDumpJsonExporter; import datadog.trace.core.monitor.HealthMetrics; +import java.io.IOException; import java.util.ArrayList; +import java.util.Comparator; import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.zip.ZipOutputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LongRunningTracesTracker implements TracerFlare.Reporter { + private static final Logger LOGGER = LoggerFactory.getLogger(LongRunningTracesTracker.class); + private static final int MAX_DUMPED_TRACES = 50; + private static final Comparator TRACE_BY_START_TIME = + comparingLong(PendingTrace::getRunningTraceStartTime); -public class LongRunningTracesTracker { private final DDAgentFeaturesDiscovery features; private final HealthMetrics healthMetrics; private long lastFlushMilli = 0; @@ -41,6 +55,8 @@ public LongRunningTracesTracker( (int) TimeUnit.SECONDS.toMillis(config.getLongRunningTraceFlushInterval()); this.features = sharedCommunicationObjects.featuresDiscovery(config); this.healthMetrics = healthMetrics; + + TracerFlare.addReporter(this); } public boolean add(PendingTraceBuffer.Element element) { @@ -56,7 +72,7 @@ public boolean add(PendingTraceBuffer.Element element) { return true; } - private void addTrace(PendingTrace trace) { + private synchronized void addTrace(PendingTrace trace) { if (trace.empty()) { return; } @@ -67,7 +83,7 @@ private void addTrace(PendingTrace trace) { traceArray.add(trace); } - public void flushAndCompact(long nowMilli) { + public synchronized void flushAndCompact(long nowMilli) { if (nowMilli < lastFlushMilli + TimeUnit.SECONDS.toMillis(1)) { return; } @@ -139,4 +155,25 @@ private void flushStats() { write = 0; expired = 0; } + + public String getTracesAsJson() { + try (TraceDumpJsonExporter writer = new TraceDumpJsonExporter()) { + List traces; + synchronized (this) { + traces = new ArrayList<>(traceArray); + } + traces.sort(TRACE_BY_START_TIME); + + int limit = Math.min(traces.size(), MAX_DUMPED_TRACES); + for (int i = 0; i < limit; i++) { + writer.write(traces.get(i).getSpans()); + } + return writer.getDumpJson(); + } + } + + @Override + public void addReportToFlare(ZipOutputStream zip) throws IOException { + TracerFlare.addText(zip, "long_running_traces.txt", getTracesAsJson()); + } } diff --git a/dd-trace-core/src/test/groovy/datadog/trace/core/LongRunningTracesTrackerTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/core/LongRunningTracesTrackerTest.groovy index 07313ee4282..987844a3db2 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/core/LongRunningTracesTrackerTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/core/LongRunningTracesTrackerTest.groovy @@ -201,4 +201,43 @@ class LongRunningTracesTrackerTest extends DDSpecification { PrioritySampling.USER_KEEP | 1 | LongRunningTracesTracker.WRITE_RUNNING_SPANS PrioritySampling.SAMPLER_KEEP | 1 | LongRunningTracesTracker.WRITE_RUNNING_SPANS } + + def "getTracesAsJson with no traces"() { + when: + def json = tracker.getTracesAsJson() + + then: + json == "" + } + + def "getTracesAsJson with traces"() { + given: + def trace = newTraceToTrack() + tracker.add(trace) + + when: + def json = tracker.getTracesAsJson() + + then: + json != null + !json.isEmpty() + json.contains('"service"') + json.contains('"name"') + } + + def "testing tracer flare dump with trace"() { + given: + def trace = newTraceToTrack() + tracker.add(trace) + + when: + def entries = PendingTraceBufferTest.buildAndExtractZip() + + then: + entries.containsKey("long_running_traces.txt") + + def jsonContent = entries["long_running_traces.txt"] as String + jsonContent.contains('"service"') + jsonContent.contains('"name"') + } } diff --git a/dd-trace-core/src/test/groovy/datadog/trace/core/PendingTraceBufferTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/core/PendingTraceBufferTest.groovy index b6ca943fa6c..ff46f42cf3d 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/core/PendingTraceBufferTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/core/PendingTraceBufferTest.groovy @@ -605,7 +605,7 @@ class PendingTraceBufferTest extends DDSpecification { return DDSpan.create("test", 0, context, null) } - def buildAndExtractZip() { + static buildAndExtractZip() { TracerFlare.prepareForFlare() def out = new ByteArrayOutputStream() try (ZipOutputStream zip = new ZipOutputStream(out)) { From 743dd899228434ff0c01d19282d6d05b2eed6783 Mon Sep 17 00:00:00 2001 From: DJ Gregor Date: Tue, 28 Oct 2025 09:33:13 -0700 Subject: [PATCH 3/3] Keep track of how often we write around the pending trace buffer Adds a new tracer metric, pending.write_around --- .../java/datadog/trace/core/PendingTraceBuffer.java | 3 +++ .../datadog/trace/core/monitor/HealthMetrics.java | 2 ++ .../trace/core/monitor/TracerHealthMetrics.java | 12 ++++++++++++ 3 files changed, 17 insertions(+) diff --git a/dd-trace-core/src/main/java/datadog/trace/core/PendingTraceBuffer.java b/dd-trace-core/src/main/java/datadog/trace/core/PendingTraceBuffer.java index 851a11efdab..4d2954bc93e 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/PendingTraceBuffer.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/PendingTraceBuffer.java @@ -64,6 +64,7 @@ private static class DelayingPendingTraceBuffer extends PendingTraceBuffer { private final MpscBlockingConsumerArrayQueue queue; private final Thread worker; private final TimeSource timeSource; + private final HealthMetrics healthMetrics; private volatile boolean closed = false; private final AtomicInteger flushCounter = new AtomicInteger(0); @@ -102,6 +103,7 @@ public void enqueue(Element pendingTrace) { if (!pendingTrace.writeOnBufferFull()) { return; } + healthMetrics.onPendingWriteAround(); pendingTrace.write(); } } @@ -324,6 +326,7 @@ public DelayingPendingTraceBuffer( this.queue = new MpscBlockingConsumerArrayQueue<>(bufferSize); this.worker = newAgentThread(TRACE_MONITOR, new Worker()); this.timeSource = timeSource; + this.healthMetrics = healthMetrics; boolean runningSpansEnabled = config.isLongRunningTraceEnabled(); this.runningTracesTracker = runningSpansEnabled diff --git a/dd-trace-core/src/main/java/datadog/trace/core/monitor/HealthMetrics.java b/dd-trace-core/src/main/java/datadog/trace/core/monitor/HealthMetrics.java index d0531a330cf..7b4229f46f0 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/monitor/HealthMetrics.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/monitor/HealthMetrics.java @@ -71,6 +71,8 @@ public void onSend( public void onFailedSend( final int traceCount, final int sizeInBytes, final RemoteApi.Response response) {} + public void onPendingWriteAround() {} + public void onLongRunningUpdate(final int dropped, final int write, final int expired) {} /** diff --git a/dd-trace-core/src/main/java/datadog/trace/core/monitor/TracerHealthMetrics.java b/dd-trace-core/src/main/java/datadog/trace/core/monitor/TracerHealthMetrics.java index 54024e85721..878c431607b 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/monitor/TracerHealthMetrics.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/monitor/TracerHealthMetrics.java @@ -85,6 +85,8 @@ public class TracerHealthMetrics extends HealthMetrics implements AutoCloseable private final LongAdder scopeCloseErrors = new LongAdder(); private final LongAdder userScopeCloseErrors = new LongAdder(); + private final LongAdder pendingWriteAround = new LongAdder(); + private final LongAdder longRunningTracesWrite = new LongAdder(); private final LongAdder longRunningTracesDropped = new LongAdder(); private final LongAdder longRunningTracesExpired = new LongAdder(); @@ -295,6 +297,11 @@ public void onFailedSend( onSendAttempt(traceCount, sizeInBytes, response); } + @Override + public void onPendingWriteAround() { + pendingWriteAround.increment(); + } + @Override public void onLongRunningUpdate(final int dropped, final int write, final int expired) { longRunningTracesWrite.add(write); @@ -473,6 +480,8 @@ public void run(TracerHealthMetrics target) { reportIfChanged( target.statsd, "scope.user.close.error", target.userScopeCloseErrors, NO_TAGS); + reportIfChanged(target.statsd, "pending.write_around", target.pendingWriteAround, NO_TAGS); + reportIfChanged( target.statsd, "long-running.write", target.longRunningTracesWrite, NO_TAGS); reportIfChanged( @@ -602,6 +611,9 @@ public String summary() { + "\nuserScopeCloseErrors=" + userScopeCloseErrors.sum() + "\n" + + "\npendingWriteAround=" + + pendingWriteAround.sum() + + "\n" + "\nlongRunningTracesWrite=" + longRunningTracesWrite.sum() + "\nlongRunningTracesDropped="