diff --git a/docs/telemetry.md b/docs/telemetry.md
index 4726563e20..f9af25a137 100644
--- a/docs/telemetry.md
+++ b/docs/telemetry.md
@@ -14,32 +14,23 @@ It's recommended to use a local [OTEL Collector](https://opentelemetry.io/docs/c
Example configuration: https://github.com/andreasohlund/Docker/tree/main/otel-monitoring
-The following metrics are available:
-
-### Ingestion
-
-#### Success or failure
-
-- `sc.audit.ingestion.success` - Successful ingested audit message count (Counter)
-- `sc.audit.ingestion.retry` - Retried audit message count (Counter)
-- `sc.audit.ingestion.failed` - Failed audit message count (Counter)
-
-The above metrics also have the following attributes attached:
-
-- `messaging.message.body.size` - The size of the message body in bytes
-- `messaging.message.type` - The logical message type of the message if present
-
-#### Details
-
-- `sc.audit.ingestion.duration` - Audit message processing duration in milliseconds (Histogram)
-- `sc.audit.ingestion.forwarded` - Count of the number of forwarded audit messages if forwarding is enabled (Counter)
-
-### Batching
-
-- `sc.audit.ingestion.batch_duration` - Batch processing duration in milliseconds (Histogram)
- - Attributes:
- - `ingestion.batch_size`
-- `sc.audit.ingestion.consecutive_batch_failures` - Consecutive batch failures (Counter)
+The following ingestion metrics with their corresponding dimensions are available:
+
+- `sc.audit.ingestion.batch_duration_seconds` - Message batch processing duration in seconds
+ - `result` - Indicates if the full batch size was used (batch size == max concurrency of the transport): `full`, `partial` or `failed`
+- `sc.audit.ingestion.message_duration_seconds` - Audit message processing duration in seconds
+ - `message.category` - Indicates the category of the message ingested: `audit-message`, `saga-update` or `control-message`
+ - `result` - Indicates the outcome of the operation: `success`, `failed` or `skipped` (if the message was filtered out and skipped)
+- `sc.audit.ingestion.failures_total` - Failure counter
+ - `message.category` - Indicates the category of the message ingested: `audit-message`, `saga-update` or `control-message`
+ - `result` - Indicates how the failure was resolved: `retry` or `stored-poision`
+- `sc.audit.ingestion.consecutive_batch_failure_total` - Consecutive batch failures
+
+Example queries in PromQL for use in Grafana:
+
+- Ingestion rate: `sum (rate(sc_audit_ingestion_message_duration_seconds_count[$__rate_interval])) by (exported_job)`
+- Failure rate: `sum(rate(sc_audit_ingestion_failures_total[$__rate_interval])) by (exported_job,result)`
+- Message duration: `histogram_quantile(0.9,sum(rate(sc_audit_ingestion_message_duration_seconds_bucket[$__rate_interval])) by (le,exported_job))`
## Monitoring
diff --git a/src/Directory.Packages.props b/src/Directory.Packages.props
index d4ee800af0..b2b400a707 100644
--- a/src/Directory.Packages.props
+++ b/src/Directory.Packages.props
@@ -50,6 +50,7 @@
+
@@ -63,6 +64,7 @@
+
@@ -89,4 +91,4 @@
-
+
\ No newline at end of file
diff --git a/src/ServiceControl.Audit/Auditing/AuditIngestion.cs b/src/ServiceControl.Audit/Auditing/AuditIngestion.cs
index 8437d564cd..2b76b152ea 100644
--- a/src/ServiceControl.Audit/Auditing/AuditIngestion.cs
+++ b/src/ServiceControl.Audit/Auditing/AuditIngestion.cs
@@ -2,11 +2,11 @@
{
using System;
using System.Collections.Generic;
- using System.Diagnostics.Metrics;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Infrastructure.Settings;
+ using Metrics;
using Microsoft.Extensions.Hosting;
using NServiceBus;
using NServiceBus.Logging;
@@ -26,7 +26,8 @@ public AuditIngestion(
AuditIngestionCustomCheck.State ingestionState,
AuditIngestor auditIngestor,
IAuditIngestionUnitOfWorkFactory unitOfWorkFactory,
- IHostApplicationLifetime applicationLifetime)
+ IHostApplicationLifetime applicationLifetime,
+ IngestionMetrics metrics)
{
inputEndpoint = settings.AuditQueue;
this.transportCustomization = transportCustomization;
@@ -35,13 +36,16 @@ public AuditIngestion(
this.unitOfWorkFactory = unitOfWorkFactory;
this.settings = settings;
this.applicationLifetime = applicationLifetime;
+ this.metrics = metrics;
if (!transportSettings.MaxConcurrency.HasValue)
{
throw new ArgumentException("MaxConcurrency is not set in TransportSettings");
}
- channel = Channel.CreateBounded(new BoundedChannelOptions(transportSettings.MaxConcurrency.Value)
+ MaxBatchSize = transportSettings.MaxConcurrency.Value;
+
+ channel = Channel.CreateBounded(new BoundedChannelOptions(MaxBatchSize)
{
SingleReader = true,
SingleWriter = false,
@@ -49,7 +53,7 @@ public AuditIngestion(
FullMode = BoundedChannelFullMode.Wait
});
- errorHandlingPolicy = new AuditIngestionFaultPolicy(failedImportsStorage, settings.LoggingSettings, OnCriticalError);
+ errorHandlingPolicy = new AuditIngestionFaultPolicy(failedImportsStorage, settings.LoggingSettings, OnCriticalError, metrics);
watchdog = new Watchdog(
"audit message ingestion",
@@ -190,22 +194,21 @@ async Task EnsureStopped(CancellationToken cancellationToken)
async Task OnMessage(MessageContext messageContext, CancellationToken cancellationToken)
{
- var tags = Telemetry.GetIngestedMessageTags(messageContext.Headers, messageContext.Body);
- using (new DurationRecorder(ingestionDuration, tags))
+ using var messageIngestionMetrics = metrics.BeginIngestion(messageContext);
+
+ if (settings.MessageFilter != null && settings.MessageFilter(messageContext))
{
- if (settings.MessageFilter != null && settings.MessageFilter(messageContext))
- {
- return;
- }
+ messageIngestionMetrics.Skipped();
+ return;
+ }
- var taskCompletionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
- messageContext.SetTaskCompletionSource(taskCompletionSource);
+ var taskCompletionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
+ messageContext.SetTaskCompletionSource(taskCompletionSource);
- await channel.Writer.WriteAsync(messageContext, cancellationToken);
- await taskCompletionSource.Task;
+ await channel.Writer.WriteAsync(messageContext, cancellationToken);
+ _ = await taskCompletionSource.Task;
- successfulMessagesCounter.Add(1, tags);
- }
+ messageIngestionMetrics.Success();
}
public override async Task StartAsync(CancellationToken cancellationToken)
@@ -218,27 +221,24 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
try
{
- var contexts = new List(transportSettings.MaxConcurrency.Value);
+ var contexts = new List(MaxBatchSize);
while (await channel.Reader.WaitToReadAsync(stoppingToken))
{
// will only enter here if there is something to read.
try
{
+ using var batchMetrics = metrics.BeginBatch(MaxBatchSize);
+
// as long as there is something to read this will fetch up to MaximumConcurrency items
- using (var recorder = new DurationRecorder(batchDuration))
+ while (channel.Reader.TryRead(out var context))
{
- while (channel.Reader.TryRead(out var context))
- {
- contexts.Add(context);
- }
-
- recorder.Tags.Add("ingestion.batch_size", contexts.Count);
-
- await auditIngestor.Ingest(contexts);
+ contexts.Add(context);
}
- consecutiveBatchFailuresCounter.Record(0);
+ await auditIngestor.Ingest(contexts);
+
+ batchMetrics.Complete(contexts.Count);
}
catch (Exception e)
{
@@ -255,9 +255,6 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
}
logger.Info("Ingesting messages failed", e);
-
- // no need to do interlocked increment since this is running sequential
- consecutiveBatchFailuresCounter.Record(consecutiveBatchFailures++);
}
finally
{
@@ -298,8 +295,8 @@ public override async Task StopAsync(CancellationToken cancellationToken)
TransportInfrastructure transportInfrastructure;
IMessageReceiver messageReceiver;
- long consecutiveBatchFailures = 0;
+ readonly int MaxBatchSize;
readonly SemaphoreSlim startStopSemaphore = new(1);
readonly string inputEndpoint;
readonly ITransportCustomization transportCustomization;
@@ -309,12 +306,9 @@ public override async Task StopAsync(CancellationToken cancellationToken)
readonly IAuditIngestionUnitOfWorkFactory unitOfWorkFactory;
readonly Settings settings;
readonly Channel channel;
- readonly Histogram batchDuration = Telemetry.Meter.CreateHistogram(Telemetry.CreateInstrumentName("ingestion", "batch_duration"), unit: "ms", "Average audit message batch processing duration");
- readonly Counter successfulMessagesCounter = Telemetry.Meter.CreateCounter(Telemetry.CreateInstrumentName("ingestion", "success"), description: "Successful ingested audit message count");
- readonly Histogram consecutiveBatchFailuresCounter = Telemetry.Meter.CreateHistogram(Telemetry.CreateInstrumentName("ingestion", "consecutive_batch_failures"), unit: "count", description: "Consecutive audit ingestion batch failure");
- readonly Histogram ingestionDuration = Telemetry.Meter.CreateHistogram(Telemetry.CreateInstrumentName("ingestion", "duration"), unit: "ms", description: "Average incoming audit message processing duration");
readonly Watchdog watchdog;
readonly IHostApplicationLifetime applicationLifetime;
+ readonly IngestionMetrics metrics;
static readonly ILog logger = LogManager.GetLogger();
diff --git a/src/ServiceControl.Audit/Auditing/AuditIngestionFaultPolicy.cs b/src/ServiceControl.Audit/Auditing/AuditIngestionFaultPolicy.cs
index 6b8dd39a23..6ccfbedcce 100644
--- a/src/ServiceControl.Audit/Auditing/AuditIngestionFaultPolicy.cs
+++ b/src/ServiceControl.Audit/Auditing/AuditIngestionFaultPolicy.cs
@@ -1,115 +1,111 @@
-namespace ServiceControl.Audit.Auditing
+namespace ServiceControl.Audit.Auditing;
+
+using System;
+using System.Diagnostics;
+using System.IO;
+using System.Runtime.InteropServices;
+using System.Runtime.Versioning;
+using System.Threading;
+using System.Threading.Tasks;
+using Infrastructure;
+using NServiceBus.Logging;
+using NServiceBus.Transport;
+using Persistence;
+using Configuration;
+using Metrics;
+using ServiceControl.Infrastructure;
+
+class AuditIngestionFaultPolicy
{
- using System;
- using System.Diagnostics;
- using System.Diagnostics.Metrics;
- using System.IO;
- using System.Runtime.InteropServices;
- using System.Runtime.Versioning;
- using System.Threading;
- using System.Threading.Tasks;
- using Infrastructure;
- using NServiceBus.Logging;
- using NServiceBus.Transport;
- using Persistence;
- using Configuration;
- using ServiceControl.Infrastructure;
-
- class AuditIngestionFaultPolicy
+ public AuditIngestionFaultPolicy(IFailedAuditStorage failedAuditStorage, LoggingSettings settings, Func onCriticalError, IngestionMetrics metrics)
{
- readonly IFailedAuditStorage failedAuditStorage;
- readonly string logPath;
- readonly ImportFailureCircuitBreaker failureCircuitBreaker;
+ failureCircuitBreaker = new ImportFailureCircuitBreaker(onCriticalError);
+ this.failedAuditStorage = failedAuditStorage;
+ this.metrics = metrics;
- public AuditIngestionFaultPolicy(IFailedAuditStorage failedAuditStorage, LoggingSettings settings, Func onCriticalError)
+ if (!AppEnvironment.RunningInContainer)
{
- failureCircuitBreaker = new ImportFailureCircuitBreaker(onCriticalError);
- this.failedAuditStorage = failedAuditStorage;
-
- if (!AppEnvironment.RunningInContainer)
- {
- logPath = Path.Combine(settings.LogPath, @"FailedImports\Audit");
- Directory.CreateDirectory(logPath);
- }
+ logPath = Path.Combine(settings.LogPath, @"FailedImports\Audit");
+ Directory.CreateDirectory(logPath);
}
+ }
- public async Task OnError(ErrorContext errorContext, CancellationToken cancellationToken = default)
- {
- var tags = Telemetry.GetIngestedMessageTags(errorContext.Message.Headers, errorContext.Message.Body);
-
- //Same as recoverability policy in NServiceBusFactory
- if (errorContext.ImmediateProcessingFailures < 3)
- {
- retryCounter.Add(1, tags);
- return ErrorHandleResult.RetryRequired;
- }
+ public async Task OnError(ErrorContext errorContext, CancellationToken cancellationToken = default)
+ {
+ using var errorMetrics = metrics.BeginErrorHandling(errorContext);
- await StoreFailedMessageDocument(errorContext, cancellationToken);
+ //Same as recoverability policy in NServiceBusFactory
+ if (errorContext.ImmediateProcessingFailures < 3)
+ {
+ errorMetrics.Retry();
+ return ErrorHandleResult.RetryRequired;
+ }
- failedCounter.Add(1, tags);
+ await StoreFailedMessageDocument(errorContext, cancellationToken);
- return ErrorHandleResult.Handled;
- }
+ return ErrorHandleResult.Handled;
+ }
- async Task StoreFailedMessageDocument(ErrorContext errorContext, CancellationToken cancellationToken)
+ async Task StoreFailedMessageDocument(ErrorContext errorContext, CancellationToken cancellationToken)
+ {
+ var failure = new FailedAuditImport
{
- var failure = new FailedAuditImport
- {
- Id = Guid.NewGuid().ToString(),
- Message = new FailedTransportMessage
- {
- Id = errorContext.Message.MessageId,
- Headers = errorContext.Message.Headers,
- // At the moment we are taking a defensive copy of the body to avoid issues with the message body
- // buffers being returned to the pool and potentially being overwritten. Once we know how RavenDB
- // handles byte[] to ReadOnlyMemory conversion we might be able to remove this.
- Body = errorContext.Message.Body.ToArray()
- },
- ExceptionInfo = errorContext.Exception.ToFriendlyString()
- };
-
- try
- {
- await DoLogging(errorContext.Exception, failure, cancellationToken);
- }
- finally
+ Id = Guid.NewGuid().ToString(),
+ Message = new FailedTransportMessage
{
- failureCircuitBreaker.Increment(errorContext.Exception);
- }
+ Id = errorContext.Message.MessageId,
+ Headers = errorContext.Message.Headers,
+ // At the moment we are taking a defensive copy of the body to avoid issues with the message body
+ // buffers being returned to the pool and potentially being overwritten. Once we know how RavenDB
+ // handles byte[] to ReadOnlyMemory conversion we might be able to remove this.
+ Body = errorContext.Message.Body.ToArray()
+ },
+ ExceptionInfo = errorContext.Exception.ToFriendlyString()
+ };
+
+ try
+ {
+ await DoLogging(errorContext.Exception, failure, cancellationToken);
}
-
- async Task DoLogging(Exception exception, FailedAuditImport failure, CancellationToken cancellationToken)
+ finally
{
- log.Error("Failed importing error message", exception);
+ failureCircuitBreaker.Increment(errorContext.Exception);
+ }
+ }
+
+ async Task DoLogging(Exception exception, FailedAuditImport failure, CancellationToken cancellationToken)
+ {
+ log.Error("Failed importing error message", exception);
- // Write to storage
- await failedAuditStorage.SaveFailedAuditImport(failure);
+ // Write to storage
+ await failedAuditStorage.SaveFailedAuditImport(failure);
- if (!AppEnvironment.RunningInContainer)
+ if (!AppEnvironment.RunningInContainer)
+ {
+ // Write to Log Path
+ var filePath = Path.Combine(logPath, failure.Id + ".txt");
+ await File.WriteAllTextAsync(filePath, failure.ExceptionInfo, cancellationToken);
+
+ if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
{
- // Write to Log Path
- var filePath = Path.Combine(logPath, failure.Id + ".txt");
- await File.WriteAllTextAsync(filePath, failure.ExceptionInfo, cancellationToken);
-
- if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
- {
- WriteToEventLog("A message import has failed. A log file has been written to " + filePath);
- }
+ WriteToEventLog("A message import has failed. A log file has been written to " + filePath);
}
}
+ }
- [SupportedOSPlatform("windows")]
- void WriteToEventLog(string message)
- {
+ [SupportedOSPlatform("windows")]
+ void WriteToEventLog(string message)
+ {
#if DEBUG
- EventSourceCreator.Create();
+ EventSourceCreator.Create();
#endif
- EventLog.WriteEntry(EventSourceCreator.SourceName, message, EventLogEntryType.Error);
- }
+ EventLog.WriteEntry(EventSourceCreator.SourceName, message, EventLogEntryType.Error);
+ }
- readonly Counter retryCounter = Telemetry.Meter.CreateCounter(Telemetry.CreateInstrumentName("ingestion", "retry"), description: "Audit ingestion retries count");
- readonly Counter failedCounter = Telemetry.Meter.CreateCounter(Telemetry.CreateInstrumentName("ingestion", "failed"), description: "Audit ingestion failure count");
+ readonly IFailedAuditStorage failedAuditStorage;
+ readonly IngestionMetrics metrics;
+ readonly string logPath;
+ readonly ImportFailureCircuitBreaker failureCircuitBreaker;
- static readonly ILog log = LogManager.GetLogger();
- }
+ static readonly ILog log = LogManager.GetLogger();
}
\ No newline at end of file
diff --git a/src/ServiceControl.Audit/Auditing/AuditIngestor.cs b/src/ServiceControl.Audit/Auditing/AuditIngestor.cs
index a466f93be9..4a333a4b3e 100644
--- a/src/ServiceControl.Audit/Auditing/AuditIngestor.cs
+++ b/src/ServiceControl.Audit/Auditing/AuditIngestor.cs
@@ -2,7 +2,6 @@
{
using System;
using System.Collections.Generic;
- using System.Diagnostics.Metrics;
using System.Linq;
using System.Threading.Tasks;
using Infrastructure.Settings;
@@ -14,8 +13,7 @@
using Persistence.UnitOfWork;
using Recoverability;
using SagaAudit;
- using ServiceControl.Infrastructure;
- using ServiceControl.Transports;
+ using Transports;
public class AuditIngestor
{
@@ -31,6 +29,7 @@ ITransportCustomization transportCustomization
{
this.settings = settings;
this.messageDispatcher = messageDispatcher;
+
var enrichers = new IEnrichImportedAuditMessages[] { new MessageTypeEnricher(), new EnrichWithTrackingIds(), new ProcessingStatisticsEnricher(), new DetectNewEndpointsFromAuditImportsEnricher(endpointInstanceMonitoring), new DetectSuccessfulRetriesEnricher(), new SagaRelationshipsEnricher() }.Concat(auditEnrichers).ToArray();
logQueueAddress = transportCustomization.ToTransportQualifiedQueueName(settings.AuditLogQueue);
@@ -52,7 +51,6 @@ public async Task Ingest(List contexts)
if (settings.ForwardAuditMessages)
{
await Forward(stored, logQueueAddress);
- forwardedMessagesCounter.Add(stored.Count);
}
foreach (var context in contexts)
@@ -132,7 +130,6 @@ public async Task VerifyCanReachForwardingAddress()
readonly Settings settings;
readonly Lazy messageDispatcher;
readonly string logQueueAddress;
- readonly Counter forwardedMessagesCounter = Telemetry.Meter.CreateCounter(Telemetry.CreateInstrumentName("ingestion", "forwarded"), description: "Audit ingestion forwarded message count");
static readonly ILog Log = LogManager.GetLogger();
}
diff --git a/src/ServiceControl.Audit/Auditing/Metrics/BatchMetrics.cs b/src/ServiceControl.Audit/Auditing/Metrics/BatchMetrics.cs
new file mode 100644
index 0000000000..41d2994fa9
--- /dev/null
+++ b/src/ServiceControl.Audit/Auditing/Metrics/BatchMetrics.cs
@@ -0,0 +1,33 @@
+namespace ServiceControl.Audit.Auditing.Metrics;
+
+using System;
+using System.Diagnostics;
+using System.Diagnostics.Metrics;
+
+public record BatchMetrics(int MaxBatchSize, Histogram BatchDuration, Action IsSuccess) : IDisposable
+{
+ public void Dispose()
+ {
+ var isSuccess = actualBatchSize > 0;
+
+ IsSuccess(isSuccess);
+
+ string result;
+
+ if (isSuccess)
+ {
+ result = actualBatchSize == MaxBatchSize ? "full" : "partial";
+ }
+ else
+ {
+ result = "failed";
+ }
+
+ BatchDuration.Record(sw.Elapsed.TotalSeconds, new TagList { { "result", result } });
+ }
+
+ public void Complete(int size) => actualBatchSize = size;
+
+ int actualBatchSize = -1;
+ readonly Stopwatch sw = Stopwatch.StartNew();
+}
\ No newline at end of file
diff --git a/src/ServiceControl.Audit/Auditing/Metrics/ErrorMetrics.cs b/src/ServiceControl.Audit/Auditing/Metrics/ErrorMetrics.cs
new file mode 100644
index 0000000000..f01e6a9293
--- /dev/null
+++ b/src/ServiceControl.Audit/Auditing/Metrics/ErrorMetrics.cs
@@ -0,0 +1,21 @@
+namespace ServiceControl.Audit.Auditing.Metrics;
+
+using System;
+using System.Diagnostics.Metrics;
+using NServiceBus.Transport;
+
+public record ErrorMetrics(ErrorContext Context, Counter Failures) : IDisposable
+{
+ public void Dispose()
+ {
+ var tags = IngestionMetrics.GetMessageTags(Context.Message.Headers);
+
+ tags.Add("result", retry ? "retry" : "stored-poison");
+
+ Failures.Add(1, tags);
+ }
+
+ public void Retry() => retry = true;
+
+ bool retry;
+}
\ No newline at end of file
diff --git a/src/ServiceControl.Audit/Auditing/Metrics/IngestionMetrics.cs b/src/ServiceControl.Audit/Auditing/Metrics/IngestionMetrics.cs
new file mode 100644
index 0000000000..2c79c586d5
--- /dev/null
+++ b/src/ServiceControl.Audit/Auditing/Metrics/IngestionMetrics.cs
@@ -0,0 +1,74 @@
+namespace ServiceControl.Audit.Auditing.Metrics;
+
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.Diagnostics.Metrics;
+using EndpointPlugin.Messages.SagaState;
+using NServiceBus;
+using NServiceBus.Transport;
+
+public class IngestionMetrics
+{
+ public const string MeterName = "Particular.ServiceControl.Audit";
+
+ public static readonly string BatchDurationInstrumentName = $"{InstrumentPrefix}.batch_duration_seconds";
+ public static readonly string MessageDurationInstrumentName = $"{InstrumentPrefix}.message_duration_seconds";
+
+ public IngestionMetrics(IMeterFactory meterFactory)
+ {
+ var meter = meterFactory.Create(MeterName, MeterVersion);
+
+ batchDuration = meter.CreateHistogram(BatchDurationInstrumentName, unit: "seconds", "Message batch processing duration in seconds");
+ ingestionDuration = meter.CreateHistogram(MessageDurationInstrumentName, unit: "seconds", description: "Audit message processing duration in seconds");
+ consecutiveBatchFailureGauge = meter.CreateGauge($"{InstrumentPrefix}.consecutive_batch_failure_total", description: "Consecutive audit ingestion batch failure");
+ failureCounter = meter.CreateCounter($"{InstrumentPrefix}.failures_total", description: "Audit ingestion failure count");
+ }
+
+ public MessageMetrics BeginIngestion(MessageContext messageContext) => new(messageContext, ingestionDuration);
+
+ public ErrorMetrics BeginErrorHandling(ErrorContext errorContext) => new(errorContext, failureCounter);
+
+ public BatchMetrics BeginBatch(int maxBatchSize) => new(maxBatchSize, batchDuration, RecordBatchOutcome);
+
+ public static TagList GetMessageTags(Dictionary headers)
+ {
+ var tags = new TagList();
+
+ if (headers.TryGetValue(Headers.EnclosedMessageTypes, out var messageType))
+ {
+ tags.Add("message.category", messageType == SagaUpdateMessageType ? "saga-update" : "audit-message");
+ }
+ else
+ {
+ tags.Add("message.category", "control-message");
+ }
+
+ return tags;
+ }
+
+ void RecordBatchOutcome(bool success)
+ {
+ if (success)
+ {
+ consecutiveBatchFailures = 0;
+ }
+ else
+ {
+ consecutiveBatchFailures++;
+ }
+
+ consecutiveBatchFailureGauge.Record(consecutiveBatchFailures);
+ }
+
+ long consecutiveBatchFailures;
+
+ readonly Histogram batchDuration;
+ readonly Gauge consecutiveBatchFailureGauge;
+ readonly Histogram ingestionDuration;
+ readonly Counter failureCounter;
+
+ const string MeterVersion = "0.1.0";
+ const string InstrumentPrefix = "sc.audit.ingestion";
+
+ static readonly string SagaUpdateMessageType = typeof(SagaUpdatedMessage).FullName;
+}
\ No newline at end of file
diff --git a/src/ServiceControl.Audit/Auditing/Metrics/IngestionMetricsConfiguration.cs b/src/ServiceControl.Audit/Auditing/Metrics/IngestionMetricsConfiguration.cs
new file mode 100644
index 0000000000..b78b6cd62a
--- /dev/null
+++ b/src/ServiceControl.Audit/Auditing/Metrics/IngestionMetricsConfiguration.cs
@@ -0,0 +1,19 @@
+namespace ServiceControl.Audit.Auditing.Metrics;
+
+using OpenTelemetry.Metrics;
+
+public static class IngestionMetricsConfiguration
+{
+ public static void AddIngestionMetrics(this MeterProviderBuilder builder)
+ {
+ builder.AddMeter(IngestionMetrics.MeterName);
+
+ // Note: Views can be replaced by new InstrumentAdvice { HistogramBucketBoundaries = [...] }; once we can update to the latest OpenTelemetry packages
+ builder.AddView(
+ instrumentName: IngestionMetrics.MessageDurationInstrumentName,
+ new ExplicitBucketHistogramConfiguration { Boundaries = [0.01, 0.05, 0.1, 0.5, 1, 5] });
+ builder.AddView(
+ instrumentName: IngestionMetrics.BatchDurationInstrumentName,
+ new ExplicitBucketHistogramConfiguration { Boundaries = [0.01, 0.05, 0.1, 0.5, 1, 5] });
+ }
+}
\ No newline at end of file
diff --git a/src/ServiceControl.Audit/Auditing/Metrics/MessageMetrics.cs b/src/ServiceControl.Audit/Auditing/Metrics/MessageMetrics.cs
new file mode 100644
index 0000000000..df85310365
--- /dev/null
+++ b/src/ServiceControl.Audit/Auditing/Metrics/MessageMetrics.cs
@@ -0,0 +1,25 @@
+namespace ServiceControl.Audit.Auditing.Metrics;
+
+using System;
+using System.Diagnostics;
+using System.Diagnostics.Metrics;
+using NServiceBus.Transport;
+
+public record MessageMetrics(MessageContext Context, Histogram Duration) : IDisposable
+{
+ public void Skipped() => result = "skipped";
+
+ public void Success() => result = "success";
+
+ public void Dispose()
+ {
+ var tags = IngestionMetrics.GetMessageTags(Context.Headers);
+
+ tags.Add("result", result);
+ Duration.Record(sw.Elapsed.TotalSeconds, tags);
+ }
+
+ string result = "failed";
+
+ readonly Stopwatch sw = Stopwatch.StartNew();
+}
\ No newline at end of file
diff --git a/src/ServiceControl.Audit/HostApplicationBuilderExtensions.cs b/src/ServiceControl.Audit/HostApplicationBuilderExtensions.cs
index c38e023869..ccacf5db1c 100644
--- a/src/ServiceControl.Audit/HostApplicationBuilderExtensions.cs
+++ b/src/ServiceControl.Audit/HostApplicationBuilderExtensions.cs
@@ -6,6 +6,7 @@ namespace ServiceControl.Audit;
using System.Threading.Tasks;
using Auditing;
using Hosting;
+using Auditing.Metrics;
using Infrastructure;
using Infrastructure.Settings;
using Microsoft.AspNetCore.HttpLogging;
@@ -21,22 +22,22 @@ namespace ServiceControl.Audit;
using NServiceBus.Transport;
using Persistence;
using Transports;
-using ServiceControl.Infrastructure;
using OpenTelemetry.Metrics;
using OpenTelemetry.Resources;
static class HostApplicationBuilderExtensions
{
+ static readonly string InstanceVersion = FileVersionInfo.GetVersionInfo(typeof(HostApplicationBuilderExtensions).Assembly.Location).ProductVersion;
+
public static void AddServiceControlAudit(this IHostApplicationBuilder builder,
Func onCriticalError,
Settings settings,
EndpointConfiguration configuration)
{
- var version = FileVersionInfo.GetVersionInfo(typeof(HostApplicationBuilderExtensions).Assembly.Location).ProductVersion;
var persistenceConfiguration = PersistenceConfigurationFactory.LoadPersistenceConfiguration(settings);
var persistenceSettings = persistenceConfiguration.BuildPersistenceSettings(settings);
- RecordStartup(version, settings, configuration, persistenceConfiguration);
+ RecordStartup(settings, configuration, persistenceConfiguration);
builder.Logging.ClearProviders();
builder.Logging.AddNLog();
@@ -71,30 +72,7 @@ public static void AddServiceControlAudit(this IHostApplicationBuilder builder,
NServiceBusFactory.Configure(settings, transportCustomization, transportSettings, onCriticalError, configuration);
builder.UseNServiceBus(configuration);
- if (!string.IsNullOrEmpty(settings.OtlpEndpointUrl))
- {
- if (!Uri.TryCreate(settings.OtlpEndpointUrl, UriKind.Absolute, out var otelMetricsUri))
- {
- throw new UriFormatException($"Invalid OtlpEndpointUrl: {settings.OtlpEndpointUrl}");
- }
-
- builder.Services.AddOpenTelemetry()
- .ConfigureResource(b => b.AddService(
- serviceName: settings.InstanceName,
- serviceVersion: version,
- autoGenerateServiceInstanceId: true))
- .WithMetrics(b =>
- {
- b.AddAuditIngestionMeters();
- b.AddOtlpExporter(e =>
- {
- e.Endpoint = otelMetricsUri;
- });
- });
-
- var logger = LogManager.GetLogger(typeof(HostApplicationBuilderExtensions));
- logger.InfoFormat("OpenTelemetry metrics exporter enabled: {0}", settings.OtlpEndpointUrl);
- }
+ builder.AddMetrics(settings);
// Configure after the NServiceBus hosted service to ensure NServiceBus is already started
if (settings.IngestAuditMessages)
@@ -116,11 +94,42 @@ public static void AddServiceControlAuditInstallers(this IHostApplicationBuilder
builder.Services.AddInstaller(persistenceSettings, persistenceConfiguration);
}
- static void RecordStartup(string version, Settings settings, EndpointConfiguration endpointConfiguration, IPersistenceConfiguration persistenceConfiguration)
+ public static void AddMetrics(this IHostApplicationBuilder builder, Settings settings)
+ {
+ builder.Services.AddSingleton();
+
+ if (!string.IsNullOrEmpty(settings.OtlpEndpointUrl))
+ {
+ if (!Uri.TryCreate(settings.OtlpEndpointUrl, UriKind.Absolute, out var otelMetricsUri))
+ {
+ throw new UriFormatException($"Invalid OtlpEndpointUrl: {settings.OtlpEndpointUrl}");
+ }
+
+ builder.Services.AddOpenTelemetry()
+ .ConfigureResource(b => b.AddService(
+ serviceName: settings.InstanceName,
+ serviceVersion: InstanceVersion,
+ autoGenerateServiceInstanceId: true))
+ .WithMetrics(b =>
+ {
+ b.AddIngestionMetrics();
+ b.AddOtlpExporter(e => e.Endpoint = otelMetricsUri);
+ if (Debugger.IsAttached)
+ {
+ b.AddConsoleExporter();
+ }
+ });
+
+ var logger = LogManager.GetLogger(typeof(HostApplicationBuilderExtensions));
+ logger.InfoFormat("OpenTelemetry metrics exporter enabled: {0}", settings.OtlpEndpointUrl);
+ }
+ }
+
+ static void RecordStartup(Settings settings, EndpointConfiguration endpointConfiguration, IPersistenceConfiguration persistenceConfiguration)
{
var startupMessage = $@"
-------------------------------------------------------------
-ServiceControl Audit Version: {version}
+ServiceControl Audit Version: {InstanceVersion}
Audit Retention Period: {settings.AuditRetentionPeriod}
Forwarding Audit Messages: {settings.ForwardAuditMessages}
ServiceControl Logging Level: {settings.LoggingSettings.LogLevel}
diff --git a/src/ServiceControl.Audit/Infrastructure/DurationRecorder.cs b/src/ServiceControl.Audit/Infrastructure/DurationRecorder.cs
deleted file mode 100644
index bd8bc3c9d7..0000000000
--- a/src/ServiceControl.Audit/Infrastructure/DurationRecorder.cs
+++ /dev/null
@@ -1,12 +0,0 @@
-namespace ServiceControl.Infrastructure;
-
-using System;
-using System.Diagnostics;
-using System.Diagnostics.Metrics;
-
-record DurationRecorder(Histogram Histogram, TagList Tags = default) : IDisposable
-{
- readonly Stopwatch sw = Stopwatch.StartNew();
-
- public void Dispose() => Histogram.Record(sw.ElapsedMilliseconds, Tags);
-}
\ No newline at end of file
diff --git a/src/ServiceControl.Audit/Infrastructure/Telemetry.cs b/src/ServiceControl.Audit/Infrastructure/Telemetry.cs
deleted file mode 100644
index 033590b91c..0000000000
--- a/src/ServiceControl.Audit/Infrastructure/Telemetry.cs
+++ /dev/null
@@ -1,30 +0,0 @@
-namespace ServiceControl.Infrastructure;
-
-using System;
-using System.Collections.Generic;
-using System.Diagnostics;
-using System.Diagnostics.Metrics;
-using NServiceBus;
-using OpenTelemetry.Metrics;
-
-static class Telemetry
-{
- const string MeterName = "Particular.ServiceControl.Audit";
- public static readonly Meter Meter = new(MeterName, "0.1.0");
-
- public static string CreateInstrumentName(string instrumentNamespace, string instrumentName) => $"sc.audit.{instrumentNamespace}.{instrumentName}".ToLower();
-
- public static void AddAuditIngestionMeters(this MeterProviderBuilder builder) => builder.AddMeter(MeterName);
-
- public static TagList GetIngestedMessageTags(IDictionary headers, ReadOnlyMemory body)
- {
- var tags = new TagList { { "messaging.message.body.size", body.Length } };
-
- if (headers.TryGetValue(Headers.EnclosedMessageTypes, out var messageType))
- {
- tags.Add("messaging.message.type", messageType);
- }
-
- return tags;
- }
-}
\ No newline at end of file
diff --git a/src/ServiceControl.Audit/ServiceControl.Audit.csproj b/src/ServiceControl.Audit/ServiceControl.Audit.csproj
index 6e0a889e9f..3d5245157b 100644
--- a/src/ServiceControl.Audit/ServiceControl.Audit.csproj
+++ b/src/ServiceControl.Audit/ServiceControl.Audit.csproj
@@ -29,8 +29,10 @@
+
+