Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,9 @@
import com.squareup.moshi.JsonAdapter;
import com.squareup.moshi.Moshi;
import com.squareup.moshi.Types;
import datadog.trace.api.flare.TracerFlare;
import datadog.trace.core.DDSpan;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.zip.ZipOutputStream;

public class TraceDumpJsonExporter implements Writer {

Expand All @@ -17,11 +14,9 @@ public class TraceDumpJsonExporter implements Writer {
.add(DDSpanJsonAdapter.buildFactory(false))
.build()
.adapter(Types.newParameterizedType(Collection.class, DDSpan.class));
private StringBuilder dumpText;
private ZipOutputStream zip;
private final StringBuilder dumpText;

public TraceDumpJsonExporter(ZipOutputStream zip) {
this.zip = zip;
public TraceDumpJsonExporter() {
dumpText = new StringBuilder();
}

Expand All @@ -32,7 +27,8 @@ public void write(final Collection<DDSpan> trace) {

@Override
public void write(List<DDSpan> trace) {
// Do nothing
Collection<DDSpan> collectionTrace = trace;
write(collectionTrace);
}

@Override
Expand All @@ -42,14 +38,14 @@ public void start() {

@Override
public boolean flush() {
try {
TracerFlare.addText(zip, "pending_traces.txt", dumpText.toString());
} catch (IOException e) {
// do nothing
}
// do nothing
return true;
}

public String getDumpJson() {
return dumpText.toString();
}

@Override
public void close() {
// do nothing
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,28 @@
package datadog.trace.core;

import static java.util.Comparator.comparingLong;

import datadog.communication.ddagent.DDAgentFeaturesDiscovery;
import datadog.communication.ddagent.SharedCommunicationObjects;
import datadog.trace.api.Config;
import datadog.trace.api.flare.TracerFlare;
import datadog.trace.common.writer.TraceDumpJsonExporter;
import datadog.trace.core.monitor.HealthMetrics;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.zip.ZipOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LongRunningTracesTracker implements TracerFlare.Reporter {
private static final Logger LOGGER = LoggerFactory.getLogger(LongRunningTracesTracker.class);
private static final int MAX_DUMPED_TRACES = 50;
private static final Comparator<PendingTrace> TRACE_BY_START_TIME =
comparingLong(PendingTrace::getRunningTraceStartTime);

public class LongRunningTracesTracker {
private final DDAgentFeaturesDiscovery features;
private final HealthMetrics healthMetrics;
private long lastFlushMilli = 0;
Expand Down Expand Up @@ -41,6 +55,8 @@ public LongRunningTracesTracker(
(int) TimeUnit.SECONDS.toMillis(config.getLongRunningTraceFlushInterval());
this.features = sharedCommunicationObjects.featuresDiscovery(config);
this.healthMetrics = healthMetrics;

TracerFlare.addReporter(this);
}

public boolean add(PendingTraceBuffer.Element element) {
Expand All @@ -56,7 +72,7 @@ public boolean add(PendingTraceBuffer.Element element) {
return true;
}

private void addTrace(PendingTrace trace) {
private synchronized void addTrace(PendingTrace trace) {
if (trace.empty()) {
return;
}
Expand All @@ -67,7 +83,7 @@ private void addTrace(PendingTrace trace) {
traceArray.add(trace);
}

public void flushAndCompact(long nowMilli) {
public synchronized void flushAndCompact(long nowMilli) {
if (nowMilli < lastFlushMilli + TimeUnit.SECONDS.toMillis(1)) {
return;
}
Expand Down Expand Up @@ -139,4 +155,25 @@ private void flushStats() {
write = 0;
expired = 0;
}

public String getTracesAsJson() {
try (TraceDumpJsonExporter writer = new TraceDumpJsonExporter()) {
List<PendingTrace> traces;
synchronized (this) {
traces = new ArrayList<>(traceArray);
}
traces.sort(TRACE_BY_START_TIME);

int limit = Math.min(traces.size(), MAX_DUMPED_TRACES);
for (int i = 0; i < limit; i++) {
writer.write(traces.get(i).getSpans());
}
return writer.getDumpJson();
}
}

@Override
public void addReportToFlare(ZipOutputStream zip) throws IOException {
TracerFlare.addText(zip, "long_running_traces.txt", getTracesAsJson());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,31 @@ private static class DelayingPendingTraceBuffer extends PendingTraceBuffer {
private final MpscBlockingConsumerArrayQueue<Element> queue;
private final Thread worker;
private final TimeSource timeSource;
private final HealthMetrics healthMetrics;

private volatile boolean closed = false;
private final AtomicInteger flushCounter = new AtomicInteger(0);
private final AtomicInteger dumpCounter = new AtomicInteger(0);

private final LongRunningTracesTracker runningTracesTracker;

private void dumpTraces() {
if (worker.isAlive()) {
int count = dumpCounter.get();
int loop = 1;
boolean signaled = queue.offer(DUMP_ELEMENT);
while (!closed && !signaled) {
yieldOrSleep(loop++);
signaled = queue.offer(DUMP_ELEMENT);
}
int newCount = dumpCounter.get();
while (!closed && count >= newCount) {
yieldOrSleep(loop++);
newCount = dumpCounter.get();
}
}
}

public boolean longRunningSpansEnabled() {
return runningTracesTracker != null;
}
Expand All @@ -85,6 +103,7 @@ public void enqueue(Element pendingTrace) {
if (!pendingTrace.writeOnBufferFull()) {
return;
}
healthMetrics.onPendingWriteAround();
pendingTrace.write();
}
}
Expand Down Expand Up @@ -136,6 +155,18 @@ public void flush() {
}
}

private String getDumpJson() {
try (TraceDumpJsonExporter writer = new TraceDumpJsonExporter()) {
for (Element e : DumpDrain.DUMP_DRAIN.collectTraces()) {
if (e instanceof PendingTrace) {
PendingTrace trace = (PendingTrace) e;
writer.write(trace.getSpans());
}
}
return writer.getDumpJson();
}
}

private static final class WriteDrain implements MessagePassingQueue.Consumer<Element> {
private static final WriteDrain WRITE_DRAIN = new WriteDrain();

Expand Down Expand Up @@ -295,6 +326,7 @@ public DelayingPendingTraceBuffer(
this.queue = new MpscBlockingConsumerArrayQueue<>(bufferSize);
this.worker = newAgentThread(TRACE_MONITOR, new Worker());
this.timeSource = timeSource;
this.healthMetrics = healthMetrics;
boolean runningSpansEnabled = config.isLongRunningTraceEnabled();
this.runningTracesTracker =
runningSpansEnabled
Expand Down Expand Up @@ -354,32 +386,12 @@ private TracerDump(DelayingPendingTraceBuffer buffer) {

@Override
public void prepareForFlare() {
if (buffer.worker.isAlive()) {
int count = buffer.dumpCounter.get();
int loop = 1;
boolean signaled = buffer.queue.offer(DelayingPendingTraceBuffer.DUMP_ELEMENT);
while (!buffer.closed && !signaled) {
buffer.yieldOrSleep(loop++);
signaled = buffer.queue.offer(DelayingPendingTraceBuffer.DUMP_ELEMENT);
}
int newCount = buffer.dumpCounter.get();
while (!buffer.closed && count >= newCount) {
buffer.yieldOrSleep(loop++);
newCount = buffer.dumpCounter.get();
}
}
buffer.dumpTraces();
}

@Override
public void addReportToFlare(ZipOutputStream zip) throws IOException {
TraceDumpJsonExporter writer = new TraceDumpJsonExporter(zip);
for (Element e : DelayingPendingTraceBuffer.DumpDrain.DUMP_DRAIN.collectTraces()) {
if (e instanceof PendingTrace) {
PendingTrace trace = (PendingTrace) e;
writer.write(trace.getSpans());
}
}
writer.flush();
TracerFlare.addText(zip, "pending_traces.txt", buffer.getDumpJson());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ public void onSend(
public void onFailedSend(
final int traceCount, final int sizeInBytes, final RemoteApi.Response response) {}

public void onPendingWriteAround() {}

public void onLongRunningUpdate(final int dropped, final int write, final int expired) {}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ public class TracerHealthMetrics extends HealthMetrics implements AutoCloseable
private final LongAdder scopeCloseErrors = new LongAdder();
private final LongAdder userScopeCloseErrors = new LongAdder();

private final LongAdder pendingWriteAround = new LongAdder();

private final LongAdder longRunningTracesWrite = new LongAdder();
private final LongAdder longRunningTracesDropped = new LongAdder();
private final LongAdder longRunningTracesExpired = new LongAdder();
Expand Down Expand Up @@ -295,6 +297,11 @@ public void onFailedSend(
onSendAttempt(traceCount, sizeInBytes, response);
}

@Override
public void onPendingWriteAround() {
pendingWriteAround.increment();
}

@Override
public void onLongRunningUpdate(final int dropped, final int write, final int expired) {
longRunningTracesWrite.add(write);
Expand Down Expand Up @@ -473,6 +480,8 @@ public void run(TracerHealthMetrics target) {
reportIfChanged(
target.statsd, "scope.user.close.error", target.userScopeCloseErrors, NO_TAGS);

reportIfChanged(target.statsd, "pending.write_around", target.pendingWriteAround, NO_TAGS);

reportIfChanged(
target.statsd, "long-running.write", target.longRunningTracesWrite, NO_TAGS);
reportIfChanged(
Expand Down Expand Up @@ -602,6 +611,9 @@ public String summary() {
+ "\nuserScopeCloseErrors="
+ userScopeCloseErrors.sum()
+ "\n"
+ "\npendingWriteAround="
+ pendingWriteAround.sum()
+ "\n"
+ "\nlongRunningTracesWrite="
+ longRunningTracesWrite.sum()
+ "\nlongRunningTracesDropped="
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,4 +201,43 @@ class LongRunningTracesTrackerTest extends DDSpecification {
PrioritySampling.USER_KEEP | 1 | LongRunningTracesTracker.WRITE_RUNNING_SPANS
PrioritySampling.SAMPLER_KEEP | 1 | LongRunningTracesTracker.WRITE_RUNNING_SPANS
}

def "getTracesAsJson with no traces"() {
when:
def json = tracker.getTracesAsJson()

then:
json == ""
}

def "getTracesAsJson with traces"() {
given:
def trace = newTraceToTrack()
tracker.add(trace)

when:
def json = tracker.getTracesAsJson()

then:
json != null
!json.isEmpty()
json.contains('"service"')
json.contains('"name"')
}

def "testing tracer flare dump with trace"() {
given:
def trace = newTraceToTrack()
tracker.add(trace)

when:
def entries = PendingTraceBufferTest.buildAndExtractZip()

then:
entries.containsKey("long_running_traces.txt")

def jsonContent = entries["long_running_traces.txt"] as String
jsonContent.contains('"service"')
jsonContent.contains('"name"')
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,7 @@ class PendingTraceBufferTest extends DDSpecification {
return DDSpan.create("test", 0, context, null)
}

def buildAndExtractZip() {
static buildAndExtractZip() {
TracerFlare.prepareForFlare()
def out = new ByteArrayOutputStream()
try (ZipOutputStream zip = new ZipOutputStream(out)) {
Expand Down