From 16fa0d179983b297522fb4739175ee9b5871e66b Mon Sep 17 00:00:00 2001 From: Anuraag Agrawal Date: Fri, 19 Dec 2025 16:03:24 +0900 Subject: [PATCH 1/2] Implement SDK metrics for logs --- .../opentelemetry-sdk-logs.txt | 16 +- .../sdk/logs/LoggerSharedState.java | 9 +- .../sdk/logs/SdkLogRecordBuilder.java | 2 + .../sdk/logs/SdkLoggerInstrumentation.java | 61 ++++ .../sdk/logs/SdkLoggerProvider.java | 11 +- .../sdk/logs/SdkLoggerProviderBuilder.java | 16 +- .../logs/export/BatchLogRecordProcessor.java | 84 ++--- .../BatchLogRecordProcessorBuilder.java | 24 +- ...gacyLogRecordProcessorInstrumentation.java | 103 ++++++ .../LogRecordProcessorInstrumentation.java | 37 +++ .../sdk/logs/export/LongCallable.java | 13 + ...ConvLogRecordProcessorInstrumentation.java | 118 +++++++ .../logs/export/SimpleLogRecordProcessor.java | 28 +- .../SimpleLogRecordProcessorBuilder.java | 42 +++ .../sdk/logs/LoggerSharedStateTest.java | 4 +- .../sdk/logs/SdkLogRecordBuilderTest.java | 3 + .../logs/SdkLoggerProviderMetricsTest.java | 314 ++++++++++++++++++ .../opentelemetry/sdk/logs/SdkLoggerTest.java | 3 + 18 files changed, 826 insertions(+), 62 deletions(-) create mode 100644 sdk/logs/src/main/java/io/opentelemetry/sdk/logs/SdkLoggerInstrumentation.java create mode 100644 sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/LegacyLogRecordProcessorInstrumentation.java create mode 100644 sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/LogRecordProcessorInstrumentation.java create mode 100644 sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/LongCallable.java create mode 100644 sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/SemConvLogRecordProcessorInstrumentation.java create mode 100644 sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/SimpleLogRecordProcessorBuilder.java create mode 100644 sdk/logs/src/test/java/io/opentelemetry/sdk/logs/SdkLoggerProviderMetricsTest.java diff --git a/docs/apidiffs/current_vs_latest/opentelemetry-sdk-logs.txt b/docs/apidiffs/current_vs_latest/opentelemetry-sdk-logs.txt index 0d1293b45ae..e63195521f2 100644 --- a/docs/apidiffs/current_vs_latest/opentelemetry-sdk-logs.txt +++ b/docs/apidiffs/current_vs_latest/opentelemetry-sdk-logs.txt @@ -1,2 +1,16 @@ Comparing source compatibility of opentelemetry-sdk-logs-1.58.0-SNAPSHOT.jar against opentelemetry-sdk-logs-1.57.0.jar -No changes. \ No newline at end of file +*** MODIFIED CLASS: PUBLIC FINAL io.opentelemetry.sdk.logs.export.BatchLogRecordProcessorBuilder (not serializable) + === CLASS FILE FORMAT VERSION: 52.0 <- 52.0 + +++ NEW METHOD: PUBLIC(+) io.opentelemetry.sdk.logs.export.BatchLogRecordProcessorBuilder setInternalTelemetryVersion(io.opentelemetry.sdk.common.InternalTelemetryVersion) + +++ NEW METHOD: PUBLIC(+) io.opentelemetry.sdk.logs.export.BatchLogRecordProcessorBuilder setMeterProvider(java.util.function.Supplier) +*** MODIFIED CLASS: PUBLIC FINAL io.opentelemetry.sdk.logs.export.SimpleLogRecordProcessor (not serializable) + === CLASS FILE FORMAT VERSION: 52.0 <- 52.0 + +++ NEW METHOD: PUBLIC(+) STATIC(+) io.opentelemetry.sdk.logs.export.SimpleLogRecordProcessorBuilder builder(io.opentelemetry.sdk.logs.export.LogRecordExporter) ++++ NEW CLASS: PUBLIC(+) FINAL(+) io.opentelemetry.sdk.logs.export.SimpleLogRecordProcessorBuilder (not serializable) + +++ CLASS FILE FORMAT VERSION: 52.0 <- n.a. + +++ NEW SUPERCLASS: java.lang.Object + +++ NEW METHOD: PUBLIC(+) io.opentelemetry.sdk.logs.export.SimpleLogRecordProcessor build() + +++ NEW METHOD: PUBLIC(+) io.opentelemetry.sdk.logs.export.SimpleLogRecordProcessorBuilder setMeterProvider(java.util.function.Supplier) +*** MODIFIED CLASS: PUBLIC FINAL io.opentelemetry.sdk.logs.SdkLoggerProviderBuilder (not serializable) + === CLASS FILE FORMAT VERSION: 52.0 <- 52.0 + +++ NEW METHOD: PUBLIC(+) io.opentelemetry.sdk.logs.SdkLoggerProviderBuilder setMeterProvider(java.util.function.Supplier) diff --git a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/LoggerSharedState.java b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/LoggerSharedState.java index 5b40f897a32..7528c745f10 100644 --- a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/LoggerSharedState.java +++ b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/LoggerSharedState.java @@ -23,6 +23,7 @@ final class LoggerSharedState { private final LogRecordProcessor logRecordProcessor; private final Clock clock; private final ExceptionAttributeResolver exceptionAttributeResolver; + private final SdkLoggerInstrumentation loggerInstrumentation; @Nullable private volatile CompletableResultCode shutdownResult = null; LoggerSharedState( @@ -30,12 +31,14 @@ final class LoggerSharedState { Supplier logLimitsSupplier, LogRecordProcessor logRecordProcessor, Clock clock, - ExceptionAttributeResolver exceptionAttributeResolver) { + ExceptionAttributeResolver exceptionAttributeResolver, + SdkLoggerInstrumentation loggerInstrumentation) { this.resource = resource; this.logLimitsSupplier = logLimitsSupplier; this.logRecordProcessor = logRecordProcessor; this.clock = clock; this.exceptionAttributeResolver = exceptionAttributeResolver; + this.loggerInstrumentation = loggerInstrumentation; } Resource getResource() { @@ -58,6 +61,10 @@ ExceptionAttributeResolver getExceptionAttributeResolver() { return exceptionAttributeResolver; } + SdkLoggerInstrumentation getLoggerInstrumentation() { + return loggerInstrumentation; + } + boolean hasBeenShutdown() { return shutdownResult != null; } diff --git a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/SdkLogRecordBuilder.java b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/SdkLogRecordBuilder.java index 35c29a20025..d4a496c1088 100644 --- a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/SdkLogRecordBuilder.java +++ b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/SdkLogRecordBuilder.java @@ -132,6 +132,8 @@ public void emit() { this.observedTimestampEpochNanos == 0 ? this.loggerSharedState.getClock().now() : this.observedTimestampEpochNanos; + + loggerSharedState.getLoggerInstrumentation().emitLog(); loggerSharedState .getLogRecordProcessor() .onEmit(context, createLogRecord(context, observedTimestampEpochNanos)); diff --git a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/SdkLoggerInstrumentation.java b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/SdkLoggerInstrumentation.java new file mode 100644 index 00000000000..f3fd42e53a1 --- /dev/null +++ b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/SdkLoggerInstrumentation.java @@ -0,0 +1,61 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.logs; + +import io.opentelemetry.api.metrics.LongCounter; +import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.api.metrics.MeterProvider; +import java.util.function.Supplier; +import javax.annotation.Nullable; + +/** + * SDK metrics exported for emitted logs as defined in the semantic + * conventions. + */ +final class SdkLoggerInstrumentation { + private final Object lock = new Object(); + + private final Supplier meterProvider; + + @Nullable private Meter meter; + @Nullable private volatile LongCounter createdLogs; + + SdkLoggerInstrumentation(Supplier meterProvider) { + this.meterProvider = meterProvider; + } + + void emitLog() { + createdLogs().add(1); + } + + private LongCounter createdLogs() { + LongCounter createdLogs = this.createdLogs; + if (createdLogs == null) { + synchronized (lock) { + createdLogs = this.createdLogs; + if (createdLogs == null) { + createdLogs = + meter() + .counterBuilder("otel.sdk.log.created") + .setUnit("{log_record}") + .setDescription("The number of logs submitted to enabled SDK Loggers.") + .build(); + this.createdLogs = createdLogs; + } + } + } + return createdLogs; + } + + private Meter meter() { + if (meter == null) { + // Safe to call from multiple threads. + meter = meterProvider.get().get("io.opentelemetry.sdk.logs"); + } + return meter; + } +} diff --git a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/SdkLoggerProvider.java b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/SdkLoggerProvider.java index 54825caf21e..d23073916bc 100644 --- a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/SdkLoggerProvider.java +++ b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/SdkLoggerProvider.java @@ -9,6 +9,7 @@ import io.opentelemetry.api.logs.Logger; import io.opentelemetry.api.logs.LoggerBuilder; import io.opentelemetry.api.logs.LoggerProvider; +import io.opentelemetry.api.metrics.MeterProvider; import io.opentelemetry.sdk.common.Clock; import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.common.InstrumentationScopeInfo; @@ -58,11 +59,17 @@ public static SdkLoggerProviderBuilder builder() { List processors, Clock clock, ScopeConfigurator loggerConfigurator, - ExceptionAttributeResolver exceptionAttributeResolver) { + ExceptionAttributeResolver exceptionAttributeResolver, + Supplier meterProvider) { LogRecordProcessor logRecordProcessor = LogRecordProcessor.composite(processors); this.sharedState = new LoggerSharedState( - resource, logLimitsSupplier, logRecordProcessor, clock, exceptionAttributeResolver); + resource, + logLimitsSupplier, + logRecordProcessor, + clock, + exceptionAttributeResolver, + new SdkLoggerInstrumentation(meterProvider)); this.loggerComponentRegistry = new ComponentRegistry<>( instrumentationScopeInfo -> diff --git a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/SdkLoggerProviderBuilder.java b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/SdkLoggerProviderBuilder.java index 41ce7d44980..948017cb2b9 100644 --- a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/SdkLoggerProviderBuilder.java +++ b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/SdkLoggerProviderBuilder.java @@ -9,6 +9,7 @@ import io.opentelemetry.api.logs.LogRecordBuilder; import io.opentelemetry.api.logs.Logger; +import io.opentelemetry.api.metrics.MeterProvider; import io.opentelemetry.context.Context; import io.opentelemetry.sdk.common.Clock; import io.opentelemetry.sdk.common.InstrumentationScopeInfo; @@ -40,6 +41,7 @@ public final class SdkLoggerProviderBuilder { LoggerConfig.configuratorBuilder(); private ExceptionAttributeResolver exceptionAttributeResolver = ExceptionAttributeResolver.getDefault(); + private Supplier meterProvider = MeterProvider::noop; SdkLoggerProviderBuilder() {} @@ -186,6 +188,17 @@ SdkLoggerProviderBuilder setExceptionAttributeResolver( return this; } + /** + * Sets the {@link MeterProvider} to use to generate SDK Span + * Metrics. + */ + public SdkLoggerProviderBuilder setMeterProvider(Supplier meterProvider) { + requireNonNull(meterProvider, "meterProvider"); + this.meterProvider = meterProvider; + return this; + } + /** * Create a {@link SdkLoggerProvider} instance. * @@ -198,6 +211,7 @@ public SdkLoggerProvider build() { logRecordProcessors, clock, loggerConfiguratorBuilder.build(), - exceptionAttributeResolver); + exceptionAttributeResolver, + meterProvider); } } diff --git a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/BatchLogRecordProcessor.java b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/BatchLogRecordProcessor.java index 601bfdfa208..b2f233c8415 100644 --- a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/BatchLogRecordProcessor.java +++ b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/BatchLogRecordProcessor.java @@ -5,13 +5,11 @@ package io.opentelemetry.sdk.logs.export; -import io.opentelemetry.api.common.AttributeKey; -import io.opentelemetry.api.common.Attributes; -import io.opentelemetry.api.metrics.LongCounter; -import io.opentelemetry.api.metrics.Meter; import io.opentelemetry.api.metrics.MeterProvider; import io.opentelemetry.context.Context; import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.common.InternalTelemetryVersion; +import io.opentelemetry.sdk.internal.ComponentId; import io.opentelemetry.sdk.internal.DaemonThreadFactory; import io.opentelemetry.sdk.logs.LogRecordProcessor; import io.opentelemetry.sdk.logs.ReadWriteLogRecord; @@ -26,6 +24,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; import java.util.logging.Level; import java.util.logging.Logger; @@ -42,14 +41,11 @@ */ public final class BatchLogRecordProcessor implements LogRecordProcessor { + private static final ComponentId COMPONENT_ID = + ComponentId.generateLazy("batching_log_processor"); + private static final String WORKER_THREAD_NAME = BatchLogRecordProcessor.class.getSimpleName() + "_WorkerThread"; - private static final AttributeKey LOG_RECORD_PROCESSOR_TYPE_LABEL = - AttributeKey.stringKey("processorType"); - private static final AttributeKey LOG_RECORD_PROCESSOR_DROPPED_LABEL = - AttributeKey.booleanKey("dropped"); - private static final String LOG_RECORD_PROCESSOR_TYPE_VALUE = - BatchLogRecordProcessor.class.getSimpleName(); private final Worker worker; private final AtomicBoolean isShutdown = new AtomicBoolean(false); @@ -67,7 +63,8 @@ public static BatchLogRecordProcessorBuilder builder(LogRecordExporter logRecord BatchLogRecordProcessor( LogRecordExporter logRecordExporter, - MeterProvider meterProvider, + Supplier meterProvider, + InternalTelemetryVersion telemetryVersion, long scheduleDelayNanos, int maxQueueSize, int maxExportBatchSize, @@ -76,10 +73,12 @@ public static BatchLogRecordProcessorBuilder builder(LogRecordExporter logRecord new Worker( logRecordExporter, meterProvider, + telemetryVersion, scheduleDelayNanos, maxExportBatchSize, exporterTimeoutNanos, - new ArrayBlockingQueue<>(maxQueueSize)); // TODO: use JcTools.newFixedSizeQueue(..) + new ArrayBlockingQueue<>(maxQueueSize), + maxQueueSize); // TODO: use JcTools.newFixedSizeQueue(..) Thread workerThread = new DaemonThreadFactory(WORKER_THREAD_NAME).newThread(worker); workerThread.start(); } @@ -140,9 +139,7 @@ private static final class Worker implements Runnable { private static final Logger logger = Logger.getLogger(Worker.class.getName()); - private final LongCounter processedLogsCounter; - private final Attributes droppedAttrs; - private final Attributes exportedAttrs; + private final LogRecordProcessorInstrumentation logProcessorInstrumentation; private final LogRecordExporter logRecordExporter; private final long scheduleDelayNanos; @@ -163,59 +160,34 @@ private static final class Worker implements Runnable { private final AtomicReference flushRequested = new AtomicReference<>(); private volatile boolean continueWork = true; private final ArrayList batch; + private final long maxQueueSize; private Worker( LogRecordExporter logRecordExporter, - MeterProvider meterProvider, + Supplier meterProvider, + InternalTelemetryVersion telemetryVersion, long scheduleDelayNanos, int maxExportBatchSize, long exporterTimeoutNanos, - Queue queue) { + Queue queue, + long maxQueueSize) { this.logRecordExporter = logRecordExporter; this.scheduleDelayNanos = scheduleDelayNanos; this.maxExportBatchSize = maxExportBatchSize; this.exporterTimeoutNanos = exporterTimeoutNanos; this.queue = queue; this.signal = new ArrayBlockingQueue<>(1); - Meter meter = meterProvider.meterBuilder("io.opentelemetry.sdk.logs").build(); - meter - .gaugeBuilder("queueSize") - .ofLongs() - .setDescription("The number of items queued") - .setUnit("1") - .buildWithCallback( - result -> - result.record( - queue.size(), - Attributes.of( - LOG_RECORD_PROCESSOR_TYPE_LABEL, LOG_RECORD_PROCESSOR_TYPE_VALUE))); - processedLogsCounter = - meter - .counterBuilder("processedLogs") - .setUnit("1") - .setDescription( - "The number of logs processed by the BatchLogRecordProcessor. " - + "[dropped=true if they were dropped due to high throughput]") - .build(); - droppedAttrs = - Attributes.of( - LOG_RECORD_PROCESSOR_TYPE_LABEL, - LOG_RECORD_PROCESSOR_TYPE_VALUE, - LOG_RECORD_PROCESSOR_DROPPED_LABEL, - true); - exportedAttrs = - Attributes.of( - LOG_RECORD_PROCESSOR_TYPE_LABEL, - LOG_RECORD_PROCESSOR_TYPE_VALUE, - LOG_RECORD_PROCESSOR_DROPPED_LABEL, - false); + logProcessorInstrumentation = + LogRecordProcessorInstrumentation.get(telemetryVersion, COMPONENT_ID, meterProvider); + this.maxQueueSize = maxQueueSize; this.batch = new ArrayList<>(this.maxExportBatchSize); } private void addLog(ReadWriteLogRecord logData) { + logProcessorInstrumentation.buildQueueMetricsOnce(maxQueueSize, queue::size); if (!queue.offer(logData)) { - processedLogsCounter.add(1, droppedAttrs); + logProcessorInstrumentation.dropLogs(1); } else { if (queue.size() >= logsNeeded.get()) { signal.offer(true); @@ -316,18 +288,24 @@ private void exportCurrentBatch() { return; } + String error = null; try { CompletableResultCode result = logRecordExporter.export(Collections.unmodifiableList(batch)); result.join(exporterTimeoutNanos, TimeUnit.NANOSECONDS); - if (result.isSuccess()) { - processedLogsCounter.add(batch.size(), exportedAttrs); - } else { + if (!result.isSuccess()) { logger.log(Level.FINE, "Exporter failed"); + if (result.getFailureThrowable() != null) { + error = result.getFailureThrowable().getClass().getName(); + } else { + error = "export_failed"; + } } } catch (RuntimeException e) { logger.log(Level.WARNING, "Exporter threw an Exception", e); + error = e.getClass().getName(); } finally { + logProcessorInstrumentation.finishLogs(batch.size(), error); batch.clear(); } } diff --git a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/BatchLogRecordProcessorBuilder.java b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/BatchLogRecordProcessorBuilder.java index 5e17848c774..f9e60be4032 100644 --- a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/BatchLogRecordProcessorBuilder.java +++ b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/BatchLogRecordProcessorBuilder.java @@ -9,8 +9,10 @@ import static java.util.Objects.requireNonNull; import io.opentelemetry.api.metrics.MeterProvider; +import io.opentelemetry.sdk.common.InternalTelemetryVersion; import java.time.Duration; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; import java.util.logging.Level; import java.util.logging.Logger; @@ -37,7 +39,8 @@ public final class BatchLogRecordProcessorBuilder { private int maxQueueSize = DEFAULT_MAX_QUEUE_SIZE; private int maxExportBatchSize = DEFAULT_MAX_EXPORT_BATCH_SIZE; private long exporterTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(DEFAULT_EXPORT_TIMEOUT_MILLIS); - private MeterProvider meterProvider = MeterProvider.noop(); + private Supplier meterProvider = MeterProvider::noop; + private InternalTelemetryVersion telemetryVersion = InternalTelemetryVersion.LEGACY; BatchLogRecordProcessorBuilder(LogRecordExporter logRecordExporter) { this.logRecordExporter = requireNonNull(logRecordExporter, "logRecordExporter"); @@ -137,11 +140,29 @@ public BatchLogRecordProcessorBuilder setMaxExportBatchSize(int maxExportBatchSi * metrics will not be collected. */ public BatchLogRecordProcessorBuilder setMeterProvider(MeterProvider meterProvider) { + requireNonNull(meterProvider, "meterProvider"); + this.meterProvider = () -> meterProvider; + return this; + } + + /** + * Sets the {@link MeterProvider} to use to collect metrics related to batch export. If not set, + * metrics will not be collected. + */ + public BatchLogRecordProcessorBuilder setMeterProvider(Supplier meterProvider) { requireNonNull(meterProvider, "meterProvider"); this.meterProvider = meterProvider; return this; } + /** Sets the {@link InternalTelemetryVersion} defining which metrics this processor records. */ + public BatchLogRecordProcessorBuilder setInternalTelemetryVersion( + InternalTelemetryVersion telemetryVersion) { + requireNonNull(telemetryVersion, "telemetryVersion"); + this.telemetryVersion = telemetryVersion; + return this; + } + // Visible for testing int getMaxExportBatchSize() { return maxExportBatchSize; @@ -164,6 +185,7 @@ public BatchLogRecordProcessor build() { return new BatchLogRecordProcessor( logRecordExporter, meterProvider, + telemetryVersion, scheduleDelayNanos, maxQueueSize, maxExportBatchSize, diff --git a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/LegacyLogRecordProcessorInstrumentation.java b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/LegacyLogRecordProcessorInstrumentation.java new file mode 100644 index 00000000000..ecff1a62137 --- /dev/null +++ b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/LegacyLogRecordProcessorInstrumentation.java @@ -0,0 +1,103 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.logs.export; + +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.LongCounter; +import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.api.metrics.MeterProvider; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; +import javax.annotation.Nullable; + +/** Log processor metrics defined before they were standardized in semconv. */ +final class LegacyLogRecordProcessorInstrumentation implements LogRecordProcessorInstrumentation { + private static final AttributeKey PROCESSOR_TYPE_LABEL = + AttributeKey.stringKey("processorType"); + private static final AttributeKey PROCESSOR_DROPPED_LABEL = + AttributeKey.booleanKey("dropped"); + // Legacy metrics are only created for batch log processor. + private static final String PROCESSOR_TYPE_VALUE = BatchLogRecordProcessor.class.getSimpleName(); + + private final Object lock = new Object(); + private final AtomicBoolean builtQueueMetrics = new AtomicBoolean(false); + + private final Supplier meterProvider; + private final Attributes standardAttrs; + private final Attributes droppedAttrs; + + @Nullable private Meter meter; + @Nullable private volatile LongCounter processedLogs; + + LegacyLogRecordProcessorInstrumentation(Supplier meterProvider) { + this.meterProvider = meterProvider; + + standardAttrs = + Attributes.of(PROCESSOR_TYPE_LABEL, PROCESSOR_TYPE_VALUE, PROCESSOR_DROPPED_LABEL, false); + droppedAttrs = + Attributes.of(PROCESSOR_TYPE_LABEL, PROCESSOR_TYPE_VALUE, PROCESSOR_DROPPED_LABEL, true); + } + + @Override + public void dropLogs(int count) { + processedLogs().add(count, droppedAttrs); + } + + @Override + public void finishLogs(int count, @Nullable String error) { + // Legacy metrics only record when no error. + if (error != null) { + processedLogs().add(count, standardAttrs); + } + } + + @Override + public void buildQueueMetricsOnce(long unusedCapacity, LongCallable getSize) { + if (!builtQueueMetrics.compareAndSet(false, true)) { + return; + } + meter() + .gaugeBuilder("queueSize") + .ofLongs() + .setDescription("The number of items queued") + .setUnit("1") + .buildWithCallback( + result -> + result.record( + getSize.get(), Attributes.of(PROCESSOR_TYPE_LABEL, PROCESSOR_TYPE_VALUE))); + // No capacity metric when legacy. + } + + private LongCounter processedLogs() { + LongCounter processedLogs = this.processedLogs; + if (processedLogs == null) { + synchronized (lock) { + processedLogs = this.processedLogs; + if (processedLogs == null) { + processedLogs = + meter() + .counterBuilder("processedLogs") + .setUnit("1") + .setDescription( + "The number of logs processed by the BatchLogRecordProcessor. " + + "[dropped=true if they were dropped due to high throughput]") + .build(); + this.processedLogs = processedLogs; + } + } + } + return processedLogs; + } + + private Meter meter() { + if (meter == null) { + // Safe to call from multiple threads. + meter = meterProvider.get().get("io.opentelemetry.sdk.logs"); + } + return meter; + } +} diff --git a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/LogRecordProcessorInstrumentation.java b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/LogRecordProcessorInstrumentation.java new file mode 100644 index 00000000000..66ac7b7dff5 --- /dev/null +++ b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/LogRecordProcessorInstrumentation.java @@ -0,0 +1,37 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.logs.export; + +import io.opentelemetry.api.metrics.MeterProvider; +import io.opentelemetry.sdk.common.InternalTelemetryVersion; +import io.opentelemetry.sdk.internal.ComponentId; +import java.util.function.Supplier; +import javax.annotation.Nullable; + +/** Metrics exported by span processors. */ +interface LogRecordProcessorInstrumentation { + + static LogRecordProcessorInstrumentation get( + InternalTelemetryVersion telemetryVersion, + ComponentId componentId, + Supplier meterProvider) { + switch (telemetryVersion) { + case LEGACY: + return new LegacyLogRecordProcessorInstrumentation(meterProvider); + default: + return new SemConvLogRecordProcessorInstrumentation(componentId, meterProvider); + } + } + + /** Records metrics for logs dropped because a queue is full. */ + void dropLogs(int count); + + /** Record metrics for logs processed, possibly with an error. */ + void finishLogs(int count, @Nullable String error); + + /** Registers metrics for processor queue capacity and size. */ + void buildQueueMetricsOnce(long capacity, LongCallable getSize); +} diff --git a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/LongCallable.java b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/LongCallable.java new file mode 100644 index 00000000000..b5c85f2ced5 --- /dev/null +++ b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/LongCallable.java @@ -0,0 +1,13 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.logs.export; + +/** A Callable returning a primitive long value. */ +@FunctionalInterface +interface LongCallable { + /** Returns the value. */ + long get(); +} diff --git a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/SemConvLogRecordProcessorInstrumentation.java b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/SemConvLogRecordProcessorInstrumentation.java new file mode 100644 index 00000000000..4ba3d546b54 --- /dev/null +++ b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/SemConvLogRecordProcessorInstrumentation.java @@ -0,0 +1,118 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.logs.export; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.LongCounter; +import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.api.metrics.MeterProvider; +import io.opentelemetry.sdk.internal.ComponentId; +import io.opentelemetry.sdk.internal.SemConvAttributes; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; +import javax.annotation.Nullable; + +/** + * SDK metrics exported for log processors as defined in the semantic + * conventions. + */ +final class SemConvLogRecordProcessorInstrumentation implements LogRecordProcessorInstrumentation { + + private final Object lock = new Object(); + private final AtomicBoolean builtQueueMetrics = new AtomicBoolean(false); + + private final Supplier meterProvider; + private final Attributes standardAttrs; + private final Attributes droppedAttrs; + + @Nullable private Meter meter; + @Nullable private volatile LongCounter processedLogs; + + SemConvLogRecordProcessorInstrumentation( + ComponentId componentId, Supplier meterProvider) { + this.meterProvider = meterProvider; + + standardAttrs = + Attributes.of( + SemConvAttributes.OTEL_COMPONENT_TYPE, + componentId.getTypeName(), + SemConvAttributes.OTEL_COMPONENT_NAME, + componentId.getComponentName()); + droppedAttrs = + Attributes.of( + SemConvAttributes.OTEL_COMPONENT_TYPE, + componentId.getTypeName(), + SemConvAttributes.OTEL_COMPONENT_NAME, + componentId.getComponentName(), + SemConvAttributes.ERROR_TYPE, + "queue_full"); + } + + @Override + public void dropLogs(int count) { + processedLogs().add(count, droppedAttrs); + } + + @Override + public void finishLogs(int count, @Nullable String error) { + if (error == null) { + processedLogs().add(count, standardAttrs); + return; + } + + Attributes attributes = + standardAttrs.toBuilder().put(SemConvAttributes.ERROR_TYPE, error).build(); + processedLogs().add(count, attributes); + } + + @Override + public void buildQueueMetricsOnce(long capacity, LongCallable getSize) { + if (!builtQueueMetrics.compareAndSet(false, true)) { + return; + } + meter() + .upDownCounterBuilder("otel.sdk.processor.log.queue.capacity") + .setUnit("{log_record}") + .setDescription( + "The maximum number of log records the queue of a given instance of an SDK Log Record processor can hold. ") + .buildWithCallback(m -> m.record(capacity, standardAttrs)); + meter() + .upDownCounterBuilder("otel.sdk.processor.log.queue.size") + .setUnit("{log_record}") + .setDescription( + "The number of log records in the queue of a given instance of an SDK log processor.") + .buildWithCallback(m -> m.record(getSize.get(), standardAttrs)); + } + + private LongCounter processedLogs() { + LongCounter processedLogs = this.processedLogs; + if (processedLogs == null) { + synchronized (lock) { + processedLogs = this.processedLogs; + if (processedLogs == null) { + processedLogs = + meter() + .counterBuilder("otel.sdk.processor.log.processed") + .setUnit("{log_record}") + .setDescription( + "The number of log records for which the processing has finished, either successful or failed.") + .build(); + this.processedLogs = processedLogs; + } + } + } + return processedLogs; + } + + private Meter meter() { + if (meter == null) { + // Safe to call from multiple threads. + meter = meterProvider.get().get("io.opentelemetry.sdk.logs"); + } + return meter; + } +} diff --git a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/SimpleLogRecordProcessor.java b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/SimpleLogRecordProcessor.java index cc75b50ae1f..8b3f3f65d16 100644 --- a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/SimpleLogRecordProcessor.java +++ b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/SimpleLogRecordProcessor.java @@ -7,8 +7,11 @@ import static java.util.Objects.requireNonNull; +import io.opentelemetry.api.metrics.MeterProvider; import io.opentelemetry.context.Context; import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.common.InternalTelemetryVersion; +import io.opentelemetry.sdk.internal.ComponentId; import io.opentelemetry.sdk.logs.LogRecordProcessor; import io.opentelemetry.sdk.logs.ReadWriteLogRecord; import io.opentelemetry.sdk.logs.data.LogRecordData; @@ -17,6 +20,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; import java.util.logging.Level; import java.util.logging.Logger; @@ -34,12 +38,15 @@ */ public final class SimpleLogRecordProcessor implements LogRecordProcessor { + private static final ComponentId COMPONENT_ID = ComponentId.generateLazy("simple_log_processor"); + private static final Logger logger = Logger.getLogger(SimpleLogRecordProcessor.class.getName()); private final LogRecordExporter logRecordExporter; private final Set pendingExports = Collections.newSetFromMap(new ConcurrentHashMap<>()); private final AtomicBoolean isShutdown = new AtomicBoolean(false); + private final LogRecordProcessorInstrumentation logProcessorInstrumentation; private final Object exporterLock = new Object(); @@ -55,11 +62,21 @@ public final class SimpleLogRecordProcessor implements LogRecordProcessor { */ public static LogRecordProcessor create(LogRecordExporter exporter) { requireNonNull(exporter, "exporter"); - return new SimpleLogRecordProcessor(exporter); + return builder(exporter).build(); + } + + /** Returns a new Builder for {@link SimpleLogRecordProcessor}. */ + public static SimpleLogRecordProcessorBuilder builder(LogRecordExporter exporter) { + requireNonNull(exporter, "exporter"); + return new SimpleLogRecordProcessorBuilder(exporter); } - private SimpleLogRecordProcessor(LogRecordExporter logRecordExporter) { + SimpleLogRecordProcessor( + LogRecordExporter logRecordExporter, Supplier meterProvider) { this.logRecordExporter = requireNonNull(logRecordExporter, "logRecordExporter"); + logProcessorInstrumentation = + LogRecordProcessorInstrumentation.get( + InternalTelemetryVersion.LATEST, COMPONENT_ID, meterProvider); } @Override @@ -76,9 +93,16 @@ public void onEmit(Context context, ReadWriteLogRecord logRecord) { result.whenComplete( () -> { pendingExports.remove(result); + String error = null; if (!result.isSuccess()) { logger.log(Level.FINE, "Exporter failed"); + if (result.getFailureThrowable() != null) { + error = result.getFailureThrowable().getClass().getName(); + } else { + error = "export_failed"; + } } + logProcessorInstrumentation.finishLogs(1, error); }); } catch (RuntimeException e) { logger.log(Level.WARNING, "Exporter threw an Exception", e); diff --git a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/SimpleLogRecordProcessorBuilder.java b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/SimpleLogRecordProcessorBuilder.java new file mode 100644 index 00000000000..737f9b5e019 --- /dev/null +++ b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/SimpleLogRecordProcessorBuilder.java @@ -0,0 +1,42 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.logs.export; + +import static java.util.Objects.requireNonNull; + +import io.opentelemetry.api.metrics.MeterProvider; +import java.util.function.Supplier; + +public final class SimpleLogRecordProcessorBuilder { + private final LogRecordExporter exporter; + private Supplier meterProvider = MeterProvider::noop; + + SimpleLogRecordProcessorBuilder(LogRecordExporter exporter) { + this.exporter = requireNonNull(exporter, "exporter"); + } + + /** + * Sets the {@link MeterProvider} to use to generate SDK Log + * Metrics. + */ + public SimpleLogRecordProcessorBuilder setMeterProvider(Supplier meterProvider) { + requireNonNull(meterProvider, "meterProvider"); + this.meterProvider = meterProvider; + return this; + } + + // TODO: add `setInternalTelemetryVersion when we support more than one version + + /** + * Returns a new {@link SimpleLogRecordProcessor} with the configuration of this builder. + * + * @return a new {@link SimpleLogRecordProcessor}. + */ + public SimpleLogRecordProcessor build() { + return new SimpleLogRecordProcessor(exporter, meterProvider); + } +} diff --git a/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/LoggerSharedStateTest.java b/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/LoggerSharedStateTest.java index 7aa4f02aa0a..a8ca122ae2b 100644 --- a/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/LoggerSharedStateTest.java +++ b/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/LoggerSharedStateTest.java @@ -10,6 +10,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import io.opentelemetry.api.metrics.MeterProvider; import io.opentelemetry.sdk.common.Clock; import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.internal.ExceptionAttributeResolver; @@ -29,7 +30,8 @@ void shutdown() { LogLimits::getDefault, logRecordProcessor, Clock.getDefault(), - ExceptionAttributeResolver.getDefault()); + ExceptionAttributeResolver.getDefault(), + new SdkLoggerInstrumentation(MeterProvider::noop)); state.shutdown(); state.shutdown(); verify(logRecordProcessor, times(1)).shutdown(); diff --git a/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/SdkLogRecordBuilderTest.java b/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/SdkLogRecordBuilderTest.java index 0d54efdec42..bbe9305b0dc 100644 --- a/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/SdkLogRecordBuilderTest.java +++ b/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/SdkLogRecordBuilderTest.java @@ -20,6 +20,7 @@ import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.common.Value; import io.opentelemetry.api.logs.Severity; +import io.opentelemetry.api.metrics.MeterProvider; import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.SpanContext; import io.opentelemetry.api.trace.TraceFlags; @@ -60,6 +61,8 @@ void setup() { .thenReturn((context, logRecord) -> emittedLog.set(logRecord)); when(loggerSharedState.getResource()).thenReturn(RESOURCE); when(loggerSharedState.getClock()).thenReturn(clock); + when(loggerSharedState.getLoggerInstrumentation()) + .thenReturn(new SdkLoggerInstrumentation(MeterProvider::noop)); SdkLogger logger = new SdkLogger(loggerSharedState, SCOPE_INFO, LoggerConfig.enabled()); builder = new SdkLogRecordBuilder(loggerSharedState, SCOPE_INFO, logger); diff --git a/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/SdkLoggerProviderMetricsTest.java b/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/SdkLoggerProviderMetricsTest.java new file mode 100644 index 00000000000..2326a52d5da --- /dev/null +++ b/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/SdkLoggerProviderMetricsTest.java @@ -0,0 +1,314 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.logs; + +import static io.opentelemetry.sdk.internal.SemConvAttributes.ERROR_TYPE; +import static io.opentelemetry.sdk.internal.SemConvAttributes.OTEL_COMPONENT_NAME; +import static io.opentelemetry.sdk.internal.SemConvAttributes.OTEL_COMPONENT_TYPE; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.when; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.logs.Logger; +import io.opentelemetry.api.logs.LoggerProvider; +import io.opentelemetry.api.metrics.MeterProvider; +import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.common.InternalTelemetryVersion; +import io.opentelemetry.sdk.logs.export.BatchLogRecordProcessor; +import io.opentelemetry.sdk.logs.export.LogRecordExporter; +import io.opentelemetry.sdk.logs.export.SimpleLogRecordProcessor; +import io.opentelemetry.sdk.metrics.SdkMeterProvider; +import io.opentelemetry.sdk.testing.exporter.InMemoryLogRecordExporter; +import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader; +import java.time.Duration; +import java.util.concurrent.TimeUnit; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class SdkLoggerProviderMetricsTest { + + @Mock private LogRecordExporter mockExporter; + + @Test + void simple() { + InMemoryMetricReader metricReader = InMemoryMetricReader.create(); + MeterProvider meterProvider = + SdkMeterProvider.builder().registerMetricReader(metricReader).build(); + + InMemoryLogRecordExporter exporter = InMemoryLogRecordExporter.create(); + LoggerProvider loggerProvider = + SdkLoggerProvider.builder() + .addLogRecordProcessor( + SimpleLogRecordProcessor.builder(exporter) + .setMeterProvider(() -> meterProvider) + .build()) + .setMeterProvider(() -> meterProvider) + .build(); + + Logger logger = loggerProvider.get("test"); + + logger.logRecordBuilder().emit(); + assertThat(metricReader.collectAllMetrics()) + .satisfiesExactlyInAnyOrder( + m -> + assertThat(m) + .hasName("otel.sdk.log.created") + .hasLongSumSatisfying( + s -> s.hasPointsSatisfying(p -> p.hasValue(1).hasAttributes())), + m -> + assertThat(m) + .hasName("otel.sdk.processor.log.processed") + .hasLongSumSatisfying( + s -> + s.hasPointsSatisfying( + p -> + p.hasValue(1) + .hasAttributes( + Attributes.of( + OTEL_COMPONENT_NAME, + "simple_log_processor/0", + OTEL_COMPONENT_TYPE, + "simple_log_processor"))))); + + logger.logRecordBuilder().emit(); + assertThat(metricReader.collectAllMetrics()) + .satisfiesExactlyInAnyOrder( + m -> + assertThat(m) + .hasName("otel.sdk.log.created") + .hasLongSumSatisfying( + s -> s.hasPointsSatisfying(p -> p.hasValue(2).hasAttributes())), + m -> + assertThat(m) + .hasName("otel.sdk.processor.log.processed") + .hasLongSumSatisfying( + s -> + s.hasPointsSatisfying( + p -> + p.hasValue(2) + .hasAttributes( + Attributes.of( + OTEL_COMPONENT_NAME, + "simple_log_processor/0", + OTEL_COMPONENT_TYPE, + "simple_log_processor"))))); + } + + @Test + void batch() throws Exception { + InMemoryMetricReader metricReader = InMemoryMetricReader.create(); + MeterProvider meterProvider = + SdkMeterProvider.builder().registerMetricReader(metricReader).build(); + + BatchLogRecordProcessor processor = + BatchLogRecordProcessor.builder(mockExporter) + .setMaxQueueSize(1) + // Manually flush + .setScheduleDelay(Duration.ofDays(1)) + .setInternalTelemetryVersion(InternalTelemetryVersion.LATEST) + .setMeterProvider(() -> meterProvider) + .build(); + LoggerProvider loggerProvider = + SdkLoggerProvider.builder() + .addLogRecordProcessor(processor) + .setMeterProvider(() -> meterProvider) + .build(); + + Logger logger = loggerProvider.get("test"); + + CompletableResultCode result1 = new CompletableResultCode(); + CompletableResultCode result2 = new CompletableResultCode(); + when(mockExporter.export(any())).thenReturn(result1).thenReturn(result2); + + // Will immediately be processed. + logger.logRecordBuilder().emit(); + Thread.sleep(500); // give time to start processing a batch of size 1 + // We haven't completed the export so this span is queued. + logger.logRecordBuilder().emit(); + // Queue is full, this span is dropped. + logger.logRecordBuilder().emit(); + + assertThat(metricReader.collectAllMetrics()) + .satisfiesExactlyInAnyOrder( + m -> + assertThat(m) + .hasName("otel.sdk.processor.log.queue.capacity") + .hasLongSumSatisfying( + s -> + s.hasPointsSatisfying( + p -> + p.hasValue(1) + .hasAttributes( + Attributes.of( + OTEL_COMPONENT_NAME, + "batching_log_processor/0", + OTEL_COMPONENT_TYPE, + "batching_log_processor")))), + m -> + assertThat(m) + .hasName("otel.sdk.processor.log.queue.size") + .hasLongSumSatisfying( + s -> + s.hasPointsSatisfying( + p -> + p.hasValue(1) + .hasAttributes( + Attributes.of( + OTEL_COMPONENT_NAME, + "batching_log_processor/0", + OTEL_COMPONENT_TYPE, + "batching_log_processor")))), + m -> + assertThat(m) + .hasName("otel.sdk.processor.log.processed") + .hasLongSumSatisfying( + s -> + s.hasPointsSatisfying( + p -> + p.hasValue(1) + .hasAttributes( + Attributes.of( + OTEL_COMPONENT_NAME, + "batching_log_processor/0", + OTEL_COMPONENT_TYPE, + "batching_log_processor", + ERROR_TYPE, + "queue_full")))), + m -> + assertThat(m) + .hasName("otel.sdk.log.created") + .hasLongSumSatisfying( + s -> s.hasPointsSatisfying(p -> p.hasValue(3).hasAttributes()))); + + result1.succeed(); + result2.fail(); + processor.forceFlush().join(1, TimeUnit.SECONDS); + + assertThat(metricReader.collectAllMetrics()) + .satisfiesExactlyInAnyOrder( + m -> + assertThat(m) + .hasName("otel.sdk.processor.log.queue.capacity") + .hasLongSumSatisfying( + s -> + s.hasPointsSatisfying( + p -> + p.hasValue(1) + .hasAttributes( + Attributes.of( + OTEL_COMPONENT_NAME, + "batching_log_processor/0", + OTEL_COMPONENT_TYPE, + "batching_log_processor")))), + m -> + assertThat(m) + .hasName("otel.sdk.processor.log.queue.size") + .hasLongSumSatisfying( + s -> + s.hasPointsSatisfying( + p -> + p.hasValue(0) + .hasAttributes( + Attributes.of( + OTEL_COMPONENT_NAME, + "batching_log_processor/0", + OTEL_COMPONENT_TYPE, + "batching_log_processor")))), + m -> + assertThat(m) + .hasName("otel.sdk.processor.log.processed") + .hasLongSumSatisfying( + s -> + s.hasPointsSatisfying( + p -> + p.hasValue(1) + .hasAttributes( + Attributes.of( + OTEL_COMPONENT_NAME, + "batching_log_processor/0", + OTEL_COMPONENT_TYPE, + "batching_log_processor")), + p -> + p.hasValue(1) + .hasAttributes( + Attributes.of( + OTEL_COMPONENT_NAME, + "batching_log_processor/0", + OTEL_COMPONENT_TYPE, + "batching_log_processor", + ERROR_TYPE, + "export_failed")), + p -> + p.hasValue(1) + .hasAttributes( + Attributes.of( + OTEL_COMPONENT_NAME, + "batching_log_processor/0", + OTEL_COMPONENT_TYPE, + "batching_log_processor", + ERROR_TYPE, + "queue_full")))), + m -> + assertThat(m) + .hasName("otel.sdk.log.created") + .hasLongSumSatisfying( + s -> s.hasPointsSatisfying(p -> p.hasValue(3).hasAttributes()))); + + lenient().when(mockExporter.shutdown()).thenReturn(CompletableResultCode.ofSuccess()); + processor.shutdown(); + } + + @Test + void simpleExportError() { + InMemoryMetricReader metricReader = InMemoryMetricReader.create(); + MeterProvider meterProvider = + SdkMeterProvider.builder().registerMetricReader(metricReader).build(); + + LoggerProvider loggerProvider = + SdkLoggerProvider.builder() + .addLogRecordProcessor( + SimpleLogRecordProcessor.builder(mockExporter) + .setMeterProvider(() -> meterProvider) + .build()) + .setMeterProvider(() -> meterProvider) + .build(); + + Logger logger = loggerProvider.get("test"); + + when(mockExporter.export(any())).thenReturn(CompletableResultCode.ofFailure()); + + logger.logRecordBuilder().emit(); + + assertThat(metricReader.collectAllMetrics()) + .satisfiesExactlyInAnyOrder( + m -> + assertThat(m) + .hasName("otel.sdk.processor.log.processed") + .hasLongSumSatisfying( + s -> + s.hasPointsSatisfying( + p -> + p.hasValue(1) + .hasAttributes( + Attributes.of( + OTEL_COMPONENT_NAME, + "simple_log_processor/0", + OTEL_COMPONENT_TYPE, + "simple_log_processor", + ERROR_TYPE, + "export_failed")))), + m -> + assertThat(m) + .hasName("otel.sdk.log.created") + .hasLongSumSatisfying( + s -> s.hasPointsSatisfying(p -> p.hasValue(1).hasAttributes()))); + } +} diff --git a/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/SdkLoggerTest.java b/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/SdkLoggerTest.java index 4abed011635..76b15234b3d 100644 --- a/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/SdkLoggerTest.java +++ b/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/SdkLoggerTest.java @@ -22,6 +22,7 @@ import io.opentelemetry.api.internal.StringUtils; import io.opentelemetry.api.logs.LogRecordBuilder; import io.opentelemetry.api.logs.Severity; +import io.opentelemetry.api.metrics.MeterProvider; import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.SpanContext; import io.opentelemetry.api.trace.TraceFlags; @@ -50,6 +51,8 @@ void logRecordBuilder() { when(state.getResource()).thenReturn(Resource.getDefault()); when(state.getLogRecordProcessor()).thenReturn(logRecordProcessor); when(state.getClock()).thenReturn(clock); + when(state.getLoggerInstrumentation()) + .thenReturn(new SdkLoggerInstrumentation(MeterProvider::noop)); SdkLogger logger = new SdkLogger(state, info, LoggerConfig.defaultConfig()); LogRecordBuilder logRecordBuilder = logger.logRecordBuilder(); From ef7126239901dc8ba3b427fa6a5e845ea50aae2e Mon Sep 17 00:00:00 2001 From: Anuraag Agrawal Date: Fri, 19 Dec 2025 17:05:25 +0900 Subject: [PATCH 2/2] Fix --- .../sdk/autoconfigure/LoggerProviderConfiguration.java | 6 +++++- .../fileconfig/LogRecordProcessorFactory.java | 6 +++++- .../OpenTelemetryConfigurationFactoryTest.java | 10 +++++----- 3 files changed, 15 insertions(+), 7 deletions(-) diff --git a/sdk-extensions/autoconfigure/src/main/java/io/opentelemetry/sdk/autoconfigure/LoggerProviderConfiguration.java b/sdk-extensions/autoconfigure/src/main/java/io/opentelemetry/sdk/autoconfigure/LoggerProviderConfiguration.java index f762915dc41..0c080e1333d 100644 --- a/sdk-extensions/autoconfigure/src/main/java/io/opentelemetry/sdk/autoconfigure/LoggerProviderConfiguration.java +++ b/sdk-extensions/autoconfigure/src/main/java/io/opentelemetry/sdk/autoconfigure/LoggerProviderConfiguration.java @@ -44,6 +44,7 @@ static void configureLoggerProvider( List closeables) { loggerProviderBuilder.setLogLimits(() -> configureLogLimits(config)); + loggerProviderBuilder.setMeterProvider(() -> meterProvider); Map exportersByName = configureLogRecordExporters(config, spiHelper, logRecordExporterCustomizer, closeables); @@ -71,7 +72,10 @@ static List configureLogRecordProcessors( for (String simpleProcessorExporterName : simpleProcessorExporterNames) { LogRecordExporter exporter = exportersByNameCopy.remove(simpleProcessorExporterName); if (exporter != null) { - LogRecordProcessor logRecordProcessor = SimpleLogRecordProcessor.create(exporter); + LogRecordProcessor logRecordProcessor = + SimpleLogRecordProcessor.builder(exporter) + .setMeterProvider(() -> meterProvider) + .build(); closeables.add(logRecordProcessor); logRecordProcessors.add(logRecordProcessor); } diff --git a/sdk-extensions/incubator/src/main/java/io/opentelemetry/sdk/extension/incubator/fileconfig/LogRecordProcessorFactory.java b/sdk-extensions/incubator/src/main/java/io/opentelemetry/sdk/extension/incubator/fileconfig/LogRecordProcessorFactory.java index e898fe6efe9..bbe54339844 100644 --- a/sdk-extensions/incubator/src/main/java/io/opentelemetry/sdk/extension/incubator/fileconfig/LogRecordProcessorFactory.java +++ b/sdk-extensions/incubator/src/main/java/io/opentelemetry/sdk/extension/incubator/fileconfig/LogRecordProcessorFactory.java @@ -68,7 +68,11 @@ public LogRecordProcessor create( simpleModel.getExporter(), "simple log record processor exporter"); LogRecordExporter logRecordExporter = LogRecordExporterFactory.getInstance().create(exporterModel, context); - return context.addCloseable(SimpleLogRecordProcessor.create(logRecordExporter)); + MeterProvider meterProvider = context.getMeterProvider(); + return context.addCloseable( + SimpleLogRecordProcessor.builder(logRecordExporter) + .setMeterProvider(() -> meterProvider) + .build()); } Map.Entry keyValue = diff --git a/sdk-extensions/incubator/src/test/java/io/opentelemetry/sdk/extension/incubator/fileconfig/OpenTelemetryConfigurationFactoryTest.java b/sdk-extensions/incubator/src/test/java/io/opentelemetry/sdk/extension/incubator/fileconfig/OpenTelemetryConfigurationFactoryTest.java index b71eafc1702..c52eb2ff16b 100644 --- a/sdk-extensions/incubator/src/test/java/io/opentelemetry/sdk/extension/incubator/fileconfig/OpenTelemetryConfigurationFactoryTest.java +++ b/sdk-extensions/incubator/src/test/java/io/opentelemetry/sdk/extension/incubator/fileconfig/OpenTelemetryConfigurationFactoryTest.java @@ -325,17 +325,17 @@ void create_Configured() throws NoSuchFieldException, IllegalAccessException { // test that the meter provider is wired through to the tracer and logger providers Field field = SdkMeterProvider.class.getDeclaredField("sharedState"); field.setAccessible(true); - Object sharedState = field.get(sdk.getSdkMeterProvider()); + + // Lazily initialized assertThat(sdk) .extracting("loggerProvider") .extracting("delegate") .extracting("sharedState") .extracting("logRecordProcessor") .extracting("worker") - .extracting("processedLogsCounter") - .extracting("sdkMeter") - .extracting("meterProviderSharedState") - .isEqualTo(sharedState); + .extracting("logProcessorInstrumentation") + .extracting("processedLogs") + .isNull(); // Lazily initialized assertThat(sdk)