Skip to content

Commit 025bd48

Browse files
committed
fix span lifecycle in the launcher listener
1 parent 6434394 commit 025bd48

File tree

2 files changed

+58
-44
lines changed

2 files changed

+58
-44
lines changed

dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherListener.java

Lines changed: 39 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,12 @@
1111
import org.slf4j.Logger;
1212
import org.slf4j.LoggerFactory;
1313

14+
/**
15+
* Listener for SparkLauncher spans. Tracks the lifecycle of a Spark application submitted via
16+
* SparkLauncher.startApplication(). Only a single launcher span can be active at a time. Subsequent
17+
* calls to startApplication() from the same or different launcher instances will not create spans;
18+
* only the first launch in the JVM is traced
19+
*/
1420
public class SparkLauncherListener implements SparkAppHandle.Listener {
1521

1622
private static final Logger log = LoggerFactory.getLogger(SparkLauncherListener.class);
@@ -81,46 +87,51 @@ public static synchronized void finishSpanWithThrowable(Throwable throwable) {
8187

8288
@Override
8389
public void stateChanged(SparkAppHandle handle) {
84-
SparkAppHandle.State state = handle.getState();
85-
AgentSpan span = launcherSpan;
86-
if (span != null) {
87-
span.setTag("spark.launcher.app_state", state.toString());
88-
89-
String appId = handle.getAppId();
90-
if (appId != null) {
91-
span.setTag("spark.app_id", appId);
92-
span.setTag("app_id", appId);
93-
}
90+
synchronized (SparkLauncherListener.class) {
91+
SparkAppHandle.State state = handle.getState();
92+
AgentSpan span = launcherSpan;
93+
if (span != null) {
94+
span.setTag("spark.launcher.app_state", state.toString());
95+
96+
String appId = handle.getAppId();
97+
if (appId != null) {
98+
span.setTag("spark.app_id", appId);
99+
span.setTag("app_id", appId);
100+
}
94101

95-
if (state.isFinal()) {
96-
if (state == SparkAppHandle.State.FAILED
97-
|| state == SparkAppHandle.State.KILLED
98-
|| state == SparkAppHandle.State.LOST) {
99-
// Set error tags but don't finish yet — RunMainAdvice may add the throwable
100-
// with the full stack trace. The span will be finished by RunMainAdvice or
101-
// the shutdown hook.
102-
span.setError(true);
103-
span.setTag(DDTags.ERROR_TYPE, "Spark Launcher Failed");
104-
span.setTag(DDTags.ERROR_MSG, "Application " + state);
105-
} else {
106-
finishSpan(false, null);
102+
if (state.isFinal()) {
103+
if (state == SparkAppHandle.State.FAILED
104+
|| state == SparkAppHandle.State.KILLED
105+
|| state == SparkAppHandle.State.LOST) {
106+
finishSpan(true, "Application " + state);
107+
} else {
108+
finishSpan(false, null);
109+
}
107110
}
108111
}
109112
}
110113
}
111114

112115
@Override
113116
public void infoChanged(SparkAppHandle handle) {
114-
AgentSpan span = launcherSpan;
115-
if (span != null) {
116-
String appId = handle.getAppId();
117-
if (appId != null) {
118-
span.setTag("spark.app_id", appId);
119-
span.setTag("app_id", appId);
117+
synchronized (SparkLauncherListener.class) {
118+
AgentSpan span = launcherSpan;
119+
if (span != null) {
120+
String appId = handle.getAppId();
121+
if (appId != null) {
122+
span.setTag("spark.app_id", appId);
123+
span.setTag("app_id", appId);
124+
}
120125
}
121126
}
122127
}
123128

129+
/**
130+
* Extract launcher configuration via reflection and set as span tags. Secret redaction uses the
131+
* default pattern only (not spark.redaction.regex) because the SparkLauncher conf map is a plain
132+
* Map, not a SparkConf, so there is no way to read the user's custom redaction regex at this
133+
* point.
134+
*/
124135
private static void setLauncherConfigTags(AgentSpan span, Object launcher) {
125136
try {
126137
Field builderField = launcher.getClass().getSuperclass().getDeclaredField("builder");

dd-java-agent/instrumentation/spark/spark-common/src/test/groovy/datadog/trace/instrumentation/spark/SparkLauncherTest.groovy

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ class SparkLauncherTest extends InstrumentationSpecification {
130130
}
131131
}
132132

133-
def "SparkLauncherListener sets error tags on FAILED state but does not finish span"() {
133+
def "SparkLauncherListener finishes span with error on FAILED state"() {
134134
setup:
135135
SparkLauncherListener.launcherSpan = null
136136
def tracer = AgentTracer.get()
@@ -151,18 +151,22 @@ class SparkLauncherTest extends InstrumentationSpecification {
151151
listener.stateChanged(handle)
152152

153153
then:
154-
// Span stays open so RunMainAdvice can add the throwable
155-
SparkLauncherListener.launcherSpan != null
156-
SparkLauncherListener.launcherSpan.isError()
157-
SparkLauncherListener.launcherSpan.getTags()["error.type"] == "Spark Launcher Failed"
158-
SparkLauncherListener.launcherSpan.getTags()["error.message"] == "Application FAILED"
159-
SparkLauncherListener.launcherSpan.getTags()["spark.app_id"] == "app-456"
160-
161-
cleanup:
162-
SparkLauncherListener.finishSpan(false, null)
154+
SparkLauncherListener.launcherSpan == null
155+
assertTraces(1) {
156+
trace(1) {
157+
span {
158+
operationName "spark.launcher.launch"
159+
spanType "spark"
160+
errored true
161+
assert span.tags["error.type"] == "Spark Launcher Failed"
162+
assert span.tags["error.message"] == "Application FAILED"
163+
assert span.tags["spark.app_id"] == "app-456"
164+
}
165+
}
166+
}
163167
}
164168

165-
def "finishSpanWithThrowable adds stack trace after FAILED state"() {
169+
def "finishSpanWithThrowable is no-op after span already finished"() {
166170
setup:
167171
SparkLauncherListener.launcherSpan = null
168172
def tracer = AgentTracer.get()
@@ -178,10 +182,11 @@ class SparkLauncherTest extends InstrumentationSpecification {
178182
def handle = Mock(SparkAppHandle)
179183

180184
when:
181-
// Simulate: listener sets error tags, then RunMainAdvice finishes with throwable
185+
// Listener finishes the span on FAILED, then RunMainAdvice tries to finish again
182186
handle.getState() >> SparkAppHandle.State.FAILED
183187
handle.getAppId() >> "app-456"
184188
listener.stateChanged(handle)
189+
// This should be a no-op since the span is already finished
185190
SparkLauncherListener.finishSpanWithThrowable(new RuntimeException("job crashed"))
186191

187192
then:
@@ -192,10 +197,8 @@ class SparkLauncherTest extends InstrumentationSpecification {
192197
operationName "spark.launcher.launch"
193198
spanType "spark"
194199
errored true
195-
assert span.tags["error.type"] == "java.lang.RuntimeException"
196-
assert span.tags["error.message"] == "job crashed"
197-
assert span.tags["error.stack"] != null
198-
assert span.tags["spark.app_id"] == "app-456"
200+
assert span.tags["error.type"] == "Spark Launcher Failed"
201+
assert span.tags["error.message"] == "Application FAILED"
199202
}
200203
}
201204
}

0 commit comments

Comments
 (0)