From b54b47ccf83c107af1923245ddd9142dd602bd95 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20=C3=96hlund?= Date: Sun, 2 Mar 2025 10:30:20 +0100 Subject: [PATCH 01/19] Add a metrics wrapper --- .../Auditing/AuditIngestionMetrics.cs | 21 +++++++++++++++++++ .../Auditing/AuditIngestor.cs | 12 +++++------ .../HostApplicationBuilderExtensions.cs | 2 ++ 3 files changed, 29 insertions(+), 6 deletions(-) create mode 100644 src/ServiceControl.Audit/Auditing/AuditIngestionMetrics.cs diff --git a/src/ServiceControl.Audit/Auditing/AuditIngestionMetrics.cs b/src/ServiceControl.Audit/Auditing/AuditIngestionMetrics.cs new file mode 100644 index 0000000000..fd0280fe4a --- /dev/null +++ b/src/ServiceControl.Audit/Auditing/AuditIngestionMetrics.cs @@ -0,0 +1,21 @@ +namespace ServiceControl.Audit.Auditing; + +using System.Diagnostics.Metrics; + +public class AuditIngestionMetrics +{ + public AuditIngestionMetrics(IMeterFactory meterFactory) + { + var meter = meterFactory.Create(MeterName, MeterVersion); + forwardedMessagesCounter = meter.CreateCounter(CreateInstrumentName("forwarded"), description: "Audit ingestion forwarded message count"); + } + + public void IncrementMessagesForwarded(int count) => forwardedMessagesCounter.Add(count); + + static string CreateInstrumentName(string instrumentName) => $"sc.audit.ingestion.{instrumentName}".ToLower(); + + readonly Counter forwardedMessagesCounter; + + const string MeterName = "Particular.ServiceControl.Audit"; + const string MeterVersion = "0.1.0"; +} \ No newline at end of file diff --git a/src/ServiceControl.Audit/Auditing/AuditIngestor.cs b/src/ServiceControl.Audit/Auditing/AuditIngestor.cs index a466f93be9..bbc244dd18 100644 --- a/src/ServiceControl.Audit/Auditing/AuditIngestor.cs +++ b/src/ServiceControl.Audit/Auditing/AuditIngestor.cs @@ -2,7 +2,6 @@ { using System; using System.Collections.Generic; - using System.Diagnostics.Metrics; using System.Linq; using System.Threading.Tasks; using Infrastructure.Settings; @@ -14,8 +13,7 @@ using Persistence.UnitOfWork; using Recoverability; using SagaAudit; - using ServiceControl.Infrastructure; - using ServiceControl.Transports; + using Transports; public class AuditIngestor { @@ -26,11 +24,13 @@ public AuditIngestor( IEnumerable auditEnrichers, // allows extending message enrichers with custom enrichers registered in the DI container IMessageSession messageSession, Lazy messageDispatcher, - ITransportCustomization transportCustomization + ITransportCustomization transportCustomization, + AuditIngestionMetrics metrics ) { this.settings = settings; this.messageDispatcher = messageDispatcher; + this.metrics = metrics; var enrichers = new IEnrichImportedAuditMessages[] { new MessageTypeEnricher(), new EnrichWithTrackingIds(), new ProcessingStatisticsEnricher(), new DetectNewEndpointsFromAuditImportsEnricher(endpointInstanceMonitoring), new DetectSuccessfulRetriesEnricher(), new SagaRelationshipsEnricher() }.Concat(auditEnrichers).ToArray(); logQueueAddress = transportCustomization.ToTransportQualifiedQueueName(settings.AuditLogQueue); @@ -52,7 +52,7 @@ public async Task Ingest(List contexts) if (settings.ForwardAuditMessages) { await Forward(stored, logQueueAddress); - forwardedMessagesCounter.Add(stored.Count); + metrics.IncrementMessagesForwarded(stored.Count); } foreach (var context in contexts) @@ -131,8 +131,8 @@ public async Task VerifyCanReachForwardingAddress() readonly AuditPersister auditPersister; readonly Settings settings; readonly Lazy messageDispatcher; + readonly AuditIngestionMetrics metrics; readonly string logQueueAddress; - readonly Counter forwardedMessagesCounter = Telemetry.Meter.CreateCounter(Telemetry.CreateInstrumentName("ingestion", "forwarded"), description: "Audit ingestion forwarded message count"); static readonly ILog Log = LogManager.GetLogger(); } diff --git a/src/ServiceControl.Audit/HostApplicationBuilderExtensions.cs b/src/ServiceControl.Audit/HostApplicationBuilderExtensions.cs index c38e023869..3e02deb93e 100644 --- a/src/ServiceControl.Audit/HostApplicationBuilderExtensions.cs +++ b/src/ServiceControl.Audit/HostApplicationBuilderExtensions.cs @@ -71,6 +71,8 @@ public static void AddServiceControlAudit(this IHostApplicationBuilder builder, NServiceBusFactory.Configure(settings, transportCustomization, transportSettings, onCriticalError, configuration); builder.UseNServiceBus(configuration); + services.AddSingleton(); + if (!string.IsNullOrEmpty(settings.OtlpEndpointUrl)) { if (!Uri.TryCreate(settings.OtlpEndpointUrl, UriKind.Absolute, out var otelMetricsUri)) From 90a2811c415e3ec50a3a6015e0e4704db9400bce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20=C3=96hlund?= Date: Thu, 6 Mar 2025 13:18:06 +0100 Subject: [PATCH 02/19] Add more metrics --- .../Auditing/AuditIngestion.cs | 54 +++++++++---------- .../Auditing/AuditIngestionMetrics.cs | 21 -------- .../Auditing/AuditIngestor.cs | 1 + .../Auditing/Metrics/AuditIngestionMetrics.cs | 49 +++++++++++++++++ .../Auditing/Metrics/BatchMetrics.cs | 35 ++++++++++++ .../Metrics/MessageIngestionMetrics.cs | 45 ++++++++++++++++ .../HostApplicationBuilderExtensions.cs | 1 + 7 files changed, 158 insertions(+), 48 deletions(-) delete mode 100644 src/ServiceControl.Audit/Auditing/AuditIngestionMetrics.cs create mode 100644 src/ServiceControl.Audit/Auditing/Metrics/AuditIngestionMetrics.cs create mode 100644 src/ServiceControl.Audit/Auditing/Metrics/BatchMetrics.cs create mode 100644 src/ServiceControl.Audit/Auditing/Metrics/MessageIngestionMetrics.cs diff --git a/src/ServiceControl.Audit/Auditing/AuditIngestion.cs b/src/ServiceControl.Audit/Auditing/AuditIngestion.cs index 8437d564cd..19a348b79a 100644 --- a/src/ServiceControl.Audit/Auditing/AuditIngestion.cs +++ b/src/ServiceControl.Audit/Auditing/AuditIngestion.cs @@ -2,11 +2,11 @@ { using System; using System.Collections.Generic; - using System.Diagnostics.Metrics; using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; using Infrastructure.Settings; + using Metrics; using Microsoft.Extensions.Hosting; using NServiceBus; using NServiceBus.Logging; @@ -26,7 +26,8 @@ public AuditIngestion( AuditIngestionCustomCheck.State ingestionState, AuditIngestor auditIngestor, IAuditIngestionUnitOfWorkFactory unitOfWorkFactory, - IHostApplicationLifetime applicationLifetime) + IHostApplicationLifetime applicationLifetime, + AuditIngestionMetrics metrics) { inputEndpoint = settings.AuditQueue; this.transportCustomization = transportCustomization; @@ -35,13 +36,16 @@ public AuditIngestion( this.unitOfWorkFactory = unitOfWorkFactory; this.settings = settings; this.applicationLifetime = applicationLifetime; + this.metrics = metrics; if (!transportSettings.MaxConcurrency.HasValue) { throw new ArgumentException("MaxConcurrency is not set in TransportSettings"); } - channel = Channel.CreateBounded(new BoundedChannelOptions(transportSettings.MaxConcurrency.Value) + MaxBatchSize = transportSettings.MaxConcurrency.Value; + + channel = Channel.CreateBounded(new BoundedChannelOptions(MaxBatchSize) { SingleReader = true, SingleWriter = false, @@ -190,22 +194,21 @@ async Task EnsureStopped(CancellationToken cancellationToken) async Task OnMessage(MessageContext messageContext, CancellationToken cancellationToken) { - var tags = Telemetry.GetIngestedMessageTags(messageContext.Headers, messageContext.Body); - using (new DurationRecorder(ingestionDuration, tags)) + using var messageIngestionMetrics = metrics.BeginIngestion(messageContext); + + if (settings.MessageFilter != null && settings.MessageFilter(messageContext)) { - if (settings.MessageFilter != null && settings.MessageFilter(messageContext)) - { - return; - } + messageIngestionMetrics.Skipped(); + return; + } - var taskCompletionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - messageContext.SetTaskCompletionSource(taskCompletionSource); + var taskCompletionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + messageContext.SetTaskCompletionSource(taskCompletionSource); - await channel.Writer.WriteAsync(messageContext, cancellationToken); - await taskCompletionSource.Task; + await channel.Writer.WriteAsync(messageContext, cancellationToken); + _ = await taskCompletionSource.Task; - successfulMessagesCounter.Add(1, tags); - } + messageIngestionMetrics.Success(); } public override async Task StartAsync(CancellationToken cancellationToken) @@ -218,7 +221,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) { try { - var contexts = new List(transportSettings.MaxConcurrency.Value); + var contexts = new List(MaxBatchSize); while (await channel.Reader.WaitToReadAsync(stoppingToken)) { @@ -226,19 +229,19 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) try { // as long as there is something to read this will fetch up to MaximumConcurrency items - using (var recorder = new DurationRecorder(batchDuration)) + using (var batchMetrics = metrics.BeginBatch(MaxBatchSize)) { while (channel.Reader.TryRead(out var context)) { contexts.Add(context); } - recorder.Tags.Add("ingestion.batch_size", contexts.Count); - await auditIngestor.Ingest(contexts); + + batchMetrics.Complete(contexts.Count); } - consecutiveBatchFailuresCounter.Record(0); + //metrics.ClearB .Record(0); } catch (Exception e) { @@ -257,7 +260,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) logger.Info("Ingesting messages failed", e); // no need to do interlocked increment since this is running sequential - consecutiveBatchFailuresCounter.Record(consecutiveBatchFailures++); + //consecutiveBatchFailuresCounter.Record(consecutiveBatchFailures++); } finally { @@ -297,9 +300,9 @@ public override async Task StopAsync(CancellationToken cancellationToken) } TransportInfrastructure transportInfrastructure; - IMessageReceiver messageReceiver; - long consecutiveBatchFailures = 0; + IMessageReceiver queueIngestor; + readonly int MaxBatchSize; readonly SemaphoreSlim startStopSemaphore = new(1); readonly string inputEndpoint; readonly ITransportCustomization transportCustomization; @@ -309,12 +312,9 @@ public override async Task StopAsync(CancellationToken cancellationToken) readonly IAuditIngestionUnitOfWorkFactory unitOfWorkFactory; readonly Settings settings; readonly Channel channel; - readonly Histogram batchDuration = Telemetry.Meter.CreateHistogram(Telemetry.CreateInstrumentName("ingestion", "batch_duration"), unit: "ms", "Average audit message batch processing duration"); - readonly Counter successfulMessagesCounter = Telemetry.Meter.CreateCounter(Telemetry.CreateInstrumentName("ingestion", "success"), description: "Successful ingested audit message count"); - readonly Histogram consecutiveBatchFailuresCounter = Telemetry.Meter.CreateHistogram(Telemetry.CreateInstrumentName("ingestion", "consecutive_batch_failures"), unit: "count", description: "Consecutive audit ingestion batch failure"); - readonly Histogram ingestionDuration = Telemetry.Meter.CreateHistogram(Telemetry.CreateInstrumentName("ingestion", "duration"), unit: "ms", description: "Average incoming audit message processing duration"); readonly Watchdog watchdog; readonly IHostApplicationLifetime applicationLifetime; + readonly AuditIngestionMetrics metrics; static readonly ILog logger = LogManager.GetLogger(); diff --git a/src/ServiceControl.Audit/Auditing/AuditIngestionMetrics.cs b/src/ServiceControl.Audit/Auditing/AuditIngestionMetrics.cs deleted file mode 100644 index fd0280fe4a..0000000000 --- a/src/ServiceControl.Audit/Auditing/AuditIngestionMetrics.cs +++ /dev/null @@ -1,21 +0,0 @@ -namespace ServiceControl.Audit.Auditing; - -using System.Diagnostics.Metrics; - -public class AuditIngestionMetrics -{ - public AuditIngestionMetrics(IMeterFactory meterFactory) - { - var meter = meterFactory.Create(MeterName, MeterVersion); - forwardedMessagesCounter = meter.CreateCounter(CreateInstrumentName("forwarded"), description: "Audit ingestion forwarded message count"); - } - - public void IncrementMessagesForwarded(int count) => forwardedMessagesCounter.Add(count); - - static string CreateInstrumentName(string instrumentName) => $"sc.audit.ingestion.{instrumentName}".ToLower(); - - readonly Counter forwardedMessagesCounter; - - const string MeterName = "Particular.ServiceControl.Audit"; - const string MeterVersion = "0.1.0"; -} \ No newline at end of file diff --git a/src/ServiceControl.Audit/Auditing/AuditIngestor.cs b/src/ServiceControl.Audit/Auditing/AuditIngestor.cs index bbc244dd18..c694b1a58f 100644 --- a/src/ServiceControl.Audit/Auditing/AuditIngestor.cs +++ b/src/ServiceControl.Audit/Auditing/AuditIngestor.cs @@ -5,6 +5,7 @@ using System.Linq; using System.Threading.Tasks; using Infrastructure.Settings; + using Metrics; using Monitoring; using NServiceBus; using NServiceBus.Logging; diff --git a/src/ServiceControl.Audit/Auditing/Metrics/AuditIngestionMetrics.cs b/src/ServiceControl.Audit/Auditing/Metrics/AuditIngestionMetrics.cs new file mode 100644 index 0000000000..dc5892e422 --- /dev/null +++ b/src/ServiceControl.Audit/Auditing/Metrics/AuditIngestionMetrics.cs @@ -0,0 +1,49 @@ +namespace ServiceControl.Audit.Auditing.Metrics; + +using System.Diagnostics.Metrics; +using NServiceBus.Transport; + +public class AuditIngestionMetrics +{ + public AuditIngestionMetrics(IMeterFactory meterFactory) + { + var meter = meterFactory.Create(MeterName, MeterVersion); + + forwardedMessagesCounter = meter.CreateCounter(CreateInstrumentName("forwarded"), description: "Audit ingestion forwarded message count"); + batchDuration = meter.CreateHistogram(CreateInstrumentName("batch_duration"), unit: "ms", "Average audit message batch processing duration"); + consecutiveBatchFailureGauge = meter.CreateObservableGauge(CreateInstrumentName("consecutive_batch_failures"), () => consecutiveBatchFailures, unit: "count", description: "Consecutive audit ingestion batch failure"); + ingestionDuration = meter.CreateHistogram(CreateInstrumentName("duration"), unit: "ms", description: "Average incoming audit message processing duration"); + } + + public void IncrementMessagesForwarded(int count) => forwardedMessagesCounter.Add(count); + + public MessageIngestionMetrics BeginIngestion(MessageContext messageContext) => new(messageContext, ingestionDuration); + + public BatchMetrics BeginBatch(int maxBatchSize) => new(maxBatchSize, batchDuration, RecordBatchOutcome); + + void RecordBatchOutcome(bool success) + { + if (success) + { + consecutiveBatchFailures = 0; + } + else + { + consecutiveBatchFailures++; + } + } + + static string CreateInstrumentName(string instrumentName) => $"sc.audit.ingestion.{instrumentName}".ToLower(); + + long consecutiveBatchFailures; + + readonly Counter forwardedMessagesCounter; + readonly Histogram batchDuration; +#pragma warning disable IDE0052 + readonly ObservableGauge consecutiveBatchFailureGauge; +#pragma warning restore IDE0052 + readonly Histogram ingestionDuration; + + const string MeterName = "Particular.ServiceControl.Audit"; + const string MeterVersion = "0.1.0"; +} \ No newline at end of file diff --git a/src/ServiceControl.Audit/Auditing/Metrics/BatchMetrics.cs b/src/ServiceControl.Audit/Auditing/Metrics/BatchMetrics.cs new file mode 100644 index 0000000000..9939a2f754 --- /dev/null +++ b/src/ServiceControl.Audit/Auditing/Metrics/BatchMetrics.cs @@ -0,0 +1,35 @@ +namespace ServiceControl.Audit.Auditing.Metrics; + +using System; +using System.Diagnostics; +using System.Diagnostics.Metrics; + +public record BatchMetrics(int MaxBatchSize, Histogram BatchDuration, Action IsSuccess) : IDisposable +{ + public void Dispose() + { + var tags = new TagList(); + + string result; + + if (actualBatchSize <= 0) + { + result = "failed"; + IsSuccess(false); + } + else + { + result = actualBatchSize == MaxBatchSize ? "full" : "partial"; + + IsSuccess(true); + } + + tags.Add("result", result); + BatchDuration.Record(sw.ElapsedMilliseconds, tags); + } + + public void Complete(int size) => actualBatchSize = size; + + int actualBatchSize = -1; + readonly Stopwatch sw = Stopwatch.StartNew(); +} \ No newline at end of file diff --git a/src/ServiceControl.Audit/Auditing/Metrics/MessageIngestionMetrics.cs b/src/ServiceControl.Audit/Auditing/Metrics/MessageIngestionMetrics.cs new file mode 100644 index 0000000000..a1bd7339b5 --- /dev/null +++ b/src/ServiceControl.Audit/Auditing/Metrics/MessageIngestionMetrics.cs @@ -0,0 +1,45 @@ +namespace ServiceControl.Audit.Auditing.Metrics; + +using System; +using System.Diagnostics; +using System.Diagnostics.Metrics; +using EndpointPlugin.Messages.SagaState; +using NServiceBus; +using NServiceBus.Transport; + +public record MessageIngestionMetrics(MessageContext Message, Histogram Duration) : IDisposable +{ + public void Skipped() => result = "skipped"; + + public void Success() => result = "success"; + + public void Dispose() + { + var tags = GetTags(Message); + + tags.Add("result", result); + Duration.Record(sw.ElapsedMilliseconds, tags); + } + + static TagList GetTags(MessageContext messageContext) + { + var tags = new TagList(); + + if (messageContext.Headers.TryGetValue(Headers.EnclosedMessageTypes, out var messageType)) + { + tags.Add("message.category", messageType == SagaUpdateMessageType ? "saga-update" : "audit-message"); + } + else + { + tags.Add("message.category", "control-message"); + } + + return tags; + } + + string result = "failed"; + + readonly Stopwatch sw = Stopwatch.StartNew(); + + static readonly string SagaUpdateMessageType = typeof(SagaUpdatedMessage).FullName; +} \ No newline at end of file diff --git a/src/ServiceControl.Audit/HostApplicationBuilderExtensions.cs b/src/ServiceControl.Audit/HostApplicationBuilderExtensions.cs index 3e02deb93e..2b60d12833 100644 --- a/src/ServiceControl.Audit/HostApplicationBuilderExtensions.cs +++ b/src/ServiceControl.Audit/HostApplicationBuilderExtensions.cs @@ -6,6 +6,7 @@ namespace ServiceControl.Audit; using System.Threading.Tasks; using Auditing; using Hosting; +using Auditing.Metrics; using Infrastructure; using Infrastructure.Settings; using Microsoft.AspNetCore.HttpLogging; From c78d24950254225f6551f800619c78cd76f81643 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20=C3=96hlund?= Date: Thu, 6 Mar 2025 18:13:09 +0100 Subject: [PATCH 03/19] Remove counter for forwarding --- src/ServiceControl.Audit/Auditing/AuditIngestor.cs | 7 ++----- .../Auditing/Metrics/AuditIngestionMetrics.cs | 4 ---- 2 files changed, 2 insertions(+), 9 deletions(-) diff --git a/src/ServiceControl.Audit/Auditing/AuditIngestor.cs b/src/ServiceControl.Audit/Auditing/AuditIngestor.cs index c694b1a58f..8009dd4f77 100644 --- a/src/ServiceControl.Audit/Auditing/AuditIngestor.cs +++ b/src/ServiceControl.Audit/Auditing/AuditIngestor.cs @@ -25,13 +25,12 @@ public AuditIngestor( IEnumerable auditEnrichers, // allows extending message enrichers with custom enrichers registered in the DI container IMessageSession messageSession, Lazy messageDispatcher, - ITransportCustomization transportCustomization, - AuditIngestionMetrics metrics + ITransportCustomization transportCustomization ) { this.settings = settings; this.messageDispatcher = messageDispatcher; - this.metrics = metrics; + var enrichers = new IEnrichImportedAuditMessages[] { new MessageTypeEnricher(), new EnrichWithTrackingIds(), new ProcessingStatisticsEnricher(), new DetectNewEndpointsFromAuditImportsEnricher(endpointInstanceMonitoring), new DetectSuccessfulRetriesEnricher(), new SagaRelationshipsEnricher() }.Concat(auditEnrichers).ToArray(); logQueueAddress = transportCustomization.ToTransportQualifiedQueueName(settings.AuditLogQueue); @@ -53,7 +52,6 @@ public async Task Ingest(List contexts) if (settings.ForwardAuditMessages) { await Forward(stored, logQueueAddress); - metrics.IncrementMessagesForwarded(stored.Count); } foreach (var context in contexts) @@ -132,7 +130,6 @@ public async Task VerifyCanReachForwardingAddress() readonly AuditPersister auditPersister; readonly Settings settings; readonly Lazy messageDispatcher; - readonly AuditIngestionMetrics metrics; readonly string logQueueAddress; static readonly ILog Log = LogManager.GetLogger(); diff --git a/src/ServiceControl.Audit/Auditing/Metrics/AuditIngestionMetrics.cs b/src/ServiceControl.Audit/Auditing/Metrics/AuditIngestionMetrics.cs index dc5892e422..5830571c55 100644 --- a/src/ServiceControl.Audit/Auditing/Metrics/AuditIngestionMetrics.cs +++ b/src/ServiceControl.Audit/Auditing/Metrics/AuditIngestionMetrics.cs @@ -9,14 +9,11 @@ public AuditIngestionMetrics(IMeterFactory meterFactory) { var meter = meterFactory.Create(MeterName, MeterVersion); - forwardedMessagesCounter = meter.CreateCounter(CreateInstrumentName("forwarded"), description: "Audit ingestion forwarded message count"); batchDuration = meter.CreateHistogram(CreateInstrumentName("batch_duration"), unit: "ms", "Average audit message batch processing duration"); consecutiveBatchFailureGauge = meter.CreateObservableGauge(CreateInstrumentName("consecutive_batch_failures"), () => consecutiveBatchFailures, unit: "count", description: "Consecutive audit ingestion batch failure"); ingestionDuration = meter.CreateHistogram(CreateInstrumentName("duration"), unit: "ms", description: "Average incoming audit message processing duration"); } - public void IncrementMessagesForwarded(int count) => forwardedMessagesCounter.Add(count); - public MessageIngestionMetrics BeginIngestion(MessageContext messageContext) => new(messageContext, ingestionDuration); public BatchMetrics BeginBatch(int maxBatchSize) => new(maxBatchSize, batchDuration, RecordBatchOutcome); @@ -37,7 +34,6 @@ void RecordBatchOutcome(bool success) long consecutiveBatchFailures; - readonly Counter forwardedMessagesCounter; readonly Histogram batchDuration; #pragma warning disable IDE0052 readonly ObservableGauge consecutiveBatchFailureGauge; From c11900f2fc93788b3c7fa13890265487da32efb1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20=C3=96hlund?= Date: Thu, 6 Mar 2025 18:34:40 +0100 Subject: [PATCH 04/19] Move failure counters --- .../Auditing/AuditIngestion.cs | 6 +- .../Auditing/AuditIngestionFaultPolicy.cs | 176 +++++++++--------- .../Auditing/AuditIngestor.cs | 1 - .../Auditing/Metrics/ErrorMetrics.cs | 21 +++ ...ngestionMetrics.cs => IngestionMetrics.cs} | 34 +++- .../Metrics/MessageIngestionMetrics.cs | 45 ----- .../Auditing/Metrics/MessageMetrics.cs | 25 +++ .../HostApplicationBuilderExtensions.cs | 2 +- 8 files changed, 167 insertions(+), 143 deletions(-) create mode 100644 src/ServiceControl.Audit/Auditing/Metrics/ErrorMetrics.cs rename src/ServiceControl.Audit/Auditing/Metrics/{AuditIngestionMetrics.cs => IngestionMetrics.cs} (54%) delete mode 100644 src/ServiceControl.Audit/Auditing/Metrics/MessageIngestionMetrics.cs create mode 100644 src/ServiceControl.Audit/Auditing/Metrics/MessageMetrics.cs diff --git a/src/ServiceControl.Audit/Auditing/AuditIngestion.cs b/src/ServiceControl.Audit/Auditing/AuditIngestion.cs index 19a348b79a..875d5b0669 100644 --- a/src/ServiceControl.Audit/Auditing/AuditIngestion.cs +++ b/src/ServiceControl.Audit/Auditing/AuditIngestion.cs @@ -27,7 +27,7 @@ public AuditIngestion( AuditIngestor auditIngestor, IAuditIngestionUnitOfWorkFactory unitOfWorkFactory, IHostApplicationLifetime applicationLifetime, - AuditIngestionMetrics metrics) + IngestionMetrics metrics) { inputEndpoint = settings.AuditQueue; this.transportCustomization = transportCustomization; @@ -53,7 +53,7 @@ public AuditIngestion( FullMode = BoundedChannelFullMode.Wait }); - errorHandlingPolicy = new AuditIngestionFaultPolicy(failedImportsStorage, settings.LoggingSettings, OnCriticalError); + errorHandlingPolicy = new AuditIngestionFaultPolicy(failedImportsStorage, settings.LoggingSettings, OnCriticalError, metrics); watchdog = new Watchdog( "audit message ingestion", @@ -314,7 +314,7 @@ public override async Task StopAsync(CancellationToken cancellationToken) readonly Channel channel; readonly Watchdog watchdog; readonly IHostApplicationLifetime applicationLifetime; - readonly AuditIngestionMetrics metrics; + readonly IngestionMetrics metrics; static readonly ILog logger = LogManager.GetLogger(); diff --git a/src/ServiceControl.Audit/Auditing/AuditIngestionFaultPolicy.cs b/src/ServiceControl.Audit/Auditing/AuditIngestionFaultPolicy.cs index 6b8dd39a23..aa5d8c3aa5 100644 --- a/src/ServiceControl.Audit/Auditing/AuditIngestionFaultPolicy.cs +++ b/src/ServiceControl.Audit/Auditing/AuditIngestionFaultPolicy.cs @@ -1,115 +1,113 @@ -namespace ServiceControl.Audit.Auditing +namespace ServiceControl.Audit.Auditing; + +using System; +using System.Diagnostics; +using System.IO; +using System.Runtime.InteropServices; +using System.Runtime.Versioning; +using System.Threading; +using System.Threading.Tasks; +using Infrastructure; +using NServiceBus.Logging; +using NServiceBus.Transport; +using Persistence; +using Configuration; +using Metrics; +using ServiceControl.Infrastructure; + +class AuditIngestionFaultPolicy { - using System; - using System.Diagnostics; - using System.Diagnostics.Metrics; - using System.IO; - using System.Runtime.InteropServices; - using System.Runtime.Versioning; - using System.Threading; - using System.Threading.Tasks; - using Infrastructure; - using NServiceBus.Logging; - using NServiceBus.Transport; - using Persistence; - using Configuration; - using ServiceControl.Infrastructure; - - class AuditIngestionFaultPolicy + public AuditIngestionFaultPolicy(IFailedAuditStorage failedAuditStorage, LoggingSettings settings, Func onCriticalError, IngestionMetrics metrics) { - readonly IFailedAuditStorage failedAuditStorage; - readonly string logPath; - readonly ImportFailureCircuitBreaker failureCircuitBreaker; + failureCircuitBreaker = new ImportFailureCircuitBreaker(onCriticalError); + this.failedAuditStorage = failedAuditStorage; + this.metrics = metrics; - public AuditIngestionFaultPolicy(IFailedAuditStorage failedAuditStorage, LoggingSettings settings, Func onCriticalError) + if (!AppEnvironment.RunningInContainer) { - failureCircuitBreaker = new ImportFailureCircuitBreaker(onCriticalError); - this.failedAuditStorage = failedAuditStorage; - - if (!AppEnvironment.RunningInContainer) - { - logPath = Path.Combine(settings.LogPath, @"FailedImports\Audit"); - Directory.CreateDirectory(logPath); - } + logPath = Path.Combine(settings.LogPath, @"FailedImports\Audit"); + Directory.CreateDirectory(logPath); } + } - public async Task OnError(ErrorContext errorContext, CancellationToken cancellationToken = default) - { - var tags = Telemetry.GetIngestedMessageTags(errorContext.Message.Headers, errorContext.Message.Body); + public async Task OnError(ErrorContext errorContext, CancellationToken cancellationToken = default) + { + using var errorMetrics = metrics.BeginErrorHandling(errorContext); - //Same as recoverability policy in NServiceBusFactory - if (errorContext.ImmediateProcessingFailures < 3) - { - retryCounter.Add(1, tags); - return ErrorHandleResult.RetryRequired; - } + //Same as recoverability policy in NServiceBusFactory + if (errorContext.ImmediateProcessingFailures < 3) + { + errorMetrics.Retry(); + return ErrorHandleResult.RetryRequired; + } - await StoreFailedMessageDocument(errorContext, cancellationToken); + await StoreFailedMessageDocument(errorContext, cancellationToken); - failedCounter.Add(1, tags); + //failedCounter.Add(1, tags); - return ErrorHandleResult.Handled; - } + return ErrorHandleResult.Handled; + } - async Task StoreFailedMessageDocument(ErrorContext errorContext, CancellationToken cancellationToken) + async Task StoreFailedMessageDocument(ErrorContext errorContext, CancellationToken cancellationToken) + { + var failure = new FailedAuditImport { - var failure = new FailedAuditImport - { - Id = Guid.NewGuid().ToString(), - Message = new FailedTransportMessage - { - Id = errorContext.Message.MessageId, - Headers = errorContext.Message.Headers, - // At the moment we are taking a defensive copy of the body to avoid issues with the message body - // buffers being returned to the pool and potentially being overwritten. Once we know how RavenDB - // handles byte[] to ReadOnlyMemory conversion we might be able to remove this. - Body = errorContext.Message.Body.ToArray() - }, - ExceptionInfo = errorContext.Exception.ToFriendlyString() - }; - - try - { - await DoLogging(errorContext.Exception, failure, cancellationToken); - } - finally + Id = Guid.NewGuid().ToString(), + Message = new FailedTransportMessage { - failureCircuitBreaker.Increment(errorContext.Exception); - } + Id = errorContext.Message.MessageId, + Headers = errorContext.Message.Headers, + // At the moment we are taking a defensive copy of the body to avoid issues with the message body + // buffers being returned to the pool and potentially being overwritten. Once we know how RavenDB + // handles byte[] to ReadOnlyMemory conversion we might be able to remove this. + Body = errorContext.Message.Body.ToArray() + }, + ExceptionInfo = errorContext.Exception.ToFriendlyString() + }; + + try + { + await DoLogging(errorContext.Exception, failure, cancellationToken); } - - async Task DoLogging(Exception exception, FailedAuditImport failure, CancellationToken cancellationToken) + finally { - log.Error("Failed importing error message", exception); + failureCircuitBreaker.Increment(errorContext.Exception); + } + } + + async Task DoLogging(Exception exception, FailedAuditImport failure, CancellationToken cancellationToken) + { + log.Error("Failed importing error message", exception); - // Write to storage - await failedAuditStorage.SaveFailedAuditImport(failure); + // Write to storage + await failedAuditStorage.SaveFailedAuditImport(failure); - if (!AppEnvironment.RunningInContainer) + if (!AppEnvironment.RunningInContainer) + { + // Write to Log Path + var filePath = Path.Combine(logPath, failure.Id + ".txt"); + await File.WriteAllTextAsync(filePath, failure.ExceptionInfo, cancellationToken); + + if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) { - // Write to Log Path - var filePath = Path.Combine(logPath, failure.Id + ".txt"); - await File.WriteAllTextAsync(filePath, failure.ExceptionInfo, cancellationToken); - - if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) - { - WriteToEventLog("A message import has failed. A log file has been written to " + filePath); - } + WriteToEventLog("A message import has failed. A log file has been written to " + filePath); } } + } - [SupportedOSPlatform("windows")] - void WriteToEventLog(string message) - { + [SupportedOSPlatform("windows")] + void WriteToEventLog(string message) + { #if DEBUG - EventSourceCreator.Create(); + EventSourceCreator.Create(); #endif - EventLog.WriteEntry(EventSourceCreator.SourceName, message, EventLogEntryType.Error); - } + EventLog.WriteEntry(EventSourceCreator.SourceName, message, EventLogEntryType.Error); + } - readonly Counter retryCounter = Telemetry.Meter.CreateCounter(Telemetry.CreateInstrumentName("ingestion", "retry"), description: "Audit ingestion retries count"); - readonly Counter failedCounter = Telemetry.Meter.CreateCounter(Telemetry.CreateInstrumentName("ingestion", "failed"), description: "Audit ingestion failure count"); + readonly IFailedAuditStorage failedAuditStorage; + readonly IngestionMetrics metrics; + readonly string logPath; + readonly ImportFailureCircuitBreaker failureCircuitBreaker; - static readonly ILog log = LogManager.GetLogger(); - } + static readonly ILog log = LogManager.GetLogger(); } \ No newline at end of file diff --git a/src/ServiceControl.Audit/Auditing/AuditIngestor.cs b/src/ServiceControl.Audit/Auditing/AuditIngestor.cs index 8009dd4f77..4a333a4b3e 100644 --- a/src/ServiceControl.Audit/Auditing/AuditIngestor.cs +++ b/src/ServiceControl.Audit/Auditing/AuditIngestor.cs @@ -5,7 +5,6 @@ using System.Linq; using System.Threading.Tasks; using Infrastructure.Settings; - using Metrics; using Monitoring; using NServiceBus; using NServiceBus.Logging; diff --git a/src/ServiceControl.Audit/Auditing/Metrics/ErrorMetrics.cs b/src/ServiceControl.Audit/Auditing/Metrics/ErrorMetrics.cs new file mode 100644 index 0000000000..f01e6a9293 --- /dev/null +++ b/src/ServiceControl.Audit/Auditing/Metrics/ErrorMetrics.cs @@ -0,0 +1,21 @@ +namespace ServiceControl.Audit.Auditing.Metrics; + +using System; +using System.Diagnostics.Metrics; +using NServiceBus.Transport; + +public record ErrorMetrics(ErrorContext Context, Counter Failures) : IDisposable +{ + public void Dispose() + { + var tags = IngestionMetrics.GetMessageTags(Context.Message.Headers); + + tags.Add("result", retry ? "retry" : "stored-poison"); + + Failures.Add(1, tags); + } + + public void Retry() => retry = true; + + bool retry; +} \ No newline at end of file diff --git a/src/ServiceControl.Audit/Auditing/Metrics/AuditIngestionMetrics.cs b/src/ServiceControl.Audit/Auditing/Metrics/IngestionMetrics.cs similarity index 54% rename from src/ServiceControl.Audit/Auditing/Metrics/AuditIngestionMetrics.cs rename to src/ServiceControl.Audit/Auditing/Metrics/IngestionMetrics.cs index 5830571c55..b11b3a099a 100644 --- a/src/ServiceControl.Audit/Auditing/Metrics/AuditIngestionMetrics.cs +++ b/src/ServiceControl.Audit/Auditing/Metrics/IngestionMetrics.cs @@ -1,23 +1,46 @@ namespace ServiceControl.Audit.Auditing.Metrics; +using System.Collections.Generic; +using System.Diagnostics; using System.Diagnostics.Metrics; +using EndpointPlugin.Messages.SagaState; +using NServiceBus; using NServiceBus.Transport; -public class AuditIngestionMetrics +public class IngestionMetrics { - public AuditIngestionMetrics(IMeterFactory meterFactory) + public IngestionMetrics(IMeterFactory meterFactory) { var meter = meterFactory.Create(MeterName, MeterVersion); batchDuration = meter.CreateHistogram(CreateInstrumentName("batch_duration"), unit: "ms", "Average audit message batch processing duration"); - consecutiveBatchFailureGauge = meter.CreateObservableGauge(CreateInstrumentName("consecutive_batch_failures"), () => consecutiveBatchFailures, unit: "count", description: "Consecutive audit ingestion batch failure"); + consecutiveBatchFailureGauge = meter.CreateObservableGauge(CreateInstrumentName("consecutive_batch_failures"), () => consecutiveBatchFailures, description: "Consecutive audit ingestion batch failure"); ingestionDuration = meter.CreateHistogram(CreateInstrumentName("duration"), unit: "ms", description: "Average incoming audit message processing duration"); + failureCounter = meter.CreateCounter(CreateInstrumentName("failure_count"), description: "Audit ingestion failure count"); } - public MessageIngestionMetrics BeginIngestion(MessageContext messageContext) => new(messageContext, ingestionDuration); + public MessageMetrics BeginIngestion(MessageContext messageContext) => new(messageContext, ingestionDuration); + + public ErrorMetrics BeginErrorHandling(ErrorContext errorContext) => new(errorContext, failureCounter); public BatchMetrics BeginBatch(int maxBatchSize) => new(maxBatchSize, batchDuration, RecordBatchOutcome); + public static TagList GetMessageTags(Dictionary headers) + { + var tags = new TagList(); + + if (headers.TryGetValue(Headers.EnclosedMessageTypes, out var messageType)) + { + tags.Add("message.category", messageType == SagaUpdateMessageType ? "saga-update" : "audit-message"); + } + else + { + tags.Add("message.category", "control-message"); + } + + return tags; + } + void RecordBatchOutcome(bool success) { if (success) @@ -39,7 +62,10 @@ void RecordBatchOutcome(bool success) readonly ObservableGauge consecutiveBatchFailureGauge; #pragma warning restore IDE0052 readonly Histogram ingestionDuration; + readonly Counter failureCounter; const string MeterName = "Particular.ServiceControl.Audit"; const string MeterVersion = "0.1.0"; + + static readonly string SagaUpdateMessageType = typeof(SagaUpdatedMessage).FullName; } \ No newline at end of file diff --git a/src/ServiceControl.Audit/Auditing/Metrics/MessageIngestionMetrics.cs b/src/ServiceControl.Audit/Auditing/Metrics/MessageIngestionMetrics.cs deleted file mode 100644 index a1bd7339b5..0000000000 --- a/src/ServiceControl.Audit/Auditing/Metrics/MessageIngestionMetrics.cs +++ /dev/null @@ -1,45 +0,0 @@ -namespace ServiceControl.Audit.Auditing.Metrics; - -using System; -using System.Diagnostics; -using System.Diagnostics.Metrics; -using EndpointPlugin.Messages.SagaState; -using NServiceBus; -using NServiceBus.Transport; - -public record MessageIngestionMetrics(MessageContext Message, Histogram Duration) : IDisposable -{ - public void Skipped() => result = "skipped"; - - public void Success() => result = "success"; - - public void Dispose() - { - var tags = GetTags(Message); - - tags.Add("result", result); - Duration.Record(sw.ElapsedMilliseconds, tags); - } - - static TagList GetTags(MessageContext messageContext) - { - var tags = new TagList(); - - if (messageContext.Headers.TryGetValue(Headers.EnclosedMessageTypes, out var messageType)) - { - tags.Add("message.category", messageType == SagaUpdateMessageType ? "saga-update" : "audit-message"); - } - else - { - tags.Add("message.category", "control-message"); - } - - return tags; - } - - string result = "failed"; - - readonly Stopwatch sw = Stopwatch.StartNew(); - - static readonly string SagaUpdateMessageType = typeof(SagaUpdatedMessage).FullName; -} \ No newline at end of file diff --git a/src/ServiceControl.Audit/Auditing/Metrics/MessageMetrics.cs b/src/ServiceControl.Audit/Auditing/Metrics/MessageMetrics.cs new file mode 100644 index 0000000000..e0cf4e6b79 --- /dev/null +++ b/src/ServiceControl.Audit/Auditing/Metrics/MessageMetrics.cs @@ -0,0 +1,25 @@ +namespace ServiceControl.Audit.Auditing.Metrics; + +using System; +using System.Diagnostics; +using System.Diagnostics.Metrics; +using NServiceBus.Transport; + +public record MessageMetrics(MessageContext Context, Histogram Duration) : IDisposable +{ + public void Skipped() => result = "skipped"; + + public void Success() => result = "success"; + + public void Dispose() + { + var tags = IngestionMetrics.GetMessageTags(Context.Headers); + + tags.Add("result", result); + Duration.Record(sw.ElapsedMilliseconds, tags); + } + + string result = "failed"; + + readonly Stopwatch sw = Stopwatch.StartNew(); +} \ No newline at end of file diff --git a/src/ServiceControl.Audit/HostApplicationBuilderExtensions.cs b/src/ServiceControl.Audit/HostApplicationBuilderExtensions.cs index 2b60d12833..858bcbe729 100644 --- a/src/ServiceControl.Audit/HostApplicationBuilderExtensions.cs +++ b/src/ServiceControl.Audit/HostApplicationBuilderExtensions.cs @@ -72,7 +72,7 @@ public static void AddServiceControlAudit(this IHostApplicationBuilder builder, NServiceBusFactory.Configure(settings, transportCustomization, transportSettings, onCriticalError, configuration); builder.UseNServiceBus(configuration); - services.AddSingleton(); + services.AddSingleton(); if (!string.IsNullOrEmpty(settings.OtlpEndpointUrl)) { From 3ffdc4c923dd690f5c594483b44a6d277bb28688 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20=C3=96hlund?= Date: Thu, 6 Mar 2025 18:38:15 +0100 Subject: [PATCH 05/19] Cleanup --- .../Auditing/Metrics/IngestionMetrics.cs | 3 +- .../HostApplicationBuilderExtensions.cs | 2 +- .../Infrastructure/DurationRecorder.cs | 12 -------- .../Infrastructure/Telemetry.cs | 30 ------------------- 4 files changed, 3 insertions(+), 44 deletions(-) delete mode 100644 src/ServiceControl.Audit/Infrastructure/DurationRecorder.cs delete mode 100644 src/ServiceControl.Audit/Infrastructure/Telemetry.cs diff --git a/src/ServiceControl.Audit/Auditing/Metrics/IngestionMetrics.cs b/src/ServiceControl.Audit/Auditing/Metrics/IngestionMetrics.cs index b11b3a099a..c08fc6ced0 100644 --- a/src/ServiceControl.Audit/Auditing/Metrics/IngestionMetrics.cs +++ b/src/ServiceControl.Audit/Auditing/Metrics/IngestionMetrics.cs @@ -9,6 +9,8 @@ namespace ServiceControl.Audit.Auditing.Metrics; public class IngestionMetrics { + public const string MeterName = "Particular.ServiceControl.Audit"; + public IngestionMetrics(IMeterFactory meterFactory) { var meter = meterFactory.Create(MeterName, MeterVersion); @@ -64,7 +66,6 @@ void RecordBatchOutcome(bool success) readonly Histogram ingestionDuration; readonly Counter failureCounter; - const string MeterName = "Particular.ServiceControl.Audit"; const string MeterVersion = "0.1.0"; static readonly string SagaUpdateMessageType = typeof(SagaUpdatedMessage).FullName; diff --git a/src/ServiceControl.Audit/HostApplicationBuilderExtensions.cs b/src/ServiceControl.Audit/HostApplicationBuilderExtensions.cs index 858bcbe729..149cc67d6a 100644 --- a/src/ServiceControl.Audit/HostApplicationBuilderExtensions.cs +++ b/src/ServiceControl.Audit/HostApplicationBuilderExtensions.cs @@ -88,7 +88,7 @@ public static void AddServiceControlAudit(this IHostApplicationBuilder builder, autoGenerateServiceInstanceId: true)) .WithMetrics(b => { - b.AddAuditIngestionMeters(); + b.AddMeter(IngestionMetrics.MeterName); b.AddOtlpExporter(e => { e.Endpoint = otelMetricsUri; diff --git a/src/ServiceControl.Audit/Infrastructure/DurationRecorder.cs b/src/ServiceControl.Audit/Infrastructure/DurationRecorder.cs deleted file mode 100644 index bd8bc3c9d7..0000000000 --- a/src/ServiceControl.Audit/Infrastructure/DurationRecorder.cs +++ /dev/null @@ -1,12 +0,0 @@ -namespace ServiceControl.Infrastructure; - -using System; -using System.Diagnostics; -using System.Diagnostics.Metrics; - -record DurationRecorder(Histogram Histogram, TagList Tags = default) : IDisposable -{ - readonly Stopwatch sw = Stopwatch.StartNew(); - - public void Dispose() => Histogram.Record(sw.ElapsedMilliseconds, Tags); -} \ No newline at end of file diff --git a/src/ServiceControl.Audit/Infrastructure/Telemetry.cs b/src/ServiceControl.Audit/Infrastructure/Telemetry.cs deleted file mode 100644 index 033590b91c..0000000000 --- a/src/ServiceControl.Audit/Infrastructure/Telemetry.cs +++ /dev/null @@ -1,30 +0,0 @@ -namespace ServiceControl.Infrastructure; - -using System; -using System.Collections.Generic; -using System.Diagnostics; -using System.Diagnostics.Metrics; -using NServiceBus; -using OpenTelemetry.Metrics; - -static class Telemetry -{ - const string MeterName = "Particular.ServiceControl.Audit"; - public static readonly Meter Meter = new(MeterName, "0.1.0"); - - public static string CreateInstrumentName(string instrumentNamespace, string instrumentName) => $"sc.audit.{instrumentNamespace}.{instrumentName}".ToLower(); - - public static void AddAuditIngestionMeters(this MeterProviderBuilder builder) => builder.AddMeter(MeterName); - - public static TagList GetIngestedMessageTags(IDictionary headers, ReadOnlyMemory body) - { - var tags = new TagList { { "messaging.message.body.size", body.Length } }; - - if (headers.TryGetValue(Headers.EnclosedMessageTypes, out var messageType)) - { - tags.Add("messaging.message.type", messageType); - } - - return tags; - } -} \ No newline at end of file From b7b1958bf561bf3b7a27c43b73e67f9a0a72b112 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20=C3=96hlund?= Date: Thu, 6 Mar 2025 18:42:39 +0100 Subject: [PATCH 06/19] Refactor --- .../HostApplicationBuilderExtensions.cs | 57 ++++++++++--------- 1 file changed, 31 insertions(+), 26 deletions(-) diff --git a/src/ServiceControl.Audit/HostApplicationBuilderExtensions.cs b/src/ServiceControl.Audit/HostApplicationBuilderExtensions.cs index 149cc67d6a..2cca4c36c1 100644 --- a/src/ServiceControl.Audit/HostApplicationBuilderExtensions.cs +++ b/src/ServiceControl.Audit/HostApplicationBuilderExtensions.cs @@ -22,22 +22,22 @@ namespace ServiceControl.Audit; using NServiceBus.Transport; using Persistence; using Transports; -using ServiceControl.Infrastructure; using OpenTelemetry.Metrics; using OpenTelemetry.Resources; static class HostApplicationBuilderExtensions { + static string InstanceVersion = FileVersionInfo.GetVersionInfo(typeof(HostApplicationBuilderExtensions).Assembly.Location).ProductVersion; + public static void AddServiceControlAudit(this IHostApplicationBuilder builder, Func onCriticalError, Settings settings, EndpointConfiguration configuration) { - var version = FileVersionInfo.GetVersionInfo(typeof(HostApplicationBuilderExtensions).Assembly.Location).ProductVersion; var persistenceConfiguration = PersistenceConfigurationFactory.LoadPersistenceConfiguration(settings); var persistenceSettings = persistenceConfiguration.BuildPersistenceSettings(settings); - RecordStartup(version, settings, configuration, persistenceConfiguration); + RecordStartup(settings, configuration, persistenceConfiguration); builder.Logging.ClearProviders(); builder.Logging.AddNLog(); @@ -72,7 +72,31 @@ public static void AddServiceControlAudit(this IHostApplicationBuilder builder, NServiceBusFactory.Configure(settings, transportCustomization, transportSettings, onCriticalError, configuration); builder.UseNServiceBus(configuration); - services.AddSingleton(); + builder.AddMetrics(settings); + + // Configure after the NServiceBus hosted service to ensure NServiceBus is already started + if (settings.IngestAuditMessages) + { + services.AddHostedService(); + } + + if (WindowsServiceHelpers.IsWindowsService()) + { + // The if is added for clarity, internally AddWindowsService has a similar logic + builder.AddWindowsServiceWithRequestTimeout(); + } + } + + public static void AddServiceControlAuditInstallers(this IHostApplicationBuilder builder, Settings settings) + { + var persistenceConfiguration = PersistenceConfigurationFactory.LoadPersistenceConfiguration(settings); + var persistenceSettings = persistenceConfiguration.BuildPersistenceSettings(settings); + builder.Services.AddInstaller(persistenceSettings, persistenceConfiguration); + } + + public static void AddMetrics(this IHostApplicationBuilder builder, Settings settings) + { + builder.Services.AddSingleton(); if (!string.IsNullOrEmpty(settings.OtlpEndpointUrl)) { @@ -84,7 +108,7 @@ public static void AddServiceControlAudit(this IHostApplicationBuilder builder, builder.Services.AddOpenTelemetry() .ConfigureResource(b => b.AddService( serviceName: settings.InstanceName, - serviceVersion: version, + serviceVersion: InstanceVersion, autoGenerateServiceInstanceId: true)) .WithMetrics(b => { @@ -98,32 +122,13 @@ public static void AddServiceControlAudit(this IHostApplicationBuilder builder, var logger = LogManager.GetLogger(typeof(HostApplicationBuilderExtensions)); logger.InfoFormat("OpenTelemetry metrics exporter enabled: {0}", settings.OtlpEndpointUrl); } - - // Configure after the NServiceBus hosted service to ensure NServiceBus is already started - if (settings.IngestAuditMessages) - { - services.AddHostedService(); - } - - if (WindowsServiceHelpers.IsWindowsService()) - { - // The if is added for clarity, internally AddWindowsService has a similar logic - builder.AddWindowsServiceWithRequestTimeout(); - } - } - - public static void AddServiceControlAuditInstallers(this IHostApplicationBuilder builder, Settings settings) - { - var persistenceConfiguration = PersistenceConfigurationFactory.LoadPersistenceConfiguration(settings); - var persistenceSettings = persistenceConfiguration.BuildPersistenceSettings(settings); - builder.Services.AddInstaller(persistenceSettings, persistenceConfiguration); } - static void RecordStartup(string version, Settings settings, EndpointConfiguration endpointConfiguration, IPersistenceConfiguration persistenceConfiguration) + static void RecordStartup(Settings settings, EndpointConfiguration endpointConfiguration, IPersistenceConfiguration persistenceConfiguration) { var startupMessage = $@" ------------------------------------------------------------- -ServiceControl Audit Version: {version} +ServiceControl Audit Version: {InstanceVersion} Audit Retention Period: {settings.AuditRetentionPeriod} Forwarding Audit Messages: {settings.ForwardAuditMessages} ServiceControl Logging Level: {settings.LoggingSettings.LogLevel} From 83e4246de7ccabfcc3d7efcded438a5261d6e81e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20=C3=96hlund?= Date: Thu, 6 Mar 2025 18:45:04 +0100 Subject: [PATCH 07/19] Use seconds as unit for duration --- src/ServiceControl.Audit/Auditing/Metrics/BatchMetrics.cs | 2 +- src/ServiceControl.Audit/Auditing/Metrics/IngestionMetrics.cs | 4 ++-- src/ServiceControl.Audit/Auditing/Metrics/MessageMetrics.cs | 2 +- src/ServiceControl.Audit/HostApplicationBuilderExtensions.cs | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/ServiceControl.Audit/Auditing/Metrics/BatchMetrics.cs b/src/ServiceControl.Audit/Auditing/Metrics/BatchMetrics.cs index 9939a2f754..cf55e163c5 100644 --- a/src/ServiceControl.Audit/Auditing/Metrics/BatchMetrics.cs +++ b/src/ServiceControl.Audit/Auditing/Metrics/BatchMetrics.cs @@ -25,7 +25,7 @@ public void Dispose() } tags.Add("result", result); - BatchDuration.Record(sw.ElapsedMilliseconds, tags); + BatchDuration.Record(sw.Elapsed.TotalSeconds, tags); } public void Complete(int size) => actualBatchSize = size; diff --git a/src/ServiceControl.Audit/Auditing/Metrics/IngestionMetrics.cs b/src/ServiceControl.Audit/Auditing/Metrics/IngestionMetrics.cs index c08fc6ced0..2293e06dec 100644 --- a/src/ServiceControl.Audit/Auditing/Metrics/IngestionMetrics.cs +++ b/src/ServiceControl.Audit/Auditing/Metrics/IngestionMetrics.cs @@ -15,9 +15,9 @@ public IngestionMetrics(IMeterFactory meterFactory) { var meter = meterFactory.Create(MeterName, MeterVersion); - batchDuration = meter.CreateHistogram(CreateInstrumentName("batch_duration"), unit: "ms", "Average audit message batch processing duration"); + batchDuration = meter.CreateHistogram(CreateInstrumentName("batch_duration"), unit: "seconds", "Average audit message batch processing duration"); + ingestionDuration = meter.CreateHistogram(CreateInstrumentName("message_duration"), unit: "seconds", description: "Average incoming audit message processing duration"); consecutiveBatchFailureGauge = meter.CreateObservableGauge(CreateInstrumentName("consecutive_batch_failures"), () => consecutiveBatchFailures, description: "Consecutive audit ingestion batch failure"); - ingestionDuration = meter.CreateHistogram(CreateInstrumentName("duration"), unit: "ms", description: "Average incoming audit message processing duration"); failureCounter = meter.CreateCounter(CreateInstrumentName("failure_count"), description: "Audit ingestion failure count"); } diff --git a/src/ServiceControl.Audit/Auditing/Metrics/MessageMetrics.cs b/src/ServiceControl.Audit/Auditing/Metrics/MessageMetrics.cs index e0cf4e6b79..df85310365 100644 --- a/src/ServiceControl.Audit/Auditing/Metrics/MessageMetrics.cs +++ b/src/ServiceControl.Audit/Auditing/Metrics/MessageMetrics.cs @@ -16,7 +16,7 @@ public void Dispose() var tags = IngestionMetrics.GetMessageTags(Context.Headers); tags.Add("result", result); - Duration.Record(sw.ElapsedMilliseconds, tags); + Duration.Record(sw.Elapsed.TotalSeconds, tags); } string result = "failed"; diff --git a/src/ServiceControl.Audit/HostApplicationBuilderExtensions.cs b/src/ServiceControl.Audit/HostApplicationBuilderExtensions.cs index 2cca4c36c1..3c6514a548 100644 --- a/src/ServiceControl.Audit/HostApplicationBuilderExtensions.cs +++ b/src/ServiceControl.Audit/HostApplicationBuilderExtensions.cs @@ -27,7 +27,7 @@ namespace ServiceControl.Audit; static class HostApplicationBuilderExtensions { - static string InstanceVersion = FileVersionInfo.GetVersionInfo(typeof(HostApplicationBuilderExtensions).Assembly.Location).ProductVersion; + static readonly string InstanceVersion = FileVersionInfo.GetVersionInfo(typeof(HostApplicationBuilderExtensions).Assembly.Location).ProductVersion; public static void AddServiceControlAudit(this IHostApplicationBuilder builder, Func onCriticalError, From f0e3752572ec55d17a45c133b2d0a7d721242707 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20=C3=96hlund?= Date: Thu, 6 Mar 2025 18:50:50 +0100 Subject: [PATCH 08/19] Cleanup --- src/ServiceControl.Audit/Auditing/AuditIngestionFaultPolicy.cs | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/ServiceControl.Audit/Auditing/AuditIngestionFaultPolicy.cs b/src/ServiceControl.Audit/Auditing/AuditIngestionFaultPolicy.cs index aa5d8c3aa5..6ccfbedcce 100644 --- a/src/ServiceControl.Audit/Auditing/AuditIngestionFaultPolicy.cs +++ b/src/ServiceControl.Audit/Auditing/AuditIngestionFaultPolicy.cs @@ -43,8 +43,6 @@ public async Task OnError(ErrorContext errorContext, Cancella await StoreFailedMessageDocument(errorContext, cancellationToken); - //failedCounter.Add(1, tags); - return ErrorHandleResult.Handled; } From 72075b9f9e04b15a60daf703d7861a47db83387c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20=C3=96hlund?= Date: Fri, 7 Mar 2025 18:03:25 +0100 Subject: [PATCH 09/19] Remove commented out code --- .../Auditing/AuditIngestion.cs | 20 +++++++------------ 1 file changed, 7 insertions(+), 13 deletions(-) diff --git a/src/ServiceControl.Audit/Auditing/AuditIngestion.cs b/src/ServiceControl.Audit/Auditing/AuditIngestion.cs index 875d5b0669..6f2245d8d5 100644 --- a/src/ServiceControl.Audit/Auditing/AuditIngestion.cs +++ b/src/ServiceControl.Audit/Auditing/AuditIngestion.cs @@ -228,20 +228,17 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) // will only enter here if there is something to read. try { + using var batchMetrics = metrics.BeginBatch(MaxBatchSize); + // as long as there is something to read this will fetch up to MaximumConcurrency items - using (var batchMetrics = metrics.BeginBatch(MaxBatchSize)) + while (channel.Reader.TryRead(out var context)) { - while (channel.Reader.TryRead(out var context)) - { - contexts.Add(context); - } - - await auditIngestor.Ingest(contexts); - - batchMetrics.Complete(contexts.Count); + contexts.Add(context); } - //metrics.ClearB .Record(0); + await auditIngestor.Ingest(contexts); + + batchMetrics.Complete(contexts.Count); } catch (Exception e) { @@ -258,9 +255,6 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) } logger.Info("Ingesting messages failed", e); - - // no need to do interlocked increment since this is running sequential - //consecutiveBatchFailuresCounter.Record(consecutiveBatchFailures++); } finally { From a9ed09c35657810989f968d89bf76609e18483be Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20=C3=96hlund?= Date: Fri, 7 Mar 2025 18:05:39 +0100 Subject: [PATCH 10/19] Apply suggestions from code review Co-authored-by: Ramon Smits --- .../Auditing/Metrics/BatchMetrics.cs | 18 +++++++----------- .../Auditing/Metrics/IngestionMetrics.cs | 2 +- 2 files changed, 8 insertions(+), 12 deletions(-) diff --git a/src/ServiceControl.Audit/Auditing/Metrics/BatchMetrics.cs b/src/ServiceControl.Audit/Auditing/Metrics/BatchMetrics.cs index cf55e163c5..d6b2528269 100644 --- a/src/ServiceControl.Audit/Auditing/Metrics/BatchMetrics.cs +++ b/src/ServiceControl.Audit/Auditing/Metrics/BatchMetrics.cs @@ -12,19 +12,15 @@ public void Dispose() string result; - if (actualBatchSize <= 0) - { - result = "failed"; - IsSuccess(false); - } - else - { - result = actualBatchSize == MaxBatchSize ? "full" : "partial"; + var isSuccess = actualBatchSize > 0; + + IsSuccess(isSuccess); - IsSuccess(true); + if (isSuccess) + { + var result = actualBatchSize == MaxBatchSize ? "full" : "partial"; + tags.Add("result", result); } - - tags.Add("result", result); BatchDuration.Record(sw.Elapsed.TotalSeconds, tags); } diff --git a/src/ServiceControl.Audit/Auditing/Metrics/IngestionMetrics.cs b/src/ServiceControl.Audit/Auditing/Metrics/IngestionMetrics.cs index 2293e06dec..a472aac607 100644 --- a/src/ServiceControl.Audit/Auditing/Metrics/IngestionMetrics.cs +++ b/src/ServiceControl.Audit/Auditing/Metrics/IngestionMetrics.cs @@ -55,7 +55,7 @@ void RecordBatchOutcome(bool success) } } - static string CreateInstrumentName(string instrumentName) => $"sc.audit.ingestion.{instrumentName}".ToLower(); + static string CreateInstrumentName(string instrumentName) => $"sc.audit.ingestion.{instrumentName.ToLower()}"; long consecutiveBatchFailures; From 589d1ca9c44bcfa2bc5d9201b7c91d7c1c848786 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20=C3=96hlund?= Date: Fri, 7 Mar 2025 18:07:38 +0100 Subject: [PATCH 11/19] Fixup --- src/ServiceControl.Audit/Auditing/Metrics/BatchMetrics.cs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/ServiceControl.Audit/Auditing/Metrics/BatchMetrics.cs b/src/ServiceControl.Audit/Auditing/Metrics/BatchMetrics.cs index d6b2528269..fb2f619c8a 100644 --- a/src/ServiceControl.Audit/Auditing/Metrics/BatchMetrics.cs +++ b/src/ServiceControl.Audit/Auditing/Metrics/BatchMetrics.cs @@ -10,10 +10,8 @@ public void Dispose() { var tags = new TagList(); - string result; + var isSuccess = actualBatchSize > 0; - var isSuccess = actualBatchSize > 0; - IsSuccess(isSuccess); if (isSuccess) @@ -21,6 +19,7 @@ public void Dispose() var result = actualBatchSize == MaxBatchSize ? "full" : "partial"; tags.Add("result", result); } + BatchDuration.Record(sw.Elapsed.TotalSeconds, tags); } From 06f1bc6cb03d5a0c500067f8aac55a175f70986d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20=C3=96hlund?= Date: Fri, 7 Mar 2025 18:26:12 +0100 Subject: [PATCH 12/19] Align naming with https://prometheus.io/docs/practices/naming/ --- docs/telemetry.md | 32 ++++++------------- .../Auditing/Metrics/IngestionMetrics.cs | 8 ++--- 2 files changed, 14 insertions(+), 26 deletions(-) diff --git a/docs/telemetry.md b/docs/telemetry.md index 4726563e20..f5dd138e9b 100644 --- a/docs/telemetry.md +++ b/docs/telemetry.md @@ -18,28 +18,16 @@ The following metrics are available: ### Ingestion -#### Success or failure - -- `sc.audit.ingestion.success` - Successful ingested audit message count (Counter) -- `sc.audit.ingestion.retry` - Retried audit message count (Counter) -- `sc.audit.ingestion.failed` - Failed audit message count (Counter) - -The above metrics also have the following attributes attached: - -- `messaging.message.body.size` - The size of the message body in bytes -- `messaging.message.type` - The logical message type of the message if present - -#### Details - -- `sc.audit.ingestion.duration` - Audit message processing duration in milliseconds (Histogram) -- `sc.audit.ingestion.forwarded` - Count of the number of forwarded audit messages if forwarding is enabled (Counter) - -### Batching - -- `sc.audit.ingestion.batch_duration` - Batch processing duration in milliseconds (Histogram) - - Attributes: - - `ingestion.batch_size` -- `sc.audit.ingestion.consecutive_batch_failures` - Consecutive batch failures (Counter) +The following ingestion metrics with their corresponding dimensions are available: + +- `sc.audit.ingestion.batch_duration_seconds` - Message batch processing duration in seconds + - `result` - Indicates if the full batch size was used (batch size == max concurrency of the transport): `full` or `partial` +- `sc.audit.ingestion.message_duration_seconds` - Audit message processing duration in seconds + - `message.category` - Indicates the category of the message ingested: `audit-message`, `saga-update` or `control-message` +- `sc.audit.ingestion.failures_total` - Failure counter + - `message.category` - Indicates the category of the message ingested: `audit-message`, `saga-update` or `control-message` + - `result` - Indicates how the failure was resolved: `retry` or `stored-poision` +- `sc.audit.ingestion.consecutive_batch_failure_total` - Consecutive batch failures ## Monitoring diff --git a/src/ServiceControl.Audit/Auditing/Metrics/IngestionMetrics.cs b/src/ServiceControl.Audit/Auditing/Metrics/IngestionMetrics.cs index a472aac607..348f34999a 100644 --- a/src/ServiceControl.Audit/Auditing/Metrics/IngestionMetrics.cs +++ b/src/ServiceControl.Audit/Auditing/Metrics/IngestionMetrics.cs @@ -15,10 +15,10 @@ public IngestionMetrics(IMeterFactory meterFactory) { var meter = meterFactory.Create(MeterName, MeterVersion); - batchDuration = meter.CreateHistogram(CreateInstrumentName("batch_duration"), unit: "seconds", "Average audit message batch processing duration"); - ingestionDuration = meter.CreateHistogram(CreateInstrumentName("message_duration"), unit: "seconds", description: "Average incoming audit message processing duration"); - consecutiveBatchFailureGauge = meter.CreateObservableGauge(CreateInstrumentName("consecutive_batch_failures"), () => consecutiveBatchFailures, description: "Consecutive audit ingestion batch failure"); - failureCounter = meter.CreateCounter(CreateInstrumentName("failure_count"), description: "Audit ingestion failure count"); + batchDuration = meter.CreateHistogram(CreateInstrumentName("batch_duration_seconds"), unit: "seconds", "Message batch processing duration in seconds"); + ingestionDuration = meter.CreateHistogram(CreateInstrumentName("message_duration_seconds"), unit: "seconds", description: "Audit message processing duration in seconds"); + consecutiveBatchFailureGauge = meter.CreateObservableGauge(CreateInstrumentName("consecutive_batch_failure_total"), () => consecutiveBatchFailures, description: "Consecutive audit ingestion batch failure"); + failureCounter = meter.CreateCounter(CreateInstrumentName("failures_total"), description: "Audit ingestion failure count"); } public MessageMetrics BeginIngestion(MessageContext messageContext) => new(messageContext, ingestionDuration); From 426aace317406d1811fd9a43f9e16b5355987003 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20=C3=96hlund?= Date: Fri, 7 Mar 2025 18:50:02 +0100 Subject: [PATCH 13/19] Use explicit System.Diagnostics.DiagnosticSource to get access to Gauge and instrumentation advice --- src/Directory.Packages.props | 3 ++- .../Auditing/Metrics/IngestionMetrics.cs | 14 +++++++------ .../ServiceControl.Audit.csproj | 21 +++++++++++++++++++ 3 files changed, 31 insertions(+), 7 deletions(-) diff --git a/src/Directory.Packages.props b/src/Directory.Packages.props index d4ee800af0..64213f849b 100644 --- a/src/Directory.Packages.props +++ b/src/Directory.Packages.props @@ -63,6 +63,7 @@ + @@ -89,4 +90,4 @@ - + \ No newline at end of file diff --git a/src/ServiceControl.Audit/Auditing/Metrics/IngestionMetrics.cs b/src/ServiceControl.Audit/Auditing/Metrics/IngestionMetrics.cs index 348f34999a..f0a23c64f7 100644 --- a/src/ServiceControl.Audit/Auditing/Metrics/IngestionMetrics.cs +++ b/src/ServiceControl.Audit/Auditing/Metrics/IngestionMetrics.cs @@ -15,9 +15,11 @@ public IngestionMetrics(IMeterFactory meterFactory) { var meter = meterFactory.Create(MeterName, MeterVersion); - batchDuration = meter.CreateHistogram(CreateInstrumentName("batch_duration_seconds"), unit: "seconds", "Message batch processing duration in seconds"); - ingestionDuration = meter.CreateHistogram(CreateInstrumentName("message_duration_seconds"), unit: "seconds", description: "Audit message processing duration in seconds"); - consecutiveBatchFailureGauge = meter.CreateObservableGauge(CreateInstrumentName("consecutive_batch_failure_total"), () => consecutiveBatchFailures, description: "Consecutive audit ingestion batch failure"); + var durationBucketsInSeconds = new InstrumentAdvice { HistogramBucketBoundaries = [0.01, 0.05, 0.1, 0.5, 1, 5] }; + + batchDuration = meter.CreateHistogram(CreateInstrumentName("batch_duration_seconds"), unit: "seconds", "Message batch processing duration in seconds", advice: durationBucketsInSeconds); + ingestionDuration = meter.CreateHistogram(CreateInstrumentName("message_duration_seconds"), unit: "seconds", description: "Audit message processing duration in seconds", advice: durationBucketsInSeconds); + consecutiveBatchFailureGauge = meter.CreateGauge(CreateInstrumentName("consecutive_batch_failure_total"), description: "Consecutive audit ingestion batch failure"); failureCounter = meter.CreateCounter(CreateInstrumentName("failures_total"), description: "Audit ingestion failure count"); } @@ -53,6 +55,8 @@ void RecordBatchOutcome(bool success) { consecutiveBatchFailures++; } + + consecutiveBatchFailureGauge.Record(consecutiveBatchFailures); } static string CreateInstrumentName(string instrumentName) => $"sc.audit.ingestion.{instrumentName.ToLower()}"; @@ -60,9 +64,7 @@ void RecordBatchOutcome(bool success) long consecutiveBatchFailures; readonly Histogram batchDuration; -#pragma warning disable IDE0052 - readonly ObservableGauge consecutiveBatchFailureGauge; -#pragma warning restore IDE0052 + readonly Gauge consecutiveBatchFailureGauge; readonly Histogram ingestionDuration; readonly Counter failureCounter; diff --git a/src/ServiceControl.Audit/ServiceControl.Audit.csproj b/src/ServiceControl.Audit/ServiceControl.Audit.csproj index 6e0a889e9f..9c6a99c8a6 100644 --- a/src/ServiceControl.Audit/ServiceControl.Audit.csproj +++ b/src/ServiceControl.Audit/ServiceControl.Audit.csproj @@ -31,6 +31,27 @@ + + + + + + + + + + + + + + + + + + + + + From 87815fb3023992d99cec73b372b7dd6f6784d30b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20=C3=96hlund?= Date: Fri, 7 Mar 2025 19:06:01 +0100 Subject: [PATCH 14/19] Fixup --- .../ServiceControl.Audit.csproj | 20 ------------------- 1 file changed, 20 deletions(-) diff --git a/src/ServiceControl.Audit/ServiceControl.Audit.csproj b/src/ServiceControl.Audit/ServiceControl.Audit.csproj index 9c6a99c8a6..af1ed44e10 100644 --- a/src/ServiceControl.Audit/ServiceControl.Audit.csproj +++ b/src/ServiceControl.Audit/ServiceControl.Audit.csproj @@ -31,26 +31,6 @@ - - - - - - - - - - - - - - - - - - - - From 42af34fd9f922dd92b37425e311f989c731cbf06 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20=C3=96hlund?= Date: Fri, 7 Mar 2025 19:08:22 +0100 Subject: [PATCH 15/19] Add result dimension to docs --- docs/telemetry.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/telemetry.md b/docs/telemetry.md index f5dd138e9b..01aee7315a 100644 --- a/docs/telemetry.md +++ b/docs/telemetry.md @@ -24,6 +24,7 @@ The following ingestion metrics with their corresponding dimensions are availabl - `result` - Indicates if the full batch size was used (batch size == max concurrency of the transport): `full` or `partial` - `sc.audit.ingestion.message_duration_seconds` - Audit message processing duration in seconds - `message.category` - Indicates the category of the message ingested: `audit-message`, `saga-update` or `control-message` + - `result` - Indicates the outcome of the operation: `success`, `failed` or `skipped` (if the message was filtered out and skipped) - `sc.audit.ingestion.failures_total` - Failure counter - `message.category` - Indicates the category of the message ingested: `audit-message`, `saga-update` or `control-message` - `result` - Indicates how the failure was resolved: `retry` or `stored-poision` From 8a1ea9ad3a3d736f9b33cf89bf149713d82c4b4f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20=C3=96hlund?= Date: Sat, 8 Mar 2025 08:10:03 +0100 Subject: [PATCH 16/19] Make sure histogram buckets work --- src/Directory.Packages.props | 1 + .../Auditing/Metrics/IngestionMetrics.cs | 16 ++++++++-------- .../Metrics/IngestionMetricsConfiguration.cs | 19 +++++++++++++++++++ .../HostApplicationBuilderExtensions.cs | 9 +++++---- .../ServiceControl.Audit.csproj | 1 + 5 files changed, 34 insertions(+), 12 deletions(-) create mode 100644 src/ServiceControl.Audit/Auditing/Metrics/IngestionMetricsConfiguration.cs diff --git a/src/Directory.Packages.props b/src/Directory.Packages.props index 64213f849b..b2b400a707 100644 --- a/src/Directory.Packages.props +++ b/src/Directory.Packages.props @@ -50,6 +50,7 @@ + diff --git a/src/ServiceControl.Audit/Auditing/Metrics/IngestionMetrics.cs b/src/ServiceControl.Audit/Auditing/Metrics/IngestionMetrics.cs index f0a23c64f7..2c79c586d5 100644 --- a/src/ServiceControl.Audit/Auditing/Metrics/IngestionMetrics.cs +++ b/src/ServiceControl.Audit/Auditing/Metrics/IngestionMetrics.cs @@ -11,16 +11,17 @@ public class IngestionMetrics { public const string MeterName = "Particular.ServiceControl.Audit"; + public static readonly string BatchDurationInstrumentName = $"{InstrumentPrefix}.batch_duration_seconds"; + public static readonly string MessageDurationInstrumentName = $"{InstrumentPrefix}.message_duration_seconds"; + public IngestionMetrics(IMeterFactory meterFactory) { var meter = meterFactory.Create(MeterName, MeterVersion); - var durationBucketsInSeconds = new InstrumentAdvice { HistogramBucketBoundaries = [0.01, 0.05, 0.1, 0.5, 1, 5] }; - - batchDuration = meter.CreateHistogram(CreateInstrumentName("batch_duration_seconds"), unit: "seconds", "Message batch processing duration in seconds", advice: durationBucketsInSeconds); - ingestionDuration = meter.CreateHistogram(CreateInstrumentName("message_duration_seconds"), unit: "seconds", description: "Audit message processing duration in seconds", advice: durationBucketsInSeconds); - consecutiveBatchFailureGauge = meter.CreateGauge(CreateInstrumentName("consecutive_batch_failure_total"), description: "Consecutive audit ingestion batch failure"); - failureCounter = meter.CreateCounter(CreateInstrumentName("failures_total"), description: "Audit ingestion failure count"); + batchDuration = meter.CreateHistogram(BatchDurationInstrumentName, unit: "seconds", "Message batch processing duration in seconds"); + ingestionDuration = meter.CreateHistogram(MessageDurationInstrumentName, unit: "seconds", description: "Audit message processing duration in seconds"); + consecutiveBatchFailureGauge = meter.CreateGauge($"{InstrumentPrefix}.consecutive_batch_failure_total", description: "Consecutive audit ingestion batch failure"); + failureCounter = meter.CreateCounter($"{InstrumentPrefix}.failures_total", description: "Audit ingestion failure count"); } public MessageMetrics BeginIngestion(MessageContext messageContext) => new(messageContext, ingestionDuration); @@ -59,8 +60,6 @@ void RecordBatchOutcome(bool success) consecutiveBatchFailureGauge.Record(consecutiveBatchFailures); } - static string CreateInstrumentName(string instrumentName) => $"sc.audit.ingestion.{instrumentName.ToLower()}"; - long consecutiveBatchFailures; readonly Histogram batchDuration; @@ -69,6 +68,7 @@ void RecordBatchOutcome(bool success) readonly Counter failureCounter; const string MeterVersion = "0.1.0"; + const string InstrumentPrefix = "sc.audit.ingestion"; static readonly string SagaUpdateMessageType = typeof(SagaUpdatedMessage).FullName; } \ No newline at end of file diff --git a/src/ServiceControl.Audit/Auditing/Metrics/IngestionMetricsConfiguration.cs b/src/ServiceControl.Audit/Auditing/Metrics/IngestionMetricsConfiguration.cs new file mode 100644 index 0000000000..b78b6cd62a --- /dev/null +++ b/src/ServiceControl.Audit/Auditing/Metrics/IngestionMetricsConfiguration.cs @@ -0,0 +1,19 @@ +namespace ServiceControl.Audit.Auditing.Metrics; + +using OpenTelemetry.Metrics; + +public static class IngestionMetricsConfiguration +{ + public static void AddIngestionMetrics(this MeterProviderBuilder builder) + { + builder.AddMeter(IngestionMetrics.MeterName); + + // Note: Views can be replaced by new InstrumentAdvice { HistogramBucketBoundaries = [...] }; once we can update to the latest OpenTelemetry packages + builder.AddView( + instrumentName: IngestionMetrics.MessageDurationInstrumentName, + new ExplicitBucketHistogramConfiguration { Boundaries = [0.01, 0.05, 0.1, 0.5, 1, 5] }); + builder.AddView( + instrumentName: IngestionMetrics.BatchDurationInstrumentName, + new ExplicitBucketHistogramConfiguration { Boundaries = [0.01, 0.05, 0.1, 0.5, 1, 5] }); + } +} \ No newline at end of file diff --git a/src/ServiceControl.Audit/HostApplicationBuilderExtensions.cs b/src/ServiceControl.Audit/HostApplicationBuilderExtensions.cs index 3c6514a548..ccacf5db1c 100644 --- a/src/ServiceControl.Audit/HostApplicationBuilderExtensions.cs +++ b/src/ServiceControl.Audit/HostApplicationBuilderExtensions.cs @@ -112,11 +112,12 @@ public static void AddMetrics(this IHostApplicationBuilder builder, Settings set autoGenerateServiceInstanceId: true)) .WithMetrics(b => { - b.AddMeter(IngestionMetrics.MeterName); - b.AddOtlpExporter(e => + b.AddIngestionMetrics(); + b.AddOtlpExporter(e => e.Endpoint = otelMetricsUri); + if (Debugger.IsAttached) { - e.Endpoint = otelMetricsUri; - }); + b.AddConsoleExporter(); + } }); var logger = LogManager.GetLogger(typeof(HostApplicationBuilderExtensions)); diff --git a/src/ServiceControl.Audit/ServiceControl.Audit.csproj b/src/ServiceControl.Audit/ServiceControl.Audit.csproj index af1ed44e10..3d5245157b 100644 --- a/src/ServiceControl.Audit/ServiceControl.Audit.csproj +++ b/src/ServiceControl.Audit/ServiceControl.Audit.csproj @@ -29,6 +29,7 @@ + From f84cc18279320d40d7576147c7541c4b4e6502e5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20=C3=96hlund?= Date: Sat, 8 Mar 2025 08:35:07 +0100 Subject: [PATCH 17/19] Fix batch result --- docs/telemetry.md | 2 +- .../Auditing/Metrics/BatchMetrics.cs | 13 ++++++++----- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/docs/telemetry.md b/docs/telemetry.md index 01aee7315a..36f03696a8 100644 --- a/docs/telemetry.md +++ b/docs/telemetry.md @@ -21,7 +21,7 @@ The following metrics are available: The following ingestion metrics with their corresponding dimensions are available: - `sc.audit.ingestion.batch_duration_seconds` - Message batch processing duration in seconds - - `result` - Indicates if the full batch size was used (batch size == max concurrency of the transport): `full` or `partial` + - `result` - Indicates if the full batch size was used (batch size == max concurrency of the transport): `full`, `partial` or `failed` - `sc.audit.ingestion.message_duration_seconds` - Audit message processing duration in seconds - `message.category` - Indicates the category of the message ingested: `audit-message`, `saga-update` or `control-message` - `result` - Indicates the outcome of the operation: `success`, `failed` or `skipped` (if the message was filtered out and skipped) diff --git a/src/ServiceControl.Audit/Auditing/Metrics/BatchMetrics.cs b/src/ServiceControl.Audit/Auditing/Metrics/BatchMetrics.cs index fb2f619c8a..41d2994fa9 100644 --- a/src/ServiceControl.Audit/Auditing/Metrics/BatchMetrics.cs +++ b/src/ServiceControl.Audit/Auditing/Metrics/BatchMetrics.cs @@ -8,19 +8,22 @@ public record BatchMetrics(int MaxBatchSize, Histogram BatchDuration, Ac { public void Dispose() { - var tags = new TagList(); - var isSuccess = actualBatchSize > 0; IsSuccess(isSuccess); + string result; + if (isSuccess) { - var result = actualBatchSize == MaxBatchSize ? "full" : "partial"; - tags.Add("result", result); + result = actualBatchSize == MaxBatchSize ? "full" : "partial"; + } + else + { + result = "failed"; } - BatchDuration.Record(sw.Elapsed.TotalSeconds, tags); + BatchDuration.Record(sw.Elapsed.TotalSeconds, new TagList { { "result", result } }); } public void Complete(int size) => actualBatchSize = size; From 0a405b5e0a4fe340904728d222c8522f3078f76e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20=C3=96hlund?= Date: Sun, 9 Mar 2025 13:00:16 +0100 Subject: [PATCH 18/19] Document queries --- docs/telemetry.md | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/docs/telemetry.md b/docs/telemetry.md index 36f03696a8..f9af25a137 100644 --- a/docs/telemetry.md +++ b/docs/telemetry.md @@ -14,10 +14,6 @@ It's recommended to use a local [OTEL Collector](https://opentelemetry.io/docs/c Example configuration: https://github.com/andreasohlund/Docker/tree/main/otel-monitoring -The following metrics are available: - -### Ingestion - The following ingestion metrics with their corresponding dimensions are available: - `sc.audit.ingestion.batch_duration_seconds` - Message batch processing duration in seconds @@ -30,6 +26,12 @@ The following ingestion metrics with their corresponding dimensions are availabl - `result` - Indicates how the failure was resolved: `retry` or `stored-poision` - `sc.audit.ingestion.consecutive_batch_failure_total` - Consecutive batch failures +Example queries in PromQL for use in Grafana: + +- Ingestion rate: `sum (rate(sc_audit_ingestion_message_duration_seconds_count[$__rate_interval])) by (exported_job)` +- Failure rate: `sum(rate(sc_audit_ingestion_failures_total[$__rate_interval])) by (exported_job,result)` +- Message duration: `histogram_quantile(0.9,sum(rate(sc_audit_ingestion_message_duration_seconds_bucket[$__rate_interval])) by (le,exported_job))` + ## Monitoring No telemetry is currently available. From 1f972d1ccb162436ba5d33838238c7e6fc6163b7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20=C3=96hlund?= Date: Sun, 9 Mar 2025 13:09:25 +0100 Subject: [PATCH 19/19] Fixup --- src/ServiceControl.Audit/Auditing/AuditIngestion.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ServiceControl.Audit/Auditing/AuditIngestion.cs b/src/ServiceControl.Audit/Auditing/AuditIngestion.cs index 6f2245d8d5..2b76b152ea 100644 --- a/src/ServiceControl.Audit/Auditing/AuditIngestion.cs +++ b/src/ServiceControl.Audit/Auditing/AuditIngestion.cs @@ -294,7 +294,7 @@ public override async Task StopAsync(CancellationToken cancellationToken) } TransportInfrastructure transportInfrastructure; - IMessageReceiver queueIngestor; + IMessageReceiver messageReceiver; readonly int MaxBatchSize; readonly SemaphoreSlim startStopSemaphore = new(1);