diff --git a/docs/telemetry.md b/docs/telemetry.md index 4726563e20..f9af25a137 100644 --- a/docs/telemetry.md +++ b/docs/telemetry.md @@ -14,32 +14,23 @@ It's recommended to use a local [OTEL Collector](https://opentelemetry.io/docs/c Example configuration: https://github.com/andreasohlund/Docker/tree/main/otel-monitoring -The following metrics are available: - -### Ingestion - -#### Success or failure - -- `sc.audit.ingestion.success` - Successful ingested audit message count (Counter) -- `sc.audit.ingestion.retry` - Retried audit message count (Counter) -- `sc.audit.ingestion.failed` - Failed audit message count (Counter) - -The above metrics also have the following attributes attached: - -- `messaging.message.body.size` - The size of the message body in bytes -- `messaging.message.type` - The logical message type of the message if present - -#### Details - -- `sc.audit.ingestion.duration` - Audit message processing duration in milliseconds (Histogram) -- `sc.audit.ingestion.forwarded` - Count of the number of forwarded audit messages if forwarding is enabled (Counter) - -### Batching - -- `sc.audit.ingestion.batch_duration` - Batch processing duration in milliseconds (Histogram) - - Attributes: - - `ingestion.batch_size` -- `sc.audit.ingestion.consecutive_batch_failures` - Consecutive batch failures (Counter) +The following ingestion metrics with their corresponding dimensions are available: + +- `sc.audit.ingestion.batch_duration_seconds` - Message batch processing duration in seconds + - `result` - Indicates if the full batch size was used (batch size == max concurrency of the transport): `full`, `partial` or `failed` +- `sc.audit.ingestion.message_duration_seconds` - Audit message processing duration in seconds + - `message.category` - Indicates the category of the message ingested: `audit-message`, `saga-update` or `control-message` + - `result` - Indicates the outcome of the operation: `success`, `failed` or `skipped` (if the message was filtered out and skipped) +- `sc.audit.ingestion.failures_total` - Failure counter + - `message.category` - Indicates the category of the message ingested: `audit-message`, `saga-update` or `control-message` + - `result` - Indicates how the failure was resolved: `retry` or `stored-poision` +- `sc.audit.ingestion.consecutive_batch_failure_total` - Consecutive batch failures + +Example queries in PromQL for use in Grafana: + +- Ingestion rate: `sum (rate(sc_audit_ingestion_message_duration_seconds_count[$__rate_interval])) by (exported_job)` +- Failure rate: `sum(rate(sc_audit_ingestion_failures_total[$__rate_interval])) by (exported_job,result)` +- Message duration: `histogram_quantile(0.9,sum(rate(sc_audit_ingestion_message_duration_seconds_bucket[$__rate_interval])) by (le,exported_job))` ## Monitoring diff --git a/src/Directory.Packages.props b/src/Directory.Packages.props index d4ee800af0..b2b400a707 100644 --- a/src/Directory.Packages.props +++ b/src/Directory.Packages.props @@ -50,6 +50,7 @@ + @@ -63,6 +64,7 @@ + @@ -89,4 +91,4 @@ - + \ No newline at end of file diff --git a/src/ServiceControl.Audit/Auditing/AuditIngestion.cs b/src/ServiceControl.Audit/Auditing/AuditIngestion.cs index 8437d564cd..2b76b152ea 100644 --- a/src/ServiceControl.Audit/Auditing/AuditIngestion.cs +++ b/src/ServiceControl.Audit/Auditing/AuditIngestion.cs @@ -2,11 +2,11 @@ { using System; using System.Collections.Generic; - using System.Diagnostics.Metrics; using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; using Infrastructure.Settings; + using Metrics; using Microsoft.Extensions.Hosting; using NServiceBus; using NServiceBus.Logging; @@ -26,7 +26,8 @@ public AuditIngestion( AuditIngestionCustomCheck.State ingestionState, AuditIngestor auditIngestor, IAuditIngestionUnitOfWorkFactory unitOfWorkFactory, - IHostApplicationLifetime applicationLifetime) + IHostApplicationLifetime applicationLifetime, + IngestionMetrics metrics) { inputEndpoint = settings.AuditQueue; this.transportCustomization = transportCustomization; @@ -35,13 +36,16 @@ public AuditIngestion( this.unitOfWorkFactory = unitOfWorkFactory; this.settings = settings; this.applicationLifetime = applicationLifetime; + this.metrics = metrics; if (!transportSettings.MaxConcurrency.HasValue) { throw new ArgumentException("MaxConcurrency is not set in TransportSettings"); } - channel = Channel.CreateBounded(new BoundedChannelOptions(transportSettings.MaxConcurrency.Value) + MaxBatchSize = transportSettings.MaxConcurrency.Value; + + channel = Channel.CreateBounded(new BoundedChannelOptions(MaxBatchSize) { SingleReader = true, SingleWriter = false, @@ -49,7 +53,7 @@ public AuditIngestion( FullMode = BoundedChannelFullMode.Wait }); - errorHandlingPolicy = new AuditIngestionFaultPolicy(failedImportsStorage, settings.LoggingSettings, OnCriticalError); + errorHandlingPolicy = new AuditIngestionFaultPolicy(failedImportsStorage, settings.LoggingSettings, OnCriticalError, metrics); watchdog = new Watchdog( "audit message ingestion", @@ -190,22 +194,21 @@ async Task EnsureStopped(CancellationToken cancellationToken) async Task OnMessage(MessageContext messageContext, CancellationToken cancellationToken) { - var tags = Telemetry.GetIngestedMessageTags(messageContext.Headers, messageContext.Body); - using (new DurationRecorder(ingestionDuration, tags)) + using var messageIngestionMetrics = metrics.BeginIngestion(messageContext); + + if (settings.MessageFilter != null && settings.MessageFilter(messageContext)) { - if (settings.MessageFilter != null && settings.MessageFilter(messageContext)) - { - return; - } + messageIngestionMetrics.Skipped(); + return; + } - var taskCompletionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - messageContext.SetTaskCompletionSource(taskCompletionSource); + var taskCompletionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + messageContext.SetTaskCompletionSource(taskCompletionSource); - await channel.Writer.WriteAsync(messageContext, cancellationToken); - await taskCompletionSource.Task; + await channel.Writer.WriteAsync(messageContext, cancellationToken); + _ = await taskCompletionSource.Task; - successfulMessagesCounter.Add(1, tags); - } + messageIngestionMetrics.Success(); } public override async Task StartAsync(CancellationToken cancellationToken) @@ -218,27 +221,24 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) { try { - var contexts = new List(transportSettings.MaxConcurrency.Value); + var contexts = new List(MaxBatchSize); while (await channel.Reader.WaitToReadAsync(stoppingToken)) { // will only enter here if there is something to read. try { + using var batchMetrics = metrics.BeginBatch(MaxBatchSize); + // as long as there is something to read this will fetch up to MaximumConcurrency items - using (var recorder = new DurationRecorder(batchDuration)) + while (channel.Reader.TryRead(out var context)) { - while (channel.Reader.TryRead(out var context)) - { - contexts.Add(context); - } - - recorder.Tags.Add("ingestion.batch_size", contexts.Count); - - await auditIngestor.Ingest(contexts); + contexts.Add(context); } - consecutiveBatchFailuresCounter.Record(0); + await auditIngestor.Ingest(contexts); + + batchMetrics.Complete(contexts.Count); } catch (Exception e) { @@ -255,9 +255,6 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) } logger.Info("Ingesting messages failed", e); - - // no need to do interlocked increment since this is running sequential - consecutiveBatchFailuresCounter.Record(consecutiveBatchFailures++); } finally { @@ -298,8 +295,8 @@ public override async Task StopAsync(CancellationToken cancellationToken) TransportInfrastructure transportInfrastructure; IMessageReceiver messageReceiver; - long consecutiveBatchFailures = 0; + readonly int MaxBatchSize; readonly SemaphoreSlim startStopSemaphore = new(1); readonly string inputEndpoint; readonly ITransportCustomization transportCustomization; @@ -309,12 +306,9 @@ public override async Task StopAsync(CancellationToken cancellationToken) readonly IAuditIngestionUnitOfWorkFactory unitOfWorkFactory; readonly Settings settings; readonly Channel channel; - readonly Histogram batchDuration = Telemetry.Meter.CreateHistogram(Telemetry.CreateInstrumentName("ingestion", "batch_duration"), unit: "ms", "Average audit message batch processing duration"); - readonly Counter successfulMessagesCounter = Telemetry.Meter.CreateCounter(Telemetry.CreateInstrumentName("ingestion", "success"), description: "Successful ingested audit message count"); - readonly Histogram consecutiveBatchFailuresCounter = Telemetry.Meter.CreateHistogram(Telemetry.CreateInstrumentName("ingestion", "consecutive_batch_failures"), unit: "count", description: "Consecutive audit ingestion batch failure"); - readonly Histogram ingestionDuration = Telemetry.Meter.CreateHistogram(Telemetry.CreateInstrumentName("ingestion", "duration"), unit: "ms", description: "Average incoming audit message processing duration"); readonly Watchdog watchdog; readonly IHostApplicationLifetime applicationLifetime; + readonly IngestionMetrics metrics; static readonly ILog logger = LogManager.GetLogger(); diff --git a/src/ServiceControl.Audit/Auditing/AuditIngestionFaultPolicy.cs b/src/ServiceControl.Audit/Auditing/AuditIngestionFaultPolicy.cs index 6b8dd39a23..6ccfbedcce 100644 --- a/src/ServiceControl.Audit/Auditing/AuditIngestionFaultPolicy.cs +++ b/src/ServiceControl.Audit/Auditing/AuditIngestionFaultPolicy.cs @@ -1,115 +1,111 @@ -namespace ServiceControl.Audit.Auditing +namespace ServiceControl.Audit.Auditing; + +using System; +using System.Diagnostics; +using System.IO; +using System.Runtime.InteropServices; +using System.Runtime.Versioning; +using System.Threading; +using System.Threading.Tasks; +using Infrastructure; +using NServiceBus.Logging; +using NServiceBus.Transport; +using Persistence; +using Configuration; +using Metrics; +using ServiceControl.Infrastructure; + +class AuditIngestionFaultPolicy { - using System; - using System.Diagnostics; - using System.Diagnostics.Metrics; - using System.IO; - using System.Runtime.InteropServices; - using System.Runtime.Versioning; - using System.Threading; - using System.Threading.Tasks; - using Infrastructure; - using NServiceBus.Logging; - using NServiceBus.Transport; - using Persistence; - using Configuration; - using ServiceControl.Infrastructure; - - class AuditIngestionFaultPolicy + public AuditIngestionFaultPolicy(IFailedAuditStorage failedAuditStorage, LoggingSettings settings, Func onCriticalError, IngestionMetrics metrics) { - readonly IFailedAuditStorage failedAuditStorage; - readonly string logPath; - readonly ImportFailureCircuitBreaker failureCircuitBreaker; + failureCircuitBreaker = new ImportFailureCircuitBreaker(onCriticalError); + this.failedAuditStorage = failedAuditStorage; + this.metrics = metrics; - public AuditIngestionFaultPolicy(IFailedAuditStorage failedAuditStorage, LoggingSettings settings, Func onCriticalError) + if (!AppEnvironment.RunningInContainer) { - failureCircuitBreaker = new ImportFailureCircuitBreaker(onCriticalError); - this.failedAuditStorage = failedAuditStorage; - - if (!AppEnvironment.RunningInContainer) - { - logPath = Path.Combine(settings.LogPath, @"FailedImports\Audit"); - Directory.CreateDirectory(logPath); - } + logPath = Path.Combine(settings.LogPath, @"FailedImports\Audit"); + Directory.CreateDirectory(logPath); } + } - public async Task OnError(ErrorContext errorContext, CancellationToken cancellationToken = default) - { - var tags = Telemetry.GetIngestedMessageTags(errorContext.Message.Headers, errorContext.Message.Body); - - //Same as recoverability policy in NServiceBusFactory - if (errorContext.ImmediateProcessingFailures < 3) - { - retryCounter.Add(1, tags); - return ErrorHandleResult.RetryRequired; - } + public async Task OnError(ErrorContext errorContext, CancellationToken cancellationToken = default) + { + using var errorMetrics = metrics.BeginErrorHandling(errorContext); - await StoreFailedMessageDocument(errorContext, cancellationToken); + //Same as recoverability policy in NServiceBusFactory + if (errorContext.ImmediateProcessingFailures < 3) + { + errorMetrics.Retry(); + return ErrorHandleResult.RetryRequired; + } - failedCounter.Add(1, tags); + await StoreFailedMessageDocument(errorContext, cancellationToken); - return ErrorHandleResult.Handled; - } + return ErrorHandleResult.Handled; + } - async Task StoreFailedMessageDocument(ErrorContext errorContext, CancellationToken cancellationToken) + async Task StoreFailedMessageDocument(ErrorContext errorContext, CancellationToken cancellationToken) + { + var failure = new FailedAuditImport { - var failure = new FailedAuditImport - { - Id = Guid.NewGuid().ToString(), - Message = new FailedTransportMessage - { - Id = errorContext.Message.MessageId, - Headers = errorContext.Message.Headers, - // At the moment we are taking a defensive copy of the body to avoid issues with the message body - // buffers being returned to the pool and potentially being overwritten. Once we know how RavenDB - // handles byte[] to ReadOnlyMemory conversion we might be able to remove this. - Body = errorContext.Message.Body.ToArray() - }, - ExceptionInfo = errorContext.Exception.ToFriendlyString() - }; - - try - { - await DoLogging(errorContext.Exception, failure, cancellationToken); - } - finally + Id = Guid.NewGuid().ToString(), + Message = new FailedTransportMessage { - failureCircuitBreaker.Increment(errorContext.Exception); - } + Id = errorContext.Message.MessageId, + Headers = errorContext.Message.Headers, + // At the moment we are taking a defensive copy of the body to avoid issues with the message body + // buffers being returned to the pool and potentially being overwritten. Once we know how RavenDB + // handles byte[] to ReadOnlyMemory conversion we might be able to remove this. + Body = errorContext.Message.Body.ToArray() + }, + ExceptionInfo = errorContext.Exception.ToFriendlyString() + }; + + try + { + await DoLogging(errorContext.Exception, failure, cancellationToken); } - - async Task DoLogging(Exception exception, FailedAuditImport failure, CancellationToken cancellationToken) + finally { - log.Error("Failed importing error message", exception); + failureCircuitBreaker.Increment(errorContext.Exception); + } + } + + async Task DoLogging(Exception exception, FailedAuditImport failure, CancellationToken cancellationToken) + { + log.Error("Failed importing error message", exception); - // Write to storage - await failedAuditStorage.SaveFailedAuditImport(failure); + // Write to storage + await failedAuditStorage.SaveFailedAuditImport(failure); - if (!AppEnvironment.RunningInContainer) + if (!AppEnvironment.RunningInContainer) + { + // Write to Log Path + var filePath = Path.Combine(logPath, failure.Id + ".txt"); + await File.WriteAllTextAsync(filePath, failure.ExceptionInfo, cancellationToken); + + if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) { - // Write to Log Path - var filePath = Path.Combine(logPath, failure.Id + ".txt"); - await File.WriteAllTextAsync(filePath, failure.ExceptionInfo, cancellationToken); - - if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) - { - WriteToEventLog("A message import has failed. A log file has been written to " + filePath); - } + WriteToEventLog("A message import has failed. A log file has been written to " + filePath); } } + } - [SupportedOSPlatform("windows")] - void WriteToEventLog(string message) - { + [SupportedOSPlatform("windows")] + void WriteToEventLog(string message) + { #if DEBUG - EventSourceCreator.Create(); + EventSourceCreator.Create(); #endif - EventLog.WriteEntry(EventSourceCreator.SourceName, message, EventLogEntryType.Error); - } + EventLog.WriteEntry(EventSourceCreator.SourceName, message, EventLogEntryType.Error); + } - readonly Counter retryCounter = Telemetry.Meter.CreateCounter(Telemetry.CreateInstrumentName("ingestion", "retry"), description: "Audit ingestion retries count"); - readonly Counter failedCounter = Telemetry.Meter.CreateCounter(Telemetry.CreateInstrumentName("ingestion", "failed"), description: "Audit ingestion failure count"); + readonly IFailedAuditStorage failedAuditStorage; + readonly IngestionMetrics metrics; + readonly string logPath; + readonly ImportFailureCircuitBreaker failureCircuitBreaker; - static readonly ILog log = LogManager.GetLogger(); - } + static readonly ILog log = LogManager.GetLogger(); } \ No newline at end of file diff --git a/src/ServiceControl.Audit/Auditing/AuditIngestor.cs b/src/ServiceControl.Audit/Auditing/AuditIngestor.cs index a466f93be9..4a333a4b3e 100644 --- a/src/ServiceControl.Audit/Auditing/AuditIngestor.cs +++ b/src/ServiceControl.Audit/Auditing/AuditIngestor.cs @@ -2,7 +2,6 @@ { using System; using System.Collections.Generic; - using System.Diagnostics.Metrics; using System.Linq; using System.Threading.Tasks; using Infrastructure.Settings; @@ -14,8 +13,7 @@ using Persistence.UnitOfWork; using Recoverability; using SagaAudit; - using ServiceControl.Infrastructure; - using ServiceControl.Transports; + using Transports; public class AuditIngestor { @@ -31,6 +29,7 @@ ITransportCustomization transportCustomization { this.settings = settings; this.messageDispatcher = messageDispatcher; + var enrichers = new IEnrichImportedAuditMessages[] { new MessageTypeEnricher(), new EnrichWithTrackingIds(), new ProcessingStatisticsEnricher(), new DetectNewEndpointsFromAuditImportsEnricher(endpointInstanceMonitoring), new DetectSuccessfulRetriesEnricher(), new SagaRelationshipsEnricher() }.Concat(auditEnrichers).ToArray(); logQueueAddress = transportCustomization.ToTransportQualifiedQueueName(settings.AuditLogQueue); @@ -52,7 +51,6 @@ public async Task Ingest(List contexts) if (settings.ForwardAuditMessages) { await Forward(stored, logQueueAddress); - forwardedMessagesCounter.Add(stored.Count); } foreach (var context in contexts) @@ -132,7 +130,6 @@ public async Task VerifyCanReachForwardingAddress() readonly Settings settings; readonly Lazy messageDispatcher; readonly string logQueueAddress; - readonly Counter forwardedMessagesCounter = Telemetry.Meter.CreateCounter(Telemetry.CreateInstrumentName("ingestion", "forwarded"), description: "Audit ingestion forwarded message count"); static readonly ILog Log = LogManager.GetLogger(); } diff --git a/src/ServiceControl.Audit/Auditing/Metrics/BatchMetrics.cs b/src/ServiceControl.Audit/Auditing/Metrics/BatchMetrics.cs new file mode 100644 index 0000000000..41d2994fa9 --- /dev/null +++ b/src/ServiceControl.Audit/Auditing/Metrics/BatchMetrics.cs @@ -0,0 +1,33 @@ +namespace ServiceControl.Audit.Auditing.Metrics; + +using System; +using System.Diagnostics; +using System.Diagnostics.Metrics; + +public record BatchMetrics(int MaxBatchSize, Histogram BatchDuration, Action IsSuccess) : IDisposable +{ + public void Dispose() + { + var isSuccess = actualBatchSize > 0; + + IsSuccess(isSuccess); + + string result; + + if (isSuccess) + { + result = actualBatchSize == MaxBatchSize ? "full" : "partial"; + } + else + { + result = "failed"; + } + + BatchDuration.Record(sw.Elapsed.TotalSeconds, new TagList { { "result", result } }); + } + + public void Complete(int size) => actualBatchSize = size; + + int actualBatchSize = -1; + readonly Stopwatch sw = Stopwatch.StartNew(); +} \ No newline at end of file diff --git a/src/ServiceControl.Audit/Auditing/Metrics/ErrorMetrics.cs b/src/ServiceControl.Audit/Auditing/Metrics/ErrorMetrics.cs new file mode 100644 index 0000000000..f01e6a9293 --- /dev/null +++ b/src/ServiceControl.Audit/Auditing/Metrics/ErrorMetrics.cs @@ -0,0 +1,21 @@ +namespace ServiceControl.Audit.Auditing.Metrics; + +using System; +using System.Diagnostics.Metrics; +using NServiceBus.Transport; + +public record ErrorMetrics(ErrorContext Context, Counter Failures) : IDisposable +{ + public void Dispose() + { + var tags = IngestionMetrics.GetMessageTags(Context.Message.Headers); + + tags.Add("result", retry ? "retry" : "stored-poison"); + + Failures.Add(1, tags); + } + + public void Retry() => retry = true; + + bool retry; +} \ No newline at end of file diff --git a/src/ServiceControl.Audit/Auditing/Metrics/IngestionMetrics.cs b/src/ServiceControl.Audit/Auditing/Metrics/IngestionMetrics.cs new file mode 100644 index 0000000000..2c79c586d5 --- /dev/null +++ b/src/ServiceControl.Audit/Auditing/Metrics/IngestionMetrics.cs @@ -0,0 +1,74 @@ +namespace ServiceControl.Audit.Auditing.Metrics; + +using System.Collections.Generic; +using System.Diagnostics; +using System.Diagnostics.Metrics; +using EndpointPlugin.Messages.SagaState; +using NServiceBus; +using NServiceBus.Transport; + +public class IngestionMetrics +{ + public const string MeterName = "Particular.ServiceControl.Audit"; + + public static readonly string BatchDurationInstrumentName = $"{InstrumentPrefix}.batch_duration_seconds"; + public static readonly string MessageDurationInstrumentName = $"{InstrumentPrefix}.message_duration_seconds"; + + public IngestionMetrics(IMeterFactory meterFactory) + { + var meter = meterFactory.Create(MeterName, MeterVersion); + + batchDuration = meter.CreateHistogram(BatchDurationInstrumentName, unit: "seconds", "Message batch processing duration in seconds"); + ingestionDuration = meter.CreateHistogram(MessageDurationInstrumentName, unit: "seconds", description: "Audit message processing duration in seconds"); + consecutiveBatchFailureGauge = meter.CreateGauge($"{InstrumentPrefix}.consecutive_batch_failure_total", description: "Consecutive audit ingestion batch failure"); + failureCounter = meter.CreateCounter($"{InstrumentPrefix}.failures_total", description: "Audit ingestion failure count"); + } + + public MessageMetrics BeginIngestion(MessageContext messageContext) => new(messageContext, ingestionDuration); + + public ErrorMetrics BeginErrorHandling(ErrorContext errorContext) => new(errorContext, failureCounter); + + public BatchMetrics BeginBatch(int maxBatchSize) => new(maxBatchSize, batchDuration, RecordBatchOutcome); + + public static TagList GetMessageTags(Dictionary headers) + { + var tags = new TagList(); + + if (headers.TryGetValue(Headers.EnclosedMessageTypes, out var messageType)) + { + tags.Add("message.category", messageType == SagaUpdateMessageType ? "saga-update" : "audit-message"); + } + else + { + tags.Add("message.category", "control-message"); + } + + return tags; + } + + void RecordBatchOutcome(bool success) + { + if (success) + { + consecutiveBatchFailures = 0; + } + else + { + consecutiveBatchFailures++; + } + + consecutiveBatchFailureGauge.Record(consecutiveBatchFailures); + } + + long consecutiveBatchFailures; + + readonly Histogram batchDuration; + readonly Gauge consecutiveBatchFailureGauge; + readonly Histogram ingestionDuration; + readonly Counter failureCounter; + + const string MeterVersion = "0.1.0"; + const string InstrumentPrefix = "sc.audit.ingestion"; + + static readonly string SagaUpdateMessageType = typeof(SagaUpdatedMessage).FullName; +} \ No newline at end of file diff --git a/src/ServiceControl.Audit/Auditing/Metrics/IngestionMetricsConfiguration.cs b/src/ServiceControl.Audit/Auditing/Metrics/IngestionMetricsConfiguration.cs new file mode 100644 index 0000000000..b78b6cd62a --- /dev/null +++ b/src/ServiceControl.Audit/Auditing/Metrics/IngestionMetricsConfiguration.cs @@ -0,0 +1,19 @@ +namespace ServiceControl.Audit.Auditing.Metrics; + +using OpenTelemetry.Metrics; + +public static class IngestionMetricsConfiguration +{ + public static void AddIngestionMetrics(this MeterProviderBuilder builder) + { + builder.AddMeter(IngestionMetrics.MeterName); + + // Note: Views can be replaced by new InstrumentAdvice { HistogramBucketBoundaries = [...] }; once we can update to the latest OpenTelemetry packages + builder.AddView( + instrumentName: IngestionMetrics.MessageDurationInstrumentName, + new ExplicitBucketHistogramConfiguration { Boundaries = [0.01, 0.05, 0.1, 0.5, 1, 5] }); + builder.AddView( + instrumentName: IngestionMetrics.BatchDurationInstrumentName, + new ExplicitBucketHistogramConfiguration { Boundaries = [0.01, 0.05, 0.1, 0.5, 1, 5] }); + } +} \ No newline at end of file diff --git a/src/ServiceControl.Audit/Auditing/Metrics/MessageMetrics.cs b/src/ServiceControl.Audit/Auditing/Metrics/MessageMetrics.cs new file mode 100644 index 0000000000..df85310365 --- /dev/null +++ b/src/ServiceControl.Audit/Auditing/Metrics/MessageMetrics.cs @@ -0,0 +1,25 @@ +namespace ServiceControl.Audit.Auditing.Metrics; + +using System; +using System.Diagnostics; +using System.Diagnostics.Metrics; +using NServiceBus.Transport; + +public record MessageMetrics(MessageContext Context, Histogram Duration) : IDisposable +{ + public void Skipped() => result = "skipped"; + + public void Success() => result = "success"; + + public void Dispose() + { + var tags = IngestionMetrics.GetMessageTags(Context.Headers); + + tags.Add("result", result); + Duration.Record(sw.Elapsed.TotalSeconds, tags); + } + + string result = "failed"; + + readonly Stopwatch sw = Stopwatch.StartNew(); +} \ No newline at end of file diff --git a/src/ServiceControl.Audit/HostApplicationBuilderExtensions.cs b/src/ServiceControl.Audit/HostApplicationBuilderExtensions.cs index c38e023869..ccacf5db1c 100644 --- a/src/ServiceControl.Audit/HostApplicationBuilderExtensions.cs +++ b/src/ServiceControl.Audit/HostApplicationBuilderExtensions.cs @@ -6,6 +6,7 @@ namespace ServiceControl.Audit; using System.Threading.Tasks; using Auditing; using Hosting; +using Auditing.Metrics; using Infrastructure; using Infrastructure.Settings; using Microsoft.AspNetCore.HttpLogging; @@ -21,22 +22,22 @@ namespace ServiceControl.Audit; using NServiceBus.Transport; using Persistence; using Transports; -using ServiceControl.Infrastructure; using OpenTelemetry.Metrics; using OpenTelemetry.Resources; static class HostApplicationBuilderExtensions { + static readonly string InstanceVersion = FileVersionInfo.GetVersionInfo(typeof(HostApplicationBuilderExtensions).Assembly.Location).ProductVersion; + public static void AddServiceControlAudit(this IHostApplicationBuilder builder, Func onCriticalError, Settings settings, EndpointConfiguration configuration) { - var version = FileVersionInfo.GetVersionInfo(typeof(HostApplicationBuilderExtensions).Assembly.Location).ProductVersion; var persistenceConfiguration = PersistenceConfigurationFactory.LoadPersistenceConfiguration(settings); var persistenceSettings = persistenceConfiguration.BuildPersistenceSettings(settings); - RecordStartup(version, settings, configuration, persistenceConfiguration); + RecordStartup(settings, configuration, persistenceConfiguration); builder.Logging.ClearProviders(); builder.Logging.AddNLog(); @@ -71,30 +72,7 @@ public static void AddServiceControlAudit(this IHostApplicationBuilder builder, NServiceBusFactory.Configure(settings, transportCustomization, transportSettings, onCriticalError, configuration); builder.UseNServiceBus(configuration); - if (!string.IsNullOrEmpty(settings.OtlpEndpointUrl)) - { - if (!Uri.TryCreate(settings.OtlpEndpointUrl, UriKind.Absolute, out var otelMetricsUri)) - { - throw new UriFormatException($"Invalid OtlpEndpointUrl: {settings.OtlpEndpointUrl}"); - } - - builder.Services.AddOpenTelemetry() - .ConfigureResource(b => b.AddService( - serviceName: settings.InstanceName, - serviceVersion: version, - autoGenerateServiceInstanceId: true)) - .WithMetrics(b => - { - b.AddAuditIngestionMeters(); - b.AddOtlpExporter(e => - { - e.Endpoint = otelMetricsUri; - }); - }); - - var logger = LogManager.GetLogger(typeof(HostApplicationBuilderExtensions)); - logger.InfoFormat("OpenTelemetry metrics exporter enabled: {0}", settings.OtlpEndpointUrl); - } + builder.AddMetrics(settings); // Configure after the NServiceBus hosted service to ensure NServiceBus is already started if (settings.IngestAuditMessages) @@ -116,11 +94,42 @@ public static void AddServiceControlAuditInstallers(this IHostApplicationBuilder builder.Services.AddInstaller(persistenceSettings, persistenceConfiguration); } - static void RecordStartup(string version, Settings settings, EndpointConfiguration endpointConfiguration, IPersistenceConfiguration persistenceConfiguration) + public static void AddMetrics(this IHostApplicationBuilder builder, Settings settings) + { + builder.Services.AddSingleton(); + + if (!string.IsNullOrEmpty(settings.OtlpEndpointUrl)) + { + if (!Uri.TryCreate(settings.OtlpEndpointUrl, UriKind.Absolute, out var otelMetricsUri)) + { + throw new UriFormatException($"Invalid OtlpEndpointUrl: {settings.OtlpEndpointUrl}"); + } + + builder.Services.AddOpenTelemetry() + .ConfigureResource(b => b.AddService( + serviceName: settings.InstanceName, + serviceVersion: InstanceVersion, + autoGenerateServiceInstanceId: true)) + .WithMetrics(b => + { + b.AddIngestionMetrics(); + b.AddOtlpExporter(e => e.Endpoint = otelMetricsUri); + if (Debugger.IsAttached) + { + b.AddConsoleExporter(); + } + }); + + var logger = LogManager.GetLogger(typeof(HostApplicationBuilderExtensions)); + logger.InfoFormat("OpenTelemetry metrics exporter enabled: {0}", settings.OtlpEndpointUrl); + } + } + + static void RecordStartup(Settings settings, EndpointConfiguration endpointConfiguration, IPersistenceConfiguration persistenceConfiguration) { var startupMessage = $@" ------------------------------------------------------------- -ServiceControl Audit Version: {version} +ServiceControl Audit Version: {InstanceVersion} Audit Retention Period: {settings.AuditRetentionPeriod} Forwarding Audit Messages: {settings.ForwardAuditMessages} ServiceControl Logging Level: {settings.LoggingSettings.LogLevel} diff --git a/src/ServiceControl.Audit/Infrastructure/DurationRecorder.cs b/src/ServiceControl.Audit/Infrastructure/DurationRecorder.cs deleted file mode 100644 index bd8bc3c9d7..0000000000 --- a/src/ServiceControl.Audit/Infrastructure/DurationRecorder.cs +++ /dev/null @@ -1,12 +0,0 @@ -namespace ServiceControl.Infrastructure; - -using System; -using System.Diagnostics; -using System.Diagnostics.Metrics; - -record DurationRecorder(Histogram Histogram, TagList Tags = default) : IDisposable -{ - readonly Stopwatch sw = Stopwatch.StartNew(); - - public void Dispose() => Histogram.Record(sw.ElapsedMilliseconds, Tags); -} \ No newline at end of file diff --git a/src/ServiceControl.Audit/Infrastructure/Telemetry.cs b/src/ServiceControl.Audit/Infrastructure/Telemetry.cs deleted file mode 100644 index 033590b91c..0000000000 --- a/src/ServiceControl.Audit/Infrastructure/Telemetry.cs +++ /dev/null @@ -1,30 +0,0 @@ -namespace ServiceControl.Infrastructure; - -using System; -using System.Collections.Generic; -using System.Diagnostics; -using System.Diagnostics.Metrics; -using NServiceBus; -using OpenTelemetry.Metrics; - -static class Telemetry -{ - const string MeterName = "Particular.ServiceControl.Audit"; - public static readonly Meter Meter = new(MeterName, "0.1.0"); - - public static string CreateInstrumentName(string instrumentNamespace, string instrumentName) => $"sc.audit.{instrumentNamespace}.{instrumentName}".ToLower(); - - public static void AddAuditIngestionMeters(this MeterProviderBuilder builder) => builder.AddMeter(MeterName); - - public static TagList GetIngestedMessageTags(IDictionary headers, ReadOnlyMemory body) - { - var tags = new TagList { { "messaging.message.body.size", body.Length } }; - - if (headers.TryGetValue(Headers.EnclosedMessageTypes, out var messageType)) - { - tags.Add("messaging.message.type", messageType); - } - - return tags; - } -} \ No newline at end of file diff --git a/src/ServiceControl.Audit/ServiceControl.Audit.csproj b/src/ServiceControl.Audit/ServiceControl.Audit.csproj index 6e0a889e9f..3d5245157b 100644 --- a/src/ServiceControl.Audit/ServiceControl.Audit.csproj +++ b/src/ServiceControl.Audit/ServiceControl.Audit.csproj @@ -29,8 +29,10 @@ + +