Skip to content

Commit 4e5bdc7

Browse files
committed
Track external accumulators in tracer instead of using SparkInfo values
1 parent d10055d commit 4e5bdc7

File tree

5 files changed

+130
-15
lines changed

5 files changed

+130
-15
lines changed

dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.apache.spark.ExceptionFailure;
4141
import org.apache.spark.SparkConf;
4242
import org.apache.spark.TaskFailedReason;
43+
import org.apache.spark.executor.TaskMetrics;
4344
import org.apache.spark.scheduler.AccumulableInfo;
4445
import org.apache.spark.scheduler.JobFailed;
4546
import org.apache.spark.scheduler.SparkListener;
@@ -64,6 +65,7 @@
6465
import org.apache.spark.sql.streaming.StateOperatorProgress;
6566
import org.apache.spark.sql.streaming.StreamingQueryListener;
6667
import org.apache.spark.sql.streaming.StreamingQueryProgress;
68+
import org.apache.spark.util.AccumulatorV2;
6769
import org.slf4j.Logger;
6870
import org.slf4j.LoggerFactory;
6971
import scala.Tuple2;
@@ -127,8 +129,10 @@ public abstract class AbstractDatadogSparkListener extends SparkListener {
127129
private final HashMap<String, SparkListenerExecutorAdded> liveExecutors = new HashMap<>();
128130

129131
// There is no easy way to know if an accumulator is not useful anymore (meaning it is not part of
130-
// an active SQL query)
131-
// so capping the size of the collection storing them
132+
// an active SQL query) so capping the size of the collection storing them
133+
// TODO (CY): Is this potentially the reason why some Spark Plans aren't showing up consistently?
134+
// If we know we don't need the accumulator values, can we drop all associated data and just map
135+
// stage ID -> accumulator ID? Put this behind some FF
132136
private final Map<Long, SparkSQLUtils.AccumulatorWithStage> accumulators =
133137
new RemoveEldestHashMap<>(MAX_ACCUMULATOR_SIZE);
134138

@@ -151,6 +155,8 @@ public abstract class AbstractDatadogSparkListener extends SparkListener {
151155
public AbstractDatadogSparkListener(SparkConf sparkConf, String appId, String sparkVersion) {
152156
tracer = AgentTracer.get();
153157

158+
log.error("[CHARLES] HELLO WORLD");
159+
154160
this.sparkConf = sparkConf;
155161
this.appId = appId;
156162
this.sparkVersion = sparkVersion;
@@ -229,6 +235,9 @@ public void setupOpenLineage(DDTraceId traceId) {
229235
/** Parent Ids of a Stage. Provide an implementation based on a specific scala version */
230236
protected abstract int[] getStageParentIds(StageInfo info);
231237

238+
/** All External Accumulators associated with a given task. Provide an implementation based on a specific scala version */
239+
protected abstract List<AccumulatorV2> getExternalAccumulators(TaskMetrics metrics);
240+
232241
@Override
233242
public synchronized void onApplicationStart(SparkListenerApplicationStart applicationStart) {
234243
this.applicationStart = applicationStart;
@@ -670,7 +679,8 @@ public synchronized void onStageCompleted(SparkListenerStageCompleted stageCompl
670679

671680
SparkPlanInfo sqlPlan = sqlPlans.get(sqlExecutionId);
672681
if (sqlPlan != null) {
673-
SparkSQLUtils.addSQLPlanToStageSpan(span, sqlPlan, accumulators, stageId);
682+
SparkSQLUtils.addSQLPlanToStageSpan(span, sqlPlan, accumulators, stageMetric, stageId);
683+
log.info("[CHARLES]", span.getTag("_dd.spark.sql_plan"));
674684
}
675685

676686
span.finish(completionTimeMs * 1000);
@@ -684,7 +694,9 @@ public void onTaskEnd(SparkListenerTaskEnd taskEnd) {
684694

685695
SparkAggregatedTaskMetrics stageMetric = stageMetrics.get(stageSpanKey);
686696
if (stageMetric != null) {
687-
stageMetric.addTaskMetrics(taskEnd);
697+
// Not happy that we have to extract external accumulators here, but needed as we're dealing with Seq
698+
// which varies across Scala versions
699+
stageMetric.addTaskMetrics(taskEnd, getExternalAccumulators(taskEnd.taskMetrics()));
688700
}
689701

690702
if (taskEnd.taskMetrics() != null) {

dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkAggregatedTaskMetrics.java

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,20 @@
11
package datadog.trace.instrumentation.spark;
22

3+
import com.fasterxml.jackson.core.JsonGenerator;
34
import datadog.metrics.api.Histogram;
45
import datadog.trace.api.Config;
56
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
7+
import java.io.IOException;
68
import java.nio.ByteBuffer;
79
import java.util.Base64;
10+
import java.util.HashMap;
11+
import java.util.List;
12+
import java.util.Map;
813
import org.apache.spark.TaskFailedReason;
914
import org.apache.spark.executor.TaskMetrics;
1015
import org.apache.spark.scheduler.SparkListenerTaskEnd;
16+
import org.apache.spark.sql.execution.metric.SQLMetricInfo;
17+
import org.apache.spark.util.AccumulatorV2;
1118

1219
class SparkAggregatedTaskMetrics {
1320
private static final double HISTOGRAM_RELATIVE_ACCURACY = 1 / 32.0;
@@ -59,13 +66,16 @@ class SparkAggregatedTaskMetrics {
5966
private Histogram shuffleWriteBytesHistogram;
6067
private Histogram diskBytesSpilledHistogram;
6168

69+
// Used for Spark SQL Plan metrics ONLY, don't put in regular span for now
70+
private Map<Long, Histogram> externalAccumulableHistograms;
71+
6272
public SparkAggregatedTaskMetrics() {}
6373

6474
public SparkAggregatedTaskMetrics(long availableExecutorTime) {
6575
this.previousAvailableExecutorTime = availableExecutorTime;
6676
}
6777

68-
public void addTaskMetrics(SparkListenerTaskEnd taskEnd) {
78+
public void addTaskMetrics(SparkListenerTaskEnd taskEnd, List<AccumulatorV2> externalAccumulators) {
6979
taskCompletedCount += 1;
7080

7181
if (taskEnd.taskInfo().attemptNumber() > 0) {
@@ -127,6 +137,24 @@ public void addTaskMetrics(SparkListenerTaskEnd taskEnd) {
127137
shuffleWriteBytesHistogram, taskMetrics.shuffleWriteMetrics().bytesWritten());
128138
diskBytesSpilledHistogram =
129139
lazyHistogramAccept(diskBytesSpilledHistogram, taskMetrics.diskBytesSpilled());
140+
141+
// TODO (CY): Should we also look at TaskInfo accumulable update values as a backup? Is that
142+
// only needed for SHS?
143+
if (externalAccumulators != null && !externalAccumulators.isEmpty()) {
144+
if (externalAccumulableHistograms == null) {
145+
externalAccumulableHistograms = new HashMap<>(externalAccumulators.size());
146+
}
147+
148+
externalAccumulators.forEach(acc -> {
149+
Histogram hist = externalAccumulableHistograms.get(acc.id());
150+
try {
151+
// As of spark 3.5, all SQL metrics are Long, safeguard if it changes in new
152+
// versions
153+
externalAccumulableHistograms.put(
154+
acc.id(), lazyHistogramAccept(hist, (Long) acc.value()));
155+
} catch (ClassCastException ignored) {}
156+
});
157+
}
130158
}
131159
}
132160
}
@@ -276,6 +304,19 @@ private Histogram lazyHistogramAccept(Histogram hist, double value) {
276304
return hist;
277305
}
278306

307+
// Used to put external accum metrics to JSON for Spark SQL plans
308+
public void externalAccumToJson(JsonGenerator generator, SQLMetricInfo info) throws IOException {
309+
Histogram hist = externalAccumulableHistograms.get(info.accumulatorId());
310+
String name = info.name();
311+
312+
if (name != null && hist != null) {
313+
generator.writeStartObject();
314+
generator.writeStringField(name, histogramToBase64(hist));
315+
generator.writeStringField("type", info.metricType());
316+
generator.writeEndObject();
317+
}
318+
}
319+
279320
public static long computeTaskRunTime(TaskMetrics metrics) {
280321
return metrics.executorDeserializeTime()
281322
+ metrics.executorRunTime()

dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkSQLUtils.java

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ public static void addSQLPlanToStageSpan(
2424
AgentSpan span,
2525
SparkPlanInfo plan,
2626
Map<Long, AccumulatorWithStage> accumulators,
27+
SparkAggregatedTaskMetrics stageMetric,
2728
int stageId) {
2829
Set<Integer> parentStageIds = new HashSet<>();
2930
SparkPlanInfoForStage planForStage =
@@ -32,7 +33,7 @@ public static void addSQLPlanToStageSpan(
3233
span.setTag("_dd.spark.sql_parent_stage_ids", parentStageIds.toString());
3334

3435
if (planForStage != null) {
35-
String json = planForStage.toJson(accumulators);
36+
String json = planForStage.toJson(stageMetric);
3637
span.setTag("_dd.spark.sql_plan", json);
3738
}
3839
}
@@ -143,15 +144,15 @@ public SparkPlanInfoForStage(SparkPlanInfo plan, List<SparkPlanInfoForStage> chi
143144
this.children = children;
144145
}
145146

146-
public String toJson(Map<Long, AccumulatorWithStage> accumulators) {
147+
public String toJson(SparkAggregatedTaskMetrics stageMetric) {
147148
// Using the jackson JSON lib used by spark
148149
// https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.12/3.5.0
149150
ObjectMapper mapper =
150151
new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
151152

152153
ByteArrayOutputStream baos = new ByteArrayOutputStream();
153154
try (JsonGenerator generator = mapper.getFactory().createGenerator(baos)) {
154-
this.toJson(generator, accumulators, mapper);
155+
this.toJson(generator, mapper, stageMetric);
155156
} catch (IOException e) {
156157
return null;
157158
}
@@ -160,7 +161,7 @@ public String toJson(Map<Long, AccumulatorWithStage> accumulators) {
160161
}
161162

162163
private void toJson(
163-
JsonGenerator generator, Map<Long, AccumulatorWithStage> accumulators, ObjectMapper mapper)
164+
JsonGenerator generator, ObjectMapper mapper, SparkAggregatedTaskMetrics stageMetric)
164165
throws IOException {
165166
generator.writeStartObject();
166167
generator.writeStringField("node", plan.nodeName());
@@ -199,11 +200,7 @@ private void toJson(
199200
generator.writeFieldName("metrics");
200201
generator.writeStartArray();
201202
for (SQLMetricInfo metric : metrics) {
202-
long accumulatorId = metric.accumulatorId();
203-
AccumulatorWithStage acc = accumulators.get(accumulatorId);
204-
if (acc != null) {
205-
acc.toJson(generator, metric);
206-
}
203+
stageMetric.externalAccumToJson(generator, metric);
207204
}
208205
generator.writeEndArray();
209206
}
@@ -213,7 +210,7 @@ private void toJson(
213210
generator.writeFieldName("children");
214211
generator.writeStartArray();
215212
for (SparkPlanInfoForStage child : children) {
216-
child.toJson(generator, accumulators, mapper);
213+
child.toJson(generator, mapper, stageMetric);
217214
}
218215
generator.writeEndArray();
219216
}

dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/DatadogSpark212Listener.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,22 @@
11
package datadog.trace.instrumentation.spark;
22

3+
import java.lang.invoke.MethodHandle;
34
import java.util.ArrayList;
45
import java.util.Collection;
56
import java.util.List;
7+
import java.util.Map;
8+
import datadog.trace.util.MethodHandles;
69
import org.apache.spark.SparkConf;
10+
import org.apache.spark.executor.TaskMetrics;
711
import org.apache.spark.scheduler.SparkListenerJobStart;
812
import org.apache.spark.scheduler.StageInfo;
913
import org.apache.spark.sql.execution.SparkPlanInfo;
1014
import org.apache.spark.sql.execution.metric.SQLMetricInfo;
15+
import org.apache.spark.util.AccumulatorV2;
16+
import scala.Function;
17+
import scala.Function1;
1118
import scala.collection.JavaConverters;
19+
import scala.collection.mutable.ArrayBuffer;
1220

1321
/**
1422
* DatadogSparkListener compiled for Scala 2.12
@@ -17,6 +25,10 @@
1725
* compiled with the specific scala version
1826
*/
1927
public class DatadogSpark212Listener extends AbstractDatadogSparkListener {
28+
private static final MethodHandles methodLoader = new MethodHandles(ClassLoader.getSystemClassLoader());
29+
private static final MethodHandle externalAccums = methodLoader.method(TaskMetrics.class, "externalAccums");
30+
private static final MethodHandle withExternalAccums = methodLoader.method(TaskMetrics.class, "withExternalAccums", new Class[] {});
31+
2032
public DatadogSpark212Listener(SparkConf sparkConf, String appId, String sparkVersion) {
2133
super(sparkConf, appId, sparkVersion);
2234
}
@@ -62,4 +74,25 @@ protected int[] getStageParentIds(StageInfo info) {
6274

6375
return parentIds;
6476
}
77+
78+
@Override
79+
protected List<AccumulatorV2> getExternalAccumulators(TaskMetrics metrics) {
80+
if (metrics == null) {
81+
return null;
82+
}
83+
84+
Function1 lambda = (Function1<ArrayBuffer<AccumulatorV2>, List<AccumulatorV2>>) accumulators -> JavaConverters.seqAsJavaList(accumulators);
85+
List<AccumulatorV2> res = methodLoader.invoke(withExternalAccums, metrics, lambda);
86+
if (res != null) {
87+
return res;
88+
}
89+
90+
// withExternalAccums didn't work, try the legacy method
91+
ArrayBuffer<AccumulatorV2> accumulators = methodLoader.invoke(externalAccums, metrics);
92+
if (accumulators != null && !accumulators.isEmpty()) {
93+
return JavaConverters.seqAsJavaList(accumulators);
94+
}
95+
96+
return null;
97+
}
6598
}

dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/DatadogSpark213Listener.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,20 @@
11
package datadog.trace.instrumentation.spark;
22

3+
import java.lang.invoke.MethodHandle;
34
import java.util.ArrayList;
45
import java.util.Collection;
56
import java.util.List;
7+
import datadog.trace.util.MethodHandles;
68
import org.apache.spark.SparkConf;
9+
import org.apache.spark.executor.TaskMetrics;
710
import org.apache.spark.scheduler.SparkListenerJobStart;
811
import org.apache.spark.scheduler.StageInfo;
912
import org.apache.spark.sql.execution.SparkPlanInfo;
1013
import org.apache.spark.sql.execution.metric.SQLMetricInfo;
14+
import org.apache.spark.util.AccumulatorV2;
15+
import scala.Function1;
16+
import scala.collection.JavaConverters;
17+
import scala.collection.mutable.ArrayBuffer;
1118
import scala.jdk.javaapi.CollectionConverters;
1219

1320
/**
@@ -17,6 +24,10 @@
1724
* compiled with the specific scala version
1825
*/
1926
public class DatadogSpark213Listener extends AbstractDatadogSparkListener {
27+
private static final MethodHandles methodLoader = new MethodHandles(ClassLoader.getSystemClassLoader());
28+
private static final MethodHandle externalAccums = methodLoader.method(TaskMetrics.class, "externalAccums");
29+
private static final MethodHandle withExternalAccums = methodLoader.method(TaskMetrics.class, "withExternalAccums", new Class[] {});
30+
2031
public DatadogSpark213Listener(SparkConf sparkConf, String appId, String sparkVersion) {
2132
super(sparkConf, appId, sparkVersion);
2233
}
@@ -62,4 +73,25 @@ protected int[] getStageParentIds(StageInfo info) {
6273

6374
return parentIds;
6475
}
76+
77+
@Override
78+
protected List<AccumulatorV2> getExternalAccumulators(TaskMetrics metrics) {
79+
if (metrics == null) {
80+
return null;
81+
}
82+
83+
Function1 lambda = (Function1<ArrayBuffer<AccumulatorV2>, List<AccumulatorV2>>) accumulators -> CollectionConverters.asJava(accumulators);
84+
List<AccumulatorV2> res = methodLoader.invoke(withExternalAccums, metrics, lambda);
85+
if (res != null) {
86+
return res;
87+
}
88+
89+
// withExternalAccums didn't work, try the legacy method
90+
ArrayBuffer<AccumulatorV2> accumulators = methodLoader.invoke(externalAccums, metrics);
91+
if (accumulators != null && !accumulators.isEmpty()) {
92+
return CollectionConverters.asJava(accumulators);
93+
}
94+
95+
return null;
96+
}
6597
}

0 commit comments

Comments
 (0)