Skip to content

Commit 3e43b41

Browse files
authored
Fix flaky test (#2469)
1 parent ff38692 commit 3e43b41

File tree

7 files changed

+113
-41
lines changed

7 files changed

+113
-41
lines changed

inferred-spans/src/main/java/io/opentelemetry/contrib/inferredspans/InferredSpansProcessor.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,10 +49,12 @@ public class InferredSpansProcessor implements SpanProcessor {
4949
SpanAnchoredClock clock,
5050
boolean startScheduledProfiling,
5151
@Nullable File activationEventsFile,
52-
@Nullable File jfrFile) {
52+
@Nullable File jfrFile,
53+
@Nullable File tempDir) {
5354
this.config = config;
5455
profiler =
55-
new SamplingProfiler(config, clock, this::getTracer, activationEventsFile, jfrFile, null);
56+
new SamplingProfiler(
57+
config, clock, this::getTracer, activationEventsFile, jfrFile, tempDir);
5658
if (startScheduledProfiling) {
5759
profiler.start();
5860
}

inferred-spans/src/main/java/io/opentelemetry/contrib/inferredspans/InferredSpansProcessorBuilder.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ public class InferredSpansProcessorBuilder {
5252
private boolean startScheduledProfiling = true;
5353
@Nullable private File activationEventsFile = null;
5454
@Nullable private File jfrFile = null;
55+
@Nullable private File tempDir = null;
5556
private BiConsumer<SpanBuilder, SpanContext> parentOverrideHandler =
5657
CallTree.DEFAULT_PARENT_OVERRIDE;
5758

@@ -75,7 +76,7 @@ public InferredSpansProcessor build() {
7576
parentOverrideHandler);
7677
InferredSpansProcessor processor =
7778
new InferredSpansProcessor(
78-
config, clock, startScheduledProfiling, activationEventsFile, jfrFile);
79+
config, clock, startScheduledProfiling, activationEventsFile, jfrFile, tempDir);
7980
InferredSpans.setInstance(processor);
8081
return processor;
8182
}
@@ -205,6 +206,12 @@ InferredSpansProcessorBuilder jfrFile(@Nullable File jfrFile) {
205206
return this;
206207
}
207208

209+
/** For testing only. */
210+
public InferredSpansProcessorBuilder tempDir(@Nullable File tempDir) {
211+
this.tempDir = tempDir;
212+
return this;
213+
}
214+
208215
/**
209216
* Defines the action to perform when a inferred span is discovered to actually be the parent of a
210217
* normal span. The first argument of the handler is the modifiable inferred span, the second

inferred-spans/src/main/java/io/opentelemetry/contrib/inferredspans/internal/SamplingProfiler.java

Lines changed: 69 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import java.util.concurrent.ScheduledExecutorService;
4444
import java.util.concurrent.TimeUnit;
4545
import java.util.concurrent.locks.LockSupport;
46+
import java.util.concurrent.locks.ReentrantLock;
4647
import java.util.function.Supplier;
4748
import java.util.logging.Level;
4849
import java.util.logging.Logger;
@@ -153,6 +154,7 @@ public class SamplingProfiler implements Runnable {
153154
@Nullable private final File tempDir;
154155

155156
private final AsyncProfiler profiler;
157+
private final ReentrantLock profilerLock = new ReentrantLock();
156158
@Nullable private volatile Future<?> profilingTask;
157159

158160
/**
@@ -317,6 +319,9 @@ public boolean onActivation(Span activeSpan, @Nullable Span previouslyActive) {
317319
if (previouslyActive == null) {
318320
profiler.addThread(Thread.currentThread());
319321
}
322+
if (!config.isPostProcessingEnabled()) {
323+
return true;
324+
}
320325
boolean success =
321326
eventBuffer.tryPublishEvent(activationEventTranslator, activeSpan, previouslyActive);
322327
if (!success) {
@@ -343,6 +348,9 @@ public boolean onDeactivation(Span deactivatedSpan, @Nullable Span previouslyAct
343348
if (previouslyActive == null) {
344349
profiler.removeThread(Thread.currentThread());
345350
}
351+
if (!config.isPostProcessingEnabled()) {
352+
return true;
353+
}
346354
boolean success =
347355
eventBuffer.tryPublishEvent(
348356
deactivationEventTranslator, deactivatedSpan, previouslyActive);
@@ -373,7 +381,9 @@ public void run() {
373381
Duration profilingDuration = config.getProfilingDuration();
374382
boolean postProcessingEnabled = config.isPostProcessingEnabled();
375383

376-
setProfilingSessionOngoing(postProcessingEnabled);
384+
// We need to enable the session so that onActivation is called and threads are added to the
385+
// profiler (profiler.addThread). Otherwise, with the "filter" option, nothing is profiled.
386+
setProfilingSessionOngoing(true);
377387

378388
if (postProcessingEnabled) {
379389
logger.fine("Start full profiling session (async-profiler and agent processing)");
@@ -394,37 +404,62 @@ public void run() {
394404
config.isNonStopProfiling() && !interrupted && postProcessingEnabled;
395405
setProfilingSessionOngoing(continueProfilingSession);
396406

397-
if (!interrupted && !scheduler.isShutdown()) {
398-
long delay = config.getProfilingInterval().toMillis() - profilingDuration.toMillis();
399-
profilingTask = scheduler.schedule(this, delay, TimeUnit.MILLISECONDS);
407+
profilerLock.lock();
408+
try {
409+
// it's possible for an interruption to occur just before the lock was acquired. This is
410+
// handled by re-reading Thread.currentThread().isInterrupted() to ensure no task is scheduled
411+
// if an interruption occurred just before acquiring the lock
412+
if (!Thread.currentThread().isInterrupted() && !scheduler.isShutdown()) {
413+
long delay = config.getProfilingInterval().toMillis() - profilingDuration.toMillis();
414+
profilingTask = scheduler.schedule(this, delay, TimeUnit.MILLISECONDS);
415+
}
416+
} finally {
417+
profilerLock.unlock();
400418
}
401419
}
402420

403421
@SuppressWarnings({"NonAtomicVolatileUpdate", "EmptyCatch"})
404422
private void profile(Duration profilingDuration) throws Exception {
405423
try {
406424
String startCommand = createStartCommand();
407-
String startMessage = profiler.execute(startCommand);
425+
String startMessage;
426+
try {
427+
startMessage = profiler.execute(startCommand);
428+
} catch (IllegalStateException e) {
429+
if (e.getMessage() != null && e.getMessage().contains("already started")) {
430+
logger.fine("Profiler already started. Stopping and restarting.");
431+
try {
432+
profiler.stop();
433+
} catch (RuntimeException ignore) {
434+
logger.log(Level.FINE, "Ignored error on stopping profiler", ignore);
435+
}
436+
startMessage = profiler.execute(startCommand);
437+
} else {
438+
throw e;
439+
}
440+
}
408441
logger.fine(startMessage);
409-
if (!profiledThreads.isEmpty()) {
410-
restoreFilterState(profiler);
442+
try {
443+
// try-finally because if the code is interrupted we want to ensure the
444+
// profiler.execute("stop") is called
445+
if (!profiledThreads.isEmpty()) {
446+
restoreFilterState(profiler);
447+
}
448+
// Doesn't need to be atomic as this field is being updated only by a single thread
449+
profilingSessions++;
450+
451+
// When post-processing is disabled activation events are ignored, but we still need to
452+
// invoke this method as it is the one enforcing the sampling session duration. As a side
453+
// effect it will also consume residual activation events if post-processing is disabled
454+
// dynamically
455+
consumeActivationEventsFromRingBufferAndWriteToFile(profilingDuration);
456+
} finally {
457+
String stopMessage = profiler.execute("stop");
458+
logger.fine(stopMessage);
411459
}
412-
// Doesn't need to be atomic as this field is being updated only by a single thread
413-
profilingSessions++;
414-
415-
// When post-processing is disabled activation events are ignored, but we still need to invoke
416-
// this method
417-
// as it is the one enforcing the sampling session duration. As a side effect it will also
418-
// consume
419-
// residual activation events if post-processing is disabled dynamically
420-
consumeActivationEventsFromRingBufferAndWriteToFile(profilingDuration);
421-
422-
String stopMessage = profiler.execute("stop");
423-
logger.fine(stopMessage);
424460

425461
// When post-processing is disabled, jfr file will not be parsed and the heavy processing will
426-
// not occur
427-
// as this method aborts when no activation events are buffered
462+
// not occur as this method aborts when no activation events are buffered
428463
processTraces();
429464
} catch (InterruptedException | ClosedByInterruptException e) {
430465
try {
@@ -505,6 +540,9 @@ EventPoller.PollState consumeActivationEventsFromRingBufferAndWriteToFile() thro
505540
}
506541

507542
public void processTraces() throws IOException {
543+
if (!config.isPostProcessingEnabled()) {
544+
return;
545+
}
508546
if (jfrParser == null) {
509547
jfrParser = new JfrParser();
510548
}
@@ -739,18 +777,25 @@ public void start() {
739777

740778
@SuppressWarnings({"FutureReturnValueIgnored", "Interruption"})
741779
public void reschedule() {
742-
Future<?> future = this.profilingTask;
743-
if (future != null) {
744-
if (future.cancel(true)) {
780+
profilerLock.lock();
781+
try {
782+
Future<?> future = this.profilingTask;
783+
if (future != null && future.cancel(true)) {
745784
Duration profilingDuration = config.getProfilingDuration();
746785
long delay = config.getProfilingInterval().toMillis() - profilingDuration.toMillis();
747786
profilingTask = scheduler.schedule(this, delay, TimeUnit.MILLISECONDS);
748787
}
788+
} finally {
789+
profilerLock.unlock();
749790
}
750791
}
751792

793+
@SuppressWarnings({"FutureReturnValueIgnored", "Interruption"})
752794
public void stop() throws InterruptedException, IOException {
753795
// cancels/interrupts the profiling thread
796+
if (profilingTask != null) {
797+
profilingTask.cancel(true);
798+
}
754799
// implicitly clears profiled threads
755800
scheduler.shutdown();
756801
scheduler.awaitTermination(10, TimeUnit.SECONDS);

inferred-spans/src/main/java/io/opentelemetry/contrib/inferredspans/internal/asyncprofiler/JfrParser.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,9 @@ public void parse(
9393
this.includedClasses = includedClasses;
9494
bufferedFile.setFile(file);
9595
long fileSize = bufferedFile.size();
96+
if (fileSize == 0) {
97+
return;
98+
}
9699

97100
int chunkSize = readChunk(0);
98101
if (chunkSize < fileSize) {

inferred-spans/src/test/java/io/opentelemetry/contrib/inferredspans/InferredSpansTest.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,17 +10,20 @@
1010

1111
import io.opentelemetry.contrib.inferredspans.internal.SamplingProfiler;
1212
import io.opentelemetry.contrib.inferredspans.internal.util.DisabledOnOpenJ9;
13+
import java.nio.file.Path;
1314
import java.time.Duration;
1415
import org.junit.jupiter.api.AfterEach;
1516
import org.junit.jupiter.api.BeforeEach;
1617
import org.junit.jupiter.api.Test;
1718
import org.junit.jupiter.api.condition.DisabledOnOs;
1819
import org.junit.jupiter.api.condition.OS;
20+
import org.junit.jupiter.api.io.TempDir;
1921

2022
@DisabledOnOs(OS.WINDOWS)
2123
@DisabledOnOpenJ9
2224
class InferredSpansTest {
2325

26+
@TempDir Path tempDir;
2427
private ProfilerTestSetup setup;
2528

2629
@BeforeEach
@@ -40,7 +43,7 @@ void tearDown() {
4043
void testIsEnabled() {
4144
assertThat(InferredSpans.isEnabled()).isFalse();
4245

43-
setup = ProfilerTestSetup.create(c -> {});
46+
setup = ProfilerTestSetup.create(c -> c.tempDir(tempDir.toFile()));
4447

4548
assertThat(InferredSpans.isEnabled()).isTrue();
4649

@@ -61,7 +64,8 @@ void testSetProfilerIntervalWhenDisabled() {
6164
ProfilerTestSetup.create(
6265
c ->
6366
c.profilerInterval(Duration.ofSeconds(10))
64-
.profilingDuration(Duration.ofMillis(500)));
67+
.profilingDuration(Duration.ofMillis(500))
68+
.tempDir(tempDir.toFile()));
6569

6670
// assert that the interval set before the profiler was initialized is ignored
6771
assertThat(setup.profiler.getConfig().getProfilingInterval()).isEqualTo(Duration.ofSeconds(10));
@@ -73,7 +77,8 @@ void testSetProfilerInterval() {
7377
ProfilerTestSetup.create(
7478
c ->
7579
c.profilerInterval(Duration.ofSeconds(10))
76-
.profilingDuration(Duration.ofMillis(500)));
80+
.profilingDuration(Duration.ofMillis(500))
81+
.tempDir(tempDir.toFile()));
7782

7883
SamplingProfiler profiler = setup.profiler;
7984
await()

inferred-spans/src/test/java/io/opentelemetry/contrib/inferredspans/internal/SamplingProfilerTest.java

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import java.lang.reflect.Method;
2323
import java.nio.file.Files;
2424
import java.nio.file.Path;
25-
import java.nio.file.Paths;
2625
import java.time.Duration;
2726
import java.util.List;
2827
import java.util.Optional;
@@ -44,6 +43,11 @@
4443
@DisabledOnOpenJ9
4544
class SamplingProfilerTest {
4645

46+
static {
47+
// Needed to ensure ordering because tests things out of order
48+
ProfilingActivationListener.ensureInitialized();
49+
}
50+
4751
private ProfilerTestSetup setup;
4852

4953
@TempDir private Path tempDir;
@@ -58,12 +62,6 @@ void tearDown() {
5862

5963
@Test
6064
void shouldLazilyCreateTempFilesAndCleanThem() {
61-
for (Path file : getProfilerTempFiles()) {
62-
if (!file.toFile().delete()) {
63-
throw new IllegalStateException("Could not delete temp file: " + file);
64-
}
65-
}
66-
6765
// temporary files should be created on-demand, and properly deleted afterwards
6866
setupProfiler(false);
6967

@@ -91,9 +89,8 @@ void shouldLazilyCreateTempFilesAndCleanThem() {
9189
.isEmpty();
9290
}
9391

94-
private static List<Path> getProfilerTempFiles() {
95-
Path tempFolder = Paths.get(System.getProperty("java.io.tmpdir"));
96-
try (Stream<Path> files = Files.list(tempFolder)) {
92+
private List<Path> getProfilerTempFiles() {
93+
try (Stream<Path> files = Files.list(tempDir)) {
9794
return files
9895
.filter(f -> f.getFileName().toString().startsWith("otel-inferred-"))
9996
.sorted()
@@ -327,7 +324,8 @@ private void setupProfiler(Consumer<InferredSpansProcessorBuilder> configCustomi
327324
config
328325
.profilingDuration(Duration.ofMillis(500))
329326
.profilerInterval(Duration.ofMillis(500))
330-
.samplingInterval(Duration.ofMillis(5));
327+
.samplingInterval(Duration.ofMillis(5))
328+
.tempDir(tempDir.toFile());
331329
configCustomizer.accept(config);
332330
});
333331
}

inferred-spans/src/test/java/io/opentelemetry/contrib/inferredspans/internal/asyncprofiler/JfrParserTest.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,4 +60,16 @@ void name() throws Exception {
6060
});
6161
assertThat(stackTraces.get()).isEqualTo(92);
6262
}
63+
64+
@Test
65+
void testParseEmptyFile() throws Exception {
66+
File file = File.createTempFile("empty", ".jfr");
67+
try {
68+
JfrParser jfrParser = new JfrParser();
69+
jfrParser.parse(file, Collections.emptyList(), Collections.emptyList());
70+
jfrParser.consumeStackTraces((threadId, stackTraceId, nanoTime) -> {});
71+
} finally {
72+
file.delete();
73+
}
74+
}
6375
}

0 commit comments

Comments
 (0)