From eb6e9dce74cf26d486dbff7f5ec5a036b95c4a4a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20=C3=96hlund?= Date: Fri, 31 Jan 2025 14:15:29 +0100 Subject: [PATCH 01/23] Emit metrics via otel instead of custom format --- .../ServiceControlComponentRunner.cs | 8 ++- src/ServiceControl.Audit/App.config | 2 +- .../Auditing/AuditIngestion.cs | 29 ++++---- .../Auditing/AuditIngestor.cs | 23 +----- .../Auditing/AuditMetrics.cs | 9 +++ .../Auditing/AuditPersister.cs | 71 +++++++------------ .../HostApplicationBuilderExtensions.cs | 38 ++++++++-- .../Metrics/MetricsReporterHostedService.cs | 30 -------- .../MetricsServiceCollectionExtensions.cs | 14 ---- .../Infrastructure/Settings/Settings.cs | 8 ++- .../ServiceControl.Audit.csproj | 5 +- 11 files changed, 97 insertions(+), 140 deletions(-) create mode 100644 src/ServiceControl.Audit/Auditing/AuditMetrics.cs delete mode 100644 src/ServiceControl.Audit/Infrastructure/Metrics/MetricsReporterHostedService.cs delete mode 100644 src/ServiceControl.Audit/Infrastructure/Metrics/MetricsServiceCollectionExtensions.cs diff --git a/src/ServiceControl.Audit.AcceptanceTests/TestSupport/ServiceControlComponentRunner.cs b/src/ServiceControl.Audit.AcceptanceTests/TestSupport/ServiceControlComponentRunner.cs index 35990ee062..12c10ddab0 100644 --- a/src/ServiceControl.Audit.AcceptanceTests/TestSupport/ServiceControlComponentRunner.cs +++ b/src/ServiceControl.Audit.AcceptanceTests/TestSupport/ServiceControlComponentRunner.cs @@ -51,17 +51,19 @@ async Task InitializeServiceControl(ScenarioContext context) TransportConnectionString = transportToUse.ConnectionString, MaximumConcurrencyLevel = 2, ServiceControlQueueAddress = "SHOULDNOTBEUSED", + OtelMetricsUrl = "http://localhost:4317", MessageFilter = messageContext => { var id = messageContext.NativeMessageId; var headers = messageContext.Headers; - var log = NServiceBus.Logging.LogManager.GetLogger(); - headers.TryGetValue(Headers.MessageId, out var originalMessageId); + headers.TryGetValue(Headers.MessageId, + out var originalMessageId); log.Debug($"OnMessage for message '{id}'({originalMessageId ?? string.Empty})."); //Do not filter out CC, SA and HB messages as they can't be stamped - if (headers.TryGetValue(Headers.EnclosedMessageTypes, out var messageTypes) + if (headers.TryGetValue(Headers.EnclosedMessageTypes, + out var messageTypes) && (messageTypes.StartsWith("ServiceControl.Contracts") || messageTypes.StartsWith("ServiceControl.EndpointPlugin"))) { return false; diff --git a/src/ServiceControl.Audit/App.config b/src/ServiceControl.Audit/App.config index 83610fa6ee..7cf230c9f0 100644 --- a/src/ServiceControl.Audit/App.config +++ b/src/ServiceControl.Audit/App.config @@ -8,7 +8,7 @@ These settings are only here so that we can debug ServiceControl while developin - + diff --git a/src/ServiceControl.Audit/Auditing/AuditIngestion.cs b/src/ServiceControl.Audit/Auditing/AuditIngestion.cs index 56d4244e84..676930e539 100644 --- a/src/ServiceControl.Audit/Auditing/AuditIngestion.cs +++ b/src/ServiceControl.Audit/Auditing/AuditIngestion.cs @@ -3,6 +3,7 @@ using System; using System.Collections.Generic; using System.Diagnostics; + using System.Diagnostics.Metrics; using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; @@ -14,18 +15,14 @@ using Persistence; using Persistence.UnitOfWork; using ServiceControl.Infrastructure; - using ServiceControl.Infrastructure.Metrics; using Transports; class AuditIngestion : IHostedService { - static readonly long FrequencyInMilliseconds = Stopwatch.Frequency / 1000; - public AuditIngestion( Settings settings, ITransportCustomization transportCustomization, TransportSettings transportSettings, - Metrics metrics, IFailedAuditStorage failedImportsStorage, AuditIngestionCustomCheck.State ingestionState, AuditIngestor auditIngestor, @@ -40,10 +37,6 @@ public AuditIngestion( this.settings = settings; this.applicationLifetime = applicationLifetime; - batchSizeMeter = metrics.GetMeter("Audit ingestion - batch size"); - batchDurationMeter = metrics.GetMeter("Audit ingestion - batch processing duration", FrequencyInMilliseconds); - receivedMeter = metrics.GetCounter("Audit ingestion - received"); - if (!transportSettings.MaxConcurrency.HasValue) { throw new ArgumentException("MaxConcurrency is not set in TransportSettings"); @@ -102,6 +95,7 @@ async Task EnsureStarted(CancellationToken cancellationToken = default) await stoppable.StopReceive(cancellationToken); logger.Info("Shutting down due to failed persistence health check. Infrastructure shut down completed"); } + return; } @@ -168,6 +162,7 @@ async Task EnsureStopped(CancellationToken cancellationToken = default) logger.Info("Shutting down. Already stopped, skipping shut down"); return; //Already stopped } + var stoppable = queueIngestor; queueIngestor = null; logger.Info("Shutting down. Infrastructure shut down commencing"); @@ -196,7 +191,7 @@ async Task OnMessage(MessageContext messageContext, CancellationToken cancellati var taskCompletionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); messageContext.SetTaskCompletionSource(taskCompletionSource); - receivedMeter.Mark(); + receivedMeter.Add(1); await channel.Writer.WriteAsync(messageContext, cancellationToken); await taskCompletionSource.Task; @@ -217,11 +212,11 @@ async Task Loop() contexts.Add(context); } - batchSizeMeter.Mark(contexts.Count); - using (batchDurationMeter.Measure()) - { - await auditIngestor.Ingest(contexts); - } + batchSizeMeter.Record(contexts.Count); + var sw = Stopwatch.StartNew(); + + await auditIngestor.Ingest(contexts); + batchDurationMeter.Record(sw.ElapsedMilliseconds); } catch (OperationCanceledException) { @@ -261,9 +256,9 @@ async Task Loop() readonly IAuditIngestionUnitOfWorkFactory unitOfWorkFactory; readonly Settings settings; readonly Channel channel; - readonly Meter batchSizeMeter; - readonly Meter batchDurationMeter; - readonly Counter receivedMeter; + readonly Histogram batchSizeMeter = AuditMetrics.Meter.CreateHistogram($"{AuditMetrics.Prefix}.batch_size"); + readonly Histogram batchDurationMeter = AuditMetrics.Meter.CreateHistogram($"{AuditMetrics.Prefix}.batch_duration_ms"); + readonly Counter receivedMeter = AuditMetrics.Meter.CreateCounter($"{AuditMetrics.Prefix}.received"); readonly Watchdog watchdog; readonly Task ingestionWorker; readonly IHostApplicationLifetime applicationLifetime; diff --git a/src/ServiceControl.Audit/Auditing/AuditIngestor.cs b/src/ServiceControl.Audit/Auditing/AuditIngestor.cs index 84d110a1f2..c2ec2623d1 100644 --- a/src/ServiceControl.Audit/Auditing/AuditIngestor.cs +++ b/src/ServiceControl.Audit/Auditing/AuditIngestor.cs @@ -14,13 +14,11 @@ using Persistence.UnitOfWork; using Recoverability; using SagaAudit; - using ServiceControl.Infrastructure.Metrics; using ServiceControl.Transports; public class AuditIngestor { public AuditIngestor( - Metrics metrics, Settings settings, IAuditIngestionUnitOfWorkFactory unitOfWorkFactory, EndpointInstanceMonitoring endpointInstanceMonitoring, @@ -32,26 +30,11 @@ ITransportCustomization transportCustomization { this.settings = settings; this.messageDispatcher = messageDispatcher; - - var ingestedAuditMeter = metrics.GetCounter("Audit ingestion - ingested audit"); - var ingestedSagaAuditMeter = metrics.GetCounter("Audit ingestion - ingested saga audit"); - var auditBulkInsertDurationMeter = metrics.GetMeter("Audit ingestion - audit bulk insert duration", FrequencyInMilliseconds); - var sagaAuditBulkInsertDurationMeter = metrics.GetMeter("Audit ingestion - saga audit bulk insert duration", FrequencyInMilliseconds); - var bulkInsertCommitDurationMeter = metrics.GetMeter("Audit ingestion - bulk insert commit duration", FrequencyInMilliseconds); - - var enrichers = new IEnrichImportedAuditMessages[] - { - new MessageTypeEnricher(), - new EnrichWithTrackingIds(), - new ProcessingStatisticsEnricher(), - new DetectNewEndpointsFromAuditImportsEnricher(endpointInstanceMonitoring), - new DetectSuccessfulRetriesEnricher(), - new SagaRelationshipsEnricher() - }.Concat(auditEnrichers).ToArray(); + var enrichers = new IEnrichImportedAuditMessages[] { new MessageTypeEnricher(), new EnrichWithTrackingIds(), new ProcessingStatisticsEnricher(), new DetectNewEndpointsFromAuditImportsEnricher(endpointInstanceMonitoring), new DetectSuccessfulRetriesEnricher(), new SagaRelationshipsEnricher() }.Concat(auditEnrichers).ToArray(); logQueueAddress = transportCustomization.ToTransportQualifiedQueueName(settings.AuditLogQueue); - auditPersister = new AuditPersister(unitOfWorkFactory, enrichers, ingestedAuditMeter, ingestedSagaAuditMeter, auditBulkInsertDurationMeter, sagaAuditBulkInsertDurationMeter, bulkInsertCommitDurationMeter, messageSession, messageDispatcher); + auditPersister = new AuditPersister(unitOfWorkFactory, enrichers, messageSession, messageDispatcher); } public async Task Ingest(List contexts) @@ -71,6 +54,7 @@ public async Task Ingest(List contexts) { Log.Debug($"Forwarding {stored.Count} messages"); } + await Forward(stored, logQueueAddress); if (Log.IsDebugEnabled) { @@ -159,7 +143,6 @@ public async Task VerifyCanReachForwardingAddress() readonly Lazy messageDispatcher; readonly string logQueueAddress; - static readonly long FrequencyInMilliseconds = Stopwatch.Frequency / 1000; static readonly ILog Log = LogManager.GetLogger(); } } \ No newline at end of file diff --git a/src/ServiceControl.Audit/Auditing/AuditMetrics.cs b/src/ServiceControl.Audit/Auditing/AuditMetrics.cs new file mode 100644 index 0000000000..3e02448fa1 --- /dev/null +++ b/src/ServiceControl.Audit/Auditing/AuditMetrics.cs @@ -0,0 +1,9 @@ +namespace ServiceControl.Audit.Auditing; + +using System.Diagnostics.Metrics; + +static class AuditMetrics +{ + public static readonly Meter Meter = new("ServiceControl", "0.1.0"); + public static readonly string Prefix = "particular.servicecontrol.audit"; +} \ No newline at end of file diff --git a/src/ServiceControl.Audit/Auditing/AuditPersister.cs b/src/ServiceControl.Audit/Auditing/AuditPersister.cs index 0a5d0d9938..54e7f0c2da 100644 --- a/src/ServiceControl.Audit/Auditing/AuditPersister.cs +++ b/src/ServiceControl.Audit/Auditing/AuditPersister.cs @@ -3,6 +3,7 @@ using System; using System.Collections.Generic; using System.Diagnostics; + using System.Diagnostics.Metrics; using System.Text.Json; using System.Threading.Tasks; using Infrastructure; @@ -15,29 +16,13 @@ using ServiceControl.Audit.Persistence.Monitoring; using ServiceControl.EndpointPlugin.Messages.SagaState; using ServiceControl.Infrastructure; - using ServiceControl.Infrastructure.Metrics; using ServiceControl.SagaAudit; - class AuditPersister + class AuditPersister(IAuditIngestionUnitOfWorkFactory unitOfWorkFactory, + IEnrichImportedAuditMessages[] enrichers, + IMessageSession messageSession, + Lazy messageDispatcher) { - public AuditPersister(IAuditIngestionUnitOfWorkFactory unitOfWorkFactory, - IEnrichImportedAuditMessages[] enrichers, - Counter ingestedAuditMeter, Counter ingestedSagaAuditMeter, Meter auditBulkInsertDurationMeter, - Meter sagaAuditBulkInsertDurationMeter, Meter bulkInsertCommitDurationMeter, IMessageSession messageSession, - Lazy messageDispatcher) - { - this.unitOfWorkFactory = unitOfWorkFactory; - this.enrichers = enrichers; - - this.ingestedAuditMeter = ingestedAuditMeter; - this.ingestedSagaAuditMeter = ingestedSagaAuditMeter; - this.auditBulkInsertDurationMeter = auditBulkInsertDurationMeter; - this.sagaAuditBulkInsertDurationMeter = sagaAuditBulkInsertDurationMeter; - this.bulkInsertCommitDurationMeter = bulkInsertCommitDurationMeter; - this.messageSession = messageSession; - this.messageDispatcher = messageDispatcher; - } - public async Task> Persist(IReadOnlyList contexts) { var stopwatch = Stopwatch.StartNew(); @@ -51,7 +36,6 @@ public async Task> Persist(IReadOnlyList(contexts.Count); @@ -89,12 +73,13 @@ public async Task> Persist(IReadOnlyList> Persist(IReadOnlyList> Persist(IReadOnlyList messageDispatcher; + readonly Counter ingestedAuditMeter = AuditMetrics.Meter.CreateCounter($"{AuditMetrics.Prefix}.ingested_audit_messages"); // metrics.GetCounter("Audit ingestion - ingested audit"); + readonly Counter ingestedSagaAuditMeter = AuditMetrics.Meter.CreateCounter($"{AuditMetrics.Prefix}.ingested_saga_audits"); // metrics.GetCounter("Audit ingestion - ingested audit"); + readonly Histogram auditBulkInsertDurationMeter = AuditMetrics.Meter.CreateHistogram($"{AuditMetrics.Prefix}.audit_bulk_insert_duration_ms"); // metrics.GetCounter("Audit ingestion - ingested audit"); + readonly Histogram sagaAuditBulkInsertDurationMeter = AuditMetrics.Meter.CreateHistogram($"{AuditMetrics.Prefix}.saga_bulk_insert_duration_ms"); // metrics.GetCounter("Audit ingestion - ingested audit"); + readonly Histogram bulkInsertCommitDurationMeter = AuditMetrics.Meter.CreateHistogram($"{AuditMetrics.Prefix}.audit_commit_duration_ms"); // metrics.GetCounter("Audit ingestion - ingested audit"); - readonly IEnrichImportedAuditMessages[] enrichers; - readonly IAuditIngestionUnitOfWorkFactory unitOfWorkFactory; static readonly ILog Logger = LogManager.GetLogger(); } } \ No newline at end of file diff --git a/src/ServiceControl.Audit/HostApplicationBuilderExtensions.cs b/src/ServiceControl.Audit/HostApplicationBuilderExtensions.cs index 8968565a50..1840ba6908 100644 --- a/src/ServiceControl.Audit/HostApplicationBuilderExtensions.cs +++ b/src/ServiceControl.Audit/HostApplicationBuilderExtensions.cs @@ -5,8 +5,8 @@ namespace ServiceControl.Audit; using System.Threading; using System.Threading.Tasks; using Auditing; +using Azure.Monitor.OpenTelemetry.Exporter; using Infrastructure; -using Infrastructure.Metrics; using Infrastructure.Settings; using Microsoft.AspNetCore.HttpLogging; using Microsoft.Extensions.DependencyInjection; @@ -20,6 +20,8 @@ namespace ServiceControl.Audit; using NServiceBus.Transport; using Persistence; using Transports; +using OpenTelemetry.Metrics; +using OpenTelemetry.Resources; static class HostApplicationBuilderExtensions { @@ -61,13 +63,38 @@ public static void AddServiceControlAudit(this IHostApplicationBuilder builder, // directly and to make things more complex of course the order of registration still matters ;) services.AddSingleton(provider => new Lazy(provider.GetRequiredService)); - services.AddMetrics(settings.PrintMetrics); - services.AddPersistence(persistenceSettings, persistenceConfiguration); NServiceBusFactory.Configure(settings, transportCustomization, transportSettings, onCriticalError, configuration); builder.UseNServiceBus(configuration); + if (!string.IsNullOrEmpty(settings.OtelMetricsUrl)) + { + builder.Services.AddOpenTelemetry() + .ConfigureResource(b => b.AddService(serviceName: settings.InstanceName)) + .WithMetrics(b => + { + b.AddMeter("ServiceControl"); + + if (Uri.TryCreate(settings.OtelMetricsUrl, UriKind.Absolute, out var uri)) + { + b.AddOtlpExporter(e => + { + e.Endpoint = uri; + }); + } + else + { + b.AddAzureMonitorMetricExporter(o => + { + o.ConnectionString = settings.OtelMetricsUrl; + }); + } + + b.AddConsoleExporter(); + }); + } + // Configure after the NServiceBus hosted service to ensure NServiceBus is already started if (settings.IngestAuditMessages) { @@ -101,9 +128,6 @@ static void RecordStartup(Settings settings, EndpointConfiguration endpointConfi var logger = LogManager.GetLogger(typeof(HostApplicationBuilderExtensions)); logger.Info(startupMessage); - endpointConfiguration.GetSettings().AddStartupDiagnosticsSection("Startup", new - { - Settings = settings - }); + endpointConfiguration.GetSettings().AddStartupDiagnosticsSection("Startup", new { Settings = settings }); } } \ No newline at end of file diff --git a/src/ServiceControl.Audit/Infrastructure/Metrics/MetricsReporterHostedService.cs b/src/ServiceControl.Audit/Infrastructure/Metrics/MetricsReporterHostedService.cs deleted file mode 100644 index c378a8f4d6..0000000000 --- a/src/ServiceControl.Audit/Infrastructure/Metrics/MetricsReporterHostedService.cs +++ /dev/null @@ -1,30 +0,0 @@ -namespace ServiceControl.Audit.Infrastructure.Metrics -{ - using System; - using System.Threading; - using System.Threading.Tasks; - using Microsoft.Extensions.Hosting; - using NServiceBus.Logging; - using ServiceControl.Infrastructure.Metrics; - - class MetricsReporterHostedService : IHostedService - { - readonly Metrics metrics; - MetricsReporter reporter; - - public MetricsReporterHostedService(Metrics metrics) => this.metrics = metrics; - - public Task StartAsync(CancellationToken cancellationToken) - { - var metricsLog = LogManager.GetLogger("Metrics"); - - reporter = new MetricsReporter(metrics, x => metricsLog.Info(x), TimeSpan.FromSeconds(5)); - - reporter.Start(); - - return Task.CompletedTask; - } - - public Task StopAsync(CancellationToken cancellationToken) => reporter.Stop(); - } -} \ No newline at end of file diff --git a/src/ServiceControl.Audit/Infrastructure/Metrics/MetricsServiceCollectionExtensions.cs b/src/ServiceControl.Audit/Infrastructure/Metrics/MetricsServiceCollectionExtensions.cs deleted file mode 100644 index a7bc6b1dd5..0000000000 --- a/src/ServiceControl.Audit/Infrastructure/Metrics/MetricsServiceCollectionExtensions.cs +++ /dev/null @@ -1,14 +0,0 @@ -namespace ServiceControl.Audit.Infrastructure.Metrics -{ - using Microsoft.Extensions.DependencyInjection; - using ServiceControl.Infrastructure.Metrics; - - static class MetricsServiceCollectionExtensions - { - public static void AddMetrics(this IServiceCollection services, bool printMetrics) - { - services.AddSingleton(new Metrics { Enabled = printMetrics }); - services.AddHostedService(); - } - } -} \ No newline at end of file diff --git a/src/ServiceControl.Audit/Infrastructure/Settings/Settings.cs b/src/ServiceControl.Audit/Infrastructure/Settings/Settings.cs index 183b695555..b77fcc295d 100644 --- a/src/ServiceControl.Audit/Infrastructure/Settings/Settings.cs +++ b/src/ServiceControl.Audit/Infrastructure/Settings/Settings.cs @@ -43,7 +43,9 @@ public Settings(string transportType = null, string persisterType = null, Loggin { Hostname = SettingsReader.Read(SettingsRootNamespace, "Hostname", "localhost"); Port = SettingsReader.Read(SettingsRootNamespace, "Port", 44444); - }; + } + + ; MaximumConcurrencyLevel = SettingsReader.Read(SettingsRootNamespace, "MaximumConcurrencyLevel"); ServiceControlQueueAddress = SettingsReader.Read(SettingsRootNamespace, "ServiceControlQueueAddress"); @@ -79,8 +81,7 @@ void LoadAuditQueueInformation() } } - [JsonIgnore] - public Func AssemblyLoadContextResolver { get; set; } + [JsonIgnore] public Func AssemblyLoadContextResolver { get; set; } public LoggingSettings LoggingSettings { get; } @@ -109,6 +110,7 @@ public string RootUrl public int Port { get; set; } public bool PrintMetrics => SettingsReader.Read(SettingsRootNamespace, "PrintMetrics"); + public string OtelMetricsUrl { get; set; } = SettingsReader.Read(SettingsRootNamespace, nameof(OtelMetricsUrl)); public string Hostname { get; private set; } public string VirtualDirectory => SettingsReader.Read(SettingsRootNamespace, "VirtualDirectory", string.Empty); diff --git a/src/ServiceControl.Audit/ServiceControl.Audit.csproj b/src/ServiceControl.Audit/ServiceControl.Audit.csproj index 3303782e3f..c1c7ee7075 100644 --- a/src/ServiceControl.Audit/ServiceControl.Audit.csproj +++ b/src/ServiceControl.Audit/ServiceControl.Audit.csproj @@ -18,17 +18,20 @@ - + + + + From 2d0880b3111f13fca91e4be46d627c5bb753a16a Mon Sep 17 00:00:00 2001 From: Ramon Smits Date: Fri, 31 Jan 2025 14:50:56 +0100 Subject: [PATCH 02/23] Minimize diff by reverting whitespace changes --- .../ServiceControlComponentRunner.cs | 6 ++---- .../Auditing/AuditIngestion.cs | 2 -- .../Auditing/AuditIngestor.cs | 18 +++++++++++++++--- .../HostApplicationBuilderExtensions.cs | 5 ++++- .../Infrastructure/Settings/Settings.cs | 7 +++---- 5 files changed, 24 insertions(+), 14 deletions(-) diff --git a/src/ServiceControl.Audit.AcceptanceTests/TestSupport/ServiceControlComponentRunner.cs b/src/ServiceControl.Audit.AcceptanceTests/TestSupport/ServiceControlComponentRunner.cs index 12c10ddab0..ba6f54ad66 100644 --- a/src/ServiceControl.Audit.AcceptanceTests/TestSupport/ServiceControlComponentRunner.cs +++ b/src/ServiceControl.Audit.AcceptanceTests/TestSupport/ServiceControlComponentRunner.cs @@ -57,13 +57,11 @@ async Task InitializeServiceControl(ScenarioContext context) var id = messageContext.NativeMessageId; var headers = messageContext.Headers; var log = NServiceBus.Logging.LogManager.GetLogger(); - headers.TryGetValue(Headers.MessageId, - out var originalMessageId); + headers.TryGetValue(Headers.MessageId, out var originalMessageId); log.Debug($"OnMessage for message '{id}'({originalMessageId ?? string.Empty})."); //Do not filter out CC, SA and HB messages as they can't be stamped - if (headers.TryGetValue(Headers.EnclosedMessageTypes, - out var messageTypes) + if (headers.TryGetValue(Headers.EnclosedMessageTypes, out var messageTypes) && (messageTypes.StartsWith("ServiceControl.Contracts") || messageTypes.StartsWith("ServiceControl.EndpointPlugin"))) { return false; diff --git a/src/ServiceControl.Audit/Auditing/AuditIngestion.cs b/src/ServiceControl.Audit/Auditing/AuditIngestion.cs index 676930e539..3c5b47a288 100644 --- a/src/ServiceControl.Audit/Auditing/AuditIngestion.cs +++ b/src/ServiceControl.Audit/Auditing/AuditIngestion.cs @@ -95,7 +95,6 @@ async Task EnsureStarted(CancellationToken cancellationToken = default) await stoppable.StopReceive(cancellationToken); logger.Info("Shutting down due to failed persistence health check. Infrastructure shut down completed"); } - return; } @@ -162,7 +161,6 @@ async Task EnsureStopped(CancellationToken cancellationToken = default) logger.Info("Shutting down. Already stopped, skipping shut down"); return; //Already stopped } - var stoppable = queueIngestor; queueIngestor = null; logger.Info("Shutting down. Infrastructure shut down commencing"); diff --git a/src/ServiceControl.Audit/Auditing/AuditIngestor.cs b/src/ServiceControl.Audit/Auditing/AuditIngestor.cs index c2ec2623d1..487b7e0e1a 100644 --- a/src/ServiceControl.Audit/Auditing/AuditIngestor.cs +++ b/src/ServiceControl.Audit/Auditing/AuditIngestor.cs @@ -30,11 +30,24 @@ ITransportCustomization transportCustomization { this.settings = settings; this.messageDispatcher = messageDispatcher; - var enrichers = new IEnrichImportedAuditMessages[] { new MessageTypeEnricher(), new EnrichWithTrackingIds(), new ProcessingStatisticsEnricher(), new DetectNewEndpointsFromAuditImportsEnricher(endpointInstanceMonitoring), new DetectSuccessfulRetriesEnricher(), new SagaRelationshipsEnricher() }.Concat(auditEnrichers).ToArray(); + var enrichers = new IEnrichImportedAuditMessages[] + { + new MessageTypeEnricher(), + new EnrichWithTrackingIds(), + new ProcessingStatisticsEnricher(), + new DetectNewEndpointsFromAuditImportsEnricher(endpointInstanceMonitoring), + new DetectSuccessfulRetriesEnricher(), + new SagaRelationshipsEnricher() + }.Concat(auditEnrichers).ToArray(); logQueueAddress = transportCustomization.ToTransportQualifiedQueueName(settings.AuditLogQueue); - auditPersister = new AuditPersister(unitOfWorkFactory, enrichers, messageSession, messageDispatcher); + auditPersister = new AuditPersister( + unitOfWorkFactory, + enrichers, + messageSession, + messageDispatcher + ); } public async Task Ingest(List contexts) @@ -54,7 +67,6 @@ public async Task Ingest(List contexts) { Log.Debug($"Forwarding {stored.Count} messages"); } - await Forward(stored, logQueueAddress); if (Log.IsDebugEnabled) { diff --git a/src/ServiceControl.Audit/HostApplicationBuilderExtensions.cs b/src/ServiceControl.Audit/HostApplicationBuilderExtensions.cs index 1840ba6908..f90fe923f9 100644 --- a/src/ServiceControl.Audit/HostApplicationBuilderExtensions.cs +++ b/src/ServiceControl.Audit/HostApplicationBuilderExtensions.cs @@ -128,6 +128,9 @@ static void RecordStartup(Settings settings, EndpointConfiguration endpointConfi var logger = LogManager.GetLogger(typeof(HostApplicationBuilderExtensions)); logger.Info(startupMessage); - endpointConfiguration.GetSettings().AddStartupDiagnosticsSection("Startup", new { Settings = settings }); + endpointConfiguration.GetSettings().AddStartupDiagnosticsSection("Startup", new + { + Settings = settings + }); } } \ No newline at end of file diff --git a/src/ServiceControl.Audit/Infrastructure/Settings/Settings.cs b/src/ServiceControl.Audit/Infrastructure/Settings/Settings.cs index b77fcc295d..d6a84f0662 100644 --- a/src/ServiceControl.Audit/Infrastructure/Settings/Settings.cs +++ b/src/ServiceControl.Audit/Infrastructure/Settings/Settings.cs @@ -43,9 +43,7 @@ public Settings(string transportType = null, string persisterType = null, Loggin { Hostname = SettingsReader.Read(SettingsRootNamespace, "Hostname", "localhost"); Port = SettingsReader.Read(SettingsRootNamespace, "Port", 44444); - } - - ; + }; MaximumConcurrencyLevel = SettingsReader.Read(SettingsRootNamespace, "MaximumConcurrencyLevel"); ServiceControlQueueAddress = SettingsReader.Read(SettingsRootNamespace, "ServiceControlQueueAddress"); @@ -81,7 +79,8 @@ void LoadAuditQueueInformation() } } - [JsonIgnore] public Func AssemblyLoadContextResolver { get; set; } + [JsonIgnore] + public Func AssemblyLoadContextResolver { get; set; } public LoggingSettings LoggingSettings { get; } From 9652aa03c587f6b288b53ef4e10b90f4b19fe7b5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20=C3=96hlund?= Date: Fri, 31 Jan 2025 14:52:20 +0100 Subject: [PATCH 03/23] Approvals --- .../APIApprovals.PlatformSampleSettings.approved.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/src/ServiceControl.Audit.UnitTests/ApprovalFiles/APIApprovals.PlatformSampleSettings.approved.txt b/src/ServiceControl.Audit.UnitTests/ApprovalFiles/APIApprovals.PlatformSampleSettings.approved.txt index c1dbbb4ec1..9862349774 100644 --- a/src/ServiceControl.Audit.UnitTests/ApprovalFiles/APIApprovals.PlatformSampleSettings.approved.txt +++ b/src/ServiceControl.Audit.UnitTests/ApprovalFiles/APIApprovals.PlatformSampleSettings.approved.txt @@ -12,6 +12,7 @@ "ApiUrl": "http://localhost:8888/api", "Port": 8888, "PrintMetrics": false, + "OtelMetricsUrl": null, "Hostname": "localhost", "VirtualDirectory": "", "TransportType": "LearningTransport", From 77b2c2c08d220db1ddeb1246608f15c1371d80bd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20=C3=96hlund?= Date: Fri, 31 Jan 2025 15:05:51 +0100 Subject: [PATCH 04/23] Fix formatting --- src/ServiceControl.Audit/Auditing/AuditIngestor.cs | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/src/ServiceControl.Audit/Auditing/AuditIngestor.cs b/src/ServiceControl.Audit/Auditing/AuditIngestor.cs index 487b7e0e1a..fb6f2a7240 100644 --- a/src/ServiceControl.Audit/Auditing/AuditIngestor.cs +++ b/src/ServiceControl.Audit/Auditing/AuditIngestor.cs @@ -2,7 +2,6 @@ { using System; using System.Collections.Generic; - using System.Diagnostics; using System.Linq; using System.Threading.Tasks; using Infrastructure.Settings; @@ -30,15 +29,7 @@ ITransportCustomization transportCustomization { this.settings = settings; this.messageDispatcher = messageDispatcher; - var enrichers = new IEnrichImportedAuditMessages[] - { - new MessageTypeEnricher(), - new EnrichWithTrackingIds(), - new ProcessingStatisticsEnricher(), - new DetectNewEndpointsFromAuditImportsEnricher(endpointInstanceMonitoring), - new DetectSuccessfulRetriesEnricher(), - new SagaRelationshipsEnricher() - }.Concat(auditEnrichers).ToArray(); + var enrichers = new IEnrichImportedAuditMessages[] { new MessageTypeEnricher(), new EnrichWithTrackingIds(), new ProcessingStatisticsEnricher(), new DetectNewEndpointsFromAuditImportsEnricher(endpointInstanceMonitoring), new DetectSuccessfulRetriesEnricher(), new SagaRelationshipsEnricher() }.Concat(auditEnrichers).ToArray(); logQueueAddress = transportCustomization.ToTransportQualifiedQueueName(settings.AuditLogQueue); @@ -67,6 +58,7 @@ public async Task Ingest(List contexts) { Log.Debug($"Forwarding {stored.Count} messages"); } + await Forward(stored, logQueueAddress); if (Log.IsDebugEnabled) { From 03690a08672dadd7e70ffc8a439edd0bb6e3b4e0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20=C3=96hlund?= Date: Fri, 31 Jan 2025 15:07:09 +0100 Subject: [PATCH 05/23] Apply suggestions from code review Co-authored-by: Ramon Smits --- src/ServiceControl.Audit/Auditing/AuditPersister.cs | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/src/ServiceControl.Audit/Auditing/AuditPersister.cs b/src/ServiceControl.Audit/Auditing/AuditPersister.cs index 54e7f0c2da..adae02c6b0 100644 --- a/src/ServiceControl.Audit/Auditing/AuditPersister.cs +++ b/src/ServiceControl.Audit/Auditing/AuditPersister.cs @@ -75,8 +75,6 @@ public async Task> Persist(IReadOnlyList> Persist(IReadOnlyList> Persist(IReadOnlyList Date: Fri, 31 Jan 2025 15:21:50 +0100 Subject: [PATCH 06/23] fix formatting --- src/ServiceControl.Audit/Auditing/AuditPersister.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ServiceControl.Audit/Auditing/AuditPersister.cs b/src/ServiceControl.Audit/Auditing/AuditPersister.cs index adae02c6b0..16eae2f401 100644 --- a/src/ServiceControl.Audit/Auditing/AuditPersister.cs +++ b/src/ServiceControl.Audit/Auditing/AuditPersister.cs @@ -130,7 +130,7 @@ public async Task> Persist(IReadOnlyList Date: Sun, 2 Feb 2025 17:50:13 +0100 Subject: [PATCH 07/23] Only use standar otel --- src/Directory.Packages.props | 3 +++ .../HostApplicationBuilderExtensions.cs | 22 ++++--------------- .../ServiceControl.Audit.csproj | 1 - 3 files changed, 7 insertions(+), 19 deletions(-) diff --git a/src/Directory.Packages.props b/src/Directory.Packages.props index 37eaf6fc3b..c1132dbdf8 100644 --- a/src/Directory.Packages.props +++ b/src/Directory.Packages.props @@ -50,6 +50,9 @@ + + + diff --git a/src/ServiceControl.Audit/HostApplicationBuilderExtensions.cs b/src/ServiceControl.Audit/HostApplicationBuilderExtensions.cs index f90fe923f9..d981b55d3b 100644 --- a/src/ServiceControl.Audit/HostApplicationBuilderExtensions.cs +++ b/src/ServiceControl.Audit/HostApplicationBuilderExtensions.cs @@ -5,7 +5,6 @@ namespace ServiceControl.Audit; using System.Threading; using System.Threading.Tasks; using Auditing; -using Azure.Monitor.OpenTelemetry.Exporter; using Infrastructure; using Infrastructure.Settings; using Microsoft.AspNetCore.HttpLogging; @@ -76,20 +75,10 @@ public static void AddServiceControlAudit(this IHostApplicationBuilder builder, { b.AddMeter("ServiceControl"); - if (Uri.TryCreate(settings.OtelMetricsUrl, UriKind.Absolute, out var uri)) + b.AddOtlpExporter(e => { - b.AddOtlpExporter(e => - { - e.Endpoint = uri; - }); - } - else - { - b.AddAzureMonitorMetricExporter(o => - { - o.ConnectionString = settings.OtelMetricsUrl; - }); - } + e.Endpoint = new Uri(settings.OtelMetricsUrl); + }); b.AddConsoleExporter(); }); @@ -128,9 +117,6 @@ static void RecordStartup(Settings settings, EndpointConfiguration endpointConfi var logger = LogManager.GetLogger(typeof(HostApplicationBuilderExtensions)); logger.Info(startupMessage); - endpointConfiguration.GetSettings().AddStartupDiagnosticsSection("Startup", new - { - Settings = settings - }); + endpointConfiguration.GetSettings().AddStartupDiagnosticsSection("Startup", new { Settings = settings }); } } \ No newline at end of file diff --git a/src/ServiceControl.Audit/ServiceControl.Audit.csproj b/src/ServiceControl.Audit/ServiceControl.Audit.csproj index c1c7ee7075..084b5a48ce 100644 --- a/src/ServiceControl.Audit/ServiceControl.Audit.csproj +++ b/src/ServiceControl.Audit/ServiceControl.Audit.csproj @@ -23,7 +23,6 @@ - From 7616151d636aa3cacf889ab6fda0049aa1c3c6f9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20=C3=96hlund?= Date: Sun, 2 Feb 2025 20:05:41 +0100 Subject: [PATCH 08/23] Set instance id --- src/Directory.Packages.props | 1 - .../HostApplicationBuilderExtensions.cs | 21 +++++++++++-------- .../ServiceControl.Audit.csproj | 1 - 3 files changed, 12 insertions(+), 11 deletions(-) diff --git a/src/Directory.Packages.props b/src/Directory.Packages.props index c1132dbdf8..044ee33716 100644 --- a/src/Directory.Packages.props +++ b/src/Directory.Packages.props @@ -50,7 +50,6 @@ - diff --git a/src/ServiceControl.Audit/HostApplicationBuilderExtensions.cs b/src/ServiceControl.Audit/HostApplicationBuilderExtensions.cs index d981b55d3b..d1f08569dc 100644 --- a/src/ServiceControl.Audit/HostApplicationBuilderExtensions.cs +++ b/src/ServiceControl.Audit/HostApplicationBuilderExtensions.cs @@ -29,10 +29,11 @@ public static void AddServiceControlAudit(this IHostApplicationBuilder builder, Settings settings, EndpointConfiguration configuration) { + var version = FileVersionInfo.GetVersionInfo(typeof(HostApplicationBuilderExtensions).Assembly.Location).ProductVersion; var persistenceConfiguration = PersistenceConfigurationFactory.LoadPersistenceConfiguration(settings); var persistenceSettings = persistenceConfiguration.BuildPersistenceSettings(settings); - RecordStartup(settings, configuration, persistenceConfiguration); + RecordStartup(version, settings, configuration, persistenceConfiguration); builder.Logging.ClearProviders(); builder.Logging.AddNLog(); @@ -69,18 +70,22 @@ public static void AddServiceControlAudit(this IHostApplicationBuilder builder, if (!string.IsNullOrEmpty(settings.OtelMetricsUrl)) { + if (!Uri.TryCreate(settings.OtelMetricsUrl, UriKind.Absolute, out var otelMetricsUri)) + { + throw new UriFormatException($"Invalid OtelMetricsUrl: {settings.OtelMetricsUrl}"); + } builder.Services.AddOpenTelemetry() - .ConfigureResource(b => b.AddService(serviceName: settings.InstanceName)) + .ConfigureResource(b => b.AddService( + serviceName: "Particular.ServiceControl.Audit", + serviceVersion: version, + serviceInstanceId: settings.InstanceName)) .WithMetrics(b => { b.AddMeter("ServiceControl"); - b.AddOtlpExporter(e => { - e.Endpoint = new Uri(settings.OtelMetricsUrl); + e.Endpoint = otelMetricsUri; }); - - b.AddConsoleExporter(); }); } @@ -100,10 +105,8 @@ public static void AddServiceControlAuditInstallers(this IHostApplicationBuilder builder.Services.AddInstaller(persistenceSettings, persistenceConfiguration); } - static void RecordStartup(Settings settings, EndpointConfiguration endpointConfiguration, IPersistenceConfiguration persistenceConfiguration) + static void RecordStartup(string version, Settings settings, EndpointConfiguration endpointConfiguration, IPersistenceConfiguration persistenceConfiguration) { - var version = FileVersionInfo.GetVersionInfo(typeof(HostApplicationBuilderExtensions).Assembly.Location).ProductVersion; - var startupMessage = $@" ------------------------------------------------------------- ServiceControl Audit Version: {version} diff --git a/src/ServiceControl.Audit/ServiceControl.Audit.csproj b/src/ServiceControl.Audit/ServiceControl.Audit.csproj index 084b5a48ce..8f41ba97b1 100644 --- a/src/ServiceControl.Audit/ServiceControl.Audit.csproj +++ b/src/ServiceControl.Audit/ServiceControl.Audit.csproj @@ -28,7 +28,6 @@ - From e6d1f40219228c155d6c7438befcfc3e1158e5b6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20=C3=96hlund?= Date: Mon, 3 Feb 2025 08:29:51 +0100 Subject: [PATCH 09/23] Set unit --- .../TestSupport/ServiceControlComponentRunner.cs | 1 - src/ServiceControl.Audit/Auditing/AuditIngestion.cs | 4 +++- src/ServiceControl.Audit/Auditing/AuditPersister.cs | 10 +++++----- .../{Auditing => Infrastructure}/AuditMetrics.cs | 4 ++-- 4 files changed, 10 insertions(+), 9 deletions(-) rename src/ServiceControl.Audit/{Auditing => Infrastructure}/AuditMetrics.cs (55%) diff --git a/src/ServiceControl.Audit.AcceptanceTests/TestSupport/ServiceControlComponentRunner.cs b/src/ServiceControl.Audit.AcceptanceTests/TestSupport/ServiceControlComponentRunner.cs index ba6f54ad66..0866b504db 100644 --- a/src/ServiceControl.Audit.AcceptanceTests/TestSupport/ServiceControlComponentRunner.cs +++ b/src/ServiceControl.Audit.AcceptanceTests/TestSupport/ServiceControlComponentRunner.cs @@ -51,7 +51,6 @@ async Task InitializeServiceControl(ScenarioContext context) TransportConnectionString = transportToUse.ConnectionString, MaximumConcurrencyLevel = 2, ServiceControlQueueAddress = "SHOULDNOTBEUSED", - OtelMetricsUrl = "http://localhost:4317", MessageFilter = messageContext => { var id = messageContext.NativeMessageId; diff --git a/src/ServiceControl.Audit/Auditing/AuditIngestion.cs b/src/ServiceControl.Audit/Auditing/AuditIngestion.cs index 3c5b47a288..93f93ca665 100644 --- a/src/ServiceControl.Audit/Auditing/AuditIngestion.cs +++ b/src/ServiceControl.Audit/Auditing/AuditIngestion.cs @@ -95,6 +95,7 @@ async Task EnsureStarted(CancellationToken cancellationToken = default) await stoppable.StopReceive(cancellationToken); logger.Info("Shutting down due to failed persistence health check. Infrastructure shut down completed"); } + return; } @@ -161,6 +162,7 @@ async Task EnsureStopped(CancellationToken cancellationToken = default) logger.Info("Shutting down. Already stopped, skipping shut down"); return; //Already stopped } + var stoppable = queueIngestor; queueIngestor = null; logger.Info("Shutting down. Infrastructure shut down commencing"); @@ -255,7 +257,7 @@ async Task Loop() readonly Settings settings; readonly Channel channel; readonly Histogram batchSizeMeter = AuditMetrics.Meter.CreateHistogram($"{AuditMetrics.Prefix}.batch_size"); - readonly Histogram batchDurationMeter = AuditMetrics.Meter.CreateHistogram($"{AuditMetrics.Prefix}.batch_duration_ms"); + readonly Histogram batchDurationMeter = AuditMetrics.Meter.CreateHistogram($"{AuditMetrics.Prefix}.batch_duration", unit: "ms"); readonly Counter receivedMeter = AuditMetrics.Meter.CreateCounter($"{AuditMetrics.Prefix}.received"); readonly Watchdog watchdog; readonly Task ingestionWorker; diff --git a/src/ServiceControl.Audit/Auditing/AuditPersister.cs b/src/ServiceControl.Audit/Auditing/AuditPersister.cs index 16eae2f401..1afceeab00 100644 --- a/src/ServiceControl.Audit/Auditing/AuditPersister.cs +++ b/src/ServiceControl.Audit/Auditing/AuditPersister.cs @@ -283,11 +283,11 @@ await messageDispatcher.Value.Dispatch(new TransportOperations(messagesToEmit.To } } - readonly Counter ingestedAuditMeter = AuditMetrics.Meter.CreateCounter($"{AuditMetrics.Prefix}.ingested_audit_messages"); // metrics.GetCounter("Audit ingestion - ingested audit"); - readonly Counter ingestedSagaAuditMeter = AuditMetrics.Meter.CreateCounter($"{AuditMetrics.Prefix}.ingested_saga_audits"); // metrics.GetCounter("Audit ingestion - ingested audit"); - readonly Histogram auditBulkInsertDurationMeter = AuditMetrics.Meter.CreateHistogram($"{AuditMetrics.Prefix}.audit_bulk_insert_duration_ms"); // metrics.GetCounter("Audit ingestion - ingested audit"); - readonly Histogram sagaAuditBulkInsertDurationMeter = AuditMetrics.Meter.CreateHistogram($"{AuditMetrics.Prefix}.saga_bulk_insert_duration_ms"); // metrics.GetCounter("Audit ingestion - ingested audit"); - readonly Histogram bulkInsertCommitDurationMeter = AuditMetrics.Meter.CreateHistogram($"{AuditMetrics.Prefix}.audit_commit_duration_ms"); // metrics.GetCounter("Audit ingestion - ingested audit"); + readonly Counter ingestedAuditMeter = AuditMetrics.Meter.CreateCounter($"{AuditMetrics.Prefix}.stored_audit_messages"); + readonly Counter ingestedSagaAuditMeter = AuditMetrics.Meter.CreateCounter($"{AuditMetrics.Prefix}.stored_saga_audits"); + readonly Histogram auditBulkInsertDurationMeter = AuditMetrics.Meter.CreateHistogram($"{AuditMetrics.Prefix}.bulk_insert_duration_audit", unit: "ms"); + readonly Histogram sagaAuditBulkInsertDurationMeter = AuditMetrics.Meter.CreateHistogram($"{AuditMetrics.Prefix}.bulk_insert_duration_sagas", unit: "ms"); + readonly Histogram bulkInsertCommitDurationMeter = AuditMetrics.Meter.CreateHistogram($"{AuditMetrics.Prefix}.commit_duration_audit", unit: "ms"); static readonly ILog Logger = LogManager.GetLogger(); } diff --git a/src/ServiceControl.Audit/Auditing/AuditMetrics.cs b/src/ServiceControl.Audit/Infrastructure/AuditMetrics.cs similarity index 55% rename from src/ServiceControl.Audit/Auditing/AuditMetrics.cs rename to src/ServiceControl.Audit/Infrastructure/AuditMetrics.cs index 3e02448fa1..25a9fa658f 100644 --- a/src/ServiceControl.Audit/Auditing/AuditMetrics.cs +++ b/src/ServiceControl.Audit/Infrastructure/AuditMetrics.cs @@ -1,9 +1,9 @@ -namespace ServiceControl.Audit.Auditing; +namespace ServiceControl.Audit; using System.Diagnostics.Metrics; static class AuditMetrics { - public static readonly Meter Meter = new("ServiceControl", "0.1.0"); + public static readonly Meter Meter = new("Particular.ServiceControl", "0.1.0"); public static readonly string Prefix = "particular.servicecontrol.audit"; } \ No newline at end of file From 88d2e7165a559b85079e20644581d7107ed71666 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20=C3=96hlund?= Date: Mon, 3 Feb 2025 08:35:02 +0100 Subject: [PATCH 10/23] Better metrics names --- .../Auditing/AuditIngestion.cs | 14 ++++++------- .../Auditing/AuditPersister.cs | 20 +++++++++---------- 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/src/ServiceControl.Audit/Auditing/AuditIngestion.cs b/src/ServiceControl.Audit/Auditing/AuditIngestion.cs index 93f93ca665..8d68570636 100644 --- a/src/ServiceControl.Audit/Auditing/AuditIngestion.cs +++ b/src/ServiceControl.Audit/Auditing/AuditIngestion.cs @@ -191,7 +191,7 @@ async Task OnMessage(MessageContext messageContext, CancellationToken cancellati var taskCompletionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); messageContext.SetTaskCompletionSource(taskCompletionSource); - receivedMeter.Add(1); + receivedAudits.Add(1); await channel.Writer.WriteAsync(messageContext, cancellationToken); await taskCompletionSource.Task; @@ -212,11 +212,11 @@ async Task Loop() contexts.Add(context); } - batchSizeMeter.Record(contexts.Count); + auditBatchSize.Record(contexts.Count); var sw = Stopwatch.StartNew(); await auditIngestor.Ingest(contexts); - batchDurationMeter.Record(sw.ElapsedMilliseconds); + auditBatchDuration.Record(sw.ElapsedMilliseconds); } catch (OperationCanceledException) { @@ -247,7 +247,7 @@ async Task Loop() TransportInfrastructure transportInfrastructure; IMessageReceiver queueIngestor; - readonly SemaphoreSlim startStopSemaphore = new SemaphoreSlim(1); + readonly SemaphoreSlim startStopSemaphore = new(1); readonly string inputEndpoint; readonly ITransportCustomization transportCustomization; readonly TransportSettings transportSettings; @@ -256,9 +256,9 @@ async Task Loop() readonly IAuditIngestionUnitOfWorkFactory unitOfWorkFactory; readonly Settings settings; readonly Channel channel; - readonly Histogram batchSizeMeter = AuditMetrics.Meter.CreateHistogram($"{AuditMetrics.Prefix}.batch_size"); - readonly Histogram batchDurationMeter = AuditMetrics.Meter.CreateHistogram($"{AuditMetrics.Prefix}.batch_duration", unit: "ms"); - readonly Counter receivedMeter = AuditMetrics.Meter.CreateCounter($"{AuditMetrics.Prefix}.received"); + readonly Histogram auditBatchSize = AuditMetrics.Meter.CreateHistogram($"{AuditMetrics.Prefix}.batch_size_audits"); + readonly Histogram auditBatchDuration = AuditMetrics.Meter.CreateHistogram($"{AuditMetrics.Prefix}.batch_duration_audits", unit: "ms"); + readonly Counter receivedAudits = AuditMetrics.Meter.CreateCounter($"{AuditMetrics.Prefix}.received_audits"); readonly Watchdog watchdog; readonly Task ingestionWorker; readonly IHostApplicationLifetime applicationLifetime; diff --git a/src/ServiceControl.Audit/Auditing/AuditPersister.cs b/src/ServiceControl.Audit/Auditing/AuditPersister.cs index 1afceeab00..a11e534f98 100644 --- a/src/ServiceControl.Audit/Auditing/AuditPersister.cs +++ b/src/ServiceControl.Audit/Auditing/AuditPersister.cs @@ -75,9 +75,9 @@ public async Task> Persist(IReadOnlyList> Persist(IReadOnlyList> Persist(IReadOnlyList ingestedAuditMeter = AuditMetrics.Meter.CreateCounter($"{AuditMetrics.Prefix}.stored_audit_messages"); - readonly Counter ingestedSagaAuditMeter = AuditMetrics.Meter.CreateCounter($"{AuditMetrics.Prefix}.stored_saga_audits"); - readonly Histogram auditBulkInsertDurationMeter = AuditMetrics.Meter.CreateHistogram($"{AuditMetrics.Prefix}.bulk_insert_duration_audit", unit: "ms"); - readonly Histogram sagaAuditBulkInsertDurationMeter = AuditMetrics.Meter.CreateHistogram($"{AuditMetrics.Prefix}.bulk_insert_duration_sagas", unit: "ms"); - readonly Histogram bulkInsertCommitDurationMeter = AuditMetrics.Meter.CreateHistogram($"{AuditMetrics.Prefix}.commit_duration_audit", unit: "ms"); + readonly Counter storedAudits = AuditMetrics.Meter.CreateCounter($"{AuditMetrics.Prefix}.stored_audit_messages"); + readonly Counter storedSagas = AuditMetrics.Meter.CreateCounter($"{AuditMetrics.Prefix}.stored_saga_audits"); + readonly Histogram auditBulkInsertDuration = AuditMetrics.Meter.CreateHistogram($"{AuditMetrics.Prefix}.bulk_insert_duration_audits", unit: "ms"); + readonly Histogram sagaAuditBulkInsertDuration = AuditMetrics.Meter.CreateHistogram($"{AuditMetrics.Prefix}.bulk_insert_duration_sagas", unit: "ms"); + readonly Histogram auditCommitDuration = AuditMetrics.Meter.CreateHistogram($"{AuditMetrics.Prefix}.commit_duration_audits", unit: "ms"); static readonly ILog Logger = LogManager.GetLogger(); } From da4c35baae74e99ed8ad876b8f58705b1a542289 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20=C3=96hlund?= Date: Mon, 3 Feb 2025 08:50:16 +0100 Subject: [PATCH 11/23] Emit body size --- src/ServiceControl.Audit/Auditing/AuditIngestion.cs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/ServiceControl.Audit/Auditing/AuditIngestion.cs b/src/ServiceControl.Audit/Auditing/AuditIngestion.cs index 8d68570636..6e5593d516 100644 --- a/src/ServiceControl.Audit/Auditing/AuditIngestion.cs +++ b/src/ServiceControl.Audit/Auditing/AuditIngestion.cs @@ -210,6 +210,7 @@ async Task Loop() while (channel.Reader.TryRead(out var context)) { contexts.Add(context); + auditMessageSize.Record(context.Body.Length / 1024.0); } auditBatchSize.Record(contexts.Count); @@ -258,6 +259,7 @@ async Task Loop() readonly Channel channel; readonly Histogram auditBatchSize = AuditMetrics.Meter.CreateHistogram($"{AuditMetrics.Prefix}.batch_size_audits"); readonly Histogram auditBatchDuration = AuditMetrics.Meter.CreateHistogram($"{AuditMetrics.Prefix}.batch_duration_audits", unit: "ms"); + readonly Histogram auditMessageSize = AuditMetrics.Meter.CreateHistogram($"{AuditMetrics.Prefix}.audit_message_size", unit: "kilobytes"); readonly Counter receivedAudits = AuditMetrics.Meter.CreateCounter($"{AuditMetrics.Prefix}.received_audits"); readonly Watchdog watchdog; readonly Task ingestionWorker; From 786559b1e71d6ed731d6207ba983eecb2c64ac27 Mon Sep 17 00:00:00 2001 From: Ramon Smits Date: Mon, 3 Feb 2025 16:09:12 +0100 Subject: [PATCH 12/23] Fix meter name --- src/ServiceControl.Audit/HostApplicationBuilderExtensions.cs | 2 +- src/ServiceControl.Audit/Infrastructure/AuditMetrics.cs | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/ServiceControl.Audit/HostApplicationBuilderExtensions.cs b/src/ServiceControl.Audit/HostApplicationBuilderExtensions.cs index d1f08569dc..318cd1ba11 100644 --- a/src/ServiceControl.Audit/HostApplicationBuilderExtensions.cs +++ b/src/ServiceControl.Audit/HostApplicationBuilderExtensions.cs @@ -81,7 +81,7 @@ public static void AddServiceControlAudit(this IHostApplicationBuilder builder, serviceInstanceId: settings.InstanceName)) .WithMetrics(b => { - b.AddMeter("ServiceControl"); + b.AddMeter(AuditMetrics.MeterName); b.AddOtlpExporter(e => { e.Endpoint = otelMetricsUri; diff --git a/src/ServiceControl.Audit/Infrastructure/AuditMetrics.cs b/src/ServiceControl.Audit/Infrastructure/AuditMetrics.cs index 25a9fa658f..4e07ae3453 100644 --- a/src/ServiceControl.Audit/Infrastructure/AuditMetrics.cs +++ b/src/ServiceControl.Audit/Infrastructure/AuditMetrics.cs @@ -4,6 +4,7 @@ namespace ServiceControl.Audit; static class AuditMetrics { - public static readonly Meter Meter = new("Particular.ServiceControl", "0.1.0"); + public const string MeterName = "Particular.ServiceControl"; + public static readonly Meter Meter = new(MeterName, "0.1.0"); public static readonly string Prefix = "particular.servicecontrol.audit"; } \ No newline at end of file From a2011fcec2f82a0c139aacb9065993b1b04fcec4 Mon Sep 17 00:00:00 2001 From: Ramon Smits Date: Mon, 3 Feb 2025 16:09:51 +0100 Subject: [PATCH 13/23] Log that OpenTelemetry metrics exporter is enabled --- src/ServiceControl.Audit/HostApplicationBuilderExtensions.cs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/ServiceControl.Audit/HostApplicationBuilderExtensions.cs b/src/ServiceControl.Audit/HostApplicationBuilderExtensions.cs index 318cd1ba11..47ac4e2976 100644 --- a/src/ServiceControl.Audit/HostApplicationBuilderExtensions.cs +++ b/src/ServiceControl.Audit/HostApplicationBuilderExtensions.cs @@ -87,6 +87,9 @@ public static void AddServiceControlAudit(this IHostApplicationBuilder builder, e.Endpoint = otelMetricsUri; }); }); + var logger = LogManager.GetLogger(typeof(HostApplicationBuilderExtensions)); + logger.InfoFormat("OpenTelemetry metrics exporter enabled: {0}", settings.OtelMetricsUrl); + } // Configure after the NServiceBus hosted service to ensure NServiceBus is already started From 3835af4bbf6f4f7cc93981a93b09ab08be2f6d9b Mon Sep 17 00:00:00 2001 From: Ramon Smits Date: Mon, 3 Feb 2025 14:11:47 +0100 Subject: [PATCH 14/23] Added logging to diagnose hanging ingestion --- .../Auditing/AuditIngestion.cs | 30 +++++++++++++++---- .../Auditing/AuditIngestor.cs | 5 +++- .../Auditing/AuditPersister.cs | 10 +++++-- 3 files changed, 37 insertions(+), 8 deletions(-) diff --git a/src/ServiceControl.Audit/Auditing/AuditIngestion.cs b/src/ServiceControl.Audit/Auditing/AuditIngestion.cs index 6e5593d516..b590e814f5 100644 --- a/src/ServiceControl.Audit/Auditing/AuditIngestion.cs +++ b/src/ServiceControl.Audit/Auditing/AuditIngestion.cs @@ -54,7 +54,7 @@ public AuditIngestion( watchdog = new Watchdog("audit message ingestion", EnsureStarted, EnsureStopped, ingestionState.ReportError, ingestionState.Clear, settings.TimeToRestartAuditIngestionAfterFailure, logger); - ingestionWorker = Task.Run(() => Loop(), CancellationToken.None); + ingestionWorker = Task.Run(() => LoopWithTryCatch(), CancellationToken.None); } public Task StartAsync(CancellationToken _) => watchdog.Start(() => applicationLifetime.StopApplication()); @@ -197,6 +197,21 @@ async Task OnMessage(MessageContext messageContext, CancellationToken cancellati await taskCompletionSource.Task; } + async Task LoopWithTryCatch() + { + // TODO: Done to prevent conflicts with Otel branch, needs to becombine with Loop when merging to master + try + { + await Loop(); + } + catch (Exception e) + { + logger.Fatal("Loop interrupted", e); + applicationLifetime.StopApplication(); + throw; + } + } + async Task Loop() { var contexts = new List(transportSettings.MaxConcurrency.Value); @@ -219,10 +234,12 @@ async Task Loop() await auditIngestor.Ingest(contexts); auditBatchDuration.Record(sw.ElapsedMilliseconds); } - catch (OperationCanceledException) + catch (OperationCanceledException e) { - //Do nothing as we are shutting down - continue; + logger.Info("Ingesting messages failed", e); + // continue loop, do nothing as we are shutting down + // TODO: Assumption here is that OCE equals a shutdown which is definitely not the case + // We likely need to invoke `TrySetException` } catch (Exception e) // show must go on { @@ -234,7 +251,10 @@ async Task Loop() // signal all message handling tasks to terminate foreach (var context in contexts) { - context.GetTaskCompletionSource().TrySetException(e); + if (!context.GetTaskCompletionSource().TrySetException(e)) + { + logger.Error("Loop TrySetException failed"); + } } } finally diff --git a/src/ServiceControl.Audit/Auditing/AuditIngestor.cs b/src/ServiceControl.Audit/Auditing/AuditIngestor.cs index fb6f2a7240..f8f553b5a7 100644 --- a/src/ServiceControl.Audit/Auditing/AuditIngestor.cs +++ b/src/ServiceControl.Audit/Auditing/AuditIngestor.cs @@ -68,7 +68,10 @@ public async Task Ingest(List contexts) foreach (var context in contexts) { - context.GetTaskCompletionSource().TrySetResult(true); + if (!context.GetTaskCompletionSource().TrySetResult(true)) + { + Log.Warn("TrySetResult failed"); + } } } catch (Exception e) diff --git a/src/ServiceControl.Audit/Auditing/AuditPersister.cs b/src/ServiceControl.Audit/Auditing/AuditPersister.cs index a11e534f98..9a68795a91 100644 --- a/src/ServiceControl.Audit/Auditing/AuditPersister.cs +++ b/src/ServiceControl.Audit/Auditing/AuditPersister.cs @@ -210,7 +210,10 @@ void ProcessSagaAuditMessage(MessageContext context) } // releasing the failed message context early so that they can be retried outside the current batch - context.GetTaskCompletionSource().TrySetException(e); + if (!context.GetTaskCompletionSource().TrySetException(e)) + { + Logger.Warn("ProcessSagaAuditMessage TrySetException failed"); + } } } @@ -279,7 +282,10 @@ await messageDispatcher.Value.Dispatch(new TransportOperations(messagesToEmit.To } // releasing the failed message context early so that they can be retried outside the current batch - context.GetTaskCompletionSource().TrySetException(e); + if (!context.GetTaskCompletionSource().TrySetException(e)) + { + Logger.Warn("ProcessAuditMessage TrySetException failed"); + } } } From 6a26fdcad03eec7d5565e31fcbb9f39051748ed2 Mon Sep 17 00:00:00 2001 From: Ramon Smits Date: Mon, 3 Feb 2025 14:13:58 +0100 Subject: [PATCH 15/23] NLog layout: Added processtime and padding to threadid, level, and logger --- src/ServiceControl.Infrastructure/LoggingConfigurator.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ServiceControl.Infrastructure/LoggingConfigurator.cs b/src/ServiceControl.Infrastructure/LoggingConfigurator.cs index 6cfbf98e54..7a041434fc 100644 --- a/src/ServiceControl.Infrastructure/LoggingConfigurator.cs +++ b/src/ServiceControl.Infrastructure/LoggingConfigurator.cs @@ -22,7 +22,7 @@ public static void ConfigureLogging(LoggingSettings loggingSettings) } var nlogConfig = new LoggingConfiguration(); - var simpleLayout = new SimpleLayout("${longdate}|${threadid}|${level}|${logger}|${message}${onexception:|${exception:format=tostring}}"); + var simpleLayout = new SimpleLayout("${longdate}|${processtime}|${threadid:padding=2}|${level:padding=5}|${logger:padding=70}|${message}${onexception:|${exception:format=tostring}}"); var fileTarget = new FileTarget { From 24c4ef3d805dcff6987248f9b0fbe65433c3f47b Mon Sep 17 00:00:00 2001 From: Ramon Smits Date: Tue, 4 Feb 2025 15:40:33 +0100 Subject: [PATCH 16/23] Incorrectly handled OCE can result in hanging ingestion Fixed by properly handling Stop/Start CancellationTokens and validating thrown OCE against host token and added fatal exception handling. --- .../Auditing/AuditIngestion.cs | 121 ++++++++++-------- 1 file changed, 66 insertions(+), 55 deletions(-) diff --git a/src/ServiceControl.Audit/Auditing/AuditIngestion.cs b/src/ServiceControl.Audit/Auditing/AuditIngestion.cs index b590e814f5..c0e381779e 100644 --- a/src/ServiceControl.Audit/Auditing/AuditIngestion.cs +++ b/src/ServiceControl.Audit/Auditing/AuditIngestion.cs @@ -52,15 +52,27 @@ public AuditIngestion( errorHandlingPolicy = new AuditIngestionFaultPolicy(failedImportsStorage, settings.LoggingSettings, OnCriticalError); - watchdog = new Watchdog("audit message ingestion", EnsureStarted, EnsureStopped, ingestionState.ReportError, ingestionState.Clear, settings.TimeToRestartAuditIngestionAfterFailure, logger); - - ingestionWorker = Task.Run(() => LoopWithTryCatch(), CancellationToken.None); + watchdog = new Watchdog( + "audit message ingestion", + EnsureStarted, + EnsureStopped, + ingestionState.ReportError, + ingestionState.Clear, + settings.TimeToRestartAuditIngestionAfterFailure, + logger + ); } - public Task StartAsync(CancellationToken _) => watchdog.Start(() => applicationLifetime.StopApplication()); + public async Task StartAsync(CancellationToken cancellationToken) + { + stopSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + ingestionWorker = Loop(stopSource.Token); + await watchdog.Start(() => applicationLifetime.StopApplication()); + } public async Task StopAsync(CancellationToken cancellationToken) { + await stopSource.CancelAsync(); await watchdog.Stop(); channel.Writer.Complete(); await ingestionWorker; @@ -197,76 +209,74 @@ async Task OnMessage(MessageContext messageContext, CancellationToken cancellati await taskCompletionSource.Task; } - async Task LoopWithTryCatch() + async Task Loop(CancellationToken cancellationToken) { - // TODO: Done to prevent conflicts with Otel branch, needs to becombine with Loop when merging to master try { - await Loop(); - } - catch (Exception e) - { - logger.Fatal("Loop interrupted", e); - applicationLifetime.StopApplication(); - throw; - } - } + var contexts = new List(transportSettings.MaxConcurrency.Value); - async Task Loop() - { - var contexts = new List(transportSettings.MaxConcurrency.Value); - - while (await channel.Reader.WaitToReadAsync()) - { - // will only enter here if there is something to read. - try + while (await channel.Reader.WaitToReadAsync(cancellationToken)) { - // as long as there is something to read this will fetch up to MaximumConcurrency items - while (channel.Reader.TryRead(out var context)) + // will only enter here if there is something to read. + try { - contexts.Add(context); - auditMessageSize.Record(context.Body.Length / 1024.0); - } + // as long as there is something to read this will fetch up to MaximumConcurrency items + while (channel.Reader.TryRead(out var context)) + { + contexts.Add(context); + auditMessageSize.Record(context.Body.Length / 1024D); + } - auditBatchSize.Record(contexts.Count); - var sw = Stopwatch.StartNew(); + auditBatchSize.Record(contexts.Count); + var sw = Stopwatch.StartNew(); - await auditIngestor.Ingest(contexts); - auditBatchDuration.Record(sw.ElapsedMilliseconds); - } - catch (OperationCanceledException e) - { - logger.Info("Ingesting messages failed", e); - // continue loop, do nothing as we are shutting down - // TODO: Assumption here is that OCE equals a shutdown which is definitely not the case - // We likely need to invoke `TrySetException` - } - catch (Exception e) // show must go on - { - if (logger.IsInfoEnabled) + await auditIngestor.Ingest(contexts); + auditBatchDuration.Record(sw.ElapsedMilliseconds); + } + catch (OperationCanceledException e) when (e.CancellationToken == cancellationToken) { - logger.Info("Ingesting messages failed", e); + logger.Debug("Cancelled by host"); + return; // No point in continueing as WaitToReadAsync will throw OCE } - - // signal all message handling tasks to terminate - foreach (var context in contexts) + catch (Exception e) // show must go on { - if (!context.GetTaskCompletionSource().TrySetException(e)) + if (logger.IsInfoEnabled) { - logger.Error("Loop TrySetException failed"); + logger.Info("Ingesting messages failed", e); + } + + // signal all message handling tasks to terminate + foreach (var context in contexts) + { + if (!context.GetTaskCompletionSource().TrySetException(e)) + { + logger.Error("Loop TrySetException failed"); + } } } + finally + { + contexts.Clear(); + } } - finally - { - contexts.Clear(); - } + // will fall out here when writer is completed + } + catch (OperationCanceledException e) when (e.CancellationToken == cancellationToken) + { + logger.Debug("Cancelled by host"); + } + catch (Exception e) + { + // Might the next exception scope throw an exception, consider this fatal as that cannot be an OCE + logger.Fatal("Loop interrupted", e); + applicationLifetime.StopApplication(); + throw; } - // will fall out here when writer is completed } TransportInfrastructure transportInfrastructure; IMessageReceiver queueIngestor; + Task ingestionWorker; readonly SemaphoreSlim startStopSemaphore = new(1); readonly string inputEndpoint; @@ -282,9 +292,10 @@ async Task Loop() readonly Histogram auditMessageSize = AuditMetrics.Meter.CreateHistogram($"{AuditMetrics.Prefix}.audit_message_size", unit: "kilobytes"); readonly Counter receivedAudits = AuditMetrics.Meter.CreateCounter($"{AuditMetrics.Prefix}.received_audits"); readonly Watchdog watchdog; - readonly Task ingestionWorker; readonly IHostApplicationLifetime applicationLifetime; + CancellationTokenSource stopSource; + static readonly ILog logger = LogManager.GetLogger(); } } \ No newline at end of file From d8d5730706fabe30b7528bc13257adcfa57b218d Mon Sep 17 00:00:00 2001 From: Ramon Smits Date: Tue, 4 Feb 2025 15:50:25 +0100 Subject: [PATCH 17/23] New cancellation token allows further improvement to reduce shutdown duration to close within 30 seconds --- ...InMemoryAuditIngestionUnitOfWorkFactory.cs | 3 ++- .../UnitOfWork/RavenAuditUnitOfWorkFactory.cs | 5 +++-- .../IAuditIngestionUnitOfWorkFactory.cs | 3 ++- .../Auditing/AuditIngestion.cs | 4 ++-- .../Auditing/AuditIngestor.cs | 19 +++++++++++-------- .../Auditing/AuditPersister.cs | 5 +++-- .../Auditing/ImportFailedAudits.cs | 4 ++-- 7 files changed, 25 insertions(+), 18 deletions(-) diff --git a/src/ServiceControl.Audit.Persistence.InMemory/InMemoryAuditIngestionUnitOfWorkFactory.cs b/src/ServiceControl.Audit.Persistence.InMemory/InMemoryAuditIngestionUnitOfWorkFactory.cs index ee654999a1..f07e7036de 100644 --- a/src/ServiceControl.Audit.Persistence.InMemory/InMemoryAuditIngestionUnitOfWorkFactory.cs +++ b/src/ServiceControl.Audit.Persistence.InMemory/InMemoryAuditIngestionUnitOfWorkFactory.cs @@ -1,5 +1,6 @@ namespace ServiceControl.Audit.Persistence.InMemory { + using System.Threading; using System.Threading.Tasks; using ServiceControl.Audit.Auditing.BodyStorage; using ServiceControl.Audit.Persistence.UnitOfWork; @@ -12,7 +13,7 @@ public InMemoryAuditIngestionUnitOfWorkFactory(InMemoryAuditDataStore dataStore, bodyStorageEnricher = new BodyStorageEnricher(bodyStorage, settings); } - public ValueTask StartNew(int batchSize) + public ValueTask StartNew(int batchSize, CancellationToken cancellationToken) { //The batchSize argument is ignored: the in-memory storage implementation doesn't support batching. return new ValueTask(new InMemoryAuditIngestionUnitOfWork(dataStore, bodyStorageEnricher)); diff --git a/src/ServiceControl.Audit.Persistence.RavenDB/UnitOfWork/RavenAuditUnitOfWorkFactory.cs b/src/ServiceControl.Audit.Persistence.RavenDB/UnitOfWork/RavenAuditUnitOfWorkFactory.cs index 3f9c56a6c8..8a9747fbc0 100644 --- a/src/ServiceControl.Audit.Persistence.RavenDB/UnitOfWork/RavenAuditUnitOfWorkFactory.cs +++ b/src/ServiceControl.Audit.Persistence.RavenDB/UnitOfWork/RavenAuditUnitOfWorkFactory.cs @@ -13,9 +13,10 @@ class RavenAuditIngestionUnitOfWorkFactory( MinimumRequiredStorageState customCheckState) : IAuditIngestionUnitOfWorkFactory { - public async ValueTask StartNew(int batchSize) + public async ValueTask StartNew(int batchSize, CancellationToken cancellationToken) { - var timedCancellationSource = new CancellationTokenSource(databaseConfiguration.BulkInsertCommitTimeout); + var timedCancellationSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + timedCancellationSource.CancelAfter(databaseConfiguration.BulkInsertCommitTimeout); var bulkInsert = (await documentStoreProvider.GetDocumentStore(timedCancellationSource.Token)) .BulkInsert(new BulkInsertOptions { SkipOverwriteIfUnchanged = true, }, timedCancellationSource.Token); diff --git a/src/ServiceControl.Audit.Persistence/UnitOfWork/IAuditIngestionUnitOfWorkFactory.cs b/src/ServiceControl.Audit.Persistence/UnitOfWork/IAuditIngestionUnitOfWorkFactory.cs index 25a5859e50..1d3470bdea 100644 --- a/src/ServiceControl.Audit.Persistence/UnitOfWork/IAuditIngestionUnitOfWorkFactory.cs +++ b/src/ServiceControl.Audit.Persistence/UnitOfWork/IAuditIngestionUnitOfWorkFactory.cs @@ -1,10 +1,11 @@ namespace ServiceControl.Audit.Persistence.UnitOfWork { + using System.Threading; using System.Threading.Tasks; public interface IAuditIngestionUnitOfWorkFactory { - ValueTask StartNew(int batchSize); //Throws if not enough space or some other problem preventing from writing data + ValueTask StartNew(int batchSize, CancellationToken cancellationToken = default); //Throws if not enough space or some other problem preventing from writing data bool CanIngestMore(); } } \ No newline at end of file diff --git a/src/ServiceControl.Audit/Auditing/AuditIngestion.cs b/src/ServiceControl.Audit/Auditing/AuditIngestion.cs index c0e381779e..fa179a5254 100644 --- a/src/ServiceControl.Audit/Auditing/AuditIngestion.cs +++ b/src/ServiceControl.Audit/Auditing/AuditIngestion.cs @@ -129,7 +129,7 @@ async Task EnsureStarted(CancellationToken cancellationToken = default) queueIngestor = transportInfrastructure.Receivers[inputEndpoint]; - await auditIngestor.VerifyCanReachForwardingAddress(); + await auditIngestor.VerifyCanReachForwardingAddress(cancellationToken); await queueIngestor.StartReceive(cancellationToken); @@ -230,7 +230,7 @@ async Task Loop(CancellationToken cancellationToken) auditBatchSize.Record(contexts.Count); var sw = Stopwatch.StartNew(); - await auditIngestor.Ingest(contexts); + await auditIngestor.Ingest(contexts, cancellationToken); auditBatchDuration.Record(sw.ElapsedMilliseconds); } catch (OperationCanceledException e) when (e.CancellationToken == cancellationToken) diff --git a/src/ServiceControl.Audit/Auditing/AuditIngestor.cs b/src/ServiceControl.Audit/Auditing/AuditIngestor.cs index f8f553b5a7..62ee58360f 100644 --- a/src/ServiceControl.Audit/Auditing/AuditIngestor.cs +++ b/src/ServiceControl.Audit/Auditing/AuditIngestor.cs @@ -3,6 +3,7 @@ using System; using System.Collections.Generic; using System.Linq; + using System.Threading; using System.Threading.Tasks; using Infrastructure.Settings; using Monitoring; @@ -41,14 +42,14 @@ ITransportCustomization transportCustomization ); } - public async Task Ingest(List contexts) + public async Task Ingest(List contexts, CancellationToken cancellationToken) { if (Log.IsDebugEnabled) { Log.Debug($"Ingesting {contexts.Count} message contexts"); } - var stored = await auditPersister.Persist(contexts); + var stored = await auditPersister.Persist(contexts, cancellationToken); try { @@ -59,7 +60,7 @@ public async Task Ingest(List contexts) Log.Debug($"Forwarding {stored.Count} messages"); } - await Forward(stored, logQueueAddress); + await Forward(stored, logQueueAddress, cancellationToken); if (Log.IsDebugEnabled) { Log.Debug("Forwarded messages"); @@ -86,7 +87,7 @@ public async Task Ingest(List contexts) } } - Task Forward(IReadOnlyCollection messageContexts, string forwardingAddress) + Task Forward(IReadOnlyCollection messageContexts, string forwardingAddress, CancellationToken cancellationToken) { var transportOperations = new TransportOperation[messageContexts.Count]; //We could allocate based on the actual number of ProcessedMessages but this should be OK var index = 0; @@ -103,7 +104,8 @@ Task Forward(IReadOnlyCollection messageContexts, string forward var outgoingMessage = new OutgoingMessage( messageContext.NativeMessageId, messageContext.Headers, - messageContext.Body); + messageContext.Body + ); // Forwarded messages should last as long as possible outgoingMessage.Headers.Remove(Headers.TimeToBeReceived); @@ -115,12 +117,13 @@ Task Forward(IReadOnlyCollection messageContexts, string forward return anyContext != null ? messageDispatcher.Value.Dispatch( new TransportOperations(transportOperations), - anyContext.TransportTransaction + anyContext.TransportTransaction, + cancellationToken ) : Task.CompletedTask; } - public async Task VerifyCanReachForwardingAddress() + public async Task VerifyCanReachForwardingAddress(CancellationToken cancellationToken) { if (!settings.ForwardAuditMessages) { @@ -137,7 +140,7 @@ public async Task VerifyCanReachForwardingAddress() ) ); - await messageDispatcher.Value.Dispatch(transportOperations, new TransportTransaction()); + await messageDispatcher.Value.Dispatch(transportOperations, new TransportTransaction(), cancellationToken); } catch (Exception e) { diff --git a/src/ServiceControl.Audit/Auditing/AuditPersister.cs b/src/ServiceControl.Audit/Auditing/AuditPersister.cs index 9a68795a91..520910eae0 100644 --- a/src/ServiceControl.Audit/Auditing/AuditPersister.cs +++ b/src/ServiceControl.Audit/Auditing/AuditPersister.cs @@ -5,6 +5,7 @@ using System.Diagnostics; using System.Diagnostics.Metrics; using System.Text.Json; + using System.Threading; using System.Threading.Tasks; using Infrastructure; using Monitoring; @@ -23,7 +24,7 @@ class AuditPersister(IAuditIngestionUnitOfWorkFactory unitOfWorkFactory, IMessageSession messageSession, Lazy messageDispatcher) { - public async Task> Persist(IReadOnlyList contexts) + public async Task> Persist(IReadOnlyList contexts, CancellationToken cancellationToken) { var stopwatch = Stopwatch.StartNew(); @@ -37,7 +38,7 @@ public async Task> Persist(IReadOnlyList(contexts.Count); foreach (var context in contexts) { diff --git a/src/ServiceControl.Audit/Auditing/ImportFailedAudits.cs b/src/ServiceControl.Audit/Auditing/ImportFailedAudits.cs index 7da46df703..5c3b67f17e 100644 --- a/src/ServiceControl.Audit/Auditing/ImportFailedAudits.cs +++ b/src/ServiceControl.Audit/Auditing/ImportFailedAudits.cs @@ -23,7 +23,7 @@ public ImportFailedAudits( public async Task Run(CancellationToken cancellationToken = default) { - await auditIngestor.VerifyCanReachForwardingAddress(); + await auditIngestor.VerifyCanReachForwardingAddress(cancellationToken); var succeeded = 0; var failed = 0; @@ -37,7 +37,7 @@ await failedAuditStore.ProcessFailedMessages( var taskCompletionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); messageContext.SetTaskCompletionSource(taskCompletionSource); - await auditIngestor.Ingest([messageContext]); + await auditIngestor.Ingest([messageContext], cancellationToken); await taskCompletionSource.Task; From 251f167e3ab5bae712a4b7785a8729d5136ba903 Mon Sep 17 00:00:00 2001 From: Ramon Smits Date: Wed, 5 Feb 2025 16:51:36 +0100 Subject: [PATCH 18/23] Moved TrySetResult to loop so that both TrySetResult and TrySetException are managed in the same code --- src/ServiceControl.Audit/Auditing/AuditIngestion.cs | 10 ++++++++++ src/ServiceControl.Audit/Auditing/AuditIngestor.cs | 8 -------- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/src/ServiceControl.Audit/Auditing/AuditIngestion.cs b/src/ServiceControl.Audit/Auditing/AuditIngestion.cs index fa179a5254..ec554ce094 100644 --- a/src/ServiceControl.Audit/Auditing/AuditIngestion.cs +++ b/src/ServiceControl.Audit/Auditing/AuditIngestion.cs @@ -231,6 +231,16 @@ async Task Loop(CancellationToken cancellationToken) var sw = Stopwatch.StartNew(); await auditIngestor.Ingest(contexts, cancellationToken); + + foreach (var context in contexts) + { + // Some items that faulted could already have been set + if (!context.GetTaskCompletionSource().TrySetResult(true)) + { + logger.Warn("TrySetResult failed"); + } + } + auditBatchDuration.Record(sw.ElapsedMilliseconds); } catch (OperationCanceledException e) when (e.CancellationToken == cancellationToken) diff --git a/src/ServiceControl.Audit/Auditing/AuditIngestor.cs b/src/ServiceControl.Audit/Auditing/AuditIngestor.cs index 62ee58360f..d36e4c984e 100644 --- a/src/ServiceControl.Audit/Auditing/AuditIngestor.cs +++ b/src/ServiceControl.Audit/Auditing/AuditIngestor.cs @@ -66,14 +66,6 @@ public async Task Ingest(List contexts, CancellationToken cancel Log.Debug("Forwarded messages"); } } - - foreach (var context in contexts) - { - if (!context.GetTaskCompletionSource().TrySetResult(true)) - { - Log.Warn("TrySetResult failed"); - } - } } catch (Exception e) { From 4f78bdf8305bf80e3e93b63e7a1dab17809f719f Mon Sep 17 00:00:00 2001 From: Ramon Smits Date: Wed, 5 Feb 2025 16:55:57 +0100 Subject: [PATCH 19/23] Improve message --- src/ServiceControl.Audit/Auditing/AuditIngestion.cs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/ServiceControl.Audit/Auditing/AuditIngestion.cs b/src/ServiceControl.Audit/Auditing/AuditIngestion.cs index ec554ce094..b868752a87 100644 --- a/src/ServiceControl.Audit/Auditing/AuditIngestion.cs +++ b/src/ServiceControl.Audit/Auditing/AuditIngestion.cs @@ -250,10 +250,9 @@ async Task Loop(CancellationToken cancellationToken) } catch (Exception e) // show must go on { - if (logger.IsInfoEnabled) - { - logger.Info("Ingesting messages failed", e); - } + logger.Warn("Batch processing failed", e); + + // Signal circuitbreaker, throttle whatever // signal all message handling tasks to terminate foreach (var context in contexts) From 4c00e6e066c22e689456d1cd2f58c749f74e5598 Mon Sep 17 00:00:00 2001 From: Ramon Smits Date: Wed, 5 Feb 2025 17:19:53 +0100 Subject: [PATCH 20/23] Log as INF/WRN/ERR when processing duration exceeds their corresponding thresholds --- .../Auditing/AuditIngestion.cs | 21 ++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/src/ServiceControl.Audit/Auditing/AuditIngestion.cs b/src/ServiceControl.Audit/Auditing/AuditIngestion.cs index b868752a87..5131053f93 100644 --- a/src/ServiceControl.Audit/Auditing/AuditIngestion.cs +++ b/src/ServiceControl.Audit/Auditing/AuditIngestion.cs @@ -217,6 +217,8 @@ async Task Loop(CancellationToken cancellationToken) while (await channel.Reader.WaitToReadAsync(cancellationToken)) { + var sw = Stopwatch.StartNew(); + // TODO: Add timeout handling, if processing takes over for example 1 minute // will only enter here if there is something to read. try { @@ -228,7 +230,6 @@ async Task Loop(CancellationToken cancellationToken) } auditBatchSize.Record(contexts.Count); - var sw = Stopwatch.StartNew(); await auditIngestor.Ingest(contexts, cancellationToken); @@ -265,6 +266,24 @@ async Task Loop(CancellationToken cancellationToken) } finally { + const int infoThreshold = 5000; + const int warnThreshold = 15000; + const int errorThreshold = 60000; + var elapsed = sw.ElapsedMilliseconds; + + if (elapsed > errorThreshold) + { + logger.ErrorFormat("Processing duration {0} exceeded {1}", elapsed, errorThreshold); + } + else if (elapsed > warnThreshold) + { + logger.WarnFormat("Processing duration {0} exceeded {1}", elapsed, warnThreshold); + } + else if (elapsed > infoThreshold) + { + logger.InfoFormat("Processing duration {0} exceeded {1}", elapsed, infoThreshold); + } + contexts.Clear(); } } From 76f26ecfb456aa65396cc9a966119aaf27e9ad4a Mon Sep 17 00:00:00 2001 From: Ramon Smits Date: Thu, 6 Feb 2025 00:14:28 +0100 Subject: [PATCH 21/23] Report sequential failure count and timestamp of last success --- src/ServiceControl.Audit/Auditing/AuditIngestion.cs | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/src/ServiceControl.Audit/Auditing/AuditIngestion.cs b/src/ServiceControl.Audit/Auditing/AuditIngestion.cs index 5131053f93..142608a8fa 100644 --- a/src/ServiceControl.Audit/Auditing/AuditIngestion.cs +++ b/src/ServiceControl.Audit/Auditing/AuditIngestion.cs @@ -215,6 +215,9 @@ async Task Loop(CancellationToken cancellationToken) { var contexts = new List(transportSettings.MaxConcurrency.Value); + long sequentialFailureCount = 0; + DateTime lastSuccess = DateTime.MinValue; + while (await channel.Reader.WaitToReadAsync(cancellationToken)) { var sw = Stopwatch.StartNew(); @@ -243,6 +246,10 @@ async Task Loop(CancellationToken cancellationToken) } auditBatchDuration.Record(sw.ElapsedMilliseconds); + + // No locking for consistency needed, just write, don't care about multi-threading + sequentialFailureCount = 0; + lastSuccess = DateTime.UtcNow; } catch (OperationCanceledException e) when (e.CancellationToken == cancellationToken) { @@ -251,7 +258,8 @@ async Task Loop(CancellationToken cancellationToken) } catch (Exception e) // show must go on { - logger.Warn("Batch processing failed", e); + Interlocked.Increment(ref sequentialFailureCount); + logger.Warn($"Batch processing failed [#{sequentialFailureCount} @{lastSuccess:O}] ", e); // Signal circuitbreaker, throttle whatever @@ -314,6 +322,7 @@ async Task Loop(CancellationToken cancellationToken) readonly AuditIngestionFaultPolicy errorHandlingPolicy; readonly IAuditIngestionUnitOfWorkFactory unitOfWorkFactory; readonly Settings settings; + [Obsolete] readonly Channel channel; readonly Histogram auditBatchSize = AuditMetrics.Meter.CreateHistogram($"{AuditMetrics.Prefix}.batch_size_audits"); readonly Histogram auditBatchDuration = AuditMetrics.Meter.CreateHistogram($"{AuditMetrics.Prefix}.batch_duration_audits", unit: "ms"); From 4c1b13c85269eb5b0fbdf2e63ce250410c81d0b2 Mon Sep 17 00:00:00 2001 From: Ramon Smits Date: Thu, 6 Feb 2025 01:42:54 +0100 Subject: [PATCH 22/23] fixup! Moved TrySetResult to loop so that both TrySetResult and TrySetException are managed in the same code --- src/ServiceControl.Audit/Auditing/AuditIngestion.cs | 10 ---------- src/ServiceControl.Audit/Auditing/AuditIngestor.cs | 8 ++++++++ 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/src/ServiceControl.Audit/Auditing/AuditIngestion.cs b/src/ServiceControl.Audit/Auditing/AuditIngestion.cs index 142608a8fa..8f644bbda7 100644 --- a/src/ServiceControl.Audit/Auditing/AuditIngestion.cs +++ b/src/ServiceControl.Audit/Auditing/AuditIngestion.cs @@ -235,16 +235,6 @@ async Task Loop(CancellationToken cancellationToken) auditBatchSize.Record(contexts.Count); await auditIngestor.Ingest(contexts, cancellationToken); - - foreach (var context in contexts) - { - // Some items that faulted could already have been set - if (!context.GetTaskCompletionSource().TrySetResult(true)) - { - logger.Warn("TrySetResult failed"); - } - } - auditBatchDuration.Record(sw.ElapsedMilliseconds); // No locking for consistency needed, just write, don't care about multi-threading diff --git a/src/ServiceControl.Audit/Auditing/AuditIngestor.cs b/src/ServiceControl.Audit/Auditing/AuditIngestor.cs index d36e4c984e..62ee58360f 100644 --- a/src/ServiceControl.Audit/Auditing/AuditIngestor.cs +++ b/src/ServiceControl.Audit/Auditing/AuditIngestor.cs @@ -66,6 +66,14 @@ public async Task Ingest(List contexts, CancellationToken cancel Log.Debug("Forwarded messages"); } } + + foreach (var context in contexts) + { + if (!context.GetTaskCompletionSource().TrySetResult(true)) + { + Log.Warn("TrySetResult failed"); + } + } } catch (Exception e) { From a2f471e8aefbf2af91cc2886c075b914b30f7568 Mon Sep 17 00:00:00 2001 From: Ramon Smits Date: Thu, 6 Feb 2025 01:48:25 +0100 Subject: [PATCH 23/23] fixup! Report sequential failure count and timestamp of last success --- src/ServiceControl.Audit/Auditing/AuditIngestion.cs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/ServiceControl.Audit/Auditing/AuditIngestion.cs b/src/ServiceControl.Audit/Auditing/AuditIngestion.cs index 8f644bbda7..602c2ff34e 100644 --- a/src/ServiceControl.Audit/Auditing/AuditIngestion.cs +++ b/src/ServiceControl.Audit/Auditing/AuditIngestion.cs @@ -312,7 +312,6 @@ async Task Loop(CancellationToken cancellationToken) readonly AuditIngestionFaultPolicy errorHandlingPolicy; readonly IAuditIngestionUnitOfWorkFactory unitOfWorkFactory; readonly Settings settings; - [Obsolete] readonly Channel channel; readonly Histogram auditBatchSize = AuditMetrics.Meter.CreateHistogram($"{AuditMetrics.Prefix}.batch_size_audits"); readonly Histogram auditBatchDuration = AuditMetrics.Meter.CreateHistogram($"{AuditMetrics.Prefix}.batch_duration_audits", unit: "ms");