From 427e89dd3e6832d2d7ca4b6324c72210aa787841 Mon Sep 17 00:00:00 2001 From: Ramon Smits Date: Fri, 14 Feb 2025 17:36:19 +0100 Subject: [PATCH 01/14] Watchdog stop passed CancellationToken.None which could block stop till infinity --- .../Auditing/AuditIngestion.cs | 2 +- .../WatchdogTests.cs | 8 ++-- src/ServiceControl.Infrastructure/Watchdog.cs | 41 +++++++++++-------- .../Operations/ErrorIngestion.cs | 2 +- 4 files changed, 30 insertions(+), 23 deletions(-) diff --git a/src/ServiceControl.Audit/Auditing/AuditIngestion.cs b/src/ServiceControl.Audit/Auditing/AuditIngestion.cs index 6a2a039cb9..83c5a8cdd6 100644 --- a/src/ServiceControl.Audit/Auditing/AuditIngestion.cs +++ b/src/ServiceControl.Audit/Auditing/AuditIngestion.cs @@ -260,7 +260,7 @@ public override async Task StopAsync(CancellationToken cancellationToken) { try { - await watchdog.Stop(); + await watchdog.Stop(cancellationToken); channel.Writer.Complete(); await base.StopAsync(cancellationToken); } diff --git a/src/ServiceControl.Infrastructure.Tests/WatchdogTests.cs b/src/ServiceControl.Infrastructure.Tests/WatchdogTests.cs index ffbd465bc0..512ea8a937 100644 --- a/src/ServiceControl.Infrastructure.Tests/WatchdogTests.cs +++ b/src/ServiceControl.Infrastructure.Tests/WatchdogTests.cs @@ -31,7 +31,7 @@ public async Task It_shuts_down_gracefully() await started.Task; - await dog.Stop(); + await dog.Stop(TestContext.CurrentContext.CancellationToken); await stopped.Task; } @@ -52,7 +52,7 @@ public async Task When_stop_fails_it_reports_the_failure() await started.Task; - await dog.Stop(); + await dog.Stop(TestContext.CurrentContext.CancellationToken); Assert.That(lastFailure, Is.EqualTo("Simulated")); } @@ -91,7 +91,7 @@ public async Task On_failure_triggers_stopping() await restarted.Task; - await dog.Stop(); + await dog.Stop(TestContext.CurrentContext.CancellationToken); } [Test] @@ -129,7 +129,7 @@ public async Task When_first_start_attempt_works_it_recovers_from_further_errors await recoveredFromError.Task; - await dog.Stop(); + await dog.Stop(TestContext.CurrentContext.CancellationToken); //Make sure failure is cleared Assert.That(lastFailure, Is.Null); diff --git a/src/ServiceControl.Infrastructure/Watchdog.cs b/src/ServiceControl.Infrastructure/Watchdog.cs index c91ae2b3bf..4a7cca769e 100644 --- a/src/ServiceControl.Infrastructure/Watchdog.cs +++ b/src/ServiceControl.Infrastructure/Watchdog.cs @@ -17,9 +17,14 @@ public class Watchdog ILog log; string taskName; - public Watchdog(string taskName, Func ensureStarted, - Func ensureStopped, Action reportFailure, Action clearFailure, - TimeSpan timeToWaitBetweenStartupAttempts, ILog log) + public Watchdog( + string taskName, + Func ensureStarted, + Func ensureStopped, Action reportFailure, + Action clearFailure, + TimeSpan timeToWaitBetweenStartupAttempts, + ILog log + ) { this.taskName = taskName; this.ensureStopped = ensureStopped; @@ -87,25 +92,27 @@ public Task Start(Action onFailedOnStartup) //Ignore, no need to log cancellation of delay } } - try - { - log.Debug($"Stopping watching process {taskName}"); - //We don't pass the shutdown token here because it has already been cancelled and we want to ensure we stop the ingestion. - await ensureStopped(CancellationToken.None).ConfigureAwait(false); - } - catch (Exception e) - { - log.Error($"Error while trying to stop {taskName}.", e); - reportFailure(e.Message); - } }); return Task.CompletedTask; } - public Task Stop() + public async Task Stop(CancellationToken cancellationToken) { - shutdownTokenSource.Cancel(); - return watchdog; + try + { + log.Debug($"Stopping watching process {taskName}"); + await shutdownTokenSource.CancelAsync().ConfigureAwait(false); + await watchdog.ConfigureAwait(false); + } + catch (Exception e) + { + log.Error($"Error while trying to stop {taskName}.", e); + throw; + } + finally + { + await ensureStopped(cancellationToken).ConfigureAwait(false); + } } } } diff --git a/src/ServiceControl/Operations/ErrorIngestion.cs b/src/ServiceControl/Operations/ErrorIngestion.cs index 00676b95a7..3a35a9e286 100644 --- a/src/ServiceControl/Operations/ErrorIngestion.cs +++ b/src/ServiceControl/Operations/ErrorIngestion.cs @@ -132,7 +132,7 @@ public override async Task StopAsync(CancellationToken cancellationToken) { try { - await watchdog.Stop(); + await watchdog.Stop(cancellationToken); channel.Writer.Complete(); await base.StopAsync(cancellationToken); } From fa6cae59b6166eff2b3d355646d9ecad19f41c2a Mon Sep 17 00:00:00 2001 From: Ramon Smits Date: Fri, 14 Feb 2025 19:16:13 +0100 Subject: [PATCH 02/14] Ignored watchdog test that seems wrong --- src/ServiceControl.Infrastructure.Tests/WatchdogTests.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ServiceControl.Infrastructure.Tests/WatchdogTests.cs b/src/ServiceControl.Infrastructure.Tests/WatchdogTests.cs index 512ea8a937..9566e84f1d 100644 --- a/src/ServiceControl.Infrastructure.Tests/WatchdogTests.cs +++ b/src/ServiceControl.Infrastructure.Tests/WatchdogTests.cs @@ -36,7 +36,7 @@ public async Task It_shuts_down_gracefully() await stopped.Task; } - [Test] + [Test, Ignore("When ensurestopped throws an exception, stop should also throw an exception as that is an ungraceful stop")] public async Task When_stop_fails_it_reports_the_failure() { string lastFailure = null; From 4ecfcc6a8b68ce5d5b813c75616c5c9e76b5e650 Mon Sep 17 00:00:00 2001 From: Ramon Smits Date: Fri, 14 Feb 2025 19:16:19 +0100 Subject: [PATCH 03/14] Fix leaking transportInfrastructure instance --- .../Auditing/AuditIngestion.cs | 92 +++++++++++++------ src/ServiceControl.Infrastructure/Watchdog.cs | 32 +++---- 2 files changed, 76 insertions(+), 48 deletions(-) diff --git a/src/ServiceControl.Audit/Auditing/AuditIngestion.cs b/src/ServiceControl.Audit/Auditing/AuditIngestion.cs index 83c5a8cdd6..920c693dce 100644 --- a/src/ServiceControl.Audit/Auditing/AuditIngestion.cs +++ b/src/ServiceControl.Audit/Auditing/AuditIngestion.cs @@ -78,65 +78,97 @@ async Task EnsureStarted(CancellationToken cancellationToken = default) if (!unitOfWorkFactory.CanIngestMore()) { - if (queueIngestor != null) - { - var stoppable = queueIngestor; - queueIngestor = null; - logger.Info("Shutting down due to failed persistence health check. Infrastructure shut down commencing"); - await stoppable.StopReceive(cancellationToken); - logger.Info("Shutting down due to failed persistence health check. Infrastructure shut down completed"); - } - + await StopAndTeardownInfrastructure(cancellationToken); return; } - if (queueIngestor != null) + await SetUpAndStartInfrastructure(cancellationToken); + } + catch (Exception e) + { + try { - logger.Debug("Ensure started. Already started, skipping start up"); - return; //Already started + await StopAndTeardownInfrastructure(cancellationToken); } + catch (Exception teardownException) + { + throw new AggregateException(e, teardownException); + } + + throw; + } + finally + { + logger.Debug("Ensure started. Start/stop semaphore releasing"); + startStopSemaphore.Release(); + logger.Debug("Ensure started. Start/stop semaphore released"); + } + } - logger.Info("Ensure started. Infrastructure starting"); + async Task SetUpAndStartInfrastructure(CancellationToken cancellationToken) + { + if (queueIngestor != null) + { + logger.Debug("Infrastructure already Started"); + return; + } + try + { + logger.Info("Starting infrastructure"); transportInfrastructure = await transportCustomization.CreateTransportInfrastructure( inputEndpoint, transportSettings, OnMessage, errorHandlingPolicy.OnError, OnCriticalError, - TransportTransactionMode.ReceiveOnly); + TransportTransactionMode.ReceiveOnly + ); queueIngestor = transportInfrastructure.Receivers[inputEndpoint]; await auditIngestor.VerifyCanReachForwardingAddress(); - await queueIngestor.StartReceive(cancellationToken); - logger.Info("Ensure started. Infrastructure started"); + logger.Info("Started infrastructure"); } - catch + catch (Exception e) { - if (queueIngestor != null) + logger.Error("Failed to start infrastructure", e); + throw; + } + } + + async Task StopAndTeardownInfrastructure(CancellationToken cancellationToken) + { + if (transportInfrastructure == null) + { + logger.Debug("Infrastructure already Stopped"); + return; + } + + try + { + logger.Info("Stopping infrastructure"); + try { - try + if (queueIngestor != null) { await queueIngestor.StopReceive(cancellationToken); } - catch (OperationCanceledException e) when (e.CancellationToken == cancellationToken) - { - logger.Info("StopReceive cancelled"); - } + } + finally + { + await transportInfrastructure.Shutdown(cancellationToken); } - queueIngestor = null; // Setting to null so that it doesn't exit when it retries in line 185 - - throw; + queueIngestor = null; + logger.Info("Stopped infrastructure"); } - finally + catch (Exception e) { - logger.Debug("Ensure started. Start/stop semaphore releasing"); - startStopSemaphore.Release(); - logger.Debug("Ensure started. Start/stop semaphore released"); + logger.Error("Failed to stop infrastructure", e); + throw; } } diff --git a/src/ServiceControl.Infrastructure/Watchdog.cs b/src/ServiceControl.Infrastructure/Watchdog.cs index 4a7cca769e..d629e55818 100644 --- a/src/ServiceControl.Infrastructure/Watchdog.cs +++ b/src/ServiceControl.Infrastructure/Watchdog.cs @@ -12,7 +12,7 @@ public class Watchdog Action reportFailure; Action clearFailure; Task watchdog; - CancellationTokenSource shutdownTokenSource = new CancellationTokenSource(); + CancellationTokenSource shutdownTokenSource = new(); TimeSpan timeToWaitBetweenStartupAttempts; ILog log; string taskName; @@ -47,41 +47,37 @@ public Task Start(Action onFailedOnStartup) { log.Debug($"Starting watching {taskName}"); - bool? failedOnStartup = null; + bool startup = true; while (!shutdownTokenSource.IsCancellationRequested) { try { + using var cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(shutdownTokenSource.Token); + cancellationTokenSource.CancelAfter(15000); + log.Debug($"Ensuring {taskName} is running"); - await ensureStarted(shutdownTokenSource.Token).ConfigureAwait(false); + await ensureStarted(cancellationTokenSource.Token).ConfigureAwait(false); clearFailure(); - - failedOnStartup ??= false; + startup = false; } - catch (OperationCanceledException e) when (!shutdownTokenSource.IsCancellationRequested) + catch (OperationCanceledException e) when (shutdownTokenSource.IsCancellationRequested) { - // Continue, as OCE is not from caller - log.Info("Start cancelled, retrying...", e); - continue; + log.Debug("Cancelled", e); + return; } catch (Exception e) { reportFailure(e.Message); - if (failedOnStartup == null) + if (startup) { - failedOnStartup = true; - log.Error($"Error during initial startup attempt for {taskName}.", e); - - //there was an error during startup hence we want to shut down the instance onFailedOnStartup(); + return; } - else - { - log.Error($"Error while trying to start {taskName}. Starting will be retried in {timeToWaitBetweenStartupAttempts}.", e); - } + + log.Error($"Error while trying to start {taskName}. Starting will be retried in {timeToWaitBetweenStartupAttempts}.", e); } try { From 1327bead8beb603bf982ca1c08bc6fad1ba500b2 Mon Sep 17 00:00:00 2001 From: Ramon Smits Date: Mon, 17 Feb 2025 19:55:34 +0100 Subject: [PATCH 04/14] Thanks who ever wrote a test with dependency on log output message without making this dependency visible :yakshaving: --- .../When_critical_storage_threshold_reached.cs | 12 +++++------- src/ServiceControl.Audit/Auditing/AuditIngestion.cs | 10 ++++++++-- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/src/ServiceControl.Audit.AcceptanceTests.RavenDB/Auditing/When_critical_storage_threshold_reached.cs b/src/ServiceControl.Audit.AcceptanceTests.RavenDB/Auditing/When_critical_storage_threshold_reached.cs index dcd1483d9a..68126da497 100644 --- a/src/ServiceControl.Audit.AcceptanceTests.RavenDB/Auditing/When_critical_storage_threshold_reached.cs +++ b/src/ServiceControl.Audit.AcceptanceTests.RavenDB/Auditing/When_critical_storage_threshold_reached.cs @@ -4,6 +4,7 @@ using System.Linq; using System.Threading.Tasks; using AcceptanceTesting.EndpointTemplates; + using Audit.Auditing; using Microsoft.Extensions.DependencyInjection; using NServiceBus; using NServiceBus.AcceptanceTesting; @@ -35,8 +36,7 @@ await Define() .When(context => { return context.Logs.ToArray().Any(i => - i.Message.StartsWith( - "Ensure started. Infrastructure started")); + i.Message.StartsWith(AuditIngestion.LogMessages.StartedInfrastructure)); }, (_, __) => { var databaseConfiguration = ServiceProvider.GetRequiredService(); @@ -47,8 +47,7 @@ await Define() .When(context => { return context.Logs.ToArray().Any(i => - i.Message.StartsWith( - "Shutting down due to failed persistence health check. Infrastructure shut down completed")); + i.Message.StartsWith(AuditIngestion.LogMessages.StoppedInfrastructure)); }, (bus, c) => bus.SendLocal(new MyMessage())) ) .Done(async c => await this.TryGetSingle( @@ -72,7 +71,7 @@ await Define() { return context.Logs.ToArray().Any(i => i.Message.StartsWith( - "Ensure started. Infrastructure started")); + AuditIngestion.LogMessages.StartedInfrastructure)); }, (session, context) => { var databaseConfiguration = ServiceProvider.GetRequiredService(); @@ -83,8 +82,7 @@ await Define() .When(context => { ingestionShutdown = context.Logs.ToArray().Any(i => - i.Message.StartsWith( - "Shutting down due to failed persistence health check. Infrastructure shut down completed")); + i.Message.StartsWith(AuditIngestion.LogMessages.StoppedInfrastructure)); return ingestionShutdown; }, diff --git a/src/ServiceControl.Audit/Auditing/AuditIngestion.cs b/src/ServiceControl.Audit/Auditing/AuditIngestion.cs index 920c693dce..a1a12a76a3 100644 --- a/src/ServiceControl.Audit/Auditing/AuditIngestion.cs +++ b/src/ServiceControl.Audit/Auditing/AuditIngestion.cs @@ -130,7 +130,7 @@ async Task SetUpAndStartInfrastructure(CancellationToken cancellationToken) await auditIngestor.VerifyCanReachForwardingAddress(); await queueIngestor.StartReceive(cancellationToken); - logger.Info("Started infrastructure"); + logger.Info(LogMessages.StartedInfrastructure); } catch (Exception e) { @@ -163,7 +163,7 @@ async Task StopAndTeardownInfrastructure(CancellationToken cancellationToken) } queueIngestor = null; - logger.Info("Stopped infrastructure"); + logger.Info(LogMessages.StoppedInfrastructure); } catch (Exception e) { @@ -335,5 +335,11 @@ public override async Task StopAsync(CancellationToken cancellationToken) readonly IHostApplicationLifetime applicationLifetime; static readonly ILog logger = LogManager.GetLogger(); + + internal static class LogMessages + { + internal const string StartedInfrastructure = "Started infrastructure"; + internal const string StoppedInfrastructure = "Stopped infrastructure"; + } } } \ No newline at end of file From aa3557a6d38be7c519d414ed29ac62a997343578 Mon Sep 17 00:00:00 2001 From: Ramon Smits Date: Mon, 17 Feb 2025 19:56:45 +0100 Subject: [PATCH 05/14] Improve readability --- src/ServiceControl.Audit/Auditing/AuditIngestion.cs | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/src/ServiceControl.Audit/Auditing/AuditIngestion.cs b/src/ServiceControl.Audit/Auditing/AuditIngestion.cs index a1a12a76a3..d652c1d52f 100644 --- a/src/ServiceControl.Audit/Auditing/AuditIngestion.cs +++ b/src/ServiceControl.Audit/Auditing/AuditIngestion.cs @@ -76,13 +76,18 @@ async Task EnsureStarted(CancellationToken cancellationToken = default) await startStopSemaphore.WaitAsync(cancellationToken); logger.Debug("Ensure started. Start/stop semaphore acquired"); - if (!unitOfWorkFactory.CanIngestMore()) + var canIngest = unitOfWorkFactory.CanIngestMore(); + + logger.DebugFormat("Ensure started {0}", canIngest); + + if (canIngest) + { + await SetUpAndStartInfrastructure(cancellationToken); + } + else { await StopAndTeardownInfrastructure(cancellationToken); - return; } - - await SetUpAndStartInfrastructure(cancellationToken); } catch (Exception e) { From 58877b85be3992bc57134acb428c9e2ea9dd1cc1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20=C3=96hlund?= Date: Thu, 20 Feb 2025 07:59:54 +0100 Subject: [PATCH 06/14] Add cancel on start to the wathdog for completeness --- src/ServiceControl.Audit/Auditing/AuditIngestion.cs | 2 +- .../WatchdogTests.cs | 11 ++++++----- src/ServiceControl.Infrastructure/Watchdog.cs | 5 +++-- src/ServiceControl/Operations/ErrorIngestion.cs | 2 +- 4 files changed, 11 insertions(+), 9 deletions(-) diff --git a/src/ServiceControl.Audit/Auditing/AuditIngestion.cs b/src/ServiceControl.Audit/Auditing/AuditIngestion.cs index d652c1d52f..e141a99f9b 100644 --- a/src/ServiceControl.Audit/Auditing/AuditIngestion.cs +++ b/src/ServiceControl.Audit/Auditing/AuditIngestion.cs @@ -231,7 +231,7 @@ async Task OnMessage(MessageContext messageContext, CancellationToken cancellati public override async Task StartAsync(CancellationToken cancellationToken) { - await watchdog.Start(() => applicationLifetime.StopApplication()); + await watchdog.Start(() => applicationLifetime.StopApplication(), cancellationToken); await base.StartAsync(cancellationToken); } diff --git a/src/ServiceControl.Infrastructure.Tests/WatchdogTests.cs b/src/ServiceControl.Infrastructure.Tests/WatchdogTests.cs index 9566e84f1d..39cd6951c6 100644 --- a/src/ServiceControl.Infrastructure.Tests/WatchdogTests.cs +++ b/src/ServiceControl.Infrastructure.Tests/WatchdogTests.cs @@ -1,6 +1,7 @@ namespace ServiceControl.Infrastructure.Tests { using System; + using System.Threading; using System.Threading.Tasks; using NServiceBus.Logging; using NUnit.Framework; @@ -27,7 +28,7 @@ public async Task It_shuts_down_gracefully() return Task.CompletedTask; }, x => { }, () => { }, TimeSpan.FromSeconds(1), log); - await dog.Start(() => { }); + await dog.Start(() => { }, CancellationToken.None); await started.Task; @@ -48,7 +49,7 @@ public async Task When_stop_fails_it_reports_the_failure() return Task.CompletedTask; }, token => throw new Exception("Simulated"), x => lastFailure = x, () => lastFailure = null, TimeSpan.FromSeconds(1), log); - await dog.Start(() => { }); + await dog.Start(() => { }, CancellationToken.None); await started.Task; @@ -83,7 +84,7 @@ public async Task On_failure_triggers_stopping() return Task.CompletedTask; }, x => { }, () => { }, TimeSpan.FromSeconds(1), log); - await dog.Start(() => { }); + await dog.Start(() => { }, CancellationToken.None); await started.Task; @@ -125,7 +126,7 @@ public async Task When_first_start_attempt_works_it_recovers_from_further_errors return Task.CompletedTask; }, token => Task.CompletedTask, x => lastFailure = x, () => lastFailure = null, TimeSpan.FromSeconds(1), log); - await dog.Start(() => { }); + await dog.Start(() => { }, CancellationToken.None); await recoveredFromError.Task; @@ -144,7 +145,7 @@ public async Task When_first_start_attempt_fails_onFailedOnStartup_is_called() var dog = new Watchdog("test process", token => throw new Exception("Simulated"), token => Task.CompletedTask, x => lastFailure = x, () => lastFailure = null, TimeSpan.FromSeconds(1), log); - await dog.Start(() => { onStartupFailureCalled.SetResult(true); }); + await dog.Start(() => { onStartupFailureCalled.SetResult(true); }, CancellationToken.None); await onStartupFailureCalled.Task; diff --git a/src/ServiceControl.Infrastructure/Watchdog.cs b/src/ServiceControl.Infrastructure/Watchdog.cs index d629e55818..dada3b9daa 100644 --- a/src/ServiceControl.Infrastructure/Watchdog.cs +++ b/src/ServiceControl.Infrastructure/Watchdog.cs @@ -41,7 +41,7 @@ public Task OnFailure(string failure) return ensureStopped(shutdownTokenSource.Token); } - public Task Start(Action onFailedOnStartup) + public Task Start(Action onFailedOnStartup, CancellationToken cancellationToken) { watchdog = Task.Run(async () => { @@ -88,7 +88,8 @@ public Task Start(Action onFailedOnStartup) //Ignore, no need to log cancellation of delay } } - }); + }, cancellationToken); + return Task.CompletedTask; } diff --git a/src/ServiceControl/Operations/ErrorIngestion.cs b/src/ServiceControl/Operations/ErrorIngestion.cs index 3a35a9e286..196be414e5 100644 --- a/src/ServiceControl/Operations/ErrorIngestion.cs +++ b/src/ServiceControl/Operations/ErrorIngestion.cs @@ -72,7 +72,7 @@ public ErrorIngestion( public override async Task StartAsync(CancellationToken cancellationToken) { - await watchdog.Start(() => applicationLifetime.StopApplication()); + await watchdog.Start(() => applicationLifetime.StopApplication(), cancellationToken); await base.StartAsync(cancellationToken); } From 2531c4f2ea014d58eb7a4f9828fd4fd77e42c9da Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20=C3=96hlund?= Date: Thu, 20 Feb 2025 08:14:49 +0100 Subject: [PATCH 07/14] Remove uneeded defaults --- src/ServiceControl.Audit/Auditing/AuditIngestion.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/ServiceControl.Audit/Auditing/AuditIngestion.cs b/src/ServiceControl.Audit/Auditing/AuditIngestion.cs index e141a99f9b..ce0a35c63c 100644 --- a/src/ServiceControl.Audit/Auditing/AuditIngestion.cs +++ b/src/ServiceControl.Audit/Auditing/AuditIngestion.cs @@ -68,7 +68,7 @@ Task OnCriticalError(string failure, Exception exception) return watchdog.OnFailure(failure); } - async Task EnsureStarted(CancellationToken cancellationToken = default) + async Task EnsureStarted(CancellationToken cancellationToken) { try { @@ -177,7 +177,7 @@ async Task StopAndTeardownInfrastructure(CancellationToken cancellationToken) } } - async Task EnsureStopped(CancellationToken cancellationToken = default) + async Task EnsureStopped(CancellationToken cancellationToken) { try { From 5be3091bbc9ac7005bfee12db2ae3350ca0018dd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20=C3=96hlund?= Date: Thu, 20 Feb 2025 08:22:22 +0100 Subject: [PATCH 08/14] Better logging --- .../Auditing/AuditIngestion.cs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/src/ServiceControl.Audit/Auditing/AuditIngestion.cs b/src/ServiceControl.Audit/Auditing/AuditIngestion.cs index ce0a35c63c..471ffd63d5 100644 --- a/src/ServiceControl.Audit/Auditing/AuditIngestion.cs +++ b/src/ServiceControl.Audit/Auditing/AuditIngestion.cs @@ -76,18 +76,18 @@ async Task EnsureStarted(CancellationToken cancellationToken) await startStopSemaphore.WaitAsync(cancellationToken); logger.Debug("Ensure started. Start/stop semaphore acquired"); - var canIngest = unitOfWorkFactory.CanIngestMore(); - - logger.DebugFormat("Ensure started {0}", canIngest); - - if (canIngest) - { - await SetUpAndStartInfrastructure(cancellationToken); - } - else + if (!unitOfWorkFactory.CanIngestMore()) { + logger.Warn("Ensure started. Storage unable to ingest more. Ingestion will be paused"); + await StopAndTeardownInfrastructure(cancellationToken); + + return; } + + logger.Debug("Ensure started. Storage able to ingest. Ingestion will be started"); + + await SetUpAndStartInfrastructure(cancellationToken); } catch (Exception e) { From 84dfb71b52ee2baad989d691c7c99363b6b798cd Mon Sep 17 00:00:00 2001 From: Ramon Smits Date: Thu, 20 Feb 2025 09:30:30 +0100 Subject: [PATCH 09/14] Revert "Better logging" This reverts commit 5be3091bbc9ac7005bfee12db2ae3350ca0018dd. --- .../Auditing/AuditIngestion.cs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/src/ServiceControl.Audit/Auditing/AuditIngestion.cs b/src/ServiceControl.Audit/Auditing/AuditIngestion.cs index 471ffd63d5..ce0a35c63c 100644 --- a/src/ServiceControl.Audit/Auditing/AuditIngestion.cs +++ b/src/ServiceControl.Audit/Auditing/AuditIngestion.cs @@ -76,18 +76,18 @@ async Task EnsureStarted(CancellationToken cancellationToken) await startStopSemaphore.WaitAsync(cancellationToken); logger.Debug("Ensure started. Start/stop semaphore acquired"); - if (!unitOfWorkFactory.CanIngestMore()) - { - logger.Warn("Ensure started. Storage unable to ingest more. Ingestion will be paused"); + var canIngest = unitOfWorkFactory.CanIngestMore(); - await StopAndTeardownInfrastructure(cancellationToken); + logger.DebugFormat("Ensure started {0}", canIngest); - return; + if (canIngest) + { + await SetUpAndStartInfrastructure(cancellationToken); + } + else + { + await StopAndTeardownInfrastructure(cancellationToken); } - - logger.Debug("Ensure started. Storage able to ingest. Ingestion will be started"); - - await SetUpAndStartInfrastructure(cancellationToken); } catch (Exception e) { From 41e31a1d9fe903b8db42f8b9bfdb9f16bf493e42 Mon Sep 17 00:00:00 2001 From: Ramon Smits Date: Thu, 20 Feb 2025 09:39:04 +0100 Subject: [PATCH 10/14] Why 15 seconds and why even the CTS --- src/ServiceControl.Infrastructure/Watchdog.cs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/ServiceControl.Infrastructure/Watchdog.cs b/src/ServiceControl.Infrastructure/Watchdog.cs index dada3b9daa..56d111cf78 100644 --- a/src/ServiceControl.Infrastructure/Watchdog.cs +++ b/src/ServiceControl.Infrastructure/Watchdog.cs @@ -53,8 +53,12 @@ public Task Start(Action onFailedOnStartup, CancellationToken cancellationToken) { try { + // Host builder start is launching the loop. The watch dog loop task runs in isolation + // We want the start not to run to infinity. An NServiceBus endpoint should easily + // start within 15 seconds. + const int MaxStartDurationMs = 15000; using var cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(shutdownTokenSource.Token); - cancellationTokenSource.CancelAfter(15000); + cancellationTokenSource.CancelAfter(MaxStartDurationMs); log.Debug($"Ensuring {taskName} is running"); await ensureStarted(cancellationTokenSource.Token).ConfigureAwait(false); From e601ac44064e034f9b0833d3bd264898a069dd01 Mon Sep 17 00:00:00 2001 From: Ramon Smits Date: Mon, 3 Mar 2025 17:31:51 +0100 Subject: [PATCH 11/14] Refactored tests to ensure stop throws when is stops ungracefully --- .../WatchdogTests.cs | 20 +++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/src/ServiceControl.Infrastructure.Tests/WatchdogTests.cs b/src/ServiceControl.Infrastructure.Tests/WatchdogTests.cs index 39cd6951c6..3a430ecc72 100644 --- a/src/ServiceControl.Infrastructure.Tests/WatchdogTests.cs +++ b/src/ServiceControl.Infrastructure.Tests/WatchdogTests.cs @@ -37,8 +37,8 @@ public async Task It_shuts_down_gracefully() await stopped.Task; } - [Test, Ignore("When ensurestopped throws an exception, stop should also throw an exception as that is an ungraceful stop")] - public async Task When_stop_fails_it_reports_the_failure() + [Test] + public async Task When_stop_fails_stop_should_throw_identifying_ungraceful_stop() { string lastFailure = null; var started = new TaskCompletionSource(); @@ -53,9 +53,20 @@ public async Task When_stop_fails_it_reports_the_failure() await started.Task; - await dog.Stop(TestContext.CurrentContext.CancellationToken); + // The following blocks the test: + // + // var ex = Assert.ThrowsAsync(async () => await dog.Stop(TestContext.CurrentContext.CancellationToken)); + // Assert.That(ex.Message, Is.EqualTo("Simulated")); - Assert.That(lastFailure, Is.EqualTo("Simulated")); + try + { + await dog.Stop(TestContext.CurrentContext.CancellationToken); + Assert.Fail("Should have thrown an exception"); + } + catch (Exception ex) + { + Assert.That(ex.Message, Is.EqualTo("Simulated")); + } } [Test] @@ -76,6 +87,7 @@ public async Task On_failure_triggers_stopping() { restarted.SetResult(true); } + startAttempts++; return Task.CompletedTask; }, token => From 286ec83976bc74ecc64f9416122ea1d8d88486fd Mon Sep 17 00:00:00 2001 From: Ramon Smits Date: Tue, 4 Mar 2025 15:48:39 +0100 Subject: [PATCH 12/14] - Applied changes from audit to error - Removed duplication in EnsureStopped - Removed log events around semaphore handling --- .../Auditing/AuditIngestion.cs | 41 ++----- .../Operations/ErrorIngestion.cs | 109 +++++++++++++----- 2 files changed, 89 insertions(+), 61 deletions(-) diff --git a/src/ServiceControl.Audit/Auditing/AuditIngestion.cs b/src/ServiceControl.Audit/Auditing/AuditIngestion.cs index ce0a35c63c..93cd91ac6f 100644 --- a/src/ServiceControl.Audit/Auditing/AuditIngestion.cs +++ b/src/ServiceControl.Audit/Auditing/AuditIngestion.cs @@ -72,9 +72,7 @@ async Task EnsureStarted(CancellationToken cancellationToken) { try { - logger.Debug("Ensure started. Start/stop semaphore acquiring"); await startStopSemaphore.WaitAsync(cancellationToken); - logger.Debug("Ensure started. Start/stop semaphore acquired"); var canIngest = unitOfWorkFactory.CanIngestMore(); @@ -104,15 +102,13 @@ async Task EnsureStarted(CancellationToken cancellationToken) } finally { - logger.Debug("Ensure started. Start/stop semaphore releasing"); startStopSemaphore.Release(); - logger.Debug("Ensure started. Start/stop semaphore released"); } } async Task SetUpAndStartInfrastructure(CancellationToken cancellationToken) { - if (queueIngestor != null) + if (messageReceiver != null) { logger.Debug("Infrastructure already Started"); return; @@ -130,10 +126,10 @@ async Task SetUpAndStartInfrastructure(CancellationToken cancellationToken) TransportTransactionMode.ReceiveOnly ); - queueIngestor = transportInfrastructure.Receivers[inputEndpoint]; + messageReceiver = transportInfrastructure.Receivers[inputEndpoint]; await auditIngestor.VerifyCanReachForwardingAddress(); - await queueIngestor.StartReceive(cancellationToken); + await messageReceiver.StartReceive(cancellationToken); logger.Info(LogMessages.StartedInfrastructure); } @@ -157,9 +153,9 @@ async Task StopAndTeardownInfrastructure(CancellationToken cancellationToken) logger.Info("Stopping infrastructure"); try { - if (queueIngestor != null) + if (messageReceiver != null) { - await queueIngestor.StopReceive(cancellationToken); + await messageReceiver.StopReceive(cancellationToken); } } finally @@ -167,7 +163,9 @@ async Task StopAndTeardownInfrastructure(CancellationToken cancellationToken) await transportInfrastructure.Shutdown(cancellationToken); } - queueIngestor = null; + messageReceiver = null; + transportInfrastructure = null; + logger.Info(LogMessages.StoppedInfrastructure); } catch (Exception e) @@ -181,31 +179,12 @@ async Task EnsureStopped(CancellationToken cancellationToken) { try { - logger.Info("Shutting down. Start/stop semaphore acquiring"); await startStopSemaphore.WaitAsync(cancellationToken); - logger.Info("Shutting down. Start/stop semaphore acquired"); - - if (queueIngestor == null) - { - logger.Info("Shutting down. Already stopped, skipping shut down"); - return; //Already stopped - } - - var stoppable = queueIngestor; - queueIngestor = null; - logger.Info("Shutting down. Infrastructure shut down commencing"); - await stoppable.StopReceive(cancellationToken); - logger.Info("Shutting down. Infrastructure shut down completed"); - } - catch (OperationCanceledException e) when (e.CancellationToken == cancellationToken) - { - logger.Info("StopReceive cancelled"); + await StopAndTeardownInfrastructure(cancellationToken); } finally { - logger.Info("Shutting down. Start/stop semaphore releasing"); startStopSemaphore.Release(); - logger.Info("Shutting down. Start/stop semaphore released"); } } @@ -318,7 +297,7 @@ public override async Task StopAsync(CancellationToken cancellationToken) } TransportInfrastructure transportInfrastructure; - IMessageReceiver queueIngestor; + IMessageReceiver messageReceiver; long consecutiveBatchFailures = 0; readonly SemaphoreSlim startStopSemaphore = new(1); diff --git a/src/ServiceControl/Operations/ErrorIngestion.cs b/src/ServiceControl/Operations/ErrorIngestion.cs index 196be414e5..6d0558d3ad 100644 --- a/src/ServiceControl/Operations/ErrorIngestion.cs +++ b/src/ServiceControl/Operations/ErrorIngestion.cs @@ -158,30 +158,57 @@ async Task EnsureStarted(CancellationToken cancellationToken = default) { await startStopSemaphore.WaitAsync(cancellationToken); - if (!unitOfWorkFactory.CanIngestMore()) + var canIngest = unitOfWorkFactory.CanIngestMore(); + + Logger.DebugFormat("Ensure started {0}", canIngest); + + if (canIngest) { - if (messageReceiver != null) - { - var stoppable = messageReceiver; - messageReceiver = null; - await stoppable.StopReceive(cancellationToken); - Logger.Info("Shutting down due to failed persistence health check. Infrastructure shut down completed"); - } - return; + await SetUpAndStartInfrastructure(cancellationToken); } - - if (messageReceiver != null) + else + { + await StopAndTeardownInfrastructure(cancellationToken); + } + } + catch (Exception e) + { + try { - return; //Already started + await StopAndTeardownInfrastructure(cancellationToken); + } + catch (Exception teardownException) + { + throw new AggregateException(e, teardownException); } + throw; + } + finally + { + startStopSemaphore.Release(); + } + } + + async Task SetUpAndStartInfrastructure(CancellationToken cancellationToken) + { + if (messageReceiver != null) + { + Logger.Debug("Infrastructure already Started"); + return; + } + + try + { + Logger.Info("Starting infrastructure"); transportInfrastructure = await transportCustomization.CreateTransportInfrastructure( errorQueue, transportSettings, OnMessage, errorHandlingPolicy.OnError, OnCriticalError, - TransportTransactionMode.ReceiveOnly); + TransportTransactionMode.ReceiveOnly + ); messageReceiver = transportInfrastructure.Receivers[errorQueue]; @@ -192,22 +219,45 @@ async Task EnsureStarted(CancellationToken cancellationToken = default) await messageReceiver.StartReceive(cancellationToken); - Logger.Info("Ensure started. Infrastructure started"); + Logger.Info(LogMessages.StartedInfrastructure); } - catch + catch (Exception e) { - if (messageReceiver != null) + Logger.Error("Failed to start infrastructure", e); + throw; + } + } + async Task StopAndTeardownInfrastructure(CancellationToken cancellationToken) + { + if (transportInfrastructure == null) + { + Logger.Debug("Infrastructure already Stopped"); + return; + } + try + { + Logger.Info("Stopping infrastructure"); + try + { + if (messageReceiver != null) + { + await messageReceiver.StopReceive(cancellationToken); + } + } + finally { - await messageReceiver.StopReceive(cancellationToken); + await transportInfrastructure.Shutdown(cancellationToken); } - messageReceiver = null; // Setting to null so that it doesn't exit when it retries in line 134 + messageReceiver = null; + transportInfrastructure = null; - throw; + Logger.Info(LogMessages.StoppedInfrastructure); } - finally + catch (Exception e) { - startStopSemaphore.Release(); + Logger.Error("Failed to stop infrastructure", e); + throw; } } @@ -238,14 +288,7 @@ async Task EnsureStopped(CancellationToken cancellationToken = default) try { await startStopSemaphore.WaitAsync(cancellationToken); - - if (messageReceiver == null) - { - return; //Already stopped - } - var stoppable = messageReceiver; - messageReceiver = null; - await stoppable.StopReceive(cancellationToken); + await StopAndTeardownInfrastructure(cancellationToken); } finally { @@ -253,7 +296,7 @@ async Task EnsureStopped(CancellationToken cancellationToken = default) } } - SemaphoreSlim startStopSemaphore = new SemaphoreSlim(1); + SemaphoreSlim startStopSemaphore = new(1); string errorQueue; ErrorIngestionFaultPolicy errorHandlingPolicy; TransportInfrastructure transportInfrastructure; @@ -272,5 +315,11 @@ async Task EnsureStopped(CancellationToken cancellationToken = default) readonly IHostApplicationLifetime applicationLifetime; static readonly ILog Logger = LogManager.GetLogger(); + + internal static class LogMessages + { + internal const string StartedInfrastructure = "Started infrastructure"; + internal const string StoppedInfrastructure = "Stopped infrastructure"; + } } } \ No newline at end of file From a3ec0988dd327054a574faf299d715bdd7586de8 Mon Sep 17 00:00:00 2001 From: Ramon Smits Date: Tue, 4 Mar 2025 17:10:14 +0100 Subject: [PATCH 13/14] Fix failing test --- .../When_critical_storage_threshold_reached.cs | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/src/ServiceControl.AcceptanceTests/Monitoring/CustomChecks/When_critical_storage_threshold_reached.cs b/src/ServiceControl.AcceptanceTests/Monitoring/CustomChecks/When_critical_storage_threshold_reached.cs index f7fa1ac290..742f899780 100644 --- a/src/ServiceControl.AcceptanceTests/Monitoring/CustomChecks/When_critical_storage_threshold_reached.cs +++ b/src/ServiceControl.AcceptanceTests/Monitoring/CustomChecks/When_critical_storage_threshold_reached.cs @@ -9,6 +9,7 @@ using NServiceBus; using NServiceBus.AcceptanceTesting; using NUnit.Framework; + using Operations; using ServiceBus.Management.Infrastructure.Settings; [TestFixture] @@ -33,7 +34,7 @@ await Define() .WithEndpoint(b => b .When(context => { - return context.Logs.ToArray().Any(i => i.Message.StartsWith("Ensure started. Infrastructure started")); + return context.Logs.ToArray().Any(i => i.Message.StartsWith(ErrorIngestion.LogMessages.StoppedInfrastructure)); }, (_, __) => { PersisterSettings.MinimumStorageLeftRequiredForIngestion = 100; @@ -43,8 +44,7 @@ await Define() .When(context => { return context.Logs.ToArray().Any(i => - i.Message.StartsWith( - "Shutting down due to failed persistence health check. Infrastructure shut down completed")); + i.Message.StartsWith(ErrorIngestion.LogMessages.StoppedInfrastructure)); }, (bus, c) => bus.SendLocal(new MyMessage()) ) .DoNotFailOnErrorMessages()) @@ -62,8 +62,7 @@ await Define() .When(context => { return context.Logs.ToArray().Any(i => - i.Message.StartsWith( - "Ensure started. Infrastructure started")); + i.Message.StartsWith(ErrorIngestion.LogMessages.StartedInfrastructure)); }, (session, context) => { PersisterSettings.MinimumStorageLeftRequiredForIngestion = 100; @@ -73,8 +72,7 @@ await Define() .When(context => { ingestionShutdown = context.Logs.ToArray().Any(i => - i.Message.StartsWith( - "Shutting down due to failed persistence health check. Infrastructure shut down completed")); + i.Message.StartsWith(ErrorIngestion.LogMessages.StoppedInfrastructure)); return ingestionShutdown; }, From a0dd49ad843d0be2962d84ef86ac93c86cec7651 Mon Sep 17 00:00:00 2001 From: Ramon Smits Date: Tue, 4 Mar 2025 17:26:14 +0100 Subject: [PATCH 14/14] fixup! Fix failing test --- .../CustomChecks/When_critical_storage_threshold_reached.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ServiceControl.AcceptanceTests/Monitoring/CustomChecks/When_critical_storage_threshold_reached.cs b/src/ServiceControl.AcceptanceTests/Monitoring/CustomChecks/When_critical_storage_threshold_reached.cs index 742f899780..40358da9ad 100644 --- a/src/ServiceControl.AcceptanceTests/Monitoring/CustomChecks/When_critical_storage_threshold_reached.cs +++ b/src/ServiceControl.AcceptanceTests/Monitoring/CustomChecks/When_critical_storage_threshold_reached.cs @@ -34,7 +34,7 @@ await Define() .WithEndpoint(b => b .When(context => { - return context.Logs.ToArray().Any(i => i.Message.StartsWith(ErrorIngestion.LogMessages.StoppedInfrastructure)); + return context.Logs.ToArray().Any(i => i.Message.StartsWith(ErrorIngestion.LogMessages.StartedInfrastructure)); }, (_, __) => { PersisterSettings.MinimumStorageLeftRequiredForIngestion = 100;