diff --git a/docs/telemetry.md b/docs/telemetry.md index 5ec4fe4b4f..4726563e20 100644 --- a/docs/telemetry.md +++ b/docs/telemetry.md @@ -18,24 +18,28 @@ The following metrics are available: ### Ingestion -- `sc.audit.ingestion.count` - Successful ingested audit message count -- `sc.audit.ingestion.retry` - Retried audit message count -- `sc.audit.ingestion.failed` - Failed audit message count -- `sc.audit.ingestion.duration` - Audit message processing duration (in milliseconds) -- `sc.audit.ingestion.message_size` - Audit message body size (in kilobytes) -- `sc.audit.ingestion.forwarded_count` - Forwarded audit messages count +#### Success or failure -### Batching +- `sc.audit.ingestion.success` - Successful ingested audit message count (Counter) +- `sc.audit.ingestion.retry` - Retried audit message count (Counter) +- `sc.audit.ingestion.failed` - Failed audit message count (Counter) + +The above metrics also have the following attributes attached: + +- `messaging.message.body.size` - The size of the message body in bytes +- `messaging.message.type` - The logical message type of the message if present -- `sc.audit.ingestion.batch_duration` - Batch processing duration (in milliseconds) -- `sc.audit.ingestion.batch_size` - Batch size (number of messages) -- `sc.audit.ingestion.consecutive_batch_failures` - Consecutive batch failures +#### Details -### Storage +- `sc.audit.ingestion.duration` - Audit message processing duration in milliseconds (Histogram) +- `sc.audit.ingestion.forwarded` - Count of the number of forwarded audit messages if forwarding is enabled (Counter) + +### Batching -- `sc.audit.ingestion.audits_count` - Stored audit message count -- `sc.audit.ingestion.sagas_count` - Stored sagas message count -- `sc.audit.ingestion.commit_duration` - Storage unit of work commit duration (in milliseconds) +- `sc.audit.ingestion.batch_duration` - Batch processing duration in milliseconds (Histogram) + - Attributes: + - `ingestion.batch_size` +- `sc.audit.ingestion.consecutive_batch_failures` - Consecutive batch failures (Counter) ## Monitoring diff --git a/src/ServiceControl.Audit/Auditing/AuditIngestion.cs b/src/ServiceControl.Audit/Auditing/AuditIngestion.cs index 6a2a039cb9..26ffb087aa 100644 --- a/src/ServiceControl.Audit/Auditing/AuditIngestion.cs +++ b/src/ServiceControl.Audit/Auditing/AuditIngestion.cs @@ -174,7 +174,8 @@ async Task EnsureStopped(CancellationToken cancellationToken = default) async Task OnMessage(MessageContext messageContext, CancellationToken cancellationToken) { - using (new DurationRecorder(ingestionDuration)) + var tags = Telemetry.GetIngestedMessageTags(messageContext.Headers, messageContext.Body); + using (new DurationRecorder(ingestionDuration, tags)) { if (settings.MessageFilter != null && settings.MessageFilter(messageContext)) { @@ -187,8 +188,7 @@ async Task OnMessage(MessageContext messageContext, CancellationToken cancellati await channel.Writer.WriteAsync(messageContext, cancellationToken); await taskCompletionSource.Task; - ingestedMessagesCounter.Add(1); - messageSize.Record(messageContext.Body.Length / 1024.0); + successfulMessagesCounter.Add(1, tags); } } @@ -210,15 +210,15 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) try { // as long as there is something to read this will fetch up to MaximumConcurrency items - while (channel.Reader.TryRead(out var context)) + using (var recorder = new DurationRecorder(batchDuration)) { - contexts.Add(context); - } + while (channel.Reader.TryRead(out var context)) + { + contexts.Add(context); + } - auditBatchSize.Record(contexts.Count); + recorder.Tags.Add("ingestion.batch_size", contexts.Count); - using (new DurationRecorder(auditBatchDuration)) - { await auditIngestor.Ingest(contexts); } @@ -293,10 +293,8 @@ public override async Task StopAsync(CancellationToken cancellationToken) readonly IAuditIngestionUnitOfWorkFactory unitOfWorkFactory; readonly Settings settings; readonly Channel channel; - readonly Histogram auditBatchSize = Telemetry.Meter.CreateHistogram(Telemetry.CreateInstrumentName("ingestion", "batch_size"), description: "Audit ingestion average batch size"); - readonly Histogram auditBatchDuration = Telemetry.Meter.CreateHistogram(Telemetry.CreateInstrumentName("ingestion", "batch_duration"), unit: "ms", "Average audit message batch processing duration"); - readonly Histogram messageSize = Telemetry.Meter.CreateHistogram(Telemetry.CreateInstrumentName("ingestion", "message_size"), unit: "kilobytes", description: "Average audit message body size"); - readonly Counter ingestedMessagesCounter = Telemetry.Meter.CreateCounter(Telemetry.CreateInstrumentName("ingestion", "count"), description: "Successful ingested audit message count"); + readonly Histogram batchDuration = Telemetry.Meter.CreateHistogram(Telemetry.CreateInstrumentName("ingestion", "batch_duration"), unit: "ms", "Average audit message batch processing duration"); + readonly Counter successfulMessagesCounter = Telemetry.Meter.CreateCounter(Telemetry.CreateInstrumentName("ingestion", "success"), description: "Successful ingested audit message count"); readonly Histogram consecutiveBatchFailuresCounter = Telemetry.Meter.CreateHistogram(Telemetry.CreateInstrumentName("ingestion", "consecutive_batch_failures"), unit: "count", description: "Consecutive audit ingestion batch failure"); readonly Histogram ingestionDuration = Telemetry.Meter.CreateHistogram(Telemetry.CreateInstrumentName("ingestion", "duration"), unit: "ms", description: "Average incoming audit message processing duration"); readonly Watchdog watchdog; diff --git a/src/ServiceControl.Audit/Auditing/AuditIngestionFaultPolicy.cs b/src/ServiceControl.Audit/Auditing/AuditIngestionFaultPolicy.cs index d4d5d3b900..6b8dd39a23 100644 --- a/src/ServiceControl.Audit/Auditing/AuditIngestionFaultPolicy.cs +++ b/src/ServiceControl.Audit/Auditing/AuditIngestionFaultPolicy.cs @@ -11,8 +11,8 @@ using Infrastructure; using NServiceBus.Logging; using NServiceBus.Transport; - using ServiceControl.Audit.Persistence; - using ServiceControl.Configuration; + using Persistence; + using Configuration; using ServiceControl.Infrastructure; class AuditIngestionFaultPolicy @@ -35,16 +35,19 @@ public AuditIngestionFaultPolicy(IFailedAuditStorage failedAuditStorage, Logging public async Task OnError(ErrorContext errorContext, CancellationToken cancellationToken = default) { + var tags = Telemetry.GetIngestedMessageTags(errorContext.Message.Headers, errorContext.Message.Body); + //Same as recoverability policy in NServiceBusFactory if (errorContext.ImmediateProcessingFailures < 3) { - retryCounter.Add(1); + retryCounter.Add(1, tags); return ErrorHandleResult.RetryRequired; } await StoreFailedMessageDocument(errorContext, cancellationToken); - failedCounter.Add(1); + failedCounter.Add(1, tags); + return ErrorHandleResult.Handled; } diff --git a/src/ServiceControl.Audit/Auditing/AuditIngestor.cs b/src/ServiceControl.Audit/Auditing/AuditIngestor.cs index 9885d5ba5c..a466f93be9 100644 --- a/src/ServiceControl.Audit/Auditing/AuditIngestor.cs +++ b/src/ServiceControl.Audit/Auditing/AuditIngestor.cs @@ -14,6 +14,7 @@ using Persistence.UnitOfWork; using Recoverability; using SagaAudit; + using ServiceControl.Infrastructure; using ServiceControl.Transports; public class AuditIngestor @@ -131,7 +132,7 @@ public async Task VerifyCanReachForwardingAddress() readonly Settings settings; readonly Lazy messageDispatcher; readonly string logQueueAddress; - readonly Counter forwardedMessagesCounter = Telemetry.Meter.CreateCounter(Telemetry.CreateInstrumentName("ingestion", "forwarded_count"), description: "Audit ingestion forwarded message count"); + readonly Counter forwardedMessagesCounter = Telemetry.Meter.CreateCounter(Telemetry.CreateInstrumentName("ingestion", "forwarded"), description: "Audit ingestion forwarded message count"); static readonly ILog Log = LogManager.GetLogger(); } diff --git a/src/ServiceControl.Audit/Auditing/AuditPersister.cs b/src/ServiceControl.Audit/Auditing/AuditPersister.cs index cca13fa470..008c86820a 100644 --- a/src/ServiceControl.Audit/Auditing/AuditPersister.cs +++ b/src/ServiceControl.Audit/Auditing/AuditPersister.cs @@ -2,7 +2,6 @@ { using System; using System.Collections.Generic; - using System.Diagnostics.Metrics; using System.Text.Json; using System.Threading.Tasks; using Infrastructure; @@ -61,14 +60,10 @@ public async Task> Persist(IReadOnlyList> Persist(IReadOnlyList storedAuditsCounter = Telemetry.Meter.CreateCounter(Telemetry.CreateInstrumentName("ingestion", "audits_count"), description: "Stored audit message count"); - readonly Counter storedSagasCounter = Telemetry.Meter.CreateCounter(Telemetry.CreateInstrumentName("ingestion", "sagas_count"), description: "Stored saga state count"); - readonly Histogram commitDuration = Telemetry.Meter.CreateHistogram(Telemetry.CreateInstrumentName("ingestion", "commit_duration"), unit: "ms", description: "Storage unit of work commit duration"); - static readonly ILog Logger = LogManager.GetLogger(); } } \ No newline at end of file diff --git a/src/ServiceControl.Audit/HostApplicationBuilderExtensions.cs b/src/ServiceControl.Audit/HostApplicationBuilderExtensions.cs index 255aa9f2f3..13d630456e 100644 --- a/src/ServiceControl.Audit/HostApplicationBuilderExtensions.cs +++ b/src/ServiceControl.Audit/HostApplicationBuilderExtensions.cs @@ -19,6 +19,7 @@ namespace ServiceControl.Audit; using NServiceBus.Transport; using Persistence; using Transports; +using ServiceControl.Infrastructure; using OpenTelemetry.Metrics; using OpenTelemetry.Resources; diff --git a/src/ServiceControl.Audit/Infrastructure/DurationRecorder.cs b/src/ServiceControl.Audit/Infrastructure/DurationRecorder.cs index ebb5531555..bd8bc3c9d7 100644 --- a/src/ServiceControl.Audit/Infrastructure/DurationRecorder.cs +++ b/src/ServiceControl.Audit/Infrastructure/DurationRecorder.cs @@ -1,12 +1,12 @@ -namespace ServiceControl.Audit; +namespace ServiceControl.Infrastructure; using System; using System.Diagnostics; using System.Diagnostics.Metrics; -record DurationRecorder(Histogram Histogram) : IDisposable +record DurationRecorder(Histogram Histogram, TagList Tags = default) : IDisposable { readonly Stopwatch sw = Stopwatch.StartNew(); - public void Dispose() => Histogram.Record(sw.ElapsedMilliseconds); + public void Dispose() => Histogram.Record(sw.ElapsedMilliseconds, Tags); } \ No newline at end of file diff --git a/src/ServiceControl.Audit/Infrastructure/Telemetry.cs b/src/ServiceControl.Audit/Infrastructure/Telemetry.cs index 568bced7b2..033590b91c 100644 --- a/src/ServiceControl.Audit/Infrastructure/Telemetry.cs +++ b/src/ServiceControl.Audit/Infrastructure/Telemetry.cs @@ -1,6 +1,10 @@ -namespace ServiceControl.Audit; +namespace ServiceControl.Infrastructure; +using System; +using System.Collections.Generic; +using System.Diagnostics; using System.Diagnostics.Metrics; +using NServiceBus; using OpenTelemetry.Metrics; static class Telemetry @@ -10,8 +14,17 @@ static class Telemetry public static string CreateInstrumentName(string instrumentNamespace, string instrumentName) => $"sc.audit.{instrumentNamespace}.{instrumentName}".ToLower(); - public static void AddAuditIngestionMeters(this MeterProviderBuilder builder) + public static void AddAuditIngestionMeters(this MeterProviderBuilder builder) => builder.AddMeter(MeterName); + + public static TagList GetIngestedMessageTags(IDictionary headers, ReadOnlyMemory body) { - builder.AddMeter(MeterName); + var tags = new TagList { { "messaging.message.body.size", body.Length } }; + + if (headers.TryGetValue(Headers.EnclosedMessageTypes, out var messageType)) + { + tags.Add("messaging.message.type", messageType); + } + + return tags; } } \ No newline at end of file