From 1abcae64b11545d1f48a1b188bd364ad13b42676 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20=C3=96hlund?= Date: Wed, 12 Feb 2025 15:36:39 +0100 Subject: [PATCH 1/4] Include reading from channel in batch duration --- .../Auditing/AuditIngestion.cs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/ServiceControl.Audit/Auditing/AuditIngestion.cs b/src/ServiceControl.Audit/Auditing/AuditIngestion.cs index 6a2a039cb9..58a1b4de08 100644 --- a/src/ServiceControl.Audit/Auditing/AuditIngestion.cs +++ b/src/ServiceControl.Audit/Auditing/AuditIngestion.cs @@ -210,15 +210,15 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) try { // as long as there is something to read this will fetch up to MaximumConcurrency items - while (channel.Reader.TryRead(out var context)) - { - contexts.Add(context); - } - - auditBatchSize.Record(contexts.Count); - using (new DurationRecorder(auditBatchDuration)) { + while (channel.Reader.TryRead(out var context)) + { + contexts.Add(context); + } + + auditBatchSize.Record(contexts.Count); + await auditIngestor.Ingest(contexts); } From cdc69eef3f3a553130a47695299c8469cb6c4080 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20=C3=96hlund?= Date: Wed, 12 Feb 2025 15:38:28 +0100 Subject: [PATCH 2/4] Better name for the ingested count --- src/ServiceControl.Audit/Auditing/AuditIngestion.cs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/ServiceControl.Audit/Auditing/AuditIngestion.cs b/src/ServiceControl.Audit/Auditing/AuditIngestion.cs index 58a1b4de08..2c8f62dd3f 100644 --- a/src/ServiceControl.Audit/Auditing/AuditIngestion.cs +++ b/src/ServiceControl.Audit/Auditing/AuditIngestion.cs @@ -187,7 +187,7 @@ async Task OnMessage(MessageContext messageContext, CancellationToken cancellati await channel.Writer.WriteAsync(messageContext, cancellationToken); await taskCompletionSource.Task; - ingestedMessagesCounter.Add(1); + successfulMessagesCounter.Add(1); messageSize.Record(messageContext.Body.Length / 1024.0); } } @@ -218,7 +218,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) } auditBatchSize.Record(contexts.Count); - + await auditIngestor.Ingest(contexts); } @@ -296,7 +296,7 @@ public override async Task StopAsync(CancellationToken cancellationToken) readonly Histogram auditBatchSize = Telemetry.Meter.CreateHistogram(Telemetry.CreateInstrumentName("ingestion", "batch_size"), description: "Audit ingestion average batch size"); readonly Histogram auditBatchDuration = Telemetry.Meter.CreateHistogram(Telemetry.CreateInstrumentName("ingestion", "batch_duration"), unit: "ms", "Average audit message batch processing duration"); readonly Histogram messageSize = Telemetry.Meter.CreateHistogram(Telemetry.CreateInstrumentName("ingestion", "message_size"), unit: "kilobytes", description: "Average audit message body size"); - readonly Counter ingestedMessagesCounter = Telemetry.Meter.CreateCounter(Telemetry.CreateInstrumentName("ingestion", "count"), description: "Successful ingested audit message count"); + readonly Counter successfulMessagesCounter = Telemetry.Meter.CreateCounter(Telemetry.CreateInstrumentName("ingestion", "successful"), description: "Successful ingested audit message count"); readonly Histogram consecutiveBatchFailuresCounter = Telemetry.Meter.CreateHistogram(Telemetry.CreateInstrumentName("ingestion", "consecutive_batch_failures"), unit: "count", description: "Consecutive audit ingestion batch failure"); readonly Histogram ingestionDuration = Telemetry.Meter.CreateHistogram(Telemetry.CreateInstrumentName("ingestion", "duration"), unit: "ms", description: "Average incoming audit message processing duration"); readonly Watchdog watchdog; From 6932fb3d45d52061affe8055754373ce43def3cf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20=C3=96hlund?= Date: Wed, 12 Feb 2025 15:51:50 +0100 Subject: [PATCH 3/4] Add message type tag --- docs/telemetry.md | 10 ++-------- .../Auditing/AuditIngestion.cs | 4 ++-- .../Auditing/AuditIngestionFaultPolicy.cs | 8 ++++++-- .../Auditing/AuditIngestor.cs | 2 +- .../Auditing/AuditPersister.cs | 16 ++-------------- .../Infrastructure/Telemetry.cs | 15 +++++++++++++++ 6 files changed, 28 insertions(+), 27 deletions(-) diff --git a/docs/telemetry.md b/docs/telemetry.md index 5ec4fe4b4f..9dc3946432 100644 --- a/docs/telemetry.md +++ b/docs/telemetry.md @@ -18,12 +18,12 @@ The following metrics are available: ### Ingestion -- `sc.audit.ingestion.count` - Successful ingested audit message count +- `sc.audit.ingestion.success` - Successful ingested audit message count - `sc.audit.ingestion.retry` - Retried audit message count - `sc.audit.ingestion.failed` - Failed audit message count - `sc.audit.ingestion.duration` - Audit message processing duration (in milliseconds) - `sc.audit.ingestion.message_size` - Audit message body size (in kilobytes) -- `sc.audit.ingestion.forwarded_count` - Forwarded audit messages count +- `sc.audit.ingestion.forwarded` - Forwarded audit messages count ### Batching @@ -31,12 +31,6 @@ The following metrics are available: - `sc.audit.ingestion.batch_size` - Batch size (number of messages) - `sc.audit.ingestion.consecutive_batch_failures` - Consecutive batch failures -### Storage - -- `sc.audit.ingestion.audits_count` - Stored audit message count -- `sc.audit.ingestion.sagas_count` - Stored sagas message count -- `sc.audit.ingestion.commit_duration` - Storage unit of work commit duration (in milliseconds) - ## Monitoring No telemetry is currently available. diff --git a/src/ServiceControl.Audit/Auditing/AuditIngestion.cs b/src/ServiceControl.Audit/Auditing/AuditIngestion.cs index 2c8f62dd3f..1dbe167e83 100644 --- a/src/ServiceControl.Audit/Auditing/AuditIngestion.cs +++ b/src/ServiceControl.Audit/Auditing/AuditIngestion.cs @@ -187,7 +187,7 @@ async Task OnMessage(MessageContext messageContext, CancellationToken cancellati await channel.Writer.WriteAsync(messageContext, cancellationToken); await taskCompletionSource.Task; - successfulMessagesCounter.Add(1); + successfulMessagesCounter.Add(1, Telemetry.GetIngestedMessageTags(messageContext.Headers)); messageSize.Record(messageContext.Body.Length / 1024.0); } } @@ -296,7 +296,7 @@ public override async Task StopAsync(CancellationToken cancellationToken) readonly Histogram auditBatchSize = Telemetry.Meter.CreateHistogram(Telemetry.CreateInstrumentName("ingestion", "batch_size"), description: "Audit ingestion average batch size"); readonly Histogram auditBatchDuration = Telemetry.Meter.CreateHistogram(Telemetry.CreateInstrumentName("ingestion", "batch_duration"), unit: "ms", "Average audit message batch processing duration"); readonly Histogram messageSize = Telemetry.Meter.CreateHistogram(Telemetry.CreateInstrumentName("ingestion", "message_size"), unit: "kilobytes", description: "Average audit message body size"); - readonly Counter successfulMessagesCounter = Telemetry.Meter.CreateCounter(Telemetry.CreateInstrumentName("ingestion", "successful"), description: "Successful ingested audit message count"); + readonly Counter successfulMessagesCounter = Telemetry.Meter.CreateCounter(Telemetry.CreateInstrumentName("ingestion", "success"), description: "Successful ingested audit message count"); readonly Histogram consecutiveBatchFailuresCounter = Telemetry.Meter.CreateHistogram(Telemetry.CreateInstrumentName("ingestion", "consecutive_batch_failures"), unit: "count", description: "Consecutive audit ingestion batch failure"); readonly Histogram ingestionDuration = Telemetry.Meter.CreateHistogram(Telemetry.CreateInstrumentName("ingestion", "duration"), unit: "ms", description: "Average incoming audit message processing duration"); readonly Watchdog watchdog; diff --git a/src/ServiceControl.Audit/Auditing/AuditIngestionFaultPolicy.cs b/src/ServiceControl.Audit/Auditing/AuditIngestionFaultPolicy.cs index d4d5d3b900..9ab6805633 100644 --- a/src/ServiceControl.Audit/Auditing/AuditIngestionFaultPolicy.cs +++ b/src/ServiceControl.Audit/Auditing/AuditIngestionFaultPolicy.cs @@ -9,6 +9,7 @@ using System.Threading; using System.Threading.Tasks; using Infrastructure; + using NServiceBus; using NServiceBus.Logging; using NServiceBus.Transport; using ServiceControl.Audit.Persistence; @@ -35,16 +36,19 @@ public AuditIngestionFaultPolicy(IFailedAuditStorage failedAuditStorage, Logging public async Task OnError(ErrorContext errorContext, CancellationToken cancellationToken = default) { + var tags = Telemetry.GetIngestedMessageTags(errorContext.Message.Headers); + //Same as recoverability policy in NServiceBusFactory if (errorContext.ImmediateProcessingFailures < 3) { - retryCounter.Add(1); + retryCounter.Add(1, tags); return ErrorHandleResult.RetryRequired; } await StoreFailedMessageDocument(errorContext, cancellationToken); - failedCounter.Add(1); + failedCounter.Add(1, tags); + return ErrorHandleResult.Handled; } diff --git a/src/ServiceControl.Audit/Auditing/AuditIngestor.cs b/src/ServiceControl.Audit/Auditing/AuditIngestor.cs index 9885d5ba5c..1aa7a93416 100644 --- a/src/ServiceControl.Audit/Auditing/AuditIngestor.cs +++ b/src/ServiceControl.Audit/Auditing/AuditIngestor.cs @@ -131,7 +131,7 @@ public async Task VerifyCanReachForwardingAddress() readonly Settings settings; readonly Lazy messageDispatcher; readonly string logQueueAddress; - readonly Counter forwardedMessagesCounter = Telemetry.Meter.CreateCounter(Telemetry.CreateInstrumentName("ingestion", "forwarded_count"), description: "Audit ingestion forwarded message count"); + readonly Counter forwardedMessagesCounter = Telemetry.Meter.CreateCounter(Telemetry.CreateInstrumentName("ingestion", "forwarded"), description: "Audit ingestion forwarded message count"); static readonly ILog Log = LogManager.GetLogger(); } diff --git a/src/ServiceControl.Audit/Auditing/AuditPersister.cs b/src/ServiceControl.Audit/Auditing/AuditPersister.cs index cca13fa470..008c86820a 100644 --- a/src/ServiceControl.Audit/Auditing/AuditPersister.cs +++ b/src/ServiceControl.Audit/Auditing/AuditPersister.cs @@ -2,7 +2,6 @@ { using System; using System.Collections.Generic; - using System.Diagnostics.Metrics; using System.Text.Json; using System.Threading.Tasks; using Infrastructure; @@ -61,14 +60,10 @@ public async Task> Persist(IReadOnlyList> Persist(IReadOnlyList storedAuditsCounter = Telemetry.Meter.CreateCounter(Telemetry.CreateInstrumentName("ingestion", "audits_count"), description: "Stored audit message count"); - readonly Counter storedSagasCounter = Telemetry.Meter.CreateCounter(Telemetry.CreateInstrumentName("ingestion", "sagas_count"), description: "Stored saga state count"); - readonly Histogram commitDuration = Telemetry.Meter.CreateHistogram(Telemetry.CreateInstrumentName("ingestion", "commit_duration"), unit: "ms", description: "Storage unit of work commit duration"); - static readonly ILog Logger = LogManager.GetLogger(); } } \ No newline at end of file diff --git a/src/ServiceControl.Audit/Infrastructure/Telemetry.cs b/src/ServiceControl.Audit/Infrastructure/Telemetry.cs index 568bced7b2..9a03dd2e73 100644 --- a/src/ServiceControl.Audit/Infrastructure/Telemetry.cs +++ b/src/ServiceControl.Audit/Infrastructure/Telemetry.cs @@ -1,6 +1,9 @@ namespace ServiceControl.Audit; +using System.Collections.Generic; +using System.Diagnostics; using System.Diagnostics.Metrics; +using NServiceBus; using OpenTelemetry.Metrics; static class Telemetry @@ -14,4 +17,16 @@ public static void AddAuditIngestionMeters(this MeterProviderBuilder builder) { builder.AddMeter(MeterName); } + + public static TagList GetIngestedMessageTags(IDictionary headers) + { + var tags = new TagList(); + + if (headers.TryGetValue(Headers.EnclosedMessageTypes, out var messageType)) + { + tags.Add("nservicebus.message_type", messageType); + } + + return tags; + } } \ No newline at end of file From 0e7962b2af7e88fe848f580c621eee7dad2ecea7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20=C3=96hlund?= Date: Sat, 15 Feb 2025 12:22:20 +0100 Subject: [PATCH 4/4] Move a few instruments to be attributes instead --- docs/telemetry.md | 28 +++++++++++++------ .../Auditing/AuditIngestion.cs | 14 ++++------ .../Auditing/AuditIngestionFaultPolicy.cs | 7 ++--- .../Auditing/AuditIngestor.cs | 1 + .../HostApplicationBuilderExtensions.cs | 1 + .../Infrastructure/DurationRecorder.cs | 6 ++-- .../Infrastructure/Telemetry.cs | 14 ++++------ 7 files changed, 39 insertions(+), 32 deletions(-) diff --git a/docs/telemetry.md b/docs/telemetry.md index 9dc3946432..4726563e20 100644 --- a/docs/telemetry.md +++ b/docs/telemetry.md @@ -18,18 +18,28 @@ The following metrics are available: ### Ingestion -- `sc.audit.ingestion.success` - Successful ingested audit message count -- `sc.audit.ingestion.retry` - Retried audit message count -- `sc.audit.ingestion.failed` - Failed audit message count -- `sc.audit.ingestion.duration` - Audit message processing duration (in milliseconds) -- `sc.audit.ingestion.message_size` - Audit message body size (in kilobytes) -- `sc.audit.ingestion.forwarded` - Forwarded audit messages count +#### Success or failure + +- `sc.audit.ingestion.success` - Successful ingested audit message count (Counter) +- `sc.audit.ingestion.retry` - Retried audit message count (Counter) +- `sc.audit.ingestion.failed` - Failed audit message count (Counter) + +The above metrics also have the following attributes attached: + +- `messaging.message.body.size` - The size of the message body in bytes +- `messaging.message.type` - The logical message type of the message if present + +#### Details + +- `sc.audit.ingestion.duration` - Audit message processing duration in milliseconds (Histogram) +- `sc.audit.ingestion.forwarded` - Count of the number of forwarded audit messages if forwarding is enabled (Counter) ### Batching -- `sc.audit.ingestion.batch_duration` - Batch processing duration (in milliseconds) -- `sc.audit.ingestion.batch_size` - Batch size (number of messages) -- `sc.audit.ingestion.consecutive_batch_failures` - Consecutive batch failures +- `sc.audit.ingestion.batch_duration` - Batch processing duration in milliseconds (Histogram) + - Attributes: + - `ingestion.batch_size` +- `sc.audit.ingestion.consecutive_batch_failures` - Consecutive batch failures (Counter) ## Monitoring diff --git a/src/ServiceControl.Audit/Auditing/AuditIngestion.cs b/src/ServiceControl.Audit/Auditing/AuditIngestion.cs index 1dbe167e83..26ffb087aa 100644 --- a/src/ServiceControl.Audit/Auditing/AuditIngestion.cs +++ b/src/ServiceControl.Audit/Auditing/AuditIngestion.cs @@ -174,7 +174,8 @@ async Task EnsureStopped(CancellationToken cancellationToken = default) async Task OnMessage(MessageContext messageContext, CancellationToken cancellationToken) { - using (new DurationRecorder(ingestionDuration)) + var tags = Telemetry.GetIngestedMessageTags(messageContext.Headers, messageContext.Body); + using (new DurationRecorder(ingestionDuration, tags)) { if (settings.MessageFilter != null && settings.MessageFilter(messageContext)) { @@ -187,8 +188,7 @@ async Task OnMessage(MessageContext messageContext, CancellationToken cancellati await channel.Writer.WriteAsync(messageContext, cancellationToken); await taskCompletionSource.Task; - successfulMessagesCounter.Add(1, Telemetry.GetIngestedMessageTags(messageContext.Headers)); - messageSize.Record(messageContext.Body.Length / 1024.0); + successfulMessagesCounter.Add(1, tags); } } @@ -210,14 +210,14 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) try { // as long as there is something to read this will fetch up to MaximumConcurrency items - using (new DurationRecorder(auditBatchDuration)) + using (var recorder = new DurationRecorder(batchDuration)) { while (channel.Reader.TryRead(out var context)) { contexts.Add(context); } - auditBatchSize.Record(contexts.Count); + recorder.Tags.Add("ingestion.batch_size", contexts.Count); await auditIngestor.Ingest(contexts); } @@ -293,9 +293,7 @@ public override async Task StopAsync(CancellationToken cancellationToken) readonly IAuditIngestionUnitOfWorkFactory unitOfWorkFactory; readonly Settings settings; readonly Channel channel; - readonly Histogram auditBatchSize = Telemetry.Meter.CreateHistogram(Telemetry.CreateInstrumentName("ingestion", "batch_size"), description: "Audit ingestion average batch size"); - readonly Histogram auditBatchDuration = Telemetry.Meter.CreateHistogram(Telemetry.CreateInstrumentName("ingestion", "batch_duration"), unit: "ms", "Average audit message batch processing duration"); - readonly Histogram messageSize = Telemetry.Meter.CreateHistogram(Telemetry.CreateInstrumentName("ingestion", "message_size"), unit: "kilobytes", description: "Average audit message body size"); + readonly Histogram batchDuration = Telemetry.Meter.CreateHistogram(Telemetry.CreateInstrumentName("ingestion", "batch_duration"), unit: "ms", "Average audit message batch processing duration"); readonly Counter successfulMessagesCounter = Telemetry.Meter.CreateCounter(Telemetry.CreateInstrumentName("ingestion", "success"), description: "Successful ingested audit message count"); readonly Histogram consecutiveBatchFailuresCounter = Telemetry.Meter.CreateHistogram(Telemetry.CreateInstrumentName("ingestion", "consecutive_batch_failures"), unit: "count", description: "Consecutive audit ingestion batch failure"); readonly Histogram ingestionDuration = Telemetry.Meter.CreateHistogram(Telemetry.CreateInstrumentName("ingestion", "duration"), unit: "ms", description: "Average incoming audit message processing duration"); diff --git a/src/ServiceControl.Audit/Auditing/AuditIngestionFaultPolicy.cs b/src/ServiceControl.Audit/Auditing/AuditIngestionFaultPolicy.cs index 9ab6805633..6b8dd39a23 100644 --- a/src/ServiceControl.Audit/Auditing/AuditIngestionFaultPolicy.cs +++ b/src/ServiceControl.Audit/Auditing/AuditIngestionFaultPolicy.cs @@ -9,11 +9,10 @@ using System.Threading; using System.Threading.Tasks; using Infrastructure; - using NServiceBus; using NServiceBus.Logging; using NServiceBus.Transport; - using ServiceControl.Audit.Persistence; - using ServiceControl.Configuration; + using Persistence; + using Configuration; using ServiceControl.Infrastructure; class AuditIngestionFaultPolicy @@ -36,7 +35,7 @@ public AuditIngestionFaultPolicy(IFailedAuditStorage failedAuditStorage, Logging public async Task OnError(ErrorContext errorContext, CancellationToken cancellationToken = default) { - var tags = Telemetry.GetIngestedMessageTags(errorContext.Message.Headers); + var tags = Telemetry.GetIngestedMessageTags(errorContext.Message.Headers, errorContext.Message.Body); //Same as recoverability policy in NServiceBusFactory if (errorContext.ImmediateProcessingFailures < 3) diff --git a/src/ServiceControl.Audit/Auditing/AuditIngestor.cs b/src/ServiceControl.Audit/Auditing/AuditIngestor.cs index 1aa7a93416..a466f93be9 100644 --- a/src/ServiceControl.Audit/Auditing/AuditIngestor.cs +++ b/src/ServiceControl.Audit/Auditing/AuditIngestor.cs @@ -14,6 +14,7 @@ using Persistence.UnitOfWork; using Recoverability; using SagaAudit; + using ServiceControl.Infrastructure; using ServiceControl.Transports; public class AuditIngestor diff --git a/src/ServiceControl.Audit/HostApplicationBuilderExtensions.cs b/src/ServiceControl.Audit/HostApplicationBuilderExtensions.cs index 255aa9f2f3..13d630456e 100644 --- a/src/ServiceControl.Audit/HostApplicationBuilderExtensions.cs +++ b/src/ServiceControl.Audit/HostApplicationBuilderExtensions.cs @@ -19,6 +19,7 @@ namespace ServiceControl.Audit; using NServiceBus.Transport; using Persistence; using Transports; +using ServiceControl.Infrastructure; using OpenTelemetry.Metrics; using OpenTelemetry.Resources; diff --git a/src/ServiceControl.Audit/Infrastructure/DurationRecorder.cs b/src/ServiceControl.Audit/Infrastructure/DurationRecorder.cs index ebb5531555..bd8bc3c9d7 100644 --- a/src/ServiceControl.Audit/Infrastructure/DurationRecorder.cs +++ b/src/ServiceControl.Audit/Infrastructure/DurationRecorder.cs @@ -1,12 +1,12 @@ -namespace ServiceControl.Audit; +namespace ServiceControl.Infrastructure; using System; using System.Diagnostics; using System.Diagnostics.Metrics; -record DurationRecorder(Histogram Histogram) : IDisposable +record DurationRecorder(Histogram Histogram, TagList Tags = default) : IDisposable { readonly Stopwatch sw = Stopwatch.StartNew(); - public void Dispose() => Histogram.Record(sw.ElapsedMilliseconds); + public void Dispose() => Histogram.Record(sw.ElapsedMilliseconds, Tags); } \ No newline at end of file diff --git a/src/ServiceControl.Audit/Infrastructure/Telemetry.cs b/src/ServiceControl.Audit/Infrastructure/Telemetry.cs index 9a03dd2e73..033590b91c 100644 --- a/src/ServiceControl.Audit/Infrastructure/Telemetry.cs +++ b/src/ServiceControl.Audit/Infrastructure/Telemetry.cs @@ -1,5 +1,6 @@ -namespace ServiceControl.Audit; +namespace ServiceControl.Infrastructure; +using System; using System.Collections.Generic; using System.Diagnostics; using System.Diagnostics.Metrics; @@ -13,18 +14,15 @@ static class Telemetry public static string CreateInstrumentName(string instrumentNamespace, string instrumentName) => $"sc.audit.{instrumentNamespace}.{instrumentName}".ToLower(); - public static void AddAuditIngestionMeters(this MeterProviderBuilder builder) - { - builder.AddMeter(MeterName); - } + public static void AddAuditIngestionMeters(this MeterProviderBuilder builder) => builder.AddMeter(MeterName); - public static TagList GetIngestedMessageTags(IDictionary headers) + public static TagList GetIngestedMessageTags(IDictionary headers, ReadOnlyMemory body) { - var tags = new TagList(); + var tags = new TagList { { "messaging.message.body.size", body.Length } }; if (headers.TryGetValue(Headers.EnclosedMessageTypes, out var messageType)) { - tags.Add("nservicebus.message_type", messageType); + tags.Add("messaging.message.type", messageType); } return tags;