diff --git a/docs/apidiffs/current_vs_latest/opentelemetry-sdk-trace.txt b/docs/apidiffs/current_vs_latest/opentelemetry-sdk-trace.txt index e44a11098d8..ab607b7bb6d 100644 --- a/docs/apidiffs/current_vs_latest/opentelemetry-sdk-trace.txt +++ b/docs/apidiffs/current_vs_latest/opentelemetry-sdk-trace.txt @@ -1,2 +1,11 @@ Comparing source compatibility of opentelemetry-sdk-trace-1.58.0-SNAPSHOT.jar against opentelemetry-sdk-trace-1.57.0.jar -No changes. \ No newline at end of file +*** MODIFIED CLASS: PUBLIC FINAL io.opentelemetry.sdk.trace.export.BatchSpanProcessorBuilder (not serializable) + === CLASS FILE FORMAT VERSION: 52.0 <- 52.0 + +++ NEW METHOD: PUBLIC(+) io.opentelemetry.sdk.trace.export.BatchSpanProcessorBuilder setInternalTelemetryVersion(io.opentelemetry.sdk.common.InternalTelemetryVersion) + +++ NEW METHOD: PUBLIC(+) io.opentelemetry.sdk.trace.export.BatchSpanProcessorBuilder setMeterProvider(java.util.function.Supplier) +*** MODIFIED CLASS: PUBLIC FINAL io.opentelemetry.sdk.trace.export.SimpleSpanProcessorBuilder (not serializable) + === CLASS FILE FORMAT VERSION: 52.0 <- 52.0 + +++ NEW METHOD: PUBLIC(+) io.opentelemetry.sdk.trace.export.SimpleSpanProcessorBuilder setMeterProvider(java.util.function.Supplier) +*** MODIFIED CLASS: PUBLIC FINAL io.opentelemetry.sdk.trace.SdkTracerProviderBuilder (not serializable) + === CLASS FILE FORMAT VERSION: 52.0 <- 52.0 + +++ NEW METHOD: PUBLIC(+) io.opentelemetry.sdk.trace.SdkTracerProviderBuilder setMeterProvider(java.util.function.Supplier) diff --git a/sdk-extensions/autoconfigure/src/main/java/io/opentelemetry/sdk/autoconfigure/TracerProviderConfiguration.java b/sdk-extensions/autoconfigure/src/main/java/io/opentelemetry/sdk/autoconfigure/TracerProviderConfiguration.java index ee6d75d4c70..7ecc048ff3d 100644 --- a/sdk-extensions/autoconfigure/src/main/java/io/opentelemetry/sdk/autoconfigure/TracerProviderConfiguration.java +++ b/sdk-extensions/autoconfigure/src/main/java/io/opentelemetry/sdk/autoconfigure/TracerProviderConfiguration.java @@ -49,6 +49,7 @@ static void configureTracerProvider( List closeables) { tracerProviderBuilder.setSpanLimits(configureSpanLimits(config)); + tracerProviderBuilder.setMeterProvider(() -> meterProvider); String sampler = config.getString("otel.traces.sampler", PARENTBASED_ALWAYS_ON); tracerProviderBuilder.setSampler( @@ -80,7 +81,8 @@ static List configureSpanProcessors( for (String simpleProcessorExporterNames : simpleProcessorExporterNames) { SpanExporter exporter = exportersByNameCopy.remove(simpleProcessorExporterNames); if (exporter != null) { - SpanProcessor spanProcessor = SimpleSpanProcessor.create(exporter); + SpanProcessor spanProcessor = + SimpleSpanProcessor.builder(exporter).setMeterProvider(() -> meterProvider).build(); closeables.add(spanProcessor); spanProcessors.add(spanProcessor); } @@ -101,7 +103,7 @@ static List configureSpanProcessors( static BatchSpanProcessor configureBatchSpanProcessor( ConfigProperties config, SpanExporter exporter, MeterProvider meterProvider) { BatchSpanProcessorBuilder builder = - BatchSpanProcessor.builder(exporter).setMeterProvider(meterProvider); + BatchSpanProcessor.builder(exporter).setMeterProvider(() -> meterProvider); Duration scheduleDelay = config.getDuration("otel.bsp.schedule.delay"); if (scheduleDelay != null) { diff --git a/sdk-extensions/incubator/src/main/java/io/opentelemetry/sdk/extension/incubator/fileconfig/SpanProcessorFactory.java b/sdk-extensions/incubator/src/main/java/io/opentelemetry/sdk/extension/incubator/fileconfig/SpanProcessorFactory.java index d5b2c3970f6..1f760cfe20c 100644 --- a/sdk-extensions/incubator/src/main/java/io/opentelemetry/sdk/extension/incubator/fileconfig/SpanProcessorFactory.java +++ b/sdk-extensions/incubator/src/main/java/io/opentelemetry/sdk/extension/incubator/fileconfig/SpanProcessorFactory.java @@ -14,6 +14,7 @@ import io.opentelemetry.sdk.trace.export.BatchSpanProcessor; import io.opentelemetry.sdk.trace.export.BatchSpanProcessorBuilder; import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; +import io.opentelemetry.sdk.trace.export.SimpleSpanProcessorBuilder; import io.opentelemetry.sdk.trace.export.SpanExporter; import java.time.Duration; import java.util.Map; @@ -50,7 +51,7 @@ public SpanProcessor create(SpanProcessorModel model, DeclarativeConfigContext c } MeterProvider meterProvider = context.getMeterProvider(); if (meterProvider != null) { - builder.setMeterProvider(meterProvider); + builder.setMeterProvider(() -> meterProvider); } return context.addCloseable(builder.build()); @@ -62,7 +63,12 @@ public SpanProcessor create(SpanProcessorModel model, DeclarativeConfigContext c FileConfigUtil.requireNonNull( simpleModel.getExporter(), "simple span processor exporter"); SpanExporter spanExporter = SpanExporterFactory.getInstance().create(exporterModel, context); - return context.addCloseable(SimpleSpanProcessor.create(spanExporter)); + SimpleSpanProcessorBuilder builder = SimpleSpanProcessor.builder(spanExporter); + MeterProvider meterProvider = context.getMeterProvider(); + if (meterProvider != null) { + builder.setMeterProvider(() -> meterProvider); + } + return context.addCloseable(builder.build()); } Map.Entry keyValue = diff --git a/sdk-extensions/incubator/src/test/java/io/opentelemetry/sdk/extension/incubator/fileconfig/OpenTelemetryConfigurationFactoryTest.java b/sdk-extensions/incubator/src/test/java/io/opentelemetry/sdk/extension/incubator/fileconfig/OpenTelemetryConfigurationFactoryTest.java index d06e9164356..b71eafc1702 100644 --- a/sdk-extensions/incubator/src/test/java/io/opentelemetry/sdk/extension/incubator/fileconfig/OpenTelemetryConfigurationFactoryTest.java +++ b/sdk-extensions/incubator/src/test/java/io/opentelemetry/sdk/extension/incubator/fileconfig/OpenTelemetryConfigurationFactoryTest.java @@ -337,15 +337,15 @@ void create_Configured() throws NoSuchFieldException, IllegalAccessException { .extracting("meterProviderSharedState") .isEqualTo(sharedState); + // Lazily initialized assertThat(sdk) .extracting("tracerProvider") .extracting("delegate") .extracting("sharedState") .extracting("activeSpanProcessor") .extracting("worker") - .extracting("processedSpansCounter") - .extracting("sdkMeter") - .extracting("meterProviderSharedState") - .isEqualTo(sharedState); + .extracting("spanProcessorInstrumentation") + .extracting("processedSpans") + .isNull(); } } diff --git a/sdk/common/src/main/java/io/opentelemetry/sdk/internal/SemConvAttributes.java b/sdk/common/src/main/java/io/opentelemetry/sdk/internal/SemConvAttributes.java index 1075d537604..cad85a0c341 100644 --- a/sdk/common/src/main/java/io/opentelemetry/sdk/internal/SemConvAttributes.java +++ b/sdk/common/src/main/java/io/opentelemetry/sdk/internal/SemConvAttributes.java @@ -33,4 +33,9 @@ private SemConvAttributes() {} AttributeKey.longKey("rpc.grpc.status_code"); public static final AttributeKey HTTP_RESPONSE_STATUS_CODE = AttributeKey.longKey("http.response.status_code"); + + public static final AttributeKey OTEL_SPAN_PARENT_ORIGIN = + AttributeKey.stringKey("otel.span.parent.origin"); + public static final AttributeKey OTEL_SPAN_SAMPLING_RESULT = + AttributeKey.stringKey("otel.span.sampling_result"); } diff --git a/sdk/common/src/test/java/io/opentelemetry/sdk/internal/SemConvAttributesTest.java b/sdk/common/src/test/java/io/opentelemetry/sdk/internal/SemConvAttributesTest.java index d4a5d5de1f5..d1d10007d3a 100644 --- a/sdk/common/src/test/java/io/opentelemetry/sdk/internal/SemConvAttributesTest.java +++ b/sdk/common/src/test/java/io/opentelemetry/sdk/internal/SemConvAttributesTest.java @@ -31,5 +31,10 @@ void testAttributeKeys() { .isEqualTo(RpcIncubatingAttributes.RPC_GRPC_STATUS_CODE); assertThat(SemConvAttributes.HTTP_RESPONSE_STATUS_CODE) .isEqualTo(HttpAttributes.HTTP_RESPONSE_STATUS_CODE); + + assertThat(SemConvAttributes.OTEL_SPAN_PARENT_ORIGIN) + .isEqualTo(OtelIncubatingAttributes.OTEL_SPAN_PARENT_ORIGIN); + assertThat(SemConvAttributes.OTEL_SPAN_SAMPLING_RESULT) + .isEqualTo(OtelIncubatingAttributes.OTEL_SPAN_SAMPLING_RESULT); } } diff --git a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/SdkSpan.java b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/SdkSpan.java index 37deab7ffc8..5c7acd36c6f 100644 --- a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/SdkSpan.java +++ b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/SdkSpan.java @@ -61,6 +61,9 @@ final class SdkSpan implements ReadWriteSpan { private final InstrumentationScopeInfo instrumentationScopeInfo; // The start time of the span. private final long startEpochNanos; + // Callback to run when span ends to record metrics. + private final Runnable recordEndMetrics; + // Lock used to internally guard the mutable state of this instance private final Object lock = new Object(); @@ -132,7 +135,8 @@ private SdkSpan( @Nullable AttributesMap attributes, @Nullable List links, int totalRecordedLinks, - long startEpochNanos) { + long startEpochNanos, + Runnable recordEndMetrics) { this.context = context; this.instrumentationScopeInfo = instrumentationScopeInfo; this.parentSpanContext = parentSpanContext; @@ -148,6 +152,7 @@ private SdkSpan( this.startEpochNanos = startEpochNanos; this.attributes = attributes; this.spanLimits = spanLimits; + this.recordEndMetrics = recordEndMetrics; } /** @@ -163,6 +168,7 @@ private SdkSpan( * @param resource the resource associated with this span. * @param attributes the attributes set during span creation. * @param links the links set during span creation, may be truncated. The list MUST be immutable. + * @param recordEndMetrics a {@link Runnable} to run when the span is ended to record metrics. * @return a new and started span. */ static SdkSpan startSpan( @@ -180,7 +186,8 @@ static SdkSpan startSpan( @Nullable AttributesMap attributes, @Nullable List links, int totalRecordedLinks, - long userStartEpochNanos) { + long userStartEpochNanos, + Runnable recordEndMetrics) { boolean createdAnchoredClock; AnchoredClock clock; if (parentSpan instanceof SdkSpan) { @@ -219,7 +226,8 @@ static SdkSpan startSpan( attributes, links, totalRecordedLinks, - startEpochNanos); + startEpochNanos, + recordEndMetrics); // Call onStart here instead of calling in the constructor to make sure the span is completely // initialized. if (spanProcessor.isStartRequired()) { @@ -557,6 +565,7 @@ private void endInternal(long endEpochNanos) { spanEndingThread = Thread.currentThread(); hasEnded = EndState.ENDING; } + recordEndMetrics.run(); if (spanProcessor instanceof ExtendedSpanProcessor) { ExtendedSpanProcessor extendedSpanProcessor = (ExtendedSpanProcessor) spanProcessor; if (extendedSpanProcessor.isOnEndingRequired()) { diff --git a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/SdkSpanBuilder.java b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/SdkSpanBuilder.java index c0f872265ec..6edbfc24db4 100644 --- a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/SdkSpanBuilder.java +++ b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/SdkSpanBuilder.java @@ -204,6 +204,9 @@ public Span startSpan() { /* remote= */ false, tracerSharedState.isIdGeneratorSafeToSkipIdValidation()); + Runnable recordEndSpanMetrics = + tracerSharedState.getTracerMetrics().startSpan(parentSpanContext, samplingDecision); + if (!isRecording(samplingDecision)) { return Span.wrap(spanContext); } @@ -232,7 +235,8 @@ public Span startSpan() { recordedAttributes, currentLinks, totalNumberOfLinksAdded, - startEpochNanos); + startEpochNanos, + recordEndSpanMetrics); } private AttributesMap attributes() { diff --git a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/SdkTracerMetrics.java b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/SdkTracerMetrics.java new file mode 100644 index 00000000000..1f4ce69a1c0 --- /dev/null +++ b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/SdkTracerMetrics.java @@ -0,0 +1,208 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.trace; + +import static io.opentelemetry.sdk.internal.SemConvAttributes.OTEL_SPAN_PARENT_ORIGIN; +import static io.opentelemetry.sdk.internal.SemConvAttributes.OTEL_SPAN_SAMPLING_RESULT; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.LongCounter; +import io.opentelemetry.api.metrics.LongUpDownCounter; +import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.api.metrics.MeterProvider; +import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.sdk.trace.samplers.SamplingDecision; +import java.util.function.Supplier; +import javax.annotation.Nullable; + +/** + * SDK metrics exported for started and ended spans as defined in the semantic + * conventions. + */ +final class SdkTracerMetrics { + + private static final Attributes noParentDrop = + Attributes.of( + OTEL_SPAN_PARENT_ORIGIN, "none", OTEL_SPAN_SAMPLING_RESULT, SamplingDecision.DROP.name()); + private static final Attributes noParentRecordOnly = + Attributes.of( + OTEL_SPAN_PARENT_ORIGIN, + "none", + OTEL_SPAN_SAMPLING_RESULT, + SamplingDecision.RECORD_ONLY.name()); + private static final Attributes noParentRecordAndSample = + Attributes.of( + OTEL_SPAN_PARENT_ORIGIN, + "none", + OTEL_SPAN_SAMPLING_RESULT, + SamplingDecision.RECORD_AND_SAMPLE.name()); + + private static final Attributes remoteParentDrop = + Attributes.of( + OTEL_SPAN_PARENT_ORIGIN, + "remote", + OTEL_SPAN_SAMPLING_RESULT, + SamplingDecision.DROP.name()); + private static final Attributes remoteParentRecordOnly = + Attributes.of( + OTEL_SPAN_PARENT_ORIGIN, + "remote", + OTEL_SPAN_SAMPLING_RESULT, + SamplingDecision.RECORD_ONLY.name()); + private static final Attributes remoteParentRecordAndSample = + Attributes.of( + OTEL_SPAN_PARENT_ORIGIN, + "remote", + OTEL_SPAN_SAMPLING_RESULT, + SamplingDecision.RECORD_AND_SAMPLE.name()); + + private static final Attributes localParentDrop = + Attributes.of( + OTEL_SPAN_PARENT_ORIGIN, + "local", + OTEL_SPAN_SAMPLING_RESULT, + SamplingDecision.DROP.name()); + private static final Attributes localParentRecordOnly = + Attributes.of( + OTEL_SPAN_PARENT_ORIGIN, + "local", + OTEL_SPAN_SAMPLING_RESULT, + SamplingDecision.RECORD_ONLY.name()); + private static final Attributes localParentRecordAndSample = + Attributes.of( + OTEL_SPAN_PARENT_ORIGIN, + "local", + OTEL_SPAN_SAMPLING_RESULT, + SamplingDecision.RECORD_AND_SAMPLE.name()); + + private static final Attributes recordOnly = + Attributes.of(OTEL_SPAN_SAMPLING_RESULT, SamplingDecision.RECORD_ONLY.name()); + private static final Attributes recordAndSample = + Attributes.of(OTEL_SPAN_SAMPLING_RESULT, SamplingDecision.RECORD_AND_SAMPLE.name()); + + private final Object lock = new Object(); + + private final Supplier meterProvider; + + @Nullable private Meter meter; + @Nullable private volatile LongCounter startedSpans; + @Nullable private volatile LongUpDownCounter liveSpans; + + SdkTracerMetrics(Supplier meterProvider) { + this.meterProvider = meterProvider; + } + + /** + * Records metrics for when a span starts and returns a {@link Runnable} to execute when ending + * the span. + */ + Runnable startSpan(SpanContext parentSpanContext, SamplingDecision samplingDecision) { + if (!parentSpanContext.isValid()) { + switch (samplingDecision) { + case DROP: + startedSpans().add(1, noParentDrop); + return SdkTracerMetrics::noop; + case RECORD_ONLY: + startedSpans().add(1, noParentRecordOnly); + liveSpans().add(1, recordOnly); + return this::decrementRecordOnly; + case RECORD_AND_SAMPLE: + startedSpans().add(1, noParentRecordAndSample); + liveSpans().add(1, recordAndSample); + return this::decrementRecordAndSample; + } + throw new IllegalArgumentException("Unrecognized sampling decision: " + samplingDecision); + } else if (parentSpanContext.isRemote()) { + switch (samplingDecision) { + case DROP: + startedSpans().add(1, remoteParentDrop); + return SdkTracerMetrics::noop; + case RECORD_ONLY: + startedSpans().add(1, remoteParentRecordOnly); + liveSpans().add(1, recordOnly); + return this::decrementRecordOnly; + case RECORD_AND_SAMPLE: + startedSpans().add(1, remoteParentRecordAndSample); + liveSpans().add(1, recordAndSample); + return this::decrementRecordAndSample; + } + throw new IllegalArgumentException("Unrecognized sampling decision: " + samplingDecision); + } + // local parent + switch (samplingDecision) { + case DROP: + startedSpans().add(1, localParentDrop); + return SdkTracerMetrics::noop; + case RECORD_ONLY: + startedSpans().add(1, localParentRecordOnly); + liveSpans().add(1, recordOnly); + return this::decrementRecordOnly; + case RECORD_AND_SAMPLE: + startedSpans().add(1, localParentRecordAndSample); + liveSpans().add(1, recordAndSample); + return this::decrementRecordAndSample; + } + throw new IllegalArgumentException("Unrecognized sampling decision: " + samplingDecision); + } + + private static void noop() {} + + private void decrementRecordOnly() { + liveSpans().add(-1, recordOnly); + } + + private void decrementRecordAndSample() { + liveSpans().add(-1, recordAndSample); + } + + private LongCounter startedSpans() { + LongCounter startedSpans = this.startedSpans; + if (startedSpans == null) { + synchronized (lock) { + startedSpans = this.startedSpans; + if (startedSpans == null) { + startedSpans = + meter() + .counterBuilder("otel.sdk.span.started") + .setUnit("{span}") + .setDescription("The number of created spans.") + .build(); + this.startedSpans = startedSpans; + } + } + } + return startedSpans; + } + + private LongUpDownCounter liveSpans() { + LongUpDownCounter liveSpans = this.liveSpans; + if (liveSpans == null) { + synchronized (lock) { + liveSpans = this.liveSpans; + if (liveSpans == null) { + liveSpans = + meter() + .upDownCounterBuilder("otel.sdk.span.live") + .setUnit("{span}") + .setDescription( + "The number of created spans with recording=true for which the end operation has not been called yet.") + .build(); + this.liveSpans = liveSpans; + } + } + } + return liveSpans; + } + + private Meter meter() { + if (meter == null) { + // Safe to call from multiple threads. + meter = meterProvider.get().get("io.opentelemetry.sdk.trace"); + } + return meter; + } +} diff --git a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/SdkTracerProvider.java b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/SdkTracerProvider.java index f39ce565731..ad4af9a8b06 100644 --- a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/SdkTracerProvider.java +++ b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/SdkTracerProvider.java @@ -5,6 +5,7 @@ package io.opentelemetry.sdk.trace; +import io.opentelemetry.api.metrics.MeterProvider; import io.opentelemetry.api.trace.Tracer; import io.opentelemetry.api.trace.TracerBuilder; import io.opentelemetry.api.trace.TracerProvider; @@ -54,7 +55,8 @@ public static SdkTracerProviderBuilder builder() { Sampler sampler, List spanProcessors, ScopeConfigurator tracerConfigurator, - ExceptionAttributeResolver exceptionAttributeResolver) { + ExceptionAttributeResolver exceptionAttributeResolver, + Supplier meterProvider) { this.sharedState = new TracerSharedState( clock, @@ -63,7 +65,8 @@ public static SdkTracerProviderBuilder builder() { spanLimitsSupplier, sampler, spanProcessors, - exceptionAttributeResolver); + exceptionAttributeResolver, + new SdkTracerMetrics(meterProvider)); this.tracerSdkComponentRegistry = new ComponentRegistry<>( instrumentationScopeInfo -> diff --git a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/SdkTracerProviderBuilder.java b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/SdkTracerProviderBuilder.java index f480fe37272..54b9a3cb864 100644 --- a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/SdkTracerProviderBuilder.java +++ b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/SdkTracerProviderBuilder.java @@ -7,6 +7,7 @@ import static java.util.Objects.requireNonNull; +import io.opentelemetry.api.metrics.MeterProvider; import io.opentelemetry.api.trace.Span; import io.opentelemetry.sdk.common.Clock; import io.opentelemetry.sdk.common.InstrumentationScopeInfo; @@ -38,6 +39,7 @@ public final class SdkTracerProviderBuilder { TracerConfig.configuratorBuilder(); private ExceptionAttributeResolver exceptionAttributeResolver = ExceptionAttributeResolver.getDefault(); + private Supplier meterProvider = MeterProvider::noop; /** * Assign a {@link Clock}. {@link Clock} will be used each time a {@link Span} is started, ended @@ -232,6 +234,17 @@ SdkTracerProviderBuilder setExceptionAttributeResolver( return this; } + /** + * Sets the {@link MeterProvider} to use to generate SDK Span + * Metrics. + */ + public SdkTracerProviderBuilder setMeterProvider(Supplier meterProvider) { + requireNonNull(meterProvider, "meterProvider"); + this.meterProvider = meterProvider; + return this; + } + /** * Create a new {@link SdkTracerProvider} instance with the configuration. * @@ -246,7 +259,8 @@ public SdkTracerProvider build() { sampler, spanProcessors, tracerConfiguratorBuilder.build(), - exceptionAttributeResolver); + exceptionAttributeResolver, + meterProvider); } SdkTracerProviderBuilder() {} diff --git a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/TracerSharedState.java b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/TracerSharedState.java index 74d43076b97..b425664aaba 100644 --- a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/TracerSharedState.java +++ b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/TracerSharedState.java @@ -28,6 +28,7 @@ final class TracerSharedState { private final Sampler sampler; private final SpanProcessor activeSpanProcessor; private final ExceptionAttributeResolver exceptionAttributeResolver; + private final SdkTracerMetrics tracerMetrics; @Nullable private volatile CompletableResultCode shutdownResult = null; @@ -38,7 +39,8 @@ final class TracerSharedState { Supplier spanLimitsSupplier, Sampler sampler, List spanProcessors, - ExceptionAttributeResolver exceptionAttributeResolver) { + ExceptionAttributeResolver exceptionAttributeResolver, + SdkTracerMetrics tracerMetrics) { this.clock = clock; this.idGenerator = idGenerator; this.idGeneratorSafeToSkipIdValidation = idGenerator instanceof RandomIdGenerator; @@ -47,6 +49,7 @@ final class TracerSharedState { this.sampler = sampler; this.activeSpanProcessor = SpanProcessor.composite(spanProcessors); this.exceptionAttributeResolver = exceptionAttributeResolver; + this.tracerMetrics = tracerMetrics; } Clock getClock() { @@ -98,6 +101,10 @@ ExceptionAttributeResolver getExceptionAttributesResolver() { return exceptionAttributeResolver; } + SdkTracerMetrics getTracerMetrics() { + return tracerMetrics; + } + /** * Stops tracing, including shutting down processors and set to {@code true} {@link * #hasBeenShutdown()}. diff --git a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java index cdedb5abb1c..63d951a2eb5 100644 --- a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java +++ b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java @@ -5,13 +5,11 @@ package io.opentelemetry.sdk.trace.export; -import io.opentelemetry.api.common.AttributeKey; -import io.opentelemetry.api.common.Attributes; -import io.opentelemetry.api.metrics.LongCounter; -import io.opentelemetry.api.metrics.Meter; import io.opentelemetry.api.metrics.MeterProvider; import io.opentelemetry.context.Context; import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.common.InternalTelemetryVersion; +import io.opentelemetry.sdk.internal.ComponentId; import io.opentelemetry.sdk.internal.DaemonThreadFactory; import io.opentelemetry.sdk.internal.ThrowableUtil; import io.opentelemetry.sdk.trace.ReadWriteSpan; @@ -29,6 +27,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; import java.util.logging.Level; import java.util.logging.Logger; @@ -43,15 +42,13 @@ */ public final class BatchSpanProcessor implements SpanProcessor { + private static final ComponentId COMPONENT_ID = + ComponentId.generateLazy("batching_span_processor"); + private static final Logger logger = Logger.getLogger(BatchSpanProcessor.class.getName()); private static final String WORKER_THREAD_NAME = BatchSpanProcessor.class.getSimpleName() + "_WorkerThread"; - private static final AttributeKey SPAN_PROCESSOR_TYPE_LABEL = - AttributeKey.stringKey("processorType"); - private static final AttributeKey SPAN_PROCESSOR_DROPPED_LABEL = - AttributeKey.booleanKey("dropped"); - private static final String SPAN_PROCESSOR_TYPE_VALUE = BatchSpanProcessor.class.getSimpleName(); private final boolean exportUnsampledSpans; private final Worker worker; @@ -71,7 +68,8 @@ public static BatchSpanProcessorBuilder builder(SpanExporter spanExporter) { BatchSpanProcessor( SpanExporter spanExporter, boolean exportUnsampledSpans, - MeterProvider meterProvider, + Supplier meterProvider, + InternalTelemetryVersion telemetryVersion, long scheduleDelayNanos, int maxQueueSize, int maxExportBatchSize, @@ -81,10 +79,12 @@ public static BatchSpanProcessorBuilder builder(SpanExporter spanExporter) { new Worker( spanExporter, meterProvider, + telemetryVersion, scheduleDelayNanos, maxExportBatchSize, exporterTimeoutNanos, - JcTools.newFixedSizeQueue(maxQueueSize)); + JcTools.newFixedSizeQueue(maxQueueSize), + maxQueueSize); Thread workerThread = new DaemonThreadFactory(WORKER_THREAD_NAME).newThread(worker); workerThread.start(); } @@ -161,9 +161,7 @@ public String toString() { // the data. private static final class Worker implements Runnable { - private final LongCounter processedSpansCounter; - private final Attributes droppedAttrs; - private final Attributes exportedAttrs; + private final SpanProcessorInstrumentation spanProcessorInstrumentation; private final SpanExporter spanExporter; private final long scheduleDelayNanos; @@ -185,58 +183,35 @@ private static final class Worker implements Runnable { private final AtomicReference flushRequested = new AtomicReference<>(); private volatile boolean continueWork = true; private final ArrayList batch; + private final long maxQueueSize; private Worker( SpanExporter spanExporter, - MeterProvider meterProvider, + Supplier meterProvider, + InternalTelemetryVersion telemetryVersion, long scheduleDelayNanos, int maxExportBatchSize, long exporterTimeoutNanos, - Queue queue) { + Queue queue, + long maxQueueSize) { this.spanExporter = spanExporter; this.scheduleDelayNanos = scheduleDelayNanos; this.maxExportBatchSize = maxExportBatchSize; this.exporterTimeoutNanos = exporterTimeoutNanos; this.queue = queue; this.signal = new ArrayBlockingQueue<>(1); - Meter meter = meterProvider.meterBuilder("io.opentelemetry.sdk.trace").build(); - meter - .gaugeBuilder("queueSize") - .ofLongs() - .setDescription("The number of items queued") - .setUnit("1") - .buildWithCallback( - result -> - result.record( - queue.size(), - Attributes.of(SPAN_PROCESSOR_TYPE_LABEL, SPAN_PROCESSOR_TYPE_VALUE))); - processedSpansCounter = - meter - .counterBuilder("processedSpans") - .setUnit("1") - .setDescription( - "The number of spans processed by the BatchSpanProcessor. " - + "[dropped=true if they were dropped due to high throughput]") - .build(); - droppedAttrs = - Attributes.of( - SPAN_PROCESSOR_TYPE_LABEL, - SPAN_PROCESSOR_TYPE_VALUE, - SPAN_PROCESSOR_DROPPED_LABEL, - true); - exportedAttrs = - Attributes.of( - SPAN_PROCESSOR_TYPE_LABEL, - SPAN_PROCESSOR_TYPE_VALUE, - SPAN_PROCESSOR_DROPPED_LABEL, - false); + + spanProcessorInstrumentation = + SpanProcessorInstrumentation.get(telemetryVersion, COMPONENT_ID, meterProvider); + this.maxQueueSize = maxQueueSize; this.batch = new ArrayList<>(this.maxExportBatchSize); } private void addSpan(ReadableSpan span) { + spanProcessorInstrumentation.buildQueueMetricsOnce(maxQueueSize, queue::size); if (!queue.offer(span)) { - processedSpansCounter.add(1, droppedAttrs); + spanProcessorInstrumentation.dropSpans(1); } else { if (queueSize.incrementAndGet() >= spansNeeded.get()) { signal.offer(true); @@ -340,18 +315,24 @@ private void exportCurrentBatch() { return; } + String error = null; try { CompletableResultCode result = spanExporter.export(Collections.unmodifiableList(batch)); result.join(exporterTimeoutNanos, TimeUnit.NANOSECONDS); - if (result.isSuccess()) { - processedSpansCounter.add(batch.size(), exportedAttrs); - } else { + if (!result.isSuccess()) { logger.log(Level.FINE, "Exporter failed"); + if (result.getFailureThrowable() != null) { + error = result.getFailureThrowable().getClass().getName(); + } else { + error = "export_failed"; + } } } catch (Throwable t) { ThrowableUtil.propagateIfFatal(t); logger.log(Level.WARNING, "Exporter threw an Exception", t); + error = t.getClass().getName(); } finally { + spanProcessorInstrumentation.finishSpans(batch.size(), error); batch.clear(); } } diff --git a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorBuilder.java b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorBuilder.java index d89083ded06..e584d727153 100644 --- a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorBuilder.java +++ b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorBuilder.java @@ -9,8 +9,10 @@ import static java.util.Objects.requireNonNull; import io.opentelemetry.api.metrics.MeterProvider; +import io.opentelemetry.sdk.common.InternalTelemetryVersion; import java.time.Duration; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; import java.util.logging.Level; import java.util.logging.Logger; @@ -33,7 +35,8 @@ public final class BatchSpanProcessorBuilder { private int maxQueueSize = DEFAULT_MAX_QUEUE_SIZE; private int maxExportBatchSize = DEFAULT_MAX_EXPORT_BATCH_SIZE; private long exporterTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(DEFAULT_EXPORT_TIMEOUT_MILLIS); - private MeterProvider meterProvider = MeterProvider.noop(); + private Supplier meterProvider = MeterProvider::noop; + private InternalTelemetryVersion telemetryVersion = InternalTelemetryVersion.LEGACY; BatchSpanProcessorBuilder(SpanExporter spanExporter) { this.spanExporter = requireNonNull(spanExporter, "spanExporter"); @@ -144,11 +147,29 @@ public BatchSpanProcessorBuilder setMaxExportBatchSize(int maxExportBatchSize) { * metrics will not be collected. */ public BatchSpanProcessorBuilder setMeterProvider(MeterProvider meterProvider) { + requireNonNull(meterProvider, "meterProvider"); + this.meterProvider = () -> meterProvider; + return this; + } + + /** + * Sets the {@link MeterProvider} to use to collect metrics related to batch export. If not set, + * metrics will not be collected. + */ + public BatchSpanProcessorBuilder setMeterProvider(Supplier meterProvider) { requireNonNull(meterProvider, "meterProvider"); this.meterProvider = meterProvider; return this; } + /** Sets the {@link InternalTelemetryVersion} defining which metrics this processor records. */ + public BatchSpanProcessorBuilder setInternalTelemetryVersion( + InternalTelemetryVersion telemetryVersion) { + requireNonNull(telemetryVersion, "telemetryVersion"); + this.telemetryVersion = telemetryVersion; + return this; + } + // Visible for testing int getMaxExportBatchSize() { return maxExportBatchSize; @@ -172,6 +193,7 @@ public BatchSpanProcessor build() { spanExporter, exportUnsampledSpans, meterProvider, + telemetryVersion, scheduleDelayNanos, maxQueueSize, maxExportBatchSize, diff --git a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/LegacySpanProcessorInstrumentation.java b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/LegacySpanProcessorInstrumentation.java new file mode 100644 index 00000000000..bdee2dedcaf --- /dev/null +++ b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/LegacySpanProcessorInstrumentation.java @@ -0,0 +1,112 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.trace.export; + +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.LongCounter; +import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.api.metrics.MeterProvider; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; +import javax.annotation.Nullable; + +/** Span processor metrics defined before they were standardized in semconv. */ +final class LegacySpanProcessorInstrumentation implements SpanProcessorInstrumentation { + private static final AttributeKey SPAN_PROCESSOR_TYPE_LABEL = + AttributeKey.stringKey("processorType"); + private static final AttributeKey SPAN_PROCESSOR_DROPPED_LABEL = + AttributeKey.booleanKey("dropped"); + // Legacy metrics are only created for batch span processor. + private static final String SPAN_PROCESSOR_TYPE_VALUE = BatchSpanProcessor.class.getSimpleName(); + + private final Object lock = new Object(); + private final AtomicBoolean builtQueueMetrics = new AtomicBoolean(false); + + private final Supplier meterProvider; + private final Attributes standardAttrs; + private final Attributes droppedAttrs; + + @Nullable private Meter meter; + @Nullable private volatile LongCounter processedSpans; + + LegacySpanProcessorInstrumentation(Supplier meterProvider) { + this.meterProvider = meterProvider; + + standardAttrs = + Attributes.of( + SPAN_PROCESSOR_TYPE_LABEL, + SPAN_PROCESSOR_TYPE_VALUE, + SPAN_PROCESSOR_DROPPED_LABEL, + false); + droppedAttrs = + Attributes.of( + SPAN_PROCESSOR_TYPE_LABEL, + SPAN_PROCESSOR_TYPE_VALUE, + SPAN_PROCESSOR_DROPPED_LABEL, + true); + } + + @Override + public void dropSpans(int count) { + processedSpans().add(count, droppedAttrs); + } + + @Override + public void finishSpans(int count, @Nullable String error) { + // Legacy metrics only record when no error. + if (error != null) { + processedSpans().add(count, standardAttrs); + } + } + + @Override + public void buildQueueMetricsOnce(long unusedCapacity, LongCallable getSize) { + if (!builtQueueMetrics.compareAndSet(false, true)) { + return; + } + meter() + .gaugeBuilder("queueSize") + .ofLongs() + .setDescription("The number of items queued") + .setUnit("1") + .buildWithCallback( + result -> + result.record( + getSize.get(), + Attributes.of(SPAN_PROCESSOR_TYPE_LABEL, SPAN_PROCESSOR_TYPE_VALUE))); + // No capacity metric when legacy. + } + + private LongCounter processedSpans() { + LongCounter processedSpans = this.processedSpans; + if (processedSpans == null) { + synchronized (lock) { + processedSpans = this.processedSpans; + if (processedSpans == null) { + processedSpans = + meter() + .counterBuilder("processedSpans") + .setUnit("1") + .setDescription( + "The number of spans processed by the BatchSpanProcessor. " + + "[dropped=true if they were dropped due to high throughput]") + .build(); + this.processedSpans = processedSpans; + } + } + } + return processedSpans; + } + + private Meter meter() { + if (meter == null) { + // Safe to call from multiple threads. + meter = meterProvider.get().get("io.opentelemetry.sdk.trace"); + } + return meter; + } +} diff --git a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/LongCallable.java b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/LongCallable.java new file mode 100644 index 00000000000..2f7e03d40b4 --- /dev/null +++ b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/LongCallable.java @@ -0,0 +1,13 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.trace.export; + +/** A Callable returning a primitive long value. */ +@FunctionalInterface +interface LongCallable { + /** Returns the value. */ + long get(); +} diff --git a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/SemConvSpanProcessorInstrumentation.java b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/SemConvSpanProcessorInstrumentation.java new file mode 100644 index 00000000000..a59ca5ff381 --- /dev/null +++ b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/SemConvSpanProcessorInstrumentation.java @@ -0,0 +1,118 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.trace.export; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.LongCounter; +import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.api.metrics.MeterProvider; +import io.opentelemetry.sdk.internal.ComponentId; +import io.opentelemetry.sdk.internal.SemConvAttributes; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; +import javax.annotation.Nullable; + +/** + * SDK metrics exported for span processors as defined in the semantic + * conventions. + */ +final class SemConvSpanProcessorInstrumentation implements SpanProcessorInstrumentation { + + private final Object lock = new Object(); + private final AtomicBoolean builtQueueMetrics = new AtomicBoolean(false); + + private final Supplier meterProvider; + private final Attributes standardAttrs; + private final Attributes droppedAttrs; + + @Nullable private Meter meter; + @Nullable private volatile LongCounter processedSpans; + + SemConvSpanProcessorInstrumentation( + ComponentId componentId, Supplier meterProvider) { + this.meterProvider = meterProvider; + + standardAttrs = + Attributes.of( + SemConvAttributes.OTEL_COMPONENT_TYPE, + componentId.getTypeName(), + SemConvAttributes.OTEL_COMPONENT_NAME, + componentId.getComponentName()); + droppedAttrs = + Attributes.of( + SemConvAttributes.OTEL_COMPONENT_TYPE, + componentId.getTypeName(), + SemConvAttributes.OTEL_COMPONENT_NAME, + componentId.getComponentName(), + SemConvAttributes.ERROR_TYPE, + "queue_full"); + } + + @Override + public void dropSpans(int count) { + processedSpans().add(count, droppedAttrs); + } + + @Override + public void finishSpans(int count, @Nullable String error) { + if (error == null) { + processedSpans().add(count, standardAttrs); + return; + } + + Attributes attributes = + standardAttrs.toBuilder().put(SemConvAttributes.ERROR_TYPE, error).build(); + processedSpans().add(count, attributes); + } + + @Override + public void buildQueueMetricsOnce(long capacity, LongCallable getSize) { + if (!builtQueueMetrics.compareAndSet(false, true)) { + return; + } + meter() + .upDownCounterBuilder("otel.sdk.processor.span.queue.capacity") + .setUnit("span") + .setDescription( + "The maximum number of spans the queue of a given instance of an SDK span processor can hold. ") + .buildWithCallback(m -> m.record(capacity, standardAttrs)); + meter() + .upDownCounterBuilder("otel.sdk.processor.span.queue.size") + .setUnit("span") + .setDescription( + "The number of spans in the queue of a given instance of an SDK span processor.") + .buildWithCallback(m -> m.record(getSize.get(), standardAttrs)); + } + + private LongCounter processedSpans() { + LongCounter processedSpans = this.processedSpans; + if (processedSpans == null) { + synchronized (lock) { + processedSpans = this.processedSpans; + if (processedSpans == null) { + processedSpans = + meter() + .counterBuilder("otel.sdk.processor.span.processed") + .setUnit("span") + .setDescription( + "The number of spans for which the processing has finished, either successful or failed.") + .build(); + this.processedSpans = processedSpans; + } + } + } + return processedSpans; + } + + private Meter meter() { + if (meter == null) { + // Safe to call from multiple threads. + meter = meterProvider.get().get("io.opentelemetry.sdk.trace"); + } + return meter; + } +} diff --git a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/SimpleSpanProcessor.java b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/SimpleSpanProcessor.java index f543e25353e..43f48b731c6 100644 --- a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/SimpleSpanProcessor.java +++ b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/SimpleSpanProcessor.java @@ -7,8 +7,11 @@ import static java.util.Objects.requireNonNull; +import io.opentelemetry.api.metrics.MeterProvider; import io.opentelemetry.context.Context; import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.common.InternalTelemetryVersion; +import io.opentelemetry.sdk.internal.ComponentId; import io.opentelemetry.sdk.trace.ReadWriteSpan; import io.opentelemetry.sdk.trace.ReadableSpan; import io.opentelemetry.sdk.trace.SpanProcessor; @@ -18,6 +21,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; import java.util.logging.Level; import java.util.logging.Logger; @@ -33,6 +37,8 @@ */ public final class SimpleSpanProcessor implements SpanProcessor { + private static final ComponentId COMPONENT_ID = ComponentId.generateLazy("simple_span_processor"); + private static final Logger logger = Logger.getLogger(SimpleSpanProcessor.class.getName()); private final SpanExporter spanExporter; @@ -40,6 +46,7 @@ public final class SimpleSpanProcessor implements SpanProcessor { private final Set pendingExports = Collections.newSetFromMap(new ConcurrentHashMap<>()); private final AtomicBoolean isShutdown = new AtomicBoolean(false); + private final SpanProcessorInstrumentation spanProcessorInstrumentation; private final Object exporterLock = new Object(); @@ -68,9 +75,15 @@ public static SimpleSpanProcessorBuilder builder(SpanExporter exporter) { return new SimpleSpanProcessorBuilder(exporter); } - SimpleSpanProcessor(SpanExporter spanExporter, boolean exportUnsampledSpans) { + SimpleSpanProcessor( + SpanExporter spanExporter, + boolean exportUnsampledSpans, + Supplier meterProvider) { this.spanExporter = requireNonNull(spanExporter, "spanExporter"); this.exportUnsampledSpans = exportUnsampledSpans; + spanProcessorInstrumentation = + SpanProcessorInstrumentation.get( + InternalTelemetryVersion.LATEST, COMPONENT_ID, meterProvider); } @Override @@ -98,9 +111,16 @@ public void onEnd(ReadableSpan span) { result.whenComplete( () -> { pendingExports.remove(result); + String error = null; if (!result.isSuccess()) { logger.log(Level.FINE, "Exporter failed"); + if (result.getFailureThrowable() != null) { + error = result.getFailureThrowable().getClass().getName(); + } else { + error = "export_failed"; + } } + spanProcessorInstrumentation.finishSpans(1, error); }); } catch (RuntimeException e) { logger.log(Level.WARNING, "Exporter threw an Exception", e); diff --git a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/SimpleSpanProcessorBuilder.java b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/SimpleSpanProcessorBuilder.java index de9f3f9152a..f98e3b032ba 100644 --- a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/SimpleSpanProcessorBuilder.java +++ b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/SimpleSpanProcessorBuilder.java @@ -7,6 +7,9 @@ import static java.util.Objects.requireNonNull; +import io.opentelemetry.api.metrics.MeterProvider; +import java.util.function.Supplier; + /** * Builder class for {@link SimpleSpanProcessor}. * @@ -14,6 +17,7 @@ */ public final class SimpleSpanProcessorBuilder { private final SpanExporter spanExporter; + private Supplier meterProvider = MeterProvider::noop; private boolean exportUnsampledSpans = false; SimpleSpanProcessorBuilder(SpanExporter spanExporter) { @@ -29,12 +33,25 @@ public SimpleSpanProcessorBuilder setExportUnsampledSpans(boolean exportUnsample return this; } + /** + * Sets the {@link MeterProvider} to use to generate SDK Span + * Metrics. + */ + public SimpleSpanProcessorBuilder setMeterProvider(Supplier meterProvider) { + requireNonNull(meterProvider, "meterProvider"); + this.meterProvider = meterProvider; + return this; + } + + // TODO: add `setInternalTelemetryVersion when we support more than one version + /** * Returns a new {@link SimpleSpanProcessor} with the configuration of this builder. * * @return a new {@link SimpleSpanProcessor}. */ public SimpleSpanProcessor build() { - return new SimpleSpanProcessor(spanExporter, exportUnsampledSpans); + return new SimpleSpanProcessor(spanExporter, exportUnsampledSpans, meterProvider); } } diff --git a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/SpanProcessorInstrumentation.java b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/SpanProcessorInstrumentation.java new file mode 100644 index 00000000000..5b9af2cc92a --- /dev/null +++ b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/SpanProcessorInstrumentation.java @@ -0,0 +1,37 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.trace.export; + +import io.opentelemetry.api.metrics.MeterProvider; +import io.opentelemetry.sdk.common.InternalTelemetryVersion; +import io.opentelemetry.sdk.internal.ComponentId; +import java.util.function.Supplier; +import javax.annotation.Nullable; + +/** Metrics exported by span processors. */ +interface SpanProcessorInstrumentation { + + static SpanProcessorInstrumentation get( + InternalTelemetryVersion telemetryVersion, + ComponentId componentId, + Supplier meterProvider) { + switch (telemetryVersion) { + case LEGACY: + return new LegacySpanProcessorInstrumentation(meterProvider); + default: + return new SemConvSpanProcessorInstrumentation(componentId, meterProvider); + } + } + + /** Records metrics for spans dropped because a queue is full. */ + void dropSpans(int count); + + /** Record metrics for spans processed, possibly with an error. */ + void finishSpans(int count, @Nullable String error); + + /** Registers metrics for processor queue capacity and size. */ + void buildQueueMetricsOnce(long capacity, LongCallable getSize); +} diff --git a/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/SdkSpanTest.java b/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/SdkSpanTest.java index c9d546f5486..262bd10a63c 100644 --- a/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/SdkSpanTest.java +++ b/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/SdkSpanTest.java @@ -1040,7 +1040,8 @@ void addLink_FaultIn() { null, null, // exercises the fault-in path 0, - 0); + 0, + () -> {}); SdkSpan linkedSpan = createTestSpan(SpanKind.INTERNAL); span.addLink(linkedSpan.getSpanContext()); @@ -1386,7 +1387,8 @@ void onStartOnEndNotRequired() { spanLimits.getMaxNumberOfAttributes(), spanLimits.getMaxAttributeValueLength()), Collections.emptyList(), 1, - 0); + 0, + () -> {}); verify(spanProcessor, never()).onStart(any(), any()); span.end(); @@ -1524,7 +1526,8 @@ private SdkSpan createTestSpan( attributes, linksCopy, linksCopy.size(), - 0); + 0, + () -> {}); Mockito.verify(spanProcessor, Mockito.times(1)).onStart(Context.root(), span); return span; } @@ -1612,7 +1615,8 @@ void testAsSpanData() { attributesWithCapacity, singletonList(link1), 1, - 0); + 0, + () -> {}); long startEpochNanos = clock.now(); clock.advance(Duration.ofMillis(4)); long firstEventEpochNanos = clock.now(); diff --git a/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/SdkTracerProviderMetricsTest.java b/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/SdkTracerProviderMetricsTest.java new file mode 100644 index 00000000000..db7175e5b34 --- /dev/null +++ b/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/SdkTracerProviderMetricsTest.java @@ -0,0 +1,986 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.trace; + +import static io.opentelemetry.sdk.internal.SemConvAttributes.ERROR_TYPE; +import static io.opentelemetry.sdk.internal.SemConvAttributes.OTEL_COMPONENT_NAME; +import static io.opentelemetry.sdk.internal.SemConvAttributes.OTEL_COMPONENT_TYPE; +import static io.opentelemetry.sdk.internal.SemConvAttributes.OTEL_SPAN_PARENT_ORIGIN; +import static io.opentelemetry.sdk.internal.SemConvAttributes.OTEL_SPAN_SAMPLING_RESULT; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.when; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.MeterProvider; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.api.trace.SpanId; +import io.opentelemetry.api.trace.TraceFlags; +import io.opentelemetry.api.trace.TraceId; +import io.opentelemetry.api.trace.TraceState; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.api.trace.TracerProvider; +import io.opentelemetry.context.Context; +import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.common.InternalTelemetryVersion; +import io.opentelemetry.sdk.metrics.SdkMeterProvider; +import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader; +import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter; +import io.opentelemetry.sdk.trace.export.BatchSpanProcessor; +import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; +import io.opentelemetry.sdk.trace.export.SpanExporter; +import io.opentelemetry.sdk.trace.samplers.Sampler; +import io.opentelemetry.sdk.trace.samplers.SamplingDecision; +import io.opentelemetry.sdk.trace.samplers.SamplingResult; +import java.time.Duration; +import java.util.concurrent.TimeUnit; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class SdkTracerProviderMetricsTest { + + @Mock private Sampler sampler; + @Mock private SpanExporter mockExporter; + + @Test + void simple() { + InMemoryMetricReader metricReader = InMemoryMetricReader.create(); + MeterProvider meterProvider = + SdkMeterProvider.builder().registerMetricReader(metricReader).build(); + + InMemorySpanExporter exporter = InMemorySpanExporter.create(); + TracerProvider tracerProvider = + SdkTracerProvider.builder() + .addSpanProcessor( + SimpleSpanProcessor.builder(exporter).setMeterProvider(() -> meterProvider).build()) + .setMeterProvider(() -> meterProvider) + .setSampler(sampler) + .build(); + + Tracer tracer = tracerProvider.get("test"); + + setSamplingDecision(SamplingDecision.RECORD_AND_SAMPLE); + Span span = tracer.spanBuilder("span").startSpan(); + assertThat(metricReader.collectAllMetrics()) + .satisfiesExactlyInAnyOrder( + m -> + assertThat(m) + .hasName("otel.sdk.span.started") + .hasLongSumSatisfying( + s -> + s.hasPointsSatisfying( + p -> + p.hasValue(1) + .hasAttributes( + Attributes.of( + OTEL_SPAN_PARENT_ORIGIN, + "none", + OTEL_SPAN_SAMPLING_RESULT, + "RECORD_AND_SAMPLE")))), + m -> + assertThat(m) + .hasName("otel.sdk.span.live") + .hasLongSumSatisfying( + s -> + s.hasPointsSatisfying( + p -> + p.hasValue(1) + .hasAttributes( + Attributes.of( + OTEL_SPAN_SAMPLING_RESULT, "RECORD_AND_SAMPLE"))))); + span.end(); + assertThat(metricReader.collectAllMetrics()) + .satisfiesExactlyInAnyOrder( + m -> + assertThat(m) + .hasName("otel.sdk.processor.span.processed") + .hasLongSumSatisfying( + s -> + s.hasPointsSatisfying( + p -> + p.hasValue(1) + .hasAttributes( + Attributes.of( + OTEL_COMPONENT_NAME, + "simple_span_processor/0", + OTEL_COMPONENT_TYPE, + "simple_span_processor")))), + m -> + assertThat(m) + .hasName("otel.sdk.span.started") + .hasLongSumSatisfying( + s -> + s.hasPointsSatisfying( + p -> + p.hasValue(1) + .hasAttributes( + Attributes.of( + OTEL_SPAN_PARENT_ORIGIN, + "none", + OTEL_SPAN_SAMPLING_RESULT, + "RECORD_AND_SAMPLE")))), + m -> + assertThat(m) + .hasName("otel.sdk.span.live") + .hasLongSumSatisfying( + s -> + s.hasPointsSatisfying( + p -> + p.hasValue(0) + .hasAttributes( + Attributes.of( + OTEL_SPAN_SAMPLING_RESULT, "RECORD_AND_SAMPLE"))))); + + setSamplingDecision(SamplingDecision.RECORD_ONLY); + span = tracer.spanBuilder("span").startSpan(); + assertThat(metricReader.collectAllMetrics()) + .satisfiesExactlyInAnyOrder( + m -> + assertThat(m) + .hasName("otel.sdk.processor.span.processed") + .hasLongSumSatisfying( + s -> + s.hasPointsSatisfying( + p -> + p.hasValue(1) + .hasAttributes( + Attributes.of( + OTEL_COMPONENT_NAME, + "simple_span_processor/0", + OTEL_COMPONENT_TYPE, + "simple_span_processor")))), + m -> + assertThat(m) + .hasName("otel.sdk.span.started") + .hasLongSumSatisfying( + s -> + s.hasPointsSatisfying( + p -> + p.hasValue(1) + .hasAttributes( + Attributes.of( + OTEL_SPAN_PARENT_ORIGIN, + "none", + OTEL_SPAN_SAMPLING_RESULT, + "RECORD_AND_SAMPLE")), + p -> + p.hasValue(1) + .hasAttributes( + Attributes.of( + OTEL_SPAN_PARENT_ORIGIN, + "none", + OTEL_SPAN_SAMPLING_RESULT, + "RECORD_ONLY")))), + m -> + assertThat(m) + .hasName("otel.sdk.span.live") + .hasLongSumSatisfying( + s -> + s.hasPointsSatisfying( + p -> + p.hasValue(0) + .hasAttributes( + Attributes.of( + OTEL_SPAN_SAMPLING_RESULT, "RECORD_AND_SAMPLE")), + p -> + p.hasValue(1) + .hasAttributes( + Attributes.of( + OTEL_SPAN_SAMPLING_RESULT, "RECORD_ONLY"))))); + span.end(); + assertThat(metricReader.collectAllMetrics()) + .satisfiesExactlyInAnyOrder( + m -> + assertThat(m) + .hasName("otel.sdk.processor.span.processed") + .hasLongSumSatisfying( + s -> + s.hasPointsSatisfying( + p -> + p.hasValue(1) + .hasAttributes( + Attributes.of( + OTEL_COMPONENT_NAME, + "simple_span_processor/0", + OTEL_COMPONENT_TYPE, + "simple_span_processor")))), + m -> + assertThat(m) + .hasName("otel.sdk.span.started") + .hasLongSumSatisfying( + s -> + s.hasPointsSatisfying( + p -> + p.hasValue(1) + .hasAttributes( + Attributes.of( + OTEL_SPAN_PARENT_ORIGIN, + "none", + OTEL_SPAN_SAMPLING_RESULT, + "RECORD_AND_SAMPLE")), + p -> + p.hasValue(1) + .hasAttributes( + Attributes.of( + OTEL_SPAN_PARENT_ORIGIN, + "none", + OTEL_SPAN_SAMPLING_RESULT, + "RECORD_ONLY")))), + m -> + assertThat(m) + .hasName("otel.sdk.span.live") + .hasLongSumSatisfying( + s -> + s.hasPointsSatisfying( + p -> + p.hasValue(0) + .hasAttributes( + Attributes.of( + OTEL_SPAN_SAMPLING_RESULT, "RECORD_AND_SAMPLE")), + p -> + p.hasValue(0) + .hasAttributes( + Attributes.of( + OTEL_SPAN_SAMPLING_RESULT, "RECORD_ONLY"))))); + + setSamplingDecision(SamplingDecision.DROP); + span = tracer.spanBuilder("span").startSpan(); + assertThat(metricReader.collectAllMetrics()) + .satisfiesExactlyInAnyOrder( + m -> + assertThat(m) + .hasName("otel.sdk.processor.span.processed") + .hasLongSumSatisfying( + s -> + s.hasPointsSatisfying( + p -> + p.hasValue(1) + .hasAttributes( + Attributes.of( + OTEL_COMPONENT_NAME, + "simple_span_processor/0", + OTEL_COMPONENT_TYPE, + "simple_span_processor")))), + m -> + assertThat(m) + .hasName("otel.sdk.span.started") + .hasLongSumSatisfying( + s -> + s.hasPointsSatisfying( + p -> + p.hasValue(1) + .hasAttributes( + Attributes.of( + OTEL_SPAN_PARENT_ORIGIN, + "none", + OTEL_SPAN_SAMPLING_RESULT, + "RECORD_AND_SAMPLE")), + p -> + p.hasValue(1) + .hasAttributes( + Attributes.of( + OTEL_SPAN_PARENT_ORIGIN, + "none", + OTEL_SPAN_SAMPLING_RESULT, + "RECORD_ONLY")), + p -> + p.hasValue(1) + .hasAttributes( + Attributes.of( + OTEL_SPAN_PARENT_ORIGIN, + "none", + OTEL_SPAN_SAMPLING_RESULT, + "DROP")))), + m -> + assertThat(m) + .hasName("otel.sdk.span.live") + .hasLongSumSatisfying( + s -> + s.hasPointsSatisfying( + p -> + p.hasValue(0) + .hasAttributes( + Attributes.of( + OTEL_SPAN_SAMPLING_RESULT, "RECORD_AND_SAMPLE")), + p -> + p.hasValue(0) + .hasAttributes( + Attributes.of( + OTEL_SPAN_SAMPLING_RESULT, "RECORD_ONLY"))))); + span.end(); + assertThat(metricReader.collectAllMetrics()) + .satisfiesExactlyInAnyOrder( + m -> + assertThat(m) + .hasName("otel.sdk.processor.span.processed") + .hasLongSumSatisfying( + s -> + s.hasPointsSatisfying( + p -> + p.hasValue(1) + .hasAttributes( + Attributes.of( + OTEL_COMPONENT_NAME, + "simple_span_processor/0", + OTEL_COMPONENT_TYPE, + "simple_span_processor")))), + m -> + assertThat(m) + .hasName("otel.sdk.span.started") + .hasLongSumSatisfying( + s -> + s.hasPointsSatisfying( + p -> + p.hasValue(1) + .hasAttributes( + Attributes.of( + OTEL_SPAN_PARENT_ORIGIN, + "none", + OTEL_SPAN_SAMPLING_RESULT, + "RECORD_AND_SAMPLE")), + p -> + p.hasValue(1) + .hasAttributes( + Attributes.of( + OTEL_SPAN_PARENT_ORIGIN, + "none", + OTEL_SPAN_SAMPLING_RESULT, + "RECORD_ONLY")), + p -> + p.hasValue(1) + .hasAttributes( + Attributes.of( + OTEL_SPAN_PARENT_ORIGIN, + "none", + OTEL_SPAN_SAMPLING_RESULT, + "DROP")))), + m -> + assertThat(m) + .hasName("otel.sdk.span.live") + .hasLongSumSatisfying( + s -> + s.hasPointsSatisfying( + p -> + p.hasValue(0) + .hasAttributes( + Attributes.of( + OTEL_SPAN_SAMPLING_RESULT, "RECORD_AND_SAMPLE")), + p -> + p.hasValue(0) + .hasAttributes( + Attributes.of( + OTEL_SPAN_SAMPLING_RESULT, "RECORD_ONLY"))))); + + span = + tracer + .spanBuilder("span") + .setParent( + Context.root() + .with( + Span.wrap( + SpanContext.create( + TraceId.fromLongs(1, 2), + SpanId.fromLong(3), + TraceFlags.getDefault(), + TraceState.getDefault())))) + .startSpan(); + assertThat(metricReader.collectAllMetrics()) + .satisfiesExactlyInAnyOrder( + m -> + assertThat(m) + .hasName("otel.sdk.processor.span.processed") + .hasLongSumSatisfying( + s -> + s.hasPointsSatisfying( + p -> + p.hasValue(1) + .hasAttributes( + Attributes.of( + OTEL_COMPONENT_NAME, + "simple_span_processor/0", + OTEL_COMPONENT_TYPE, + "simple_span_processor")))), + m -> + assertThat(m) + .hasName("otel.sdk.span.started") + .hasLongSumSatisfying( + s -> + s.hasPointsSatisfying( + p -> + p.hasValue(1) + .hasAttributes( + Attributes.of( + OTEL_SPAN_PARENT_ORIGIN, + "none", + OTEL_SPAN_SAMPLING_RESULT, + "RECORD_AND_SAMPLE")), + p -> + p.hasValue(1) + .hasAttributes( + Attributes.of( + OTEL_SPAN_PARENT_ORIGIN, + "none", + OTEL_SPAN_SAMPLING_RESULT, + "RECORD_ONLY")), + p -> + p.hasValue(1) + .hasAttributes( + Attributes.of( + OTEL_SPAN_PARENT_ORIGIN, + "none", + OTEL_SPAN_SAMPLING_RESULT, + "DROP")), + p -> + p.hasValue(1) + .hasAttributes( + Attributes.of( + OTEL_SPAN_PARENT_ORIGIN, + "local", + OTEL_SPAN_SAMPLING_RESULT, + "DROP")))), + m -> + assertThat(m) + .hasName("otel.sdk.span.live") + .hasLongSumSatisfying( + s -> + s.hasPointsSatisfying( + p -> + p.hasValue(0) + .hasAttributes( + Attributes.of( + OTEL_SPAN_SAMPLING_RESULT, "RECORD_AND_SAMPLE")), + p -> + p.hasValue(0) + .hasAttributes( + Attributes.of( + OTEL_SPAN_SAMPLING_RESULT, "RECORD_ONLY"))))); + span.end(); + assertThat(metricReader.collectAllMetrics()) + .satisfiesExactlyInAnyOrder( + m -> + assertThat(m) + .hasName("otel.sdk.processor.span.processed") + .hasLongSumSatisfying( + s -> + s.hasPointsSatisfying( + p -> + p.hasValue(1) + .hasAttributes( + Attributes.of( + OTEL_COMPONENT_NAME, + "simple_span_processor/0", + OTEL_COMPONENT_TYPE, + "simple_span_processor")))), + m -> + assertThat(m) + .hasName("otel.sdk.span.started") + .hasLongSumSatisfying( + s -> + s.hasPointsSatisfying( + p -> + p.hasValue(1) + .hasAttributes( + Attributes.of( + OTEL_SPAN_PARENT_ORIGIN, + "none", + OTEL_SPAN_SAMPLING_RESULT, + "RECORD_AND_SAMPLE")), + p -> + p.hasValue(1) + .hasAttributes( + Attributes.of( + OTEL_SPAN_PARENT_ORIGIN, + "none", + OTEL_SPAN_SAMPLING_RESULT, + "RECORD_ONLY")), + p -> + p.hasValue(1) + .hasAttributes( + Attributes.of( + OTEL_SPAN_PARENT_ORIGIN, + "none", + OTEL_SPAN_SAMPLING_RESULT, + "DROP")), + p -> + p.hasValue(1) + .hasAttributes( + Attributes.of( + OTEL_SPAN_PARENT_ORIGIN, + "local", + OTEL_SPAN_SAMPLING_RESULT, + "DROP")))), + m -> + assertThat(m) + .hasName("otel.sdk.span.live") + .hasLongSumSatisfying( + s -> + s.hasPointsSatisfying( + p -> + p.hasValue(0) + .hasAttributes( + Attributes.of( + OTEL_SPAN_SAMPLING_RESULT, "RECORD_AND_SAMPLE")), + p -> + p.hasValue(0) + .hasAttributes( + Attributes.of( + OTEL_SPAN_SAMPLING_RESULT, "RECORD_ONLY"))))); + + setSamplingDecision(SamplingDecision.RECORD_AND_SAMPLE); + span = + tracer + .spanBuilder("span") + .setParent( + Context.root() + .with( + Span.wrap( + SpanContext.createFromRemoteParent( + TraceId.fromLongs(1, 2), + SpanId.fromLong(3), + TraceFlags.getDefault(), + TraceState.getDefault())))) + .startSpan(); + assertThat(metricReader.collectAllMetrics()) + .satisfiesExactlyInAnyOrder( + m -> + assertThat(m) + .hasName("otel.sdk.processor.span.processed") + .hasLongSumSatisfying( + s -> + s.hasPointsSatisfying( + p -> + p.hasValue(1) + .hasAttributes( + Attributes.of( + OTEL_COMPONENT_NAME, + "simple_span_processor/0", + OTEL_COMPONENT_TYPE, + "simple_span_processor")))), + m -> + assertThat(m) + .hasName("otel.sdk.span.started") + .hasLongSumSatisfying( + s -> + s.hasPointsSatisfying( + p -> + p.hasValue(1) + .hasAttributes( + Attributes.of( + OTEL_SPAN_PARENT_ORIGIN, + "none", + OTEL_SPAN_SAMPLING_RESULT, + "RECORD_AND_SAMPLE")), + p -> + p.hasValue(1) + .hasAttributes( + Attributes.of( + OTEL_SPAN_PARENT_ORIGIN, + "remote", + OTEL_SPAN_SAMPLING_RESULT, + "RECORD_AND_SAMPLE")), + p -> + p.hasValue(1) + .hasAttributes( + Attributes.of( + OTEL_SPAN_PARENT_ORIGIN, + "none", + OTEL_SPAN_SAMPLING_RESULT, + "RECORD_ONLY")), + p -> + p.hasValue(1) + .hasAttributes( + Attributes.of( + OTEL_SPAN_PARENT_ORIGIN, + "none", + OTEL_SPAN_SAMPLING_RESULT, + "DROP")), + p -> + p.hasValue(1) + .hasAttributes( + Attributes.of( + OTEL_SPAN_PARENT_ORIGIN, + "local", + OTEL_SPAN_SAMPLING_RESULT, + "DROP")))), + m -> + assertThat(m) + .hasName("otel.sdk.span.live") + .hasLongSumSatisfying( + s -> + s.hasPointsSatisfying( + p -> + p.hasValue(1) + .hasAttributes( + Attributes.of( + OTEL_SPAN_SAMPLING_RESULT, "RECORD_AND_SAMPLE")), + p -> + p.hasValue(0) + .hasAttributes( + Attributes.of( + OTEL_SPAN_SAMPLING_RESULT, "RECORD_ONLY"))))); + span.end(); + assertThat(metricReader.collectAllMetrics()) + .satisfiesExactlyInAnyOrder( + m -> + assertThat(m) + .hasName("otel.sdk.processor.span.processed") + .hasLongSumSatisfying( + s -> + s.hasPointsSatisfying( + p -> + p.hasValue(2) + .hasAttributes( + Attributes.of( + OTEL_COMPONENT_NAME, + "simple_span_processor/0", + OTEL_COMPONENT_TYPE, + "simple_span_processor")))), + m -> + assertThat(m) + .hasName("otel.sdk.span.started") + .hasLongSumSatisfying( + s -> + s.hasPointsSatisfying( + p -> + p.hasValue(1) + .hasAttributes( + Attributes.of( + OTEL_SPAN_PARENT_ORIGIN, + "none", + OTEL_SPAN_SAMPLING_RESULT, + "RECORD_AND_SAMPLE")), + p -> + p.hasValue(1) + .hasAttributes( + Attributes.of( + OTEL_SPAN_PARENT_ORIGIN, + "remote", + OTEL_SPAN_SAMPLING_RESULT, + "RECORD_AND_SAMPLE")), + p -> + p.hasValue(1) + .hasAttributes( + Attributes.of( + OTEL_SPAN_PARENT_ORIGIN, + "none", + OTEL_SPAN_SAMPLING_RESULT, + "RECORD_ONLY")), + p -> + p.hasValue(1) + .hasAttributes( + Attributes.of( + OTEL_SPAN_PARENT_ORIGIN, + "none", + OTEL_SPAN_SAMPLING_RESULT, + "DROP")), + p -> + p.hasValue(1) + .hasAttributes( + Attributes.of( + OTEL_SPAN_PARENT_ORIGIN, + "local", + OTEL_SPAN_SAMPLING_RESULT, + "DROP")))), + m -> + assertThat(m) + .hasName("otel.sdk.span.live") + .hasLongSumSatisfying( + s -> + s.hasPointsSatisfying( + p -> + p.hasValue(0) + .hasAttributes( + Attributes.of( + OTEL_SPAN_SAMPLING_RESULT, "RECORD_AND_SAMPLE")), + p -> + p.hasValue(0) + .hasAttributes( + Attributes.of( + OTEL_SPAN_SAMPLING_RESULT, "RECORD_ONLY"))))); + } + + @Test + void batch() throws Exception { + InMemoryMetricReader metricReader = InMemoryMetricReader.create(); + MeterProvider meterProvider = + SdkMeterProvider.builder().registerMetricReader(metricReader).build(); + + BatchSpanProcessor processor = + BatchSpanProcessor.builder(mockExporter) + .setMaxQueueSize(1) + // Manually flush + .setScheduleDelay(Duration.ofDays(1)) + .setInternalTelemetryVersion(InternalTelemetryVersion.LATEST) + .setMeterProvider(() -> meterProvider) + .build(); + TracerProvider tracerProvider = + SdkTracerProvider.builder() + .addSpanProcessor(processor) + .setMeterProvider(() -> meterProvider) + .setSampler(Sampler.alwaysOn()) + .build(); + + Tracer tracer = tracerProvider.get("test"); + + CompletableResultCode result1 = new CompletableResultCode(); + CompletableResultCode result2 = new CompletableResultCode(); + when(mockExporter.export(any())).thenReturn(result1).thenReturn(result2); + + // Will immediately be processed. + tracer.spanBuilder("span").startSpan().end(); + Thread.sleep(500); // give time to start processing a batch of size 1 + // We haven't completed the export so this span is queued. + tracer.spanBuilder("span").startSpan().end(); + // Queue is full, this span is dropped. + tracer.spanBuilder("span").startSpan().end(); + + assertThat(metricReader.collectAllMetrics()) + .satisfiesExactlyInAnyOrder( + m -> + assertThat(m) + .hasName("otel.sdk.processor.span.queue.capacity") + .hasLongSumSatisfying( + s -> + s.hasPointsSatisfying( + p -> + p.hasValue(1) + .hasAttributes( + Attributes.of( + OTEL_COMPONENT_NAME, + "batching_span_processor/0", + OTEL_COMPONENT_TYPE, + "batching_span_processor")))), + m -> + assertThat(m) + .hasName("otel.sdk.processor.span.queue.size") + .hasLongSumSatisfying( + s -> + s.hasPointsSatisfying( + p -> + p.hasValue(1) + .hasAttributes( + Attributes.of( + OTEL_COMPONENT_NAME, + "batching_span_processor/0", + OTEL_COMPONENT_TYPE, + "batching_span_processor")))), + m -> + assertThat(m) + .hasName("otel.sdk.processor.span.processed") + .hasLongSumSatisfying( + s -> + s.hasPointsSatisfying( + p -> + p.hasValue(1) + .hasAttributes( + Attributes.of( + OTEL_COMPONENT_NAME, + "batching_span_processor/0", + OTEL_COMPONENT_TYPE, + "batching_span_processor", + ERROR_TYPE, + "queue_full")))), + m -> + assertThat(m) + .hasName("otel.sdk.span.started") + .hasLongSumSatisfying( + s -> + s.hasPointsSatisfying( + p -> + p.hasValue(3) + .hasAttributes( + Attributes.of( + OTEL_SPAN_PARENT_ORIGIN, + "none", + OTEL_SPAN_SAMPLING_RESULT, + "RECORD_AND_SAMPLE")))), + m -> + assertThat(m) + .hasName("otel.sdk.span.live") + .hasLongSumSatisfying( + s -> + s.hasPointsSatisfying( + p -> + p.hasValue(0) + .hasAttributes( + Attributes.of( + OTEL_SPAN_SAMPLING_RESULT, "RECORD_AND_SAMPLE"))))); + + result1.succeed(); + result2.fail(); + processor.forceFlush().join(1, TimeUnit.SECONDS); + + assertThat(metricReader.collectAllMetrics()) + .satisfiesExactlyInAnyOrder( + m -> + assertThat(m) + .hasName("otel.sdk.processor.span.queue.capacity") + .hasLongSumSatisfying( + s -> + s.hasPointsSatisfying( + p -> + p.hasValue(1) + .hasAttributes( + Attributes.of( + OTEL_COMPONENT_NAME, + "batching_span_processor/0", + OTEL_COMPONENT_TYPE, + "batching_span_processor")))), + m -> + assertThat(m) + .hasName("otel.sdk.processor.span.queue.size") + .hasLongSumSatisfying( + s -> + s.hasPointsSatisfying( + p -> + p.hasValue(0) + .hasAttributes( + Attributes.of( + OTEL_COMPONENT_NAME, + "batching_span_processor/0", + OTEL_COMPONENT_TYPE, + "batching_span_processor")))), + m -> + assertThat(m) + .hasName("otel.sdk.processor.span.processed") + .hasLongSumSatisfying( + s -> + s.hasPointsSatisfying( + p -> + p.hasValue(1) + .hasAttributes( + Attributes.of( + OTEL_COMPONENT_NAME, + "batching_span_processor/0", + OTEL_COMPONENT_TYPE, + "batching_span_processor")), + p -> + p.hasValue(1) + .hasAttributes( + Attributes.of( + OTEL_COMPONENT_NAME, + "batching_span_processor/0", + OTEL_COMPONENT_TYPE, + "batching_span_processor", + ERROR_TYPE, + "export_failed")), + p -> + p.hasValue(1) + .hasAttributes( + Attributes.of( + OTEL_COMPONENT_NAME, + "batching_span_processor/0", + OTEL_COMPONENT_TYPE, + "batching_span_processor", + ERROR_TYPE, + "queue_full")))), + m -> + assertThat(m) + .hasName("otel.sdk.span.started") + .hasLongSumSatisfying( + s -> + s.hasPointsSatisfying( + p -> + p.hasValue(3) + .hasAttributes( + Attributes.of( + OTEL_SPAN_PARENT_ORIGIN, + "none", + OTEL_SPAN_SAMPLING_RESULT, + "RECORD_AND_SAMPLE")))), + m -> + assertThat(m) + .hasName("otel.sdk.span.live") + .hasLongSumSatisfying( + s -> + s.hasPointsSatisfying( + p -> + p.hasValue(0) + .hasAttributes( + Attributes.of( + OTEL_SPAN_SAMPLING_RESULT, "RECORD_AND_SAMPLE"))))); + + lenient().when(mockExporter.shutdown()).thenReturn(CompletableResultCode.ofSuccess()); + processor.shutdown(); + } + + @Test + void simpleExportError() { + InMemoryMetricReader metricReader = InMemoryMetricReader.create(); + MeterProvider meterProvider = + SdkMeterProvider.builder().registerMetricReader(metricReader).build(); + + TracerProvider tracerProvider = + SdkTracerProvider.builder() + .addSpanProcessor( + SimpleSpanProcessor.builder(mockExporter) + .setMeterProvider(() -> meterProvider) + .build()) + .setMeterProvider(() -> meterProvider) + .setSampler(Sampler.alwaysOn()) + .build(); + + Tracer tracer = tracerProvider.get("test"); + + when(mockExporter.export(any())).thenReturn(CompletableResultCode.ofFailure()); + + tracer.spanBuilder("span").startSpan().end(); + + assertThat(metricReader.collectAllMetrics()) + .satisfiesExactlyInAnyOrder( + m -> + assertThat(m) + .hasName("otel.sdk.processor.span.processed") + .hasLongSumSatisfying( + s -> + s.hasPointsSatisfying( + p -> + p.hasValue(1) + .hasAttributes( + Attributes.of( + OTEL_COMPONENT_NAME, + "simple_span_processor/0", + OTEL_COMPONENT_TYPE, + "simple_span_processor", + ERROR_TYPE, + "export_failed")))), + m -> + assertThat(m) + .hasName("otel.sdk.span.started") + .hasLongSumSatisfying( + s -> + s.hasPointsSatisfying( + p -> + p.hasValue(1) + .hasAttributes( + Attributes.of( + OTEL_SPAN_PARENT_ORIGIN, + "none", + OTEL_SPAN_SAMPLING_RESULT, + "RECORD_AND_SAMPLE")))), + m -> + assertThat(m) + .hasName("otel.sdk.span.live") + .hasLongSumSatisfying( + s -> + s.hasPointsSatisfying( + p -> + p.hasValue(0) + .hasAttributes( + Attributes.of( + OTEL_SPAN_SAMPLING_RESULT, "RECORD_AND_SAMPLE"))))); + } + + private void setSamplingDecision(SamplingDecision decision) { + when(sampler.shouldSample(any(), any(), any(), any(), any(), any())) + .thenReturn(SamplingResult.create(decision)); + } +}