From 4d1369dec6c37dad3c1964dcdef47559a6bf3e0c Mon Sep 17 00:00:00 2001 From: DJ Gregor Date: Fri, 24 Oct 2025 15:44:38 -0700 Subject: [PATCH 1/6] Trace dump refactor in preparation for adding long running traces --- .../common/writer/TraceDumpJsonExporter.java | 22 ++++---- .../trace/core/PendingTraceBuffer.java | 53 +++++++++++-------- 2 files changed, 40 insertions(+), 35 deletions(-) diff --git a/dd-trace-core/src/main/java/datadog/trace/common/writer/TraceDumpJsonExporter.java b/dd-trace-core/src/main/java/datadog/trace/common/writer/TraceDumpJsonExporter.java index e2b3c0fc976..ec830c55cd5 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/writer/TraceDumpJsonExporter.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/writer/TraceDumpJsonExporter.java @@ -3,12 +3,9 @@ import com.squareup.moshi.JsonAdapter; import com.squareup.moshi.Moshi; import com.squareup.moshi.Types; -import datadog.trace.api.flare.TracerFlare; import datadog.trace.core.DDSpan; -import java.io.IOException; import java.util.Collection; import java.util.List; -import java.util.zip.ZipOutputStream; public class TraceDumpJsonExporter implements Writer { @@ -17,11 +14,9 @@ public class TraceDumpJsonExporter implements Writer { .add(DDSpanJsonAdapter.buildFactory(false)) .build() .adapter(Types.newParameterizedType(Collection.class, DDSpan.class)); - private StringBuilder dumpText; - private ZipOutputStream zip; + private final StringBuilder dumpText; - public TraceDumpJsonExporter(ZipOutputStream zip) { - this.zip = zip; + public TraceDumpJsonExporter() { dumpText = new StringBuilder(); } @@ -32,7 +27,8 @@ public void write(final Collection trace) { @Override public void write(List trace) { - // Do nothing + Collection collectionTrace = trace; + write(collectionTrace); } @Override @@ -42,14 +38,14 @@ public void start() { @Override public boolean flush() { - try { - TracerFlare.addText(zip, "pending_traces.txt", dumpText.toString()); - } catch (IOException e) { - // do nothing - } + // do nothing return true; } + public String getDumpJson() { + return dumpText.toString(); + } + @Override public void close() { // do nothing diff --git a/dd-trace-core/src/main/java/datadog/trace/core/PendingTraceBuffer.java b/dd-trace-core/src/main/java/datadog/trace/core/PendingTraceBuffer.java index 0057eb2ce7d..851a11efdab 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/PendingTraceBuffer.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/PendingTraceBuffer.java @@ -71,6 +71,23 @@ private static class DelayingPendingTraceBuffer extends PendingTraceBuffer { private final LongRunningTracesTracker runningTracesTracker; + private void dumpTraces() { + if (worker.isAlive()) { + int count = dumpCounter.get(); + int loop = 1; + boolean signaled = queue.offer(DUMP_ELEMENT); + while (!closed && !signaled) { + yieldOrSleep(loop++); + signaled = queue.offer(DUMP_ELEMENT); + } + int newCount = dumpCounter.get(); + while (!closed && count >= newCount) { + yieldOrSleep(loop++); + newCount = dumpCounter.get(); + } + } + } + public boolean longRunningSpansEnabled() { return runningTracesTracker != null; } @@ -136,6 +153,18 @@ public void flush() { } } + private String getDumpJson() { + try (TraceDumpJsonExporter writer = new TraceDumpJsonExporter()) { + for (Element e : DumpDrain.DUMP_DRAIN.collectTraces()) { + if (e instanceof PendingTrace) { + PendingTrace trace = (PendingTrace) e; + writer.write(trace.getSpans()); + } + } + return writer.getDumpJson(); + } + } + private static final class WriteDrain implements MessagePassingQueue.Consumer { private static final WriteDrain WRITE_DRAIN = new WriteDrain(); @@ -354,32 +383,12 @@ private TracerDump(DelayingPendingTraceBuffer buffer) { @Override public void prepareForFlare() { - if (buffer.worker.isAlive()) { - int count = buffer.dumpCounter.get(); - int loop = 1; - boolean signaled = buffer.queue.offer(DelayingPendingTraceBuffer.DUMP_ELEMENT); - while (!buffer.closed && !signaled) { - buffer.yieldOrSleep(loop++); - signaled = buffer.queue.offer(DelayingPendingTraceBuffer.DUMP_ELEMENT); - } - int newCount = buffer.dumpCounter.get(); - while (!buffer.closed && count >= newCount) { - buffer.yieldOrSleep(loop++); - newCount = buffer.dumpCounter.get(); - } - } + buffer.dumpTraces(); } @Override public void addReportToFlare(ZipOutputStream zip) throws IOException { - TraceDumpJsonExporter writer = new TraceDumpJsonExporter(zip); - for (Element e : DelayingPendingTraceBuffer.DumpDrain.DUMP_DRAIN.collectTraces()) { - if (e instanceof PendingTrace) { - PendingTrace trace = (PendingTrace) e; - writer.write(trace.getSpans()); - } - } - writer.flush(); + TracerFlare.addText(zip, "pending_traces.txt", buffer.getDumpJson()); } } } From 4efe1180cc9414d1891bc07db754675114d578ba Mon Sep 17 00:00:00 2001 From: DJ Gregor Date: Fri, 24 Oct 2025 15:46:47 -0700 Subject: [PATCH 2/6] Add long_running_traces.json to flare report Synchronized accesses to traceArray in LongRunningTracesTracker since the flare reporter can now access the array. This shouldn't be a concern for blocking because addTrace and flushAndCompact are the existing calls from PendingTraceBuffer's run() loop and getTracesAsJson is called by the reporter thread and will complete fairly quickly. --- .../trace/core/LongRunningTracesTracker.java | 43 +++++++++++++++++-- .../core/LongRunningTracesTrackerTest.groovy | 39 +++++++++++++++++ .../trace/core/PendingTraceBufferTest.groovy | 2 +- 3 files changed, 80 insertions(+), 4 deletions(-) diff --git a/dd-trace-core/src/main/java/datadog/trace/core/LongRunningTracesTracker.java b/dd-trace-core/src/main/java/datadog/trace/core/LongRunningTracesTracker.java index 5d2e32fecf6..c53c64347eb 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/LongRunningTracesTracker.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/LongRunningTracesTracker.java @@ -1,14 +1,28 @@ package datadog.trace.core; +import static java.util.Comparator.comparingLong; + import datadog.communication.ddagent.DDAgentFeaturesDiscovery; import datadog.communication.ddagent.SharedCommunicationObjects; import datadog.trace.api.Config; +import datadog.trace.api.flare.TracerFlare; +import datadog.trace.common.writer.TraceDumpJsonExporter; import datadog.trace.core.monitor.HealthMetrics; +import java.io.IOException; import java.util.ArrayList; +import java.util.Comparator; import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.zip.ZipOutputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LongRunningTracesTracker implements TracerFlare.Reporter { + private static final Logger LOGGER = LoggerFactory.getLogger(LongRunningTracesTracker.class); + private static final int MAX_DUMPED_TRACES = 50; + private static final Comparator TRACE_BY_START_TIME = + comparingLong(PendingTrace::getRunningTraceStartTime); -public class LongRunningTracesTracker { private final DDAgentFeaturesDiscovery features; private final HealthMetrics healthMetrics; private long lastFlushMilli = 0; @@ -41,6 +55,8 @@ public LongRunningTracesTracker( (int) TimeUnit.SECONDS.toMillis(config.getLongRunningTraceFlushInterval()); this.features = sharedCommunicationObjects.featuresDiscovery(config); this.healthMetrics = healthMetrics; + + TracerFlare.addReporter(this); } public boolean add(PendingTraceBuffer.Element element) { @@ -56,7 +72,7 @@ public boolean add(PendingTraceBuffer.Element element) { return true; } - private void addTrace(PendingTrace trace) { + private synchronized void addTrace(PendingTrace trace) { if (trace.empty()) { return; } @@ -67,7 +83,7 @@ private void addTrace(PendingTrace trace) { traceArray.add(trace); } - public void flushAndCompact(long nowMilli) { + public synchronized void flushAndCompact(long nowMilli) { if (nowMilli < lastFlushMilli + TimeUnit.SECONDS.toMillis(1)) { return; } @@ -139,4 +155,25 @@ private void flushStats() { write = 0; expired = 0; } + + public String getTracesAsJson() { + try (TraceDumpJsonExporter writer = new TraceDumpJsonExporter()) { + List traces; + synchronized (this) { + traces = new ArrayList<>(traceArray); + } + traces.sort(TRACE_BY_START_TIME); + + int limit = Math.min(traces.size(), MAX_DUMPED_TRACES); + for (int i = 0; i < limit; i++) { + writer.write(traces.get(i).getSpans()); + } + return writer.getDumpJson(); + } + } + + @Override + public void addReportToFlare(ZipOutputStream zip) throws IOException { + TracerFlare.addText(zip, "long_running_traces.txt", getTracesAsJson()); + } } diff --git a/dd-trace-core/src/test/groovy/datadog/trace/core/LongRunningTracesTrackerTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/core/LongRunningTracesTrackerTest.groovy index 7edd12cf2ed..493fc1db5c5 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/core/LongRunningTracesTrackerTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/core/LongRunningTracesTrackerTest.groovy @@ -192,4 +192,43 @@ class LongRunningTracesTrackerTest extends DDSpecification { PrioritySampling.USER_KEEP | 1 | LongRunningTracesTracker.WRITE_RUNNING_SPANS PrioritySampling.SAMPLER_KEEP | 1 | LongRunningTracesTracker.WRITE_RUNNING_SPANS } + + def "getTracesAsJson with no traces"() { + when: + def json = tracker.getTracesAsJson() + + then: + json == "" + } + + def "getTracesAsJson with traces"() { + given: + def trace = newTraceToTrack() + tracker.add(trace) + + when: + def json = tracker.getTracesAsJson() + + then: + json != null + !json.isEmpty() + json.contains('"service"') + json.contains('"name"') + } + + def "testing tracer flare dump with trace"() { + given: + def trace = newTraceToTrack() + tracker.add(trace) + + when: + def entries = PendingTraceBufferTest.buildAndExtractZip() + + then: + entries.containsKey("long_running_traces.txt") + + def jsonContent = entries["long_running_traces.txt"] as String + jsonContent.contains('"service"') + jsonContent.contains('"name"') + } } diff --git a/dd-trace-core/src/test/groovy/datadog/trace/core/PendingTraceBufferTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/core/PendingTraceBufferTest.groovy index c9bf72beee7..189e3e95d13 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/core/PendingTraceBufferTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/core/PendingTraceBufferTest.groovy @@ -582,7 +582,7 @@ class PendingTraceBufferTest extends DDSpecification { return DDSpan.create("test", 0, context, null) } - def buildAndExtractZip() { + static buildAndExtractZip() { TracerFlare.prepareForFlare() def out = new ByteArrayOutputStream() try (ZipOutputStream zip = new ZipOutputStream(out)) { From 488dc52638d02f62edbf755959a69390debf431b Mon Sep 17 00:00:00 2001 From: DJ Gregor Date: Fri, 24 Oct 2025 15:00:35 -0700 Subject: [PATCH 3/6] Track long running traces when agent does not support long running feature This allows dumping long running traces when not connected to a Datadog Agent using the new JMX flare feature. A warning message will be logged in this case to indicate that long running traces will not be sent upstream but are available in a flare. Previously the long running traces buffer would always be empty, even though the feature was enabled with dd.trace.experimental.long-running.enabled=true. This led to a good amount of confusion when I was initially developing a feature to dump long running traces without a local Datadog Agent running. --- .../trace/core/LongRunningTracesTracker.java | 21 +++++++++++++++---- .../core/LongRunningTracesTrackerTest.groovy | 6 ++++-- 2 files changed, 21 insertions(+), 6 deletions(-) diff --git a/dd-trace-core/src/main/java/datadog/trace/core/LongRunningTracesTracker.java b/dd-trace-core/src/main/java/datadog/trace/core/LongRunningTracesTracker.java index c53c64347eb..c6586d1cfdc 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/LongRunningTracesTracker.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/LongRunningTracesTracker.java @@ -5,6 +5,7 @@ import datadog.communication.ddagent.DDAgentFeaturesDiscovery; import datadog.communication.ddagent.SharedCommunicationObjects; import datadog.trace.api.Config; +import datadog.trace.api.config.TracerConfig; import datadog.trace.api.flare.TracerFlare; import datadog.trace.common.writer.TraceDumpJsonExporter; import datadog.trace.core.monitor.HealthMetrics; @@ -56,6 +57,16 @@ public LongRunningTracesTracker( this.features = sharedCommunicationObjects.featuresDiscovery(config); this.healthMetrics = healthMetrics; + if (!features.supportsLongRunning()) { + LOGGER.warn( + "Long running trace tracking is enabled via {}, however the Datadog Agent version {} does not support receiving long running traces. " + + "Long running traces will be tracked locally in memory (up to {} traces) but will NOT be automatically reported to the agent. " + + "Long running traces are included in tracer flares.", + "dd." + TracerConfig.TRACE_LONG_RUNNING_ENABLED, + features.getVersion() != null ? features.getVersion() : "unknown", + maxTrackedTraces); + } + TracerFlare.addReporter(this); } @@ -94,7 +105,7 @@ public synchronized void flushAndCompact(long nowMilli) { cleanSlot(i); continue; } - if (trace.empty() || !features.supportsLongRunning()) { + if (trace.empty()) { trace.compareAndSetLongRunningState(WRITE_RUNNING_SPANS, NOT_TRACKED); cleanSlot(i); continue; @@ -111,9 +122,11 @@ public synchronized void flushAndCompact(long nowMilli) { cleanSlot(i); continue; } - trace.compareAndSetLongRunningState(TRACKED, WRITE_RUNNING_SPANS); - write++; - trace.write(); + if (features.supportsLongRunning()) { + trace.compareAndSetLongRunningState(TRACKED, WRITE_RUNNING_SPANS); + write++; + trace.write(); + } } i++; } diff --git a/dd-trace-core/src/test/groovy/datadog/trace/core/LongRunningTracesTrackerTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/core/LongRunningTracesTrackerTest.groovy index 493fc1db5c5..67ddabd1fb3 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/core/LongRunningTracesTrackerTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/core/LongRunningTracesTrackerTest.groovy @@ -114,7 +114,7 @@ class LongRunningTracesTrackerTest extends DDSpecification { trace.longRunningTrackedState == LongRunningTracesTracker.EXPIRED } - def "agent disabled feature"() { + def "trace remains tracked but not written when agent long running feature not available"() { given: def trace = newTraceToTrack() tracker.add(trace) @@ -124,7 +124,9 @@ class LongRunningTracesTrackerTest extends DDSpecification { then: 1 * features.supportsLongRunning() >> false - tracker.traceArray.size() == 0 + tracker.traceArray.size() == 1 + tracker.traceArray[0].longRunningTrackedState == LongRunningTracesTracker.TRACKED + tracker.traceArray[0].getLastWriteTime() == 0 } def flushAt(long timeMilli) { From c4850cf561dae3644b2ad15ae1493a662a73eb51 Mon Sep 17 00:00:00 2001 From: DJ Gregor Date: Fri, 24 Oct 2025 15:40:51 -0700 Subject: [PATCH 4/6] Add JMX MBean for getting tracer flare files The JMX telemetry feature is controlled by dd.telemetry.jmx.enabled and is disabled by default. It enables JMXFetch telemetry (if JMXFetch is enabled, which it is byd default) and also enables a new tracer flare MBean at datadog.flare:type=TracerFlare. This new MBean exposes three operations: java.lang.String listFlareFiles() - Returns a list of sources and files available from each source. java.lang.String getFlareFile(java.lang.String p1,java.lang.String p2) - Returns a single file from a specific reporter (or flare source). - If the file ends in ".txt", it is returned as-is, otherwise it is base64 encoded. java.lang.String generateFullFlareZip() - Returns a full flare dump, base64 encoded. An easy way to enable this for testing is to add these arguments: -Ddd.telemetry.jmx.enabled=true -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.host=127.0.0.1 -Dcom.sun.management.jmxremote.port=9010 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false To test, you can use jmxterm (https://github.com/jiaqi/jmxterm) like this: echo "run -b datadog.flare:type=TracerFlare listFlareFiles" | \ java --add-exports jdk.jconsole/sun.tools.jconsole=ALL-UNNAMED \ -jar jmxterm-1.0.4-uber.jar -l localhost:9010 -n -v silent echo "run -b datadog.flare:type=TracerFlare getFlareFile datadog.trace.agent.core.LongRunningTracesTracker long_running_traces.txt" | \ java --add-exports jdk.jconsole/sun.tools.jconsole=ALL-UNNAMED \ -jar jmxterm-1.0.4-uber.jar -l localhost:9010 -n -v silent | \ jq . echo "run -b datadog.flare:type=TracerFlare generateFullFlareZip" | \ java --add-exports jdk.jconsole/sun.tools.jconsole=ALL-UNNAMED \ -jar jmxterm-1.0.4-uber.jar -l localhost:9010 -n -v silent | \ base64 -d > /tmp/flare.zip && \ unzip -v /tmp/flare.zip --- .../trace/agent/jmxfetch/JMXFetch.java | 1 + .../trace/api/config/GeneralConfig.java | 1 + .../main/java/datadog/trace/api/Config.java | 8 + .../datadog/trace/api/flare/TracerFlare.java | 15 ++ .../trace/api/flare/TracerFlareTest.groovy | 28 +++ metadata/supported-configurations.json | 1 + utils/flare-utils/build.gradle.kts | 3 + .../datadog/flare/TracerFlareManager.java | 200 ++++++++++++++++++ .../flare/TracerFlareManagerMBean.java | 53 +++++ .../java/datadog/flare/TracerFlarePoller.java | 3 + .../datadog/flare/TracerFlareService.java | 61 +++++- .../datadog/flare/TracerFlareJmxTest.groovy | 73 +++++++ 12 files changed, 446 insertions(+), 1 deletion(-) create mode 100644 utils/flare-utils/src/main/java/datadog/flare/TracerFlareManager.java create mode 100644 utils/flare-utils/src/main/java/datadog/flare/TracerFlareManagerMBean.java create mode 100644 utils/flare-utils/src/test/groovy/datadog/flare/TracerFlareJmxTest.groovy diff --git a/dd-java-agent/agent-jmxfetch/src/main/java/datadog/trace/agent/jmxfetch/JMXFetch.java b/dd-java-agent/agent-jmxfetch/src/main/java/datadog/trace/agent/jmxfetch/JMXFetch.java index ea2a79f49c2..6d5b7f21fa4 100644 --- a/dd-java-agent/agent-jmxfetch/src/main/java/datadog/trace/agent/jmxfetch/JMXFetch.java +++ b/dd-java-agent/agent-jmxfetch/src/main/java/datadog/trace/agent/jmxfetch/JMXFetch.java @@ -116,6 +116,7 @@ private static void run(final StatsDClientManager statsDClientManager, final Con .refreshBeansPeriod(refreshBeansPeriod) .globalTags(globalTags) .reporter(reporter) + .jmxfetchTelemetry(config.isTelemetryJmxEnabled()) .connectionFactory(new AgentConnectionFactory()); if (config.isJmxFetchMultipleRuntimeServicesEnabled()) { diff --git a/dd-trace-api/src/main/java/datadog/trace/api/config/GeneralConfig.java b/dd-trace-api/src/main/java/datadog/trace/api/config/GeneralConfig.java index 2e441e2677b..4eabdc3fdc6 100644 --- a/dd-trace-api/src/main/java/datadog/trace/api/config/GeneralConfig.java +++ b/dd-trace-api/src/main/java/datadog/trace/api/config/GeneralConfig.java @@ -101,6 +101,7 @@ public final class GeneralConfig { public static final String TELEMETRY_DEPENDENCY_RESOLUTION_QUEUE_SIZE = "telemetry.dependency-resolution.queue.size"; public static final String TELEMETRY_DEBUG_REQUESTS_ENABLED = "telemetry.debug.requests.enabled"; + public static final String TELEMETRY_JMX_ENABLED = "telemetry.jmx.enabled"; public static final String AGENTLESS_LOG_SUBMISSION_ENABLED = "agentless.log.submission.enabled"; public static final String AGENTLESS_LOG_SUBMISSION_QUEUE_SIZE = "agentless.log.submission.queue.size"; diff --git a/internal-api/src/main/java/datadog/trace/api/Config.java b/internal-api/src/main/java/datadog/trace/api/Config.java index 6d6bd4d1d46..f65db87361f 100644 --- a/internal-api/src/main/java/datadog/trace/api/Config.java +++ b/internal-api/src/main/java/datadog/trace/api/Config.java @@ -395,6 +395,7 @@ import static datadog.trace.api.config.GeneralConfig.TELEMETRY_DEPENDENCY_RESOLUTION_QUEUE_SIZE; import static datadog.trace.api.config.GeneralConfig.TELEMETRY_EXTENDED_HEARTBEAT_INTERVAL; import static datadog.trace.api.config.GeneralConfig.TELEMETRY_HEARTBEAT_INTERVAL; +import static datadog.trace.api.config.GeneralConfig.TELEMETRY_JMX_ENABLED; import static datadog.trace.api.config.GeneralConfig.TELEMETRY_LOG_COLLECTION_ENABLED; import static datadog.trace.api.config.GeneralConfig.TELEMETRY_METRICS_ENABLED; import static datadog.trace.api.config.GeneralConfig.TELEMETRY_METRICS_INTERVAL; @@ -1242,6 +1243,7 @@ public static String getHostName() { private final boolean isTelemetryDependencyServiceEnabled; private final boolean telemetryMetricsEnabled; private final boolean isTelemetryLogCollectionEnabled; + private final boolean isTelemetryJmxEnabled; private final int telemetryDependencyResolutionQueueSize; private final boolean azureAppServices; @@ -2154,6 +2156,8 @@ PROFILING_DATADOG_PROFILER_ENABLED, isDatadogProfilerSafeInCurrentEnvironment()) && configProvider.getBoolean( TELEMETRY_LOG_COLLECTION_ENABLED, DEFAULT_TELEMETRY_LOG_COLLECTION_ENABLED); + isTelemetryJmxEnabled = configProvider.getBoolean(TELEMETRY_JMX_ENABLED, false); + isTelemetryDependencyServiceEnabled = configProvider.getBoolean( TELEMETRY_DEPENDENCY_COLLECTION_ENABLED, @@ -3717,6 +3721,10 @@ public boolean isTelemetryLogCollectionEnabled() { return isTelemetryLogCollectionEnabled; } + public boolean isTelemetryJmxEnabled() { + return isTelemetryJmxEnabled; + } + public int getTelemetryDependencyResolutionQueueSize() { return telemetryDependencyResolutionQueueSize; } diff --git a/internal-api/src/main/java/datadog/trace/api/flare/TracerFlare.java b/internal-api/src/main/java/datadog/trace/api/flare/TracerFlare.java index f2ba39041a4..907024f2cd9 100644 --- a/internal-api/src/main/java/datadog/trace/api/flare/TracerFlare.java +++ b/internal-api/src/main/java/datadog/trace/api/flare/TracerFlare.java @@ -4,6 +4,8 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -67,6 +69,19 @@ public static void addReporter(Reporter reporter) { reporters.put(reporter.getClass(), reporter); } + public static Collection getReporters() { + return Collections.unmodifiableCollection(reporters.values()); + } + + public static Reporter getReporter(String className) { + for (Reporter reporter : getReporters()) { + if (reporter.getClass().getName().equals(className)) { + return reporter; + } + } + return null; + } + public static void addText(ZipOutputStream zip, String section, String text) throws IOException { zip.putNextEntry(new ZipEntry(section)); if (null != text) { diff --git a/internal-api/src/test/groovy/datadog/trace/api/flare/TracerFlareTest.groovy b/internal-api/src/test/groovy/datadog/trace/api/flare/TracerFlareTest.groovy index 4a6b916b80a..a9dfc44161f 100644 --- a/internal-api/src/test/groovy/datadog/trace/api/flare/TracerFlareTest.groovy +++ b/internal-api/src/test/groovy/datadog/trace/api/flare/TracerFlareTest.groovy @@ -51,6 +51,34 @@ class TracerFlareTest extends DDSpecification { /^(java.lang.IllegalStateException: (bin|txt) \(expected\)\n){2}$/ } + def "test getReporter finds reporter by class name"() { + setup: + def reporter1 = Mock(Reporter1) + def reporter2 = Mock(Reporter2) + TracerFlare.addReporter(reporter1) + TracerFlare.addReporter(reporter2) + + when: + def found1 = TracerFlare.getReporter(reporter1.getClass().getName()) + def found2 = TracerFlare.getReporter(reporter2.getClass().getName()) + + then: + found1 == reporter1 + found2 == reporter2 + } + + def "test getReporter returns null for non-existent reporter"() { + setup: + def reporter = Mock(Reporter1) + TracerFlare.addReporter(reporter) + + when: + def found = TracerFlare.getReporter("com.example.NonExistentReporter") + + then: + found == null + } + def buildAndExtractZip() { TracerFlare.prepareForFlare() def out = new ByteArrayOutputStream() diff --git a/metadata/supported-configurations.json b/metadata/supported-configurations.json index 65a52186da0..c6dac5d9506 100644 --- a/metadata/supported-configurations.json +++ b/metadata/supported-configurations.json @@ -478,6 +478,7 @@ "DD_TELEMETRY_FORWARDER_MAX_TAGS": ["A"], "DD_TELEMETRY_FORWARDER_PATH": ["A"], "DD_TELEMETRY_HEARTBEAT_INTERVAL": ["A"], + "DD_TELEMETRY_JMX_ENABLED": ["A"], "DD_TELEMETRY_LOG_COLLECTION_ENABLED": ["A"], "DD_TELEMETRY_METRICS_ENABLED": ["A"], "DD_TELEMETRY_METRICS_INTERVAL": ["A"], diff --git a/utils/flare-utils/build.gradle.kts b/utils/flare-utils/build.gradle.kts index f718826c226..1952fea5cea 100644 --- a/utils/flare-utils/build.gradle.kts +++ b/utils/flare-utils/build.gradle.kts @@ -12,4 +12,7 @@ dependencies { implementation(project(":utils:version-utils")) implementation(project(":internal-api")) implementation(libs.slf4j) + + testImplementation(project(":utils:test-utils")) + testImplementation(project(":dd-trace-api")) } diff --git a/utils/flare-utils/src/main/java/datadog/flare/TracerFlareManager.java b/utils/flare-utils/src/main/java/datadog/flare/TracerFlareManager.java new file mode 100644 index 00000000000..b2928d0cdc2 --- /dev/null +++ b/utils/flare-utils/src/main/java/datadog/flare/TracerFlareManager.java @@ -0,0 +1,200 @@ +package datadog.flare; + +import datadog.trace.api.Config; +import datadog.trace.api.flare.TracerFlare; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.nio.charset.StandardCharsets; +import java.util.Base64; +import java.util.Map; +import java.util.zip.ZipEntry; +import java.util.zip.ZipInputStream; +import java.util.zip.ZipOutputStream; +import javax.management.InstanceAlreadyExistsException; +import javax.management.MBeanRegistrationException; +import javax.management.MBeanServer; +import javax.management.MalformedObjectNameException; +import javax.management.NotCompliantMBeanException; +import javax.management.ObjectName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * MBean implementation for managing and accessing tracer flare data. + * + *

This class provides JMX operations to list flare data sources and retrieve flare data either + * for individual sources or a complete flare archive. See {@link TracerFlareManagerMBean} for + * documentation on the exposed operations. + */ +public class TracerFlareManager implements TracerFlareManagerMBean { + private static final Logger LOGGER = LoggerFactory.getLogger(TracerFlareManager.class); + + private final TracerFlareService flareService; + protected ObjectName mbeanName; + + public TracerFlareManager(TracerFlareService flareService) { + this.flareService = flareService; + } + + @Override + public String generateFullFlareZip() throws IOException { + TracerFlare.prepareForFlare(); + + long currentMillis = System.currentTimeMillis(); + boolean dumpThreads = Config.get().isTriageEnabled() || LOGGER.isDebugEnabled(); + byte[] zipBytes = flareService.buildFlareZip(currentMillis, currentMillis, dumpThreads); + return Base64.getEncoder().encodeToString(zipBytes); + } + + @Override + public String listFlareFiles() throws IOException { + TracerFlare.prepareForFlare(); + + StringBuilder result = new StringBuilder(); + + for (Map.Entry entry : TracerFlareService.BUILT_IN_SOURCES.entrySet()) { + String sourceName = entry.getKey(); + String[] files = entry.getValue(); + + for (String filename : files) { + result.append(sourceName).append(" ").append(filename).append("\n"); + } + } + + for (TracerFlare.Reporter reporter : TracerFlare.getReporters()) { + try (ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + ZipOutputStream zip = new ZipOutputStream(bytes)) { + reporter.addReportToFlare(zip); + zip.finish(); + + try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes.toByteArray()); + ZipInputStream zis = new ZipInputStream(bais)) { + ZipEntry entry; + while ((entry = zis.getNextEntry()) != null) { + result + .append(reporter.getClass().getName()) + .append(" ") + .append(entry.getName()) + .append("\n"); + zis.closeEntry(); + } + } + } catch (IOException e) { + LOGGER.debug("Failed to inspect reporter {}", reporter.getClass().getName(), e); + } + } + + return result.toString(); + } + + @Override + public String getFlareFile(String sourceName, String filename) throws IOException { + final byte[] zipBytes; + if (isBuiltInSource(sourceName)) { + zipBytes = flareService.getBuiltInSourceZip(sourceName); + } else { + zipBytes = getReporterFile(sourceName); + } + return extractFileFromZip(zipBytes, filename); + } + + private boolean isBuiltInSource(String sourceName) { + return TracerFlareService.BUILT_IN_SOURCES.containsKey(sourceName); + } + + /** + * Generates flare data for a specific reporter. + * + *

The reporter's data is generated as a ZIP file, and the specified filename is extracted. If + * the file is text, it is returned as plain text; if binary, it is returned base64-encoded. + * + * @param reporterClassName the fully qualified class name of the reporter + * @return the zip file containing the reporter's content + * @throws IOException if an error occurs while generating the flare + */ + private byte[] getReporterFile(String reporterClassName) throws IOException { + TracerFlare.Reporter reporter = TracerFlare.getReporter(reporterClassName); + if (reporter == null) { + throw new IOException("Error: Reporter not found: " + reporterClassName); + } + + reporter.prepareForFlare(); + + try (ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + ZipOutputStream zip = new ZipOutputStream(bytes)) { + reporter.addReportToFlare(zip); + zip.finish(); + return bytes.toByteArray(); + } + } + + /** + * Extracts a specific file from a ZIP archive. + * + *

Searches through the ZIP entries for the specified filename and returns its content. If the + * file name ends in ".txt", it is returned as plain text; if binary, it is returned + * base64-encoded. + * + * @param zipBytes the ZIP file bytes + * @param filename the name of the file to extract + * @return the file content (plain text or base64-encoded binary) + * @throws IOException if an error occurs while reading the ZIP + */ + private String extractFileFromZip(byte[] zipBytes, String filename) throws IOException { + try (ByteArrayInputStream bais = new ByteArrayInputStream(zipBytes); + ZipInputStream zis = new ZipInputStream(bais)) { + ZipEntry entry; + while ((entry = zis.getNextEntry()) != null) { + if (entry.getName().equals(filename)) { + ByteArrayOutputStream content = new ByteArrayOutputStream(); + byte[] buffer = new byte[8192]; + int bytesRead; + while ((bytesRead = zis.read(buffer)) != -1) { + content.write(buffer, 0, bytesRead); + } + zis.closeEntry(); + + byte[] contentBytes = content.toByteArray(); + if (entry.getName().endsWith(".txt")) { + return new String(contentBytes, StandardCharsets.UTF_8); + } else { + return Base64.getEncoder().encodeToString(contentBytes); + } + } + zis.closeEntry(); + } + + throw new IOException("Failed to extract file: " + filename); + } + } + + void registerMBean() { + MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + + try { + mbeanName = new ObjectName("datadog.flare:type=TracerFlare"); + mbs.registerMBean(this, mbeanName); + LOGGER.info("Registered TracerFlare MBean at {}", mbeanName); + } catch (MalformedObjectNameException + | InstanceAlreadyExistsException + | MBeanRegistrationException + | NotCompliantMBeanException e) { + LOGGER.warn("Failed to register TracerFlare MBean", e); + mbeanName = null; + } + } + + void unregisterMBean() { + if (mbeanName != null) { + MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + try { + mbs.unregisterMBean(mbeanName); + LOGGER.debug("Unregistered TracerFlare MBean"); + } catch (Exception e) { + LOGGER.warn("Failed to unregister TracerFlare MBean", e); + } + } + } +} diff --git a/utils/flare-utils/src/main/java/datadog/flare/TracerFlareManagerMBean.java b/utils/flare-utils/src/main/java/datadog/flare/TracerFlareManagerMBean.java new file mode 100644 index 00000000000..010bb7df04b --- /dev/null +++ b/utils/flare-utils/src/main/java/datadog/flare/TracerFlareManagerMBean.java @@ -0,0 +1,53 @@ +package datadog.flare; + +import java.io.IOException; + +/** + * MBean interface for managing and accessing tracer flare data. + * + *

This interface provides JMX operations to inspect flare data sources and generate flare data + * either for individual sources or as a complete ZIP archive. Sources include both registered + * reporters and built-in data (config, runtime, flare prelude, etc.). + */ +public interface TracerFlareManagerMBean { + /** + * Lists all available flare files from all sources. + * + *

Returns a newline-separated string where each line is formatted as "<source> + * <file>". This format makes it easy to pass the source and filename to {@link + * #getFlareFile(String, String)}. + * + *

Example output: + * + *

+   * config initial_config.txt
+   * ...
+   * datadog.trace.agent.core.CoreTracer tracer_health.txt
+   * ...
+   * 
+ * + * @return newline-separated string listing all available files and their source name + * @throws IOException if an error occurs + */ + String listFlareFiles() throws IOException; + + /** + * Returns a specific flare file by source name and filename. + * + *

If the file is text, it is returned as plain text; if binary, it is returned base64-encoded. + * + * @param sourceName the name of the source (reporter class name or built-in source name) + * @param filename the name of the file to retrieve + * @return the file content (plain text or base64-encoded binary) + * @throws IOException if an error occurs while generating or extracting the data + */ + String getFlareFile(String sourceName, String filename) throws IOException; + + /** + * Generates a complete tracer flare as a ZIP file. + * + * @return base64-encoded ZIP file containing the complete flare data + * @throws IOException if an error occurs while generating the flare ZIP + */ + String generateFullFlareZip() throws IOException; +} diff --git a/utils/flare-utils/src/main/java/datadog/flare/TracerFlarePoller.java b/utils/flare-utils/src/main/java/datadog/flare/TracerFlarePoller.java index 49755cf24d0..b695f0112c5 100644 --- a/utils/flare-utils/src/main/java/datadog/flare/TracerFlarePoller.java +++ b/utils/flare-utils/src/main/java/datadog/flare/TracerFlarePoller.java @@ -54,6 +54,9 @@ private void doStop() { if (null != stopSubmitter) { stopSubmitter.run(); } + if (null != tracerFlareService) { + tracerFlareService.close(); + } } final class Preparer implements ProductListener { diff --git a/utils/flare-utils/src/main/java/datadog/flare/TracerFlareService.java b/utils/flare-utils/src/main/java/datadog/flare/TracerFlareService.java index dcb097287a3..e8b65f513d7 100644 --- a/utils/flare-utils/src/main/java/datadog/flare/TracerFlareService.java +++ b/utils/flare-utils/src/main/java/datadog/flare/TracerFlareService.java @@ -24,6 +24,8 @@ import java.time.ZoneOffset; import java.time.ZonedDateTime; import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.zip.ZipOutputStream; import okhttp3.HttpUrl; @@ -49,11 +51,23 @@ final class TracerFlareService { private static final int MAX_LOGFILE_SIZE_BYTES = MAX_LOGFILE_SIZE_MB << 20; + static final Map BUILT_IN_SOURCES = new HashMap<>(); + + static { + BUILT_IN_SOURCES.put("prelude", new String[] {"flare_info.txt", "tracer_version.txt"}); + BUILT_IN_SOURCES.put("config", new String[] {"initial_config.txt"}); + BUILT_IN_SOURCES.put( + "runtime", + new String[] {"jvm_args.txt", "classpath.txt", "library_path.txt", "boot_classpath.txt"}); + BUILT_IN_SOURCES.put("threads", new String[] {"threads.txt"}); + } + private final AgentTaskScheduler scheduler = new AgentTaskScheduler(TRACER_FLARE); private final Config config; private final OkHttpClient okHttpClient; private final HttpUrl flareUrl; + private final TracerFlareManager jmxManager; private boolean logLevelOverridden; private volatile long flareStartMillis; @@ -65,9 +79,22 @@ final class TracerFlareService { this.okHttpClient = okHttpClient; this.flareUrl = agentUrl.newBuilder().addPathSegments(FLARE_ENDPOINT).build(); + if (config.isTelemetryJmxEnabled()) { + jmxManager = new TracerFlareManager(this); + jmxManager.registerMBean(); + } else { + jmxManager = null; + } + applyTriageReportTrigger(config.getTriageReportTrigger()); } + public void close() { + if (jmxManager != null) { + jmxManager.unregisterMBean(); + } + } + private void applyTriageReportTrigger(String triageTrigger) { if (null != triageTrigger && !triageTrigger.isEmpty()) { long delay = TimeUtils.parseSimpleDelay(triageTrigger); @@ -194,11 +221,13 @@ private String getFlareName(long endMillis) { return REPORT_PREFIX + config.getRuntimeId() + "-" + endMillis + ".zip"; } - private byte[] buildFlareZip(long startMillis, long endMillis, boolean dumpThreads) + public byte[] buildFlareZip(long startMillis, long endMillis, boolean dumpThreads) throws IOException { try (ByteArrayOutputStream bytes = new ByteArrayOutputStream(); ZipOutputStream zip = new ZipOutputStream(bytes)) { + // Make sure to update BUILT_IN_SOURCES and getBuiltInSourceZip if this list of functions + // changes addPrelude(zip, startMillis, endMillis); addConfig(zip); addRuntime(zip); @@ -212,17 +241,47 @@ private byte[] buildFlareZip(long startMillis, long endMillis, boolean dumpThrea } } + /** Generates a ZIP file for JMX fetch for a built-in source. */ + byte[] getBuiltInSourceZip(String sourceName) throws IOException { + try (ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + ZipOutputStream zip = new ZipOutputStream(bytes)) { + + switch (sourceName) { + case "prelude": + addPrelude(zip, flareStartMillis, System.currentTimeMillis()); + break; + case "config": + addConfig(zip); + break; + case "runtime": + addRuntime(zip); + break; + case "threads": + addThreadDump(zip); + break; + default: + throw new IOException("Unknown source name: " + sourceName); + } + + zip.finish(); + return bytes.toByteArray(); + } + } + private void addPrelude(ZipOutputStream zip, long startMillis, long endMillis) throws IOException { + // Make sure to update BUILT_IN_SOURCES if the files change here. TracerFlare.addText(zip, "flare_info.txt", flareInfo(startMillis, endMillis)); TracerFlare.addText(zip, "tracer_version.txt", VERSION); } private void addConfig(ZipOutputStream zip) throws IOException { + // Make sure to update BUILT_IN_SOURCES if the files change here. TracerFlare.addText(zip, "initial_config.txt", config.toString()); } private void addRuntime(ZipOutputStream zip) throws IOException { + // Make sure to update BUILT_IN_SOURCES if the files change here. try { RuntimeMXBean runtimeMXBean = ManagementFactory.getRuntimeMXBean(); TracerFlare.addText(zip, "jvm_args.txt", String.join(" ", runtimeMXBean.getInputArguments())); diff --git a/utils/flare-utils/src/test/groovy/datadog/flare/TracerFlareJmxTest.groovy b/utils/flare-utils/src/test/groovy/datadog/flare/TracerFlareJmxTest.groovy new file mode 100644 index 00000000000..030aade9a1c --- /dev/null +++ b/utils/flare-utils/src/test/groovy/datadog/flare/TracerFlareJmxTest.groovy @@ -0,0 +1,73 @@ +package datadog.flare + +import static datadog.trace.api.config.GeneralConfig.TELEMETRY_JMX_ENABLED + +import datadog.trace.api.Config +import datadog.trace.test.util.DDSpecification +import okhttp3.HttpUrl +import spock.lang.Timeout + +import javax.management.MBeanServer +import javax.management.ObjectName +import java.lang.management.ManagementFactory + +@Timeout(1) +class TracerFlareJmxTest extends DDSpecification { + static final ObjectName MBEAN_NAME = new ObjectName("datadog.flare:type=TracerFlare") + + final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer() + + TracerFlareService tracerFlareService + + def cleanup() { + if (tracerFlareService != null) { + tracerFlareService.close() + } + } + + private void createTracerFlareService() { + tracerFlareService = new TracerFlareService( + Config.get(), + null, // okHttpClient - not needed for JMX test + HttpUrl.get("http://localhost:8126") + ) + } + + def "TracerFlare MBean is registered when telemetry JMX is enabled"() { + given: + injectSysConfig(TELEMETRY_JMX_ENABLED, "true") + + when: + createTracerFlareService() + + then: + mbs.isRegistered(MBEAN_NAME) + } + + def "TracerFlare MBean is not registered when telemetry JMX is disabled"() { + given: + injectSysConfig(TELEMETRY_JMX_ENABLED, "false") + + when: + createTracerFlareService() + + then: + !mbs.isRegistered(MBEAN_NAME) + } + + def "TracerFlare MBean operations work when JMX is enabled"() { + given: + injectSysConfig(TELEMETRY_JMX_ENABLED, "true") + createTracerFlareService() + + when: + def fileList = mbs.invoke(MBEAN_NAME, "listFlareFiles", null, null) as String + + then: + mbs.isRegistered(MBEAN_NAME) + fileList != null + fileList.contains("flare_info.txt") + fileList.contains("tracer_version.txt") + fileList.contains("initial_config.txt") + } +} \ No newline at end of file From f10c14621e9a330ecdf71a9848ebe2101a2712f8 Mon Sep 17 00:00:00 2001 From: DJ Gregor Date: Tue, 28 Oct 2025 09:32:57 -0700 Subject: [PATCH 5/6] LongRunningTracesTracker: add metric for traces dropped due to sampling priority This likely isn't an important metric to track, but I noticed these traces were the only ones not reflected in existing LongRunningTraces metrics, so I thought it might be good to add for completeness. --- .../datadog/trace/core/LongRunningTracesTracker.java | 5 ++++- .../datadog/trace/core/monitor/HealthMetrics.java | 3 ++- .../trace/core/monitor/TracerHealthMetrics.java | 12 +++++++++++- .../datadog/trace/core/PendingTraceBufferTest.groovy | 2 +- .../trace/core/monitor/HealthMetricsTest.groovy | 3 ++- 5 files changed, 20 insertions(+), 5 deletions(-) diff --git a/dd-trace-core/src/main/java/datadog/trace/core/LongRunningTracesTracker.java b/dd-trace-core/src/main/java/datadog/trace/core/LongRunningTracesTracker.java index c6586d1cfdc..429ea905d2a 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/LongRunningTracesTracker.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/LongRunningTracesTracker.java @@ -36,6 +36,7 @@ public class LongRunningTracesTracker implements TracerFlare.Reporter { private int dropped = 0; private int write = 0; private int expired = 0; + private int droppedSampling = 0; public static final int NOT_TRACKED = -1; public static final int UNDEFINED = 0; @@ -119,6 +120,7 @@ public synchronized void flushAndCompact(long nowMilli) { if (shouldFlush(nowMilli, trace)) { if (negativeOrNullPriority(trace)) { trace.compareAndSetLongRunningState(TRACKED, NOT_TRACKED); + droppedSampling++; cleanSlot(i); continue; } @@ -163,10 +165,11 @@ private boolean negativeOrNullPriority(PendingTrace trace) { } private void flushStats() { - healthMetrics.onLongRunningUpdate(dropped, write, expired); + healthMetrics.onLongRunningUpdate(dropped, write, expired, droppedSampling); dropped = 0; write = 0; expired = 0; + droppedSampling = 0; } public String getTracesAsJson() { diff --git a/dd-trace-core/src/main/java/datadog/trace/core/monitor/HealthMetrics.java b/dd-trace-core/src/main/java/datadog/trace/core/monitor/HealthMetrics.java index ea6ffdebf0b..1988700ad67 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/monitor/HealthMetrics.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/monitor/HealthMetrics.java @@ -71,7 +71,8 @@ public void onSend( public void onFailedSend( final int traceCount, final int sizeInBytes, final RemoteApi.Response response) {} - public void onLongRunningUpdate(final int dropped, final int write, final int expired) {} + public void onLongRunningUpdate( + final int dropped, final int write, final int expired, final int droppedSampling) {} /** * Report that a trace has been used to compute client stats. diff --git a/dd-trace-core/src/main/java/datadog/trace/core/monitor/TracerHealthMetrics.java b/dd-trace-core/src/main/java/datadog/trace/core/monitor/TracerHealthMetrics.java index 6d00ac646a5..3caa6faac27 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/monitor/TracerHealthMetrics.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/monitor/TracerHealthMetrics.java @@ -87,6 +87,7 @@ public class TracerHealthMetrics extends HealthMetrics implements AutoCloseable private final LongAdder longRunningTracesWrite = new LongAdder(); private final LongAdder longRunningTracesDropped = new LongAdder(); private final LongAdder longRunningTracesExpired = new LongAdder(); + private final LongAdder longRunningTracesDroppedSampling = new LongAdder(); private final LongAdder clientStatsProcessedSpans = new LongAdder(); private final LongAdder clientStatsProcessedTraces = new LongAdder(); @@ -295,10 +296,12 @@ public void onFailedSend( } @Override - public void onLongRunningUpdate(final int dropped, final int write, final int expired) { + public void onLongRunningUpdate( + final int dropped, final int write, final int expired, final int droppedSampling) { longRunningTracesWrite.add(write); longRunningTracesDropped.add(dropped); longRunningTracesExpired.add(expired); + longRunningTracesDroppedSampling.add(droppedSampling); } private void onSendAttempt( @@ -476,6 +479,11 @@ public void run(TracerHealthMetrics target) { target.statsd, "long-running.dropped", target.longRunningTracesDropped, NO_TAGS); reportIfChanged( target.statsd, "long-running.expired", target.longRunningTracesExpired, NO_TAGS); + reportIfChanged( + target.statsd, + "long-running.dropped_sampling", + target.longRunningTracesDroppedSampling, + NO_TAGS); reportIfChanged( target.statsd, "stats.traces_in", target.clientStatsProcessedTraces, NO_TAGS); @@ -605,6 +613,8 @@ public String summary() { + longRunningTracesDropped.sum() + "\nlongRunningTracesExpired=" + longRunningTracesExpired.sum() + + "\nlongRunningTracesDroppedSampling=" + + longRunningTracesDroppedSampling.sum() + "\n" + "\nclientStatsRequests=" + clientStatsRequests.sum() diff --git a/dd-trace-core/src/test/groovy/datadog/trace/core/PendingTraceBufferTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/core/PendingTraceBufferTest.groovy index 189e3e95d13..8cbdcc37432 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/core/PendingTraceBufferTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/core/PendingTraceBufferTest.groovy @@ -31,7 +31,7 @@ import static java.nio.charset.StandardCharsets.UTF_8 @Timeout(5) class PendingTraceBufferTest extends DDSpecification { @Subject - def buffer = PendingTraceBuffer.delaying(SystemTimeSource.INSTANCE, Mock(Config), null, null) + def buffer = PendingTraceBuffer.delaying(SystemTimeSource.INSTANCE, Mock(Config), null, HealthMetrics.NO_OP) def bufferSpy = Spy(buffer) def tracer = Mock(CoreTracer) diff --git a/dd-trace-core/src/test/groovy/datadog/trace/core/monitor/HealthMetricsTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/core/monitor/HealthMetricsTest.groovy index 7c6154876b0..406d839a28e 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/core/monitor/HealthMetricsTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/core/monitor/HealthMetricsTest.groovy @@ -400,12 +400,13 @@ class HealthMetricsTest extends Specification { def healthMetrics = new TracerHealthMetrics(new Latched(statsD, latch), 100, TimeUnit.MILLISECONDS) healthMetrics.start() when: - healthMetrics.onLongRunningUpdate(3,10,1) + healthMetrics.onLongRunningUpdate(3,10,1,5) latch.await(10, TimeUnit.SECONDS) then: 1 * statsD.count("long-running.write", 10, _) 1 * statsD.count("long-running.dropped", 3, _) 1 * statsD.count("long-running.expired", 1, _) + 1 * statsD.count("long-running.dropped_sampling", 5, _) cleanup: healthMetrics.close() } From a95fbb018f5fab2f766affdff05d1b8352db4bbc Mon Sep 17 00:00:00 2001 From: DJ Gregor Date: Tue, 28 Oct 2025 09:33:13 -0700 Subject: [PATCH 6/6] PendingTraceBuffer: Keep track of how often we write around the buffer --- .../java/datadog/trace/core/PendingTraceBuffer.java | 3 +++ .../datadog/trace/core/monitor/HealthMetrics.java | 2 ++ .../trace/core/monitor/TracerHealthMetrics.java | 12 ++++++++++++ 3 files changed, 17 insertions(+) diff --git a/dd-trace-core/src/main/java/datadog/trace/core/PendingTraceBuffer.java b/dd-trace-core/src/main/java/datadog/trace/core/PendingTraceBuffer.java index 851a11efdab..4d2954bc93e 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/PendingTraceBuffer.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/PendingTraceBuffer.java @@ -64,6 +64,7 @@ private static class DelayingPendingTraceBuffer extends PendingTraceBuffer { private final MpscBlockingConsumerArrayQueue queue; private final Thread worker; private final TimeSource timeSource; + private final HealthMetrics healthMetrics; private volatile boolean closed = false; private final AtomicInteger flushCounter = new AtomicInteger(0); @@ -102,6 +103,7 @@ public void enqueue(Element pendingTrace) { if (!pendingTrace.writeOnBufferFull()) { return; } + healthMetrics.onPendingWriteAround(); pendingTrace.write(); } } @@ -324,6 +326,7 @@ public DelayingPendingTraceBuffer( this.queue = new MpscBlockingConsumerArrayQueue<>(bufferSize); this.worker = newAgentThread(TRACE_MONITOR, new Worker()); this.timeSource = timeSource; + this.healthMetrics = healthMetrics; boolean runningSpansEnabled = config.isLongRunningTraceEnabled(); this.runningTracesTracker = runningSpansEnabled diff --git a/dd-trace-core/src/main/java/datadog/trace/core/monitor/HealthMetrics.java b/dd-trace-core/src/main/java/datadog/trace/core/monitor/HealthMetrics.java index 1988700ad67..d43bfc9c107 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/monitor/HealthMetrics.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/monitor/HealthMetrics.java @@ -71,6 +71,8 @@ public void onSend( public void onFailedSend( final int traceCount, final int sizeInBytes, final RemoteApi.Response response) {} + public void onPendingWriteAround() {} + public void onLongRunningUpdate( final int dropped, final int write, final int expired, final int droppedSampling) {} diff --git a/dd-trace-core/src/main/java/datadog/trace/core/monitor/TracerHealthMetrics.java b/dd-trace-core/src/main/java/datadog/trace/core/monitor/TracerHealthMetrics.java index 3caa6faac27..47d3de5a122 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/monitor/TracerHealthMetrics.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/monitor/TracerHealthMetrics.java @@ -84,6 +84,8 @@ public class TracerHealthMetrics extends HealthMetrics implements AutoCloseable private final LongAdder scopeCloseErrors = new LongAdder(); private final LongAdder userScopeCloseErrors = new LongAdder(); + private final LongAdder pendingWriteAround = new LongAdder(); + private final LongAdder longRunningTracesWrite = new LongAdder(); private final LongAdder longRunningTracesDropped = new LongAdder(); private final LongAdder longRunningTracesExpired = new LongAdder(); @@ -295,6 +297,11 @@ public void onFailedSend( onSendAttempt(traceCount, sizeInBytes, response); } + @Override + public void onPendingWriteAround() { + pendingWriteAround.increment(); + } + @Override public void onLongRunningUpdate( final int dropped, final int write, final int expired, final int droppedSampling) { @@ -473,6 +480,8 @@ public void run(TracerHealthMetrics target) { reportIfChanged( target.statsd, "scope.user.close.error", target.userScopeCloseErrors, NO_TAGS); + reportIfChanged(target.statsd, "pending.write_around", target.pendingWriteAround, NO_TAGS); + reportIfChanged( target.statsd, "long-running.write", target.longRunningTracesWrite, NO_TAGS); reportIfChanged( @@ -607,6 +616,9 @@ public String summary() { + "\nuserScopeCloseErrors=" + userScopeCloseErrors.sum() + "\n" + + "\npendingWriteAround=" + + pendingWriteAround.sum() + + "\n" + "\nlongRunningTracesWrite=" + longRunningTracesWrite.sum() + "\nlongRunningTracesDropped="