-
Notifications
You must be signed in to change notification settings - Fork 327
initial spark launcher instrumentation #10629
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
gh-worker-dd-mergequeue-cf854d
merged 21 commits into
master
from
adrien.boitreaud/spark-launcher-instrumentation
Feb 23, 2026
Merged
Changes from all commits
Commits
Show all changes
21 commits
Select commit
Hold shift + click to select a range
45057c5
initial spark launcher instrumentation
aboitreaud ae18996
use ddtags
aboitreaud edbea75
Fix tess
aboitreaud 9794da8
move test to the right /test dir
aboitreaud bf99260
advice should be public
aboitreaud 74326e0
finish launcher span with error via RunMainAdvice
aboitreaud 83bdee0
sportLess
aboitreaud b6406c2
synchronize shutdown hook
aboitreaud 5c57154
Capture more spark relevant attrs
aboitreaud f7d45ac
Update tests with new attrs
aboitreaud 3f0d8a0
fix sportBugsMain and muzzle
aboitreaud 559ce6c
remove SparkLauncher.launch() instrumentation
aboitreaud 725bbf0
share common config key redaction method
aboitreaud f57bf18
make public to avoid IllegalAccessError
aboitreaud c02607d
error type and error message
aboitreaud 95c6c74
Add appId and stack trace
aboitreaud 9c973ac
extract span building in SparkLaunchListener
aboitreaud 4b9b225
wait for throwable and let the span be finished by shutdown hook
aboitreaud 6434394
spotless apply
aboitreaud 7f4844a
fix span lifecycle in the launcher listener
aboitreaud 7c0d7ee
Merge branch 'master' into adrien.boitreaud/spark-launcher-instrument…
aboitreaud File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
66 changes: 66 additions & 0 deletions
66
...ommon/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherInstrumentation.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,66 @@ | ||
| package datadog.trace.instrumentation.spark; | ||
|
|
||
| import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; | ||
| import static net.bytebuddy.matcher.ElementMatchers.isDeclaredBy; | ||
| import static net.bytebuddy.matcher.ElementMatchers.isMethod; | ||
|
|
||
| import com.google.auto.service.AutoService; | ||
| import datadog.trace.agent.tooling.Instrumenter; | ||
| import datadog.trace.agent.tooling.InstrumenterModule; | ||
| import datadog.trace.api.InstrumenterConfig; | ||
| import net.bytebuddy.asm.Advice; | ||
| import org.apache.spark.launcher.SparkAppHandle; | ||
|
|
||
| @AutoService(InstrumenterModule.class) | ||
| public class SparkLauncherInstrumentation extends InstrumenterModule.Tracing | ||
| implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice { | ||
|
|
||
| public SparkLauncherInstrumentation() { | ||
| super("spark-launcher"); | ||
| } | ||
|
|
||
| @Override | ||
| protected boolean defaultEnabled() { | ||
| return InstrumenterConfig.get().isDataJobsEnabled(); | ||
| } | ||
|
|
||
| @Override | ||
| public String instrumentedType() { | ||
| return "org.apache.spark.launcher.SparkLauncher"; | ||
| } | ||
|
|
||
| @Override | ||
| public String[] helperClassNames() { | ||
| return new String[] { | ||
| packageName + ".SparkConfAllowList", packageName + ".SparkLauncherListener", | ||
| }; | ||
| } | ||
|
|
||
| @Override | ||
| public void methodAdvice(MethodTransformer transformer) { | ||
| transformer.applyAdvice( | ||
| isMethod() | ||
| .and(named("startApplication")) | ||
| .and(isDeclaredBy(named("org.apache.spark.launcher.SparkLauncher"))), | ||
| SparkLauncherInstrumentation.class.getName() + "$StartApplicationAdvice"); | ||
| } | ||
|
|
||
| public static class StartApplicationAdvice { | ||
| @Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class) | ||
| public static void exit( | ||
| @Advice.This Object launcher, | ||
| @Advice.Return SparkAppHandle handle, | ||
| @Advice.Thrown Throwable throwable) { | ||
| SparkLauncherListener.createLauncherSpan(launcher); | ||
|
|
||
| if (throwable != null) { | ||
| SparkLauncherListener.finishSpanWithThrowable(throwable); | ||
| return; | ||
| } | ||
|
|
||
| if (handle != null) { | ||
| handle.addListener(new SparkLauncherListener()); | ||
| } | ||
| } | ||
| } | ||
| } |
192 changes: 192 additions & 0 deletions
192
...spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherListener.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,192 @@ | ||
| package datadog.trace.instrumentation.spark; | ||
|
|
||
| import datadog.trace.api.DDTags; | ||
| import datadog.trace.api.sampling.PrioritySampling; | ||
| import datadog.trace.api.sampling.SamplingMechanism; | ||
| import datadog.trace.bootstrap.instrumentation.api.AgentSpan; | ||
| import datadog.trace.bootstrap.instrumentation.api.AgentTracer; | ||
| import java.lang.reflect.Field; | ||
| import java.util.Map; | ||
| import org.apache.spark.launcher.SparkAppHandle; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| /** | ||
| * Listener for SparkLauncher spans. Tracks the lifecycle of a Spark application submitted via | ||
| * SparkLauncher.startApplication(). Only a single launcher span can be active at a time. Subsequent | ||
| * calls to startApplication() from the same or different launcher instances will not create spans; | ||
| * only the first launch in the JVM is traced | ||
| */ | ||
| public class SparkLauncherListener implements SparkAppHandle.Listener { | ||
|
|
||
| private static final Logger log = LoggerFactory.getLogger(SparkLauncherListener.class); | ||
|
|
||
| static volatile AgentSpan launcherSpan; | ||
|
|
||
| private static volatile boolean shutdownHookRegistered = false; | ||
|
|
||
| public static synchronized void createLauncherSpan(Object launcher) { | ||
| if (launcherSpan != null) { | ||
| return; | ||
| } | ||
|
|
||
| AgentTracer.TracerAPI tracer = AgentTracer.get(); | ||
| AgentSpan span = | ||
| tracer | ||
| .buildSpan("spark.launcher.launch") | ||
| .withSpanType("spark") | ||
| .withResourceName("SparkLauncher.startApplication") | ||
| .start(); | ||
| span.setSamplingPriority(PrioritySampling.USER_KEEP, SamplingMechanism.DATA_JOBS); | ||
| setLauncherConfigTags(span, launcher); | ||
| launcherSpan = span; | ||
|
|
||
| if (!shutdownHookRegistered) { | ||
| shutdownHookRegistered = true; | ||
| Runtime.getRuntime() | ||
| .addShutdownHook( | ||
| new Thread( | ||
| () -> { | ||
| synchronized (SparkLauncherListener.class) { | ||
| AgentSpan s = launcherSpan; | ||
| if (s != null) { | ||
| log.info("Finishing spark.launcher span from shutdown hook"); | ||
| s.finish(); | ||
| launcherSpan = null; | ||
| } | ||
| } | ||
| })); | ||
| } | ||
| } | ||
|
|
||
| public static synchronized void finishSpan(boolean isError, String errorMessage) { | ||
| AgentSpan span = launcherSpan; | ||
| if (span == null) { | ||
| return; | ||
| } | ||
| if (isError) { | ||
| span.setError(true); | ||
| span.setTag(DDTags.ERROR_TYPE, "Spark Launcher Failed"); | ||
| span.setTag(DDTags.ERROR_MSG, errorMessage); | ||
| } | ||
| span.finish(); | ||
| launcherSpan = null; | ||
| } | ||
|
|
||
| public static synchronized void finishSpanWithThrowable(Throwable throwable) { | ||
| AgentSpan span = launcherSpan; | ||
| if (span == null) { | ||
| return; | ||
| } | ||
| if (throwable != null) { | ||
| span.addThrowable(throwable); | ||
| } | ||
| span.finish(); | ||
| launcherSpan = null; | ||
| } | ||
|
|
||
| @Override | ||
| public void stateChanged(SparkAppHandle handle) { | ||
| synchronized (SparkLauncherListener.class) { | ||
| SparkAppHandle.State state = handle.getState(); | ||
| AgentSpan span = launcherSpan; | ||
| if (span != null) { | ||
| span.setTag("spark.launcher.app_state", state.toString()); | ||
|
|
||
| String appId = handle.getAppId(); | ||
| if (appId != null) { | ||
| span.setTag("spark.app_id", appId); | ||
| span.setTag("app_id", appId); | ||
| } | ||
|
|
||
| if (state.isFinal()) { | ||
| if (state == SparkAppHandle.State.FAILED | ||
| || state == SparkAppHandle.State.KILLED | ||
| || state == SparkAppHandle.State.LOST) { | ||
| // Set error tags but don't finish yet — RunMainAdvice may add the throwable | ||
| // with the full stack trace. The span will be finished by RunMainAdvice or | ||
| // the shutdown hook. | ||
| span.setError(true); | ||
| span.setTag(DDTags.ERROR_TYPE, "Spark Launcher Failed"); | ||
| span.setTag(DDTags.ERROR_MSG, "Application " + state); | ||
| } else { | ||
| finishSpan(false, null); | ||
| } | ||
| } | ||
pawel-big-lebowski marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public void infoChanged(SparkAppHandle handle) { | ||
| synchronized (SparkLauncherListener.class) { | ||
| AgentSpan span = launcherSpan; | ||
| if (span != null) { | ||
| String appId = handle.getAppId(); | ||
| if (appId != null) { | ||
| span.setTag("spark.app_id", appId); | ||
| span.setTag("app_id", appId); | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Extract launcher configuration via reflection and set as span tags. Secret redaction uses the | ||
| * default pattern only (not spark.redaction.regex) because the SparkLauncher conf map is a plain | ||
| * Map, not a SparkConf, so there is no way to read the user's custom redaction regex at this | ||
| * point. | ||
| */ | ||
| private static void setLauncherConfigTags(AgentSpan span, Object launcher) { | ||
| try { | ||
| Field builderField = launcher.getClass().getSuperclass().getDeclaredField("builder"); | ||
| builderField.setAccessible(true); | ||
| Object builder = builderField.get(launcher); | ||
| if (builder == null) { | ||
| return; | ||
| } | ||
|
|
||
| Class<?> builderClass = builder.getClass(); | ||
| Class<?> abstractBuilderClass = builderClass.getSuperclass(); | ||
|
|
||
| setStringFieldAsTag(span, builder, abstractBuilderClass, "master", "master"); | ||
| setStringFieldAsTag(span, builder, abstractBuilderClass, "deployMode", "deploy_mode"); | ||
| setStringFieldAsTag(span, builder, abstractBuilderClass, "appName", "application_name"); | ||
| setStringFieldAsTag(span, builder, abstractBuilderClass, "mainClass", "main_class"); | ||
| setStringFieldAsTag(span, builder, abstractBuilderClass, "appResource", "app_resource"); | ||
|
|
||
| try { | ||
| Field confField = abstractBuilderClass.getDeclaredField("conf"); | ||
| confField.setAccessible(true); | ||
| @SuppressWarnings("unchecked") | ||
| Map<String, String> conf = (Map<String, String>) confField.get(builder); | ||
| if (conf != null) { | ||
| for (Map.Entry<String, String> entry : conf.entrySet()) { | ||
| if (SparkConfAllowList.canCaptureJobParameter(entry.getKey())) { | ||
| String value = SparkConfAllowList.redactValue(entry.getKey(), entry.getValue()); | ||
| span.setTag("config." + entry.getKey().replace('.', '_'), value); | ||
| } | ||
| } | ||
| } | ||
| } catch (NoSuchFieldException e) { | ||
| log.debug("Could not find conf field on builder", e); | ||
| } | ||
| } catch (Exception e) { | ||
| log.debug("Failed to extract SparkLauncher configuration", e); | ||
| } | ||
| } | ||
|
|
||
| private static void setStringFieldAsTag( | ||
| AgentSpan span, Object obj, Class<?> clazz, String fieldName, String tagName) { | ||
| try { | ||
| Field field = clazz.getDeclaredField(fieldName); | ||
| field.setAccessible(true); | ||
| Object value = field.get(obj); | ||
| if (value != null) { | ||
| span.setTag(tagName, value.toString()); | ||
| } | ||
| } catch (Exception e) { | ||
| log.debug("Could not read field {} from builder", fieldName, e); | ||
| } | ||
| } | ||
| } | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Leaving this static reference here rather than relying on InstanceStore because InstanceStore lives in bootstrap classloader and the values stored in must also be visible from the bootstrap classloader, but the SparkLauncherListener is an agent instrumentation class.
In the rest of the Spark instrumentation, instance store is used for SparkConf and SparkListenerInterface. These two are Spark classes loaded by the application classloader, not agent instrumentation classes.