Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using NServiceBus;
using NServiceBus.AcceptanceTesting;
using NUnit.Framework;
using Operations;
using ServiceBus.Management.Infrastructure.Settings;

[TestFixture]
Expand All @@ -33,7 +34,7 @@ await Define<ScenarioContext>()
.WithEndpoint<Sender>(b => b
.When(context =>
{
return context.Logs.ToArray().Any(i => i.Message.StartsWith("Ensure started. Infrastructure started"));
return context.Logs.ToArray().Any(i => i.Message.StartsWith(ErrorIngestion.LogMessages.StartedInfrastructure));
}, (_, __) =>
{
PersisterSettings.MinimumStorageLeftRequiredForIngestion = 100;
Expand All @@ -43,8 +44,7 @@ await Define<ScenarioContext>()
.When(context =>
{
return context.Logs.ToArray().Any(i =>
i.Message.StartsWith(
"Shutting down due to failed persistence health check. Infrastructure shut down completed"));
i.Message.StartsWith(ErrorIngestion.LogMessages.StoppedInfrastructure));
}, (bus, c) => bus.SendLocal(new MyMessage())
)
.DoNotFailOnErrorMessages())
Expand All @@ -62,8 +62,7 @@ await Define<ScenarioContext>()
.When(context =>
{
return context.Logs.ToArray().Any(i =>
i.Message.StartsWith(
"Ensure started. Infrastructure started"));
i.Message.StartsWith(ErrorIngestion.LogMessages.StartedInfrastructure));
}, (session, context) =>
{
PersisterSettings.MinimumStorageLeftRequiredForIngestion = 100;
Expand All @@ -73,8 +72,7 @@ await Define<ScenarioContext>()
.When(context =>
{
ingestionShutdown = context.Logs.ToArray().Any(i =>
i.Message.StartsWith(
"Shutting down due to failed persistence health check. Infrastructure shut down completed"));
i.Message.StartsWith(ErrorIngestion.LogMessages.StoppedInfrastructure));

return ingestionShutdown;
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System.Linq;
using System.Threading.Tasks;
using AcceptanceTesting.EndpointTemplates;
using Audit.Auditing;
using Microsoft.Extensions.DependencyInjection;
using NServiceBus;
using NServiceBus.AcceptanceTesting;
Expand Down Expand Up @@ -35,8 +36,7 @@ await Define<ScenarioContext>()
.When(context =>
{
return context.Logs.ToArray().Any(i =>
i.Message.StartsWith(
"Ensure started. Infrastructure started"));
i.Message.StartsWith(AuditIngestion.LogMessages.StartedInfrastructure));
}, (_, __) =>
{
var databaseConfiguration = ServiceProvider.GetRequiredService<DatabaseConfiguration>();
Expand All @@ -47,8 +47,7 @@ await Define<ScenarioContext>()
.When(context =>
{
return context.Logs.ToArray().Any(i =>
i.Message.StartsWith(
"Shutting down due to failed persistence health check. Infrastructure shut down completed"));
i.Message.StartsWith(AuditIngestion.LogMessages.StoppedInfrastructure));
}, (bus, c) => bus.SendLocal(new MyMessage()))
)
.Done(async c => await this.TryGetSingle<MessagesView>(
Expand All @@ -72,7 +71,7 @@ await Define<ScenarioContext>()
{
return context.Logs.ToArray().Any(i =>
i.Message.StartsWith(
"Ensure started. Infrastructure started"));
AuditIngestion.LogMessages.StartedInfrastructure));
}, (session, context) =>
{
var databaseConfiguration = ServiceProvider.GetRequiredService<DatabaseConfiguration>();
Expand All @@ -83,8 +82,7 @@ await Define<ScenarioContext>()
.When(context =>
{
ingestionShutdown = context.Logs.ToArray().Any(i =>
i.Message.StartsWith(
"Shutting down due to failed persistence health check. Infrastructure shut down completed"));
i.Message.StartsWith(AuditIngestion.LogMessages.StoppedInfrastructure));

return ingestionShutdown;
},
Expand Down
144 changes: 83 additions & 61 deletions src/ServiceControl.Audit/Auditing/AuditIngestion.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,107 +68,123 @@ Task OnCriticalError(string failure, Exception exception)
return watchdog.OnFailure(failure);
}

async Task EnsureStarted(CancellationToken cancellationToken = default)
async Task EnsureStarted(CancellationToken cancellationToken)
{
try
{
logger.Debug("Ensure started. Start/stop semaphore acquiring");
await startStopSemaphore.WaitAsync(cancellationToken);
logger.Debug("Ensure started. Start/stop semaphore acquired");

if (!unitOfWorkFactory.CanIngestMore())
{
if (queueIngestor != null)
{
var stoppable = queueIngestor;
queueIngestor = null;
logger.Info("Shutting down due to failed persistence health check. Infrastructure shut down commencing");
await stoppable.StopReceive(cancellationToken);
logger.Info("Shutting down due to failed persistence health check. Infrastructure shut down completed");
}
var canIngest = unitOfWorkFactory.CanIngestMore();

return;
}
logger.DebugFormat("Ensure started {0}", canIngest);

if (queueIngestor != null)
if (canIngest)
{
await SetUpAndStartInfrastructure(cancellationToken);
}
else
{
logger.Debug("Ensure started. Already started, skipping start up");
return; //Already started
await StopAndTeardownInfrastructure(cancellationToken);
}
}
catch (Exception e)
{
try
{
await StopAndTeardownInfrastructure(cancellationToken);
}
catch (Exception teardownException)
{
throw new AggregateException(e, teardownException);
}

logger.Info("Ensure started. Infrastructure starting");
throw;
}
finally
{
startStopSemaphore.Release();
}
}

async Task SetUpAndStartInfrastructure(CancellationToken cancellationToken)
{
if (messageReceiver != null)
{
logger.Debug("Infrastructure already Started");
return;
}

try
{
logger.Info("Starting infrastructure");
transportInfrastructure = await transportCustomization.CreateTransportInfrastructure(
inputEndpoint,
transportSettings,
OnMessage,
errorHandlingPolicy.OnError,
OnCriticalError,
TransportTransactionMode.ReceiveOnly);
TransportTransactionMode.ReceiveOnly
);

queueIngestor = transportInfrastructure.Receivers[inputEndpoint];
messageReceiver = transportInfrastructure.Receivers[inputEndpoint];

await auditIngestor.VerifyCanReachForwardingAddress();
await messageReceiver.StartReceive(cancellationToken);

await queueIngestor.StartReceive(cancellationToken);
logger.Info(LogMessages.StartedInfrastructure);
}
catch (Exception e)
{
logger.Error("Failed to start infrastructure", e);
throw;
}
}

logger.Info("Ensure started. Infrastructure started");
async Task StopAndTeardownInfrastructure(CancellationToken cancellationToken)
{
if (transportInfrastructure == null)
{
logger.Debug("Infrastructure already Stopped");
return;
}
catch

try
{
if (queueIngestor != null)
logger.Info("Stopping infrastructure");
try
{
try
{
await queueIngestor.StopReceive(cancellationToken);
}
catch (OperationCanceledException e) when (e.CancellationToken == cancellationToken)
if (messageReceiver != null)
{
logger.Info("StopReceive cancelled");
await messageReceiver.StopReceive(cancellationToken);
}
}
finally
{
await transportInfrastructure.Shutdown(cancellationToken);
}

queueIngestor = null; // Setting to null so that it doesn't exit when it retries in line 185
messageReceiver = null;
transportInfrastructure = null;

throw;
logger.Info(LogMessages.StoppedInfrastructure);
}
finally
catch (Exception e)
{
logger.Debug("Ensure started. Start/stop semaphore releasing");
startStopSemaphore.Release();
logger.Debug("Ensure started. Start/stop semaphore released");
logger.Error("Failed to stop infrastructure", e);
throw;
}
}

async Task EnsureStopped(CancellationToken cancellationToken = default)
async Task EnsureStopped(CancellationToken cancellationToken)
{
try
{
logger.Info("Shutting down. Start/stop semaphore acquiring");
await startStopSemaphore.WaitAsync(cancellationToken);
logger.Info("Shutting down. Start/stop semaphore acquired");

if (queueIngestor == null)
{
logger.Info("Shutting down. Already stopped, skipping shut down");
return; //Already stopped
}

var stoppable = queueIngestor;
queueIngestor = null;
logger.Info("Shutting down. Infrastructure shut down commencing");
await stoppable.StopReceive(cancellationToken);
logger.Info("Shutting down. Infrastructure shut down completed");
}
catch (OperationCanceledException e) when (e.CancellationToken == cancellationToken)
{
logger.Info("StopReceive cancelled");
await StopAndTeardownInfrastructure(cancellationToken);
}
finally
{
logger.Info("Shutting down. Start/stop semaphore releasing");
startStopSemaphore.Release();
logger.Info("Shutting down. Start/stop semaphore released");
}
}

Expand All @@ -194,7 +210,7 @@ async Task OnMessage(MessageContext messageContext, CancellationToken cancellati

public override async Task StartAsync(CancellationToken cancellationToken)
{
await watchdog.Start(() => applicationLifetime.StopApplication());
await watchdog.Start(() => applicationLifetime.StopApplication(), cancellationToken);
await base.StartAsync(cancellationToken);
}

Expand Down Expand Up @@ -260,7 +276,7 @@ public override async Task StopAsync(CancellationToken cancellationToken)
{
try
{
await watchdog.Stop();
await watchdog.Stop(cancellationToken);
channel.Writer.Complete();
await base.StopAsync(cancellationToken);
}
Expand All @@ -281,7 +297,7 @@ public override async Task StopAsync(CancellationToken cancellationToken)
}

TransportInfrastructure transportInfrastructure;
IMessageReceiver queueIngestor;
IMessageReceiver messageReceiver;
long consecutiveBatchFailures = 0;

readonly SemaphoreSlim startStopSemaphore = new(1);
Expand All @@ -303,5 +319,11 @@ public override async Task StopAsync(CancellationToken cancellationToken)
readonly IHostApplicationLifetime applicationLifetime;

static readonly ILog logger = LogManager.GetLogger<AuditIngestion>();

internal static class LogMessages
{
internal const string StartedInfrastructure = "Started infrastructure";
internal const string StoppedInfrastructure = "Stopped infrastructure";
}
}
}
Loading