diff --git a/src/ServiceControl.AcceptanceTests/Monitoring/CustomChecks/When_critical_storage_threshold_reached.cs b/src/ServiceControl.AcceptanceTests/Monitoring/CustomChecks/When_critical_storage_threshold_reached.cs index f7fa1ac290..40358da9ad 100644 --- a/src/ServiceControl.AcceptanceTests/Monitoring/CustomChecks/When_critical_storage_threshold_reached.cs +++ b/src/ServiceControl.AcceptanceTests/Monitoring/CustomChecks/When_critical_storage_threshold_reached.cs @@ -9,6 +9,7 @@ using NServiceBus; using NServiceBus.AcceptanceTesting; using NUnit.Framework; + using Operations; using ServiceBus.Management.Infrastructure.Settings; [TestFixture] @@ -33,7 +34,7 @@ await Define() .WithEndpoint(b => b .When(context => { - return context.Logs.ToArray().Any(i => i.Message.StartsWith("Ensure started. Infrastructure started")); + return context.Logs.ToArray().Any(i => i.Message.StartsWith(ErrorIngestion.LogMessages.StartedInfrastructure)); }, (_, __) => { PersisterSettings.MinimumStorageLeftRequiredForIngestion = 100; @@ -43,8 +44,7 @@ await Define() .When(context => { return context.Logs.ToArray().Any(i => - i.Message.StartsWith( - "Shutting down due to failed persistence health check. Infrastructure shut down completed")); + i.Message.StartsWith(ErrorIngestion.LogMessages.StoppedInfrastructure)); }, (bus, c) => bus.SendLocal(new MyMessage()) ) .DoNotFailOnErrorMessages()) @@ -62,8 +62,7 @@ await Define() .When(context => { return context.Logs.ToArray().Any(i => - i.Message.StartsWith( - "Ensure started. Infrastructure started")); + i.Message.StartsWith(ErrorIngestion.LogMessages.StartedInfrastructure)); }, (session, context) => { PersisterSettings.MinimumStorageLeftRequiredForIngestion = 100; @@ -73,8 +72,7 @@ await Define() .When(context => { ingestionShutdown = context.Logs.ToArray().Any(i => - i.Message.StartsWith( - "Shutting down due to failed persistence health check. Infrastructure shut down completed")); + i.Message.StartsWith(ErrorIngestion.LogMessages.StoppedInfrastructure)); return ingestionShutdown; }, diff --git a/src/ServiceControl.Audit.AcceptanceTests.RavenDB/Auditing/When_critical_storage_threshold_reached.cs b/src/ServiceControl.Audit.AcceptanceTests.RavenDB/Auditing/When_critical_storage_threshold_reached.cs index dcd1483d9a..68126da497 100644 --- a/src/ServiceControl.Audit.AcceptanceTests.RavenDB/Auditing/When_critical_storage_threshold_reached.cs +++ b/src/ServiceControl.Audit.AcceptanceTests.RavenDB/Auditing/When_critical_storage_threshold_reached.cs @@ -4,6 +4,7 @@ using System.Linq; using System.Threading.Tasks; using AcceptanceTesting.EndpointTemplates; + using Audit.Auditing; using Microsoft.Extensions.DependencyInjection; using NServiceBus; using NServiceBus.AcceptanceTesting; @@ -35,8 +36,7 @@ await Define() .When(context => { return context.Logs.ToArray().Any(i => - i.Message.StartsWith( - "Ensure started. Infrastructure started")); + i.Message.StartsWith(AuditIngestion.LogMessages.StartedInfrastructure)); }, (_, __) => { var databaseConfiguration = ServiceProvider.GetRequiredService(); @@ -47,8 +47,7 @@ await Define() .When(context => { return context.Logs.ToArray().Any(i => - i.Message.StartsWith( - "Shutting down due to failed persistence health check. Infrastructure shut down completed")); + i.Message.StartsWith(AuditIngestion.LogMessages.StoppedInfrastructure)); }, (bus, c) => bus.SendLocal(new MyMessage())) ) .Done(async c => await this.TryGetSingle( @@ -72,7 +71,7 @@ await Define() { return context.Logs.ToArray().Any(i => i.Message.StartsWith( - "Ensure started. Infrastructure started")); + AuditIngestion.LogMessages.StartedInfrastructure)); }, (session, context) => { var databaseConfiguration = ServiceProvider.GetRequiredService(); @@ -83,8 +82,7 @@ await Define() .When(context => { ingestionShutdown = context.Logs.ToArray().Any(i => - i.Message.StartsWith( - "Shutting down due to failed persistence health check. Infrastructure shut down completed")); + i.Message.StartsWith(AuditIngestion.LogMessages.StoppedInfrastructure)); return ingestionShutdown; }, diff --git a/src/ServiceControl.Audit/Auditing/AuditIngestion.cs b/src/ServiceControl.Audit/Auditing/AuditIngestion.cs index 6a2a039cb9..93cd91ac6f 100644 --- a/src/ServiceControl.Audit/Auditing/AuditIngestion.cs +++ b/src/ServiceControl.Audit/Auditing/AuditIngestion.cs @@ -68,107 +68,123 @@ Task OnCriticalError(string failure, Exception exception) return watchdog.OnFailure(failure); } - async Task EnsureStarted(CancellationToken cancellationToken = default) + async Task EnsureStarted(CancellationToken cancellationToken) { try { - logger.Debug("Ensure started. Start/stop semaphore acquiring"); await startStopSemaphore.WaitAsync(cancellationToken); - logger.Debug("Ensure started. Start/stop semaphore acquired"); - if (!unitOfWorkFactory.CanIngestMore()) - { - if (queueIngestor != null) - { - var stoppable = queueIngestor; - queueIngestor = null; - logger.Info("Shutting down due to failed persistence health check. Infrastructure shut down commencing"); - await stoppable.StopReceive(cancellationToken); - logger.Info("Shutting down due to failed persistence health check. Infrastructure shut down completed"); - } + var canIngest = unitOfWorkFactory.CanIngestMore(); - return; - } + logger.DebugFormat("Ensure started {0}", canIngest); - if (queueIngestor != null) + if (canIngest) + { + await SetUpAndStartInfrastructure(cancellationToken); + } + else { - logger.Debug("Ensure started. Already started, skipping start up"); - return; //Already started + await StopAndTeardownInfrastructure(cancellationToken); + } + } + catch (Exception e) + { + try + { + await StopAndTeardownInfrastructure(cancellationToken); + } + catch (Exception teardownException) + { + throw new AggregateException(e, teardownException); } - logger.Info("Ensure started. Infrastructure starting"); + throw; + } + finally + { + startStopSemaphore.Release(); + } + } + async Task SetUpAndStartInfrastructure(CancellationToken cancellationToken) + { + if (messageReceiver != null) + { + logger.Debug("Infrastructure already Started"); + return; + } + + try + { + logger.Info("Starting infrastructure"); transportInfrastructure = await transportCustomization.CreateTransportInfrastructure( inputEndpoint, transportSettings, OnMessage, errorHandlingPolicy.OnError, OnCriticalError, - TransportTransactionMode.ReceiveOnly); + TransportTransactionMode.ReceiveOnly + ); - queueIngestor = transportInfrastructure.Receivers[inputEndpoint]; + messageReceiver = transportInfrastructure.Receivers[inputEndpoint]; await auditIngestor.VerifyCanReachForwardingAddress(); + await messageReceiver.StartReceive(cancellationToken); - await queueIngestor.StartReceive(cancellationToken); + logger.Info(LogMessages.StartedInfrastructure); + } + catch (Exception e) + { + logger.Error("Failed to start infrastructure", e); + throw; + } + } - logger.Info("Ensure started. Infrastructure started"); + async Task StopAndTeardownInfrastructure(CancellationToken cancellationToken) + { + if (transportInfrastructure == null) + { + logger.Debug("Infrastructure already Stopped"); + return; } - catch + + try { - if (queueIngestor != null) + logger.Info("Stopping infrastructure"); + try { - try - { - await queueIngestor.StopReceive(cancellationToken); - } - catch (OperationCanceledException e) when (e.CancellationToken == cancellationToken) + if (messageReceiver != null) { - logger.Info("StopReceive cancelled"); + await messageReceiver.StopReceive(cancellationToken); } } + finally + { + await transportInfrastructure.Shutdown(cancellationToken); + } - queueIngestor = null; // Setting to null so that it doesn't exit when it retries in line 185 + messageReceiver = null; + transportInfrastructure = null; - throw; + logger.Info(LogMessages.StoppedInfrastructure); } - finally + catch (Exception e) { - logger.Debug("Ensure started. Start/stop semaphore releasing"); - startStopSemaphore.Release(); - logger.Debug("Ensure started. Start/stop semaphore released"); + logger.Error("Failed to stop infrastructure", e); + throw; } } - async Task EnsureStopped(CancellationToken cancellationToken = default) + async Task EnsureStopped(CancellationToken cancellationToken) { try { - logger.Info("Shutting down. Start/stop semaphore acquiring"); await startStopSemaphore.WaitAsync(cancellationToken); - logger.Info("Shutting down. Start/stop semaphore acquired"); - - if (queueIngestor == null) - { - logger.Info("Shutting down. Already stopped, skipping shut down"); - return; //Already stopped - } - - var stoppable = queueIngestor; - queueIngestor = null; - logger.Info("Shutting down. Infrastructure shut down commencing"); - await stoppable.StopReceive(cancellationToken); - logger.Info("Shutting down. Infrastructure shut down completed"); - } - catch (OperationCanceledException e) when (e.CancellationToken == cancellationToken) - { - logger.Info("StopReceive cancelled"); + await StopAndTeardownInfrastructure(cancellationToken); } finally { - logger.Info("Shutting down. Start/stop semaphore releasing"); startStopSemaphore.Release(); - logger.Info("Shutting down. Start/stop semaphore released"); } } @@ -194,7 +210,7 @@ async Task OnMessage(MessageContext messageContext, CancellationToken cancellati public override async Task StartAsync(CancellationToken cancellationToken) { - await watchdog.Start(() => applicationLifetime.StopApplication()); + await watchdog.Start(() => applicationLifetime.StopApplication(), cancellationToken); await base.StartAsync(cancellationToken); } @@ -260,7 +276,7 @@ public override async Task StopAsync(CancellationToken cancellationToken) { try { - await watchdog.Stop(); + await watchdog.Stop(cancellationToken); channel.Writer.Complete(); await base.StopAsync(cancellationToken); } @@ -281,7 +297,7 @@ public override async Task StopAsync(CancellationToken cancellationToken) } TransportInfrastructure transportInfrastructure; - IMessageReceiver queueIngestor; + IMessageReceiver messageReceiver; long consecutiveBatchFailures = 0; readonly SemaphoreSlim startStopSemaphore = new(1); @@ -303,5 +319,11 @@ public override async Task StopAsync(CancellationToken cancellationToken) readonly IHostApplicationLifetime applicationLifetime; static readonly ILog logger = LogManager.GetLogger(); + + internal static class LogMessages + { + internal const string StartedInfrastructure = "Started infrastructure"; + internal const string StoppedInfrastructure = "Stopped infrastructure"; + } } } \ No newline at end of file diff --git a/src/ServiceControl.Infrastructure.Tests/WatchdogTests.cs b/src/ServiceControl.Infrastructure.Tests/WatchdogTests.cs index ffbd465bc0..3a430ecc72 100644 --- a/src/ServiceControl.Infrastructure.Tests/WatchdogTests.cs +++ b/src/ServiceControl.Infrastructure.Tests/WatchdogTests.cs @@ -1,6 +1,7 @@ namespace ServiceControl.Infrastructure.Tests { using System; + using System.Threading; using System.Threading.Tasks; using NServiceBus.Logging; using NUnit.Framework; @@ -27,17 +28,17 @@ public async Task It_shuts_down_gracefully() return Task.CompletedTask; }, x => { }, () => { }, TimeSpan.FromSeconds(1), log); - await dog.Start(() => { }); + await dog.Start(() => { }, CancellationToken.None); await started.Task; - await dog.Stop(); + await dog.Stop(TestContext.CurrentContext.CancellationToken); await stopped.Task; } [Test] - public async Task When_stop_fails_it_reports_the_failure() + public async Task When_stop_fails_stop_should_throw_identifying_ungraceful_stop() { string lastFailure = null; var started = new TaskCompletionSource(); @@ -48,13 +49,24 @@ public async Task When_stop_fails_it_reports_the_failure() return Task.CompletedTask; }, token => throw new Exception("Simulated"), x => lastFailure = x, () => lastFailure = null, TimeSpan.FromSeconds(1), log); - await dog.Start(() => { }); + await dog.Start(() => { }, CancellationToken.None); await started.Task; - await dog.Stop(); + // The following blocks the test: + // + // var ex = Assert.ThrowsAsync(async () => await dog.Stop(TestContext.CurrentContext.CancellationToken)); + // Assert.That(ex.Message, Is.EqualTo("Simulated")); - Assert.That(lastFailure, Is.EqualTo("Simulated")); + try + { + await dog.Stop(TestContext.CurrentContext.CancellationToken); + Assert.Fail("Should have thrown an exception"); + } + catch (Exception ex) + { + Assert.That(ex.Message, Is.EqualTo("Simulated")); + } } [Test] @@ -75,6 +87,7 @@ public async Task On_failure_triggers_stopping() { restarted.SetResult(true); } + startAttempts++; return Task.CompletedTask; }, token => @@ -83,7 +96,7 @@ public async Task On_failure_triggers_stopping() return Task.CompletedTask; }, x => { }, () => { }, TimeSpan.FromSeconds(1), log); - await dog.Start(() => { }); + await dog.Start(() => { }, CancellationToken.None); await started.Task; @@ -91,7 +104,7 @@ public async Task On_failure_triggers_stopping() await restarted.Task; - await dog.Stop(); + await dog.Stop(TestContext.CurrentContext.CancellationToken); } [Test] @@ -125,11 +138,11 @@ public async Task When_first_start_attempt_works_it_recovers_from_further_errors return Task.CompletedTask; }, token => Task.CompletedTask, x => lastFailure = x, () => lastFailure = null, TimeSpan.FromSeconds(1), log); - await dog.Start(() => { }); + await dog.Start(() => { }, CancellationToken.None); await recoveredFromError.Task; - await dog.Stop(); + await dog.Stop(TestContext.CurrentContext.CancellationToken); //Make sure failure is cleared Assert.That(lastFailure, Is.Null); @@ -144,7 +157,7 @@ public async Task When_first_start_attempt_fails_onFailedOnStartup_is_called() var dog = new Watchdog("test process", token => throw new Exception("Simulated"), token => Task.CompletedTask, x => lastFailure = x, () => lastFailure = null, TimeSpan.FromSeconds(1), log); - await dog.Start(() => { onStartupFailureCalled.SetResult(true); }); + await dog.Start(() => { onStartupFailureCalled.SetResult(true); }, CancellationToken.None); await onStartupFailureCalled.Task; diff --git a/src/ServiceControl.Infrastructure/Watchdog.cs b/src/ServiceControl.Infrastructure/Watchdog.cs index c91ae2b3bf..56d111cf78 100644 --- a/src/ServiceControl.Infrastructure/Watchdog.cs +++ b/src/ServiceControl.Infrastructure/Watchdog.cs @@ -12,14 +12,19 @@ public class Watchdog Action reportFailure; Action clearFailure; Task watchdog; - CancellationTokenSource shutdownTokenSource = new CancellationTokenSource(); + CancellationTokenSource shutdownTokenSource = new(); TimeSpan timeToWaitBetweenStartupAttempts; ILog log; string taskName; - public Watchdog(string taskName, Func ensureStarted, - Func ensureStopped, Action reportFailure, Action clearFailure, - TimeSpan timeToWaitBetweenStartupAttempts, ILog log) + public Watchdog( + string taskName, + Func ensureStarted, + Func ensureStopped, Action reportFailure, + Action clearFailure, + TimeSpan timeToWaitBetweenStartupAttempts, + ILog log + ) { this.taskName = taskName; this.ensureStopped = ensureStopped; @@ -36,47 +41,47 @@ public Task OnFailure(string failure) return ensureStopped(shutdownTokenSource.Token); } - public Task Start(Action onFailedOnStartup) + public Task Start(Action onFailedOnStartup, CancellationToken cancellationToken) { watchdog = Task.Run(async () => { log.Debug($"Starting watching {taskName}"); - bool? failedOnStartup = null; + bool startup = true; while (!shutdownTokenSource.IsCancellationRequested) { try { + // Host builder start is launching the loop. The watch dog loop task runs in isolation + // We want the start not to run to infinity. An NServiceBus endpoint should easily + // start within 15 seconds. + const int MaxStartDurationMs = 15000; + using var cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(shutdownTokenSource.Token); + cancellationTokenSource.CancelAfter(MaxStartDurationMs); + log.Debug($"Ensuring {taskName} is running"); - await ensureStarted(shutdownTokenSource.Token).ConfigureAwait(false); + await ensureStarted(cancellationTokenSource.Token).ConfigureAwait(false); clearFailure(); - - failedOnStartup ??= false; + startup = false; } - catch (OperationCanceledException e) when (!shutdownTokenSource.IsCancellationRequested) + catch (OperationCanceledException e) when (shutdownTokenSource.IsCancellationRequested) { - // Continue, as OCE is not from caller - log.Info("Start cancelled, retrying...", e); - continue; + log.Debug("Cancelled", e); + return; } catch (Exception e) { reportFailure(e.Message); - if (failedOnStartup == null) + if (startup) { - failedOnStartup = true; - log.Error($"Error during initial startup attempt for {taskName}.", e); - - //there was an error during startup hence we want to shut down the instance onFailedOnStartup(); + return; } - else - { - log.Error($"Error while trying to start {taskName}. Starting will be retried in {timeToWaitBetweenStartupAttempts}.", e); - } + + log.Error($"Error while trying to start {taskName}. Starting will be retried in {timeToWaitBetweenStartupAttempts}.", e); } try { @@ -87,25 +92,28 @@ public Task Start(Action onFailedOnStartup) //Ignore, no need to log cancellation of delay } } - try - { - log.Debug($"Stopping watching process {taskName}"); - //We don't pass the shutdown token here because it has already been cancelled and we want to ensure we stop the ingestion. - await ensureStopped(CancellationToken.None).ConfigureAwait(false); - } - catch (Exception e) - { - log.Error($"Error while trying to stop {taskName}.", e); - reportFailure(e.Message); - } - }); + }, cancellationToken); + return Task.CompletedTask; } - public Task Stop() + public async Task Stop(CancellationToken cancellationToken) { - shutdownTokenSource.Cancel(); - return watchdog; + try + { + log.Debug($"Stopping watching process {taskName}"); + await shutdownTokenSource.CancelAsync().ConfigureAwait(false); + await watchdog.ConfigureAwait(false); + } + catch (Exception e) + { + log.Error($"Error while trying to stop {taskName}.", e); + throw; + } + finally + { + await ensureStopped(cancellationToken).ConfigureAwait(false); + } } } } diff --git a/src/ServiceControl/Operations/ErrorIngestion.cs b/src/ServiceControl/Operations/ErrorIngestion.cs index 00676b95a7..6d0558d3ad 100644 --- a/src/ServiceControl/Operations/ErrorIngestion.cs +++ b/src/ServiceControl/Operations/ErrorIngestion.cs @@ -72,7 +72,7 @@ public ErrorIngestion( public override async Task StartAsync(CancellationToken cancellationToken) { - await watchdog.Start(() => applicationLifetime.StopApplication()); + await watchdog.Start(() => applicationLifetime.StopApplication(), cancellationToken); await base.StartAsync(cancellationToken); } @@ -132,7 +132,7 @@ public override async Task StopAsync(CancellationToken cancellationToken) { try { - await watchdog.Stop(); + await watchdog.Stop(cancellationToken); channel.Writer.Complete(); await base.StopAsync(cancellationToken); } @@ -158,30 +158,57 @@ async Task EnsureStarted(CancellationToken cancellationToken = default) { await startStopSemaphore.WaitAsync(cancellationToken); - if (!unitOfWorkFactory.CanIngestMore()) + var canIngest = unitOfWorkFactory.CanIngestMore(); + + Logger.DebugFormat("Ensure started {0}", canIngest); + + if (canIngest) { - if (messageReceiver != null) - { - var stoppable = messageReceiver; - messageReceiver = null; - await stoppable.StopReceive(cancellationToken); - Logger.Info("Shutting down due to failed persistence health check. Infrastructure shut down completed"); - } - return; + await SetUpAndStartInfrastructure(cancellationToken); } - - if (messageReceiver != null) + else + { + await StopAndTeardownInfrastructure(cancellationToken); + } + } + catch (Exception e) + { + try { - return; //Already started + await StopAndTeardownInfrastructure(cancellationToken); + } + catch (Exception teardownException) + { + throw new AggregateException(e, teardownException); } + throw; + } + finally + { + startStopSemaphore.Release(); + } + } + + async Task SetUpAndStartInfrastructure(CancellationToken cancellationToken) + { + if (messageReceiver != null) + { + Logger.Debug("Infrastructure already Started"); + return; + } + + try + { + Logger.Info("Starting infrastructure"); transportInfrastructure = await transportCustomization.CreateTransportInfrastructure( errorQueue, transportSettings, OnMessage, errorHandlingPolicy.OnError, OnCriticalError, - TransportTransactionMode.ReceiveOnly); + TransportTransactionMode.ReceiveOnly + ); messageReceiver = transportInfrastructure.Receivers[errorQueue]; @@ -192,22 +219,45 @@ async Task EnsureStarted(CancellationToken cancellationToken = default) await messageReceiver.StartReceive(cancellationToken); - Logger.Info("Ensure started. Infrastructure started"); + Logger.Info(LogMessages.StartedInfrastructure); } - catch + catch (Exception e) { - if (messageReceiver != null) + Logger.Error("Failed to start infrastructure", e); + throw; + } + } + async Task StopAndTeardownInfrastructure(CancellationToken cancellationToken) + { + if (transportInfrastructure == null) + { + Logger.Debug("Infrastructure already Stopped"); + return; + } + try + { + Logger.Info("Stopping infrastructure"); + try + { + if (messageReceiver != null) + { + await messageReceiver.StopReceive(cancellationToken); + } + } + finally { - await messageReceiver.StopReceive(cancellationToken); + await transportInfrastructure.Shutdown(cancellationToken); } - messageReceiver = null; // Setting to null so that it doesn't exit when it retries in line 134 + messageReceiver = null; + transportInfrastructure = null; - throw; + Logger.Info(LogMessages.StoppedInfrastructure); } - finally + catch (Exception e) { - startStopSemaphore.Release(); + Logger.Error("Failed to stop infrastructure", e); + throw; } } @@ -238,14 +288,7 @@ async Task EnsureStopped(CancellationToken cancellationToken = default) try { await startStopSemaphore.WaitAsync(cancellationToken); - - if (messageReceiver == null) - { - return; //Already stopped - } - var stoppable = messageReceiver; - messageReceiver = null; - await stoppable.StopReceive(cancellationToken); + await StopAndTeardownInfrastructure(cancellationToken); } finally { @@ -253,7 +296,7 @@ async Task EnsureStopped(CancellationToken cancellationToken = default) } } - SemaphoreSlim startStopSemaphore = new SemaphoreSlim(1); + SemaphoreSlim startStopSemaphore = new(1); string errorQueue; ErrorIngestionFaultPolicy errorHandlingPolicy; TransportInfrastructure transportInfrastructure; @@ -272,5 +315,11 @@ async Task EnsureStopped(CancellationToken cancellationToken = default) readonly IHostApplicationLifetime applicationLifetime; static readonly ILog Logger = LogManager.GetLogger(); + + internal static class LogMessages + { + internal const string StartedInfrastructure = "Started infrastructure"; + internal const string StoppedInfrastructure = "Stopped infrastructure"; + } } } \ No newline at end of file