Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,15 @@ sealed class EditMessageTests : PersistenceTestBase
{
EditHandler handler;
readonly TestableUnicastDispatcher dispatcher = new();
readonly ErrorQueueNameCache errorQueueNameCache = new()
{
ResolvedErrorAddress = "errorQueueName"
};

public EditMessageTests() =>
RegisterServices = services => services
.AddSingleton<IMessageDispatcher>(dispatcher)
.AddSingleton<ErrorQueueNameCache>(errorQueueNameCache)
.AddTransient<EditHandler>();

[SetUp]
Expand Down Expand Up @@ -212,6 +217,20 @@ public async Task Should_assign_edited_message_new_message_id()
Is.Not.EqualTo(messageFailure.ProcessingAttempts.Last().MessageId));
}

[Test]
public async Task Should_assign_correct_akcnowledgment_queue_address_when_editing_and_retyring()
{
var messageFailure = await CreateAndStoreFailedMessage();
var message = CreateEditMessage(messageFailure.UniqueMessageId);

await handler.Handle(message, new TestableInvokeHandlerContext());

var sentMessage = dispatcher.DispatchedMessages.Single();
Assert.That(
sentMessage.Item1.Message.Headers["ServiceControl.Retry.AcknowledgementQueue"],
Is.EqualTo(errorQueueNameCache.ResolvedErrorAddress));
}

static EditAndSend CreateEditMessage(string failedMessageId, byte[] newBodyContent = null, Dictionary<string, string> newHeaders = null)
{
return new EditAndSend
Expand Down
59 changes: 51 additions & 8 deletions src/ServiceControl.Persistence.Tests/RetryStateTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
using ServiceControl.Persistence;
using ServiceControl.Recoverability;
using ServiceControl.Transports;
using QueueAddress = NServiceBus.Transport.QueueAddress;

[NonParallelizable]
class RetryStateTests : PersistenceTestBase
Expand Down Expand Up @@ -51,6 +52,29 @@ public async Task When_a_group_is_prepared_and_SC_is_started_the_group_is_marked
Assert.That(status.Failed, Is.True);
}

[Test]
public async Task When_the_dequeuer_is_created_then_the_error_address_is_cached()
{
var domainEvents = new FakeDomainEvents();
var errorQueueNameCache = new ErrorQueueNameCache();
var transportInfrastructure = new TestTransportInfrastructure(new Dictionary<string, IMessageReceiver>
{
["TestEndpoint.staging"] = null
})
{
TransportAddress = "TestAddress"
};

var transportCustomization = new TestTransportCustomization { TransportInfrastructure = transportInfrastructure };

var testReturnToSenderDequeuer = new TestReturnToSenderDequeuer(new ReturnToSender(ErrorStore), ErrorStore, domainEvents, "TestEndpoint",
errorQueueNameCache, transportCustomization);

await testReturnToSenderDequeuer.StartAsync(new CancellationToken());

Assert.That(errorQueueNameCache.ResolvedErrorAddress, Is.EqualTo(transportInfrastructure.TransportAddress));
}

[Test]
public async Task When_a_group_is_prepared_with_three_batches_and_SC_is_restarted_while_the_first_group_is_being_forwarded_then_the_count_still_matches()
{
Expand All @@ -60,7 +84,7 @@ public async Task When_a_group_is_prepared_with_three_batches_and_SC_is_restarte
await CreateAFailedMessageAndMarkAsPartOfRetryBatch(retryManager, "Test-group", true, 2001);

var sender = new TestSender();
var processor = new RetryProcessor(RetryBatchesStore, domainEvents, new TestReturnToSenderDequeuer(new ReturnToSender(ErrorStore), ErrorStore, domainEvents, "TestEndpoint"), retryManager, new Lazy<IMessageDispatcher>(() => sender));
var processor = new RetryProcessor(RetryBatchesStore, domainEvents, new TestReturnToSenderDequeuer(new ReturnToSender(ErrorStore), ErrorStore, domainEvents, "TestEndpoint", new ErrorQueueNameCache(), new TestTransportCustomization()), retryManager, new Lazy<IMessageDispatcher>(() => sender));

// Needs index RetryBatches_ByStatus_ReduceInitialBatchSize
CompleteDatabaseOperation();
Expand All @@ -74,7 +98,7 @@ public async Task When_a_group_is_prepared_with_three_batches_and_SC_is_restarte

await documentManager.RebuildRetryOperationState();

processor = new RetryProcessor(RetryBatchesStore, domainEvents, new TestReturnToSenderDequeuer(new ReturnToSender(ErrorStore), ErrorStore, domainEvents, "TestEndpoint"), retryManager, new Lazy<IMessageDispatcher>(() => sender));
processor = new RetryProcessor(RetryBatchesStore, domainEvents, new TestReturnToSenderDequeuer(new ReturnToSender(ErrorStore), ErrorStore, domainEvents, "TestEndpoint", new ErrorQueueNameCache(), new TestTransportCustomization()), retryManager, new Lazy<IMessageDispatcher>(() => sender));

await processor.ProcessBatches();

Expand All @@ -92,7 +116,7 @@ public async Task When_a_group_is_forwarded_the_status_is_Completed()

var sender = new TestSender();

var returnToSender = new TestReturnToSenderDequeuer(new ReturnToSender(ErrorStore), ErrorStore, domainEvents, "TestEndpoint");
var returnToSender = new TestReturnToSenderDequeuer(new ReturnToSender(ErrorStore), ErrorStore, domainEvents, "TestEndpoint", new ErrorQueueNameCache(), new TestTransportCustomization());
var processor = new RetryProcessor(RetryBatchesStore, domainEvents, returnToSender, retryManager, new Lazy<IMessageDispatcher>(() => sender));

await processor.ProcessBatches(); // mark ready
Expand Down Expand Up @@ -122,7 +146,7 @@ public async Task When_there_is_one_poison_message_it_is_removed_from_batch_and_
}
};

var returnToSender = new TestReturnToSenderDequeuer(new ReturnToSender(ErrorStore), ErrorStore, domainEvents, "TestEndpoint");
var returnToSender = new TestReturnToSenderDequeuer(new ReturnToSender(ErrorStore), ErrorStore, domainEvents, "TestEndpoint", new ErrorQueueNameCache(), new TestTransportCustomization());
var processor = new RetryProcessor(RetryBatchesStore, domainEvents, returnToSender, retryManager, new Lazy<IMessageDispatcher>(() => sender));

bool c;
Expand Down Expand Up @@ -163,7 +187,7 @@ public async Task When_a_group_has_one_batch_out_of_two_forwarded_the_status_is_

var sender = new TestSender();

var processor = new RetryProcessor(RetryBatchesStore, domainEvents, new TestReturnToSenderDequeuer(returnToSender, ErrorStore, domainEvents, "TestEndpoint"), retryManager, new Lazy<IMessageDispatcher>(() => sender));
var processor = new RetryProcessor(RetryBatchesStore, domainEvents, new TestReturnToSenderDequeuer(returnToSender, ErrorStore, domainEvents, "TestEndpoint", new ErrorQueueNameCache(), new TestTransportCustomization()), retryManager, new Lazy<IMessageDispatcher>(() => sender));

CompleteDatabaseOperation();

Expand Down Expand Up @@ -281,8 +305,9 @@ class FakeApplicationLifetime : IHostApplicationLifetime

class TestReturnToSenderDequeuer : ReturnToSenderDequeuer
{
public TestReturnToSenderDequeuer(ReturnToSender returnToSender, IErrorMessageDataStore store, IDomainEvents domainEvents, string endpointName)
: base(returnToSender, store, domainEvents, new TestTransportCustomization(), null, new Settings { InstanceName = endpointName })
public TestReturnToSenderDequeuer(ReturnToSender returnToSender, IErrorMessageDataStore store, IDomainEvents domainEvents, string endpointName,
ErrorQueueNameCache cache, ITransportCustomization transportCustomization)
: base(returnToSender, store, domainEvents, transportCustomization, null, new Settings { InstanceName = endpointName }, cache)
{
}

Expand All @@ -294,10 +319,17 @@ public override Task Run(string forwardingBatchId, Predicate<MessageContext> fil

public class TestTransportCustomization : ITransportCustomization
{
public TransportInfrastructure TransportInfrastructure { get; set; }

public void AddTransportForAudit(IServiceCollection services, TransportSettings transportSettings) => throw new NotImplementedException();
public void AddTransportForMonitoring(IServiceCollection services, TransportSettings transportSettings) => throw new NotImplementedException();
public void AddTransportForPrimary(IServiceCollection services, TransportSettings transportSettings) => throw new NotImplementedException();
public Task<TransportInfrastructure> CreateTransportInfrastructure(string name, TransportSettings transportSettings, OnMessage onMessage = null, OnError onError = null, Func<string, Exception, Task> onCriticalError = null, NServiceBus.TransportTransactionMode preferredTransactionMode = NServiceBus.TransportTransactionMode.ReceiveOnly) => throw new NotImplementedException();

public Task<TransportInfrastructure> CreateTransportInfrastructure(string name,
TransportSettings transportSettings, OnMessage onMessage = null, OnError onError = null,
Func<string, Exception, Task> onCriticalError = null,
NServiceBus.TransportTransactionMode preferredTransactionMode =
NServiceBus.TransportTransactionMode.ReceiveOnly) => Task.FromResult(TransportInfrastructure);
public void CustomizeAuditEndpoint(NServiceBus.EndpointConfiguration endpointConfiguration, TransportSettings transportSettings) => throw new NotImplementedException();
public void CustomizeMonitoringEndpoint(NServiceBus.EndpointConfiguration endpointConfiguration, TransportSettings transportSettings) => throw new NotImplementedException();
public void CustomizePrimaryEndpoint(NServiceBus.EndpointConfiguration endpointConfiguration, TransportSettings transportSettings) => throw new NotImplementedException();
Expand All @@ -319,5 +351,16 @@ public Task Dispatch(TransportOperations outgoingMessages, TransportTransaction
return Task.CompletedTask;
}
}

public class TestTransportInfrastructure : TransportInfrastructure
{
public TestTransportInfrastructure(IReadOnlyDictionary<string, IMessageReceiver> receivers = null) => Receivers = receivers ?? new Dictionary<string, IMessageReceiver>();

public string TransportAddress { get; set; }

public override Task Shutdown(CancellationToken cancellationToken = new CancellationToken()) => throw new NotImplementedException();

public override string ToTransportAddress(QueueAddress address) => TransportAddress;
}
}
}
8 changes: 6 additions & 2 deletions src/ServiceControl/Recoverability/Editing/EditHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@

class EditHandler : IHandleMessages<EditAndSend>
{
public EditHandler(IErrorMessageDataStore store, IMessageRedirectsDataStore redirectsStore, IMessageDispatcher dispatcher)
public EditHandler(IErrorMessageDataStore store, IMessageRedirectsDataStore redirectsStore, IMessageDispatcher dispatcher, ErrorQueueNameCache errorQueueNameCache)
{
this.store = store;
this.redirectsStore = redirectsStore;
this.dispatcher = dispatcher;
this.errorQueueNameCache = errorQueueNameCache;
corruptedReplyToHeaderStrategy = new CorruptedReplyToHeaderStrategy(RuntimeEnvironment.MachineName);

}

public async Task Handle(EditAndSend message, IMessageHandlerContext context)
Expand Down Expand Up @@ -67,7 +69,8 @@ public async Task Handle(EditAndSend message, IMessageHandlerContext context)
var outgoingMessage = BuildMessage(message);
// mark the new message with a link to the original message id
outgoingMessage.Headers.Add("ServiceControl.EditOf", message.FailedMessageId);
outgoingMessage.Headers.Remove("ServiceControl.Retry.AcknowledgementQueue");
outgoingMessage.Headers["ServiceControl.Retry.AcknowledgementQueue"] = errorQueueNameCache.ResolvedErrorAddress;

var address = ApplyRedirect(attempt.FailureDetails.AddressOfFailingEndpoint, redirects);

if (outgoingMessage.Headers.TryGetValue("ServiceControl.RetryTo", out var retryTo))
Expand Down Expand Up @@ -116,6 +119,7 @@ Task DispatchEditedMessage(OutgoingMessage editedMessage, string address, IMessa
readonly IErrorMessageDataStore store;
readonly IMessageRedirectsDataStore redirectsStore;
readonly IMessageDispatcher dispatcher;
readonly ErrorQueueNameCache errorQueueNameCache;
static readonly ILog log = LogManager.GetLogger<EditHandler>();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public override void Configure(Settings settings, ITransportCustomization transp

//Return to sender - registered both as singleton and hosted service because it is a dependency of the RetryProcessor
services.AddSingleton<ReturnToSender>();
services.AddSingleton<ErrorQueueNameCache>();
services.AddSingleton<ReturnToSenderDequeuer>();
services.AddHostedService(provider => provider.GetRequiredService<ReturnToSenderDequeuer>());

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
namespace ServiceControl.Recoverability;

using System;

class ErrorQueueNameCache
{
string resolvedErrorAddress;

public string ResolvedErrorAddress
{
get
{
if (string.IsNullOrEmpty(resolvedErrorAddress))
{
throw new InvalidOperationException($"{nameof(ResolvedErrorAddress)} is not set. Please set it before accessing.");
}

return resolvedErrorAddress;
}
set => resolvedErrorAddress = value;
}
}
Loading