From 2f92e71b02e0355fceaed439a9ed58470c802df0 Mon Sep 17 00:00:00 2001 From: Adam Furmanek Date: Thu, 15 May 2025 14:41:33 +0200 Subject: [PATCH] Setting proper queue name on edit & retry (#4973) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Setting proper queue name on edit & retry * Update src/ServiceControl.Persistence.Tests/Recoverability/EditMessageTests.cs Co-authored-by: Laila Bougria * Update src/ServiceControl.Persistence.Tests/Recoverability/EditMessageTests.cs Co-authored-by: Laila Bougria * ⚜️ small code improvements --------- Co-authored-by: Laila Bougria Co-authored-by: Ramon Smits --- .../Recoverability/EditMessageTests.cs | 19 + .../RetryStateTests.cs | 59 ++- .../Recoverability/Editing/EditHandler.cs | 8 +- .../Recoverability/RecoverabilityComponent.cs | 1 + .../Infrastructure/ErrorQueueNameCache.cs | 22 + .../Infrastructure/ReturnToSenderDequeuer.cs | 396 +++++++++--------- 6 files changed, 302 insertions(+), 203 deletions(-) create mode 100644 src/ServiceControl/Recoverability/Retrying/Infrastructure/ErrorQueueNameCache.cs diff --git a/src/ServiceControl.Persistence.Tests/Recoverability/EditMessageTests.cs b/src/ServiceControl.Persistence.Tests/Recoverability/EditMessageTests.cs index fcad889079..9a4f74c807 100644 --- a/src/ServiceControl.Persistence.Tests/Recoverability/EditMessageTests.cs +++ b/src/ServiceControl.Persistence.Tests/Recoverability/EditMessageTests.cs @@ -20,10 +20,15 @@ sealed class EditMessageTests : PersistenceTestBase { EditHandler handler; readonly TestableUnicastDispatcher dispatcher = new(); + readonly ErrorQueueNameCache errorQueueNameCache = new() + { + ResolvedErrorAddress = "errorQueueName" + }; public EditMessageTests() => RegisterServices = services => services .AddSingleton(dispatcher) + .AddSingleton(errorQueueNameCache) .AddTransient(); [SetUp] @@ -212,6 +217,20 @@ public async Task Should_assign_edited_message_new_message_id() Is.Not.EqualTo(messageFailure.ProcessingAttempts.Last().MessageId)); } + [Test] + public async Task Should_assign_correct_akcnowledgment_queue_address_when_editing_and_retyring() + { + var messageFailure = await CreateAndStoreFailedMessage(); + var message = CreateEditMessage(messageFailure.UniqueMessageId); + + await handler.Handle(message, new TestableInvokeHandlerContext()); + + var sentMessage = dispatcher.DispatchedMessages.Single(); + Assert.That( + sentMessage.Item1.Message.Headers["ServiceControl.Retry.AcknowledgementQueue"], + Is.EqualTo(errorQueueNameCache.ResolvedErrorAddress)); + } + static EditAndSend CreateEditMessage(string failedMessageId, byte[] newBodyContent = null, Dictionary newHeaders = null) { return new EditAndSend diff --git a/src/ServiceControl.Persistence.Tests/RetryStateTests.cs b/src/ServiceControl.Persistence.Tests/RetryStateTests.cs index dcfec5888b..3c9f426086 100644 --- a/src/ServiceControl.Persistence.Tests/RetryStateTests.cs +++ b/src/ServiceControl.Persistence.Tests/RetryStateTests.cs @@ -17,6 +17,7 @@ using ServiceControl.Persistence; using ServiceControl.Recoverability; using ServiceControl.Transports; + using QueueAddress = NServiceBus.Transport.QueueAddress; [NonParallelizable] class RetryStateTests : PersistenceTestBase @@ -51,6 +52,29 @@ public async Task When_a_group_is_prepared_and_SC_is_started_the_group_is_marked Assert.That(status.Failed, Is.True); } + [Test] + public async Task When_the_dequeuer_is_created_then_the_error_address_is_cached() + { + var domainEvents = new FakeDomainEvents(); + var errorQueueNameCache = new ErrorQueueNameCache(); + var transportInfrastructure = new TestTransportInfrastructure(new Dictionary + { + ["TestEndpoint.staging"] = null + }) + { + TransportAddress = "TestAddress" + }; + + var transportCustomization = new TestTransportCustomization { TransportInfrastructure = transportInfrastructure }; + + var testReturnToSenderDequeuer = new TestReturnToSenderDequeuer(new ReturnToSender(ErrorStore), ErrorStore, domainEvents, "TestEndpoint", + errorQueueNameCache, transportCustomization); + + await testReturnToSenderDequeuer.StartAsync(new CancellationToken()); + + Assert.That(errorQueueNameCache.ResolvedErrorAddress, Is.EqualTo(transportInfrastructure.TransportAddress)); + } + [Test] public async Task When_a_group_is_prepared_with_three_batches_and_SC_is_restarted_while_the_first_group_is_being_forwarded_then_the_count_still_matches() { @@ -60,7 +84,7 @@ public async Task When_a_group_is_prepared_with_three_batches_and_SC_is_restarte await CreateAFailedMessageAndMarkAsPartOfRetryBatch(retryManager, "Test-group", true, 2001); var sender = new TestSender(); - var processor = new RetryProcessor(RetryBatchesStore, domainEvents, new TestReturnToSenderDequeuer(new ReturnToSender(ErrorStore), ErrorStore, domainEvents, "TestEndpoint"), retryManager, new Lazy(() => sender)); + var processor = new RetryProcessor(RetryBatchesStore, domainEvents, new TestReturnToSenderDequeuer(new ReturnToSender(ErrorStore), ErrorStore, domainEvents, "TestEndpoint", new ErrorQueueNameCache(), new TestTransportCustomization()), retryManager, new Lazy(() => sender)); // Needs index RetryBatches_ByStatus_ReduceInitialBatchSize CompleteDatabaseOperation(); @@ -74,7 +98,7 @@ public async Task When_a_group_is_prepared_with_three_batches_and_SC_is_restarte await documentManager.RebuildRetryOperationState(); - processor = new RetryProcessor(RetryBatchesStore, domainEvents, new TestReturnToSenderDequeuer(new ReturnToSender(ErrorStore), ErrorStore, domainEvents, "TestEndpoint"), retryManager, new Lazy(() => sender)); + processor = new RetryProcessor(RetryBatchesStore, domainEvents, new TestReturnToSenderDequeuer(new ReturnToSender(ErrorStore), ErrorStore, domainEvents, "TestEndpoint", new ErrorQueueNameCache(), new TestTransportCustomization()), retryManager, new Lazy(() => sender)); await processor.ProcessBatches(); @@ -92,7 +116,7 @@ public async Task When_a_group_is_forwarded_the_status_is_Completed() var sender = new TestSender(); - var returnToSender = new TestReturnToSenderDequeuer(new ReturnToSender(ErrorStore), ErrorStore, domainEvents, "TestEndpoint"); + var returnToSender = new TestReturnToSenderDequeuer(new ReturnToSender(ErrorStore), ErrorStore, domainEvents, "TestEndpoint", new ErrorQueueNameCache(), new TestTransportCustomization()); var processor = new RetryProcessor(RetryBatchesStore, domainEvents, returnToSender, retryManager, new Lazy(() => sender)); await processor.ProcessBatches(); // mark ready @@ -122,7 +146,7 @@ public async Task When_there_is_one_poison_message_it_is_removed_from_batch_and_ } }; - var returnToSender = new TestReturnToSenderDequeuer(new ReturnToSender(ErrorStore), ErrorStore, domainEvents, "TestEndpoint"); + var returnToSender = new TestReturnToSenderDequeuer(new ReturnToSender(ErrorStore), ErrorStore, domainEvents, "TestEndpoint", new ErrorQueueNameCache(), new TestTransportCustomization()); var processor = new RetryProcessor(RetryBatchesStore, domainEvents, returnToSender, retryManager, new Lazy(() => sender)); bool c; @@ -163,7 +187,7 @@ public async Task When_a_group_has_one_batch_out_of_two_forwarded_the_status_is_ var sender = new TestSender(); - var processor = new RetryProcessor(RetryBatchesStore, domainEvents, new TestReturnToSenderDequeuer(returnToSender, ErrorStore, domainEvents, "TestEndpoint"), retryManager, new Lazy(() => sender)); + var processor = new RetryProcessor(RetryBatchesStore, domainEvents, new TestReturnToSenderDequeuer(returnToSender, ErrorStore, domainEvents, "TestEndpoint", new ErrorQueueNameCache(), new TestTransportCustomization()), retryManager, new Lazy(() => sender)); CompleteDatabaseOperation(); @@ -281,8 +305,9 @@ class FakeApplicationLifetime : IHostApplicationLifetime class TestReturnToSenderDequeuer : ReturnToSenderDequeuer { - public TestReturnToSenderDequeuer(ReturnToSender returnToSender, IErrorMessageDataStore store, IDomainEvents domainEvents, string endpointName) - : base(returnToSender, store, domainEvents, new TestTransportCustomization(), null, new Settings { InstanceName = endpointName }) + public TestReturnToSenderDequeuer(ReturnToSender returnToSender, IErrorMessageDataStore store, IDomainEvents domainEvents, string endpointName, + ErrorQueueNameCache cache, ITransportCustomization transportCustomization) + : base(returnToSender, store, domainEvents, transportCustomization, null, new Settings { InstanceName = endpointName }, cache) { } @@ -294,10 +319,17 @@ public override Task Run(string forwardingBatchId, Predicate fil public class TestTransportCustomization : ITransportCustomization { + public TransportInfrastructure TransportInfrastructure { get; set; } + public void AddTransportForAudit(IServiceCollection services, TransportSettings transportSettings) => throw new NotImplementedException(); public void AddTransportForMonitoring(IServiceCollection services, TransportSettings transportSettings) => throw new NotImplementedException(); public void AddTransportForPrimary(IServiceCollection services, TransportSettings transportSettings) => throw new NotImplementedException(); - public Task CreateTransportInfrastructure(string name, TransportSettings transportSettings, OnMessage onMessage = null, OnError onError = null, Func onCriticalError = null, NServiceBus.TransportTransactionMode preferredTransactionMode = NServiceBus.TransportTransactionMode.ReceiveOnly) => throw new NotImplementedException(); + + public Task CreateTransportInfrastructure(string name, + TransportSettings transportSettings, OnMessage onMessage = null, OnError onError = null, + Func onCriticalError = null, + NServiceBus.TransportTransactionMode preferredTransactionMode = + NServiceBus.TransportTransactionMode.ReceiveOnly) => Task.FromResult(TransportInfrastructure); public void CustomizeAuditEndpoint(NServiceBus.EndpointConfiguration endpointConfiguration, TransportSettings transportSettings) => throw new NotImplementedException(); public void CustomizeMonitoringEndpoint(NServiceBus.EndpointConfiguration endpointConfiguration, TransportSettings transportSettings) => throw new NotImplementedException(); public void CustomizePrimaryEndpoint(NServiceBus.EndpointConfiguration endpointConfiguration, TransportSettings transportSettings) => throw new NotImplementedException(); @@ -319,5 +351,16 @@ public Task Dispatch(TransportOperations outgoingMessages, TransportTransaction return Task.CompletedTask; } } + + public class TestTransportInfrastructure : TransportInfrastructure + { + public TestTransportInfrastructure(IReadOnlyDictionary receivers = null) => Receivers = receivers ?? new Dictionary(); + + public string TransportAddress { get; set; } + + public override Task Shutdown(CancellationToken cancellationToken = new CancellationToken()) => throw new NotImplementedException(); + + public override string ToTransportAddress(QueueAddress address) => TransportAddress; + } } } \ No newline at end of file diff --git a/src/ServiceControl/Recoverability/Editing/EditHandler.cs b/src/ServiceControl/Recoverability/Editing/EditHandler.cs index eb263bd3be..d1e6bc4685 100644 --- a/src/ServiceControl/Recoverability/Editing/EditHandler.cs +++ b/src/ServiceControl/Recoverability/Editing/EditHandler.cs @@ -14,12 +14,14 @@ class EditHandler : IHandleMessages { - public EditHandler(IErrorMessageDataStore store, IMessageRedirectsDataStore redirectsStore, IMessageDispatcher dispatcher) + public EditHandler(IErrorMessageDataStore store, IMessageRedirectsDataStore redirectsStore, IMessageDispatcher dispatcher, ErrorQueueNameCache errorQueueNameCache) { this.store = store; this.redirectsStore = redirectsStore; this.dispatcher = dispatcher; + this.errorQueueNameCache = errorQueueNameCache; corruptedReplyToHeaderStrategy = new CorruptedReplyToHeaderStrategy(RuntimeEnvironment.MachineName); + } public async Task Handle(EditAndSend message, IMessageHandlerContext context) @@ -67,7 +69,8 @@ public async Task Handle(EditAndSend message, IMessageHandlerContext context) var outgoingMessage = BuildMessage(message); // mark the new message with a link to the original message id outgoingMessage.Headers.Add("ServiceControl.EditOf", message.FailedMessageId); - outgoingMessage.Headers.Remove("ServiceControl.Retry.AcknowledgementQueue"); + outgoingMessage.Headers["ServiceControl.Retry.AcknowledgementQueue"] = errorQueueNameCache.ResolvedErrorAddress; + var address = ApplyRedirect(attempt.FailureDetails.AddressOfFailingEndpoint, redirects); if (outgoingMessage.Headers.TryGetValue("ServiceControl.RetryTo", out var retryTo)) @@ -116,6 +119,7 @@ Task DispatchEditedMessage(OutgoingMessage editedMessage, string address, IMessa readonly IErrorMessageDataStore store; readonly IMessageRedirectsDataStore redirectsStore; readonly IMessageDispatcher dispatcher; + readonly ErrorQueueNameCache errorQueueNameCache; static readonly ILog log = LogManager.GetLogger(); } } \ No newline at end of file diff --git a/src/ServiceControl/Recoverability/RecoverabilityComponent.cs b/src/ServiceControl/Recoverability/RecoverabilityComponent.cs index 1736572799..875619d35e 100644 --- a/src/ServiceControl/Recoverability/RecoverabilityComponent.cs +++ b/src/ServiceControl/Recoverability/RecoverabilityComponent.cs @@ -47,6 +47,7 @@ public override void Configure(Settings settings, ITransportCustomization transp //Return to sender - registered both as singleton and hosted service because it is a dependency of the RetryProcessor services.AddSingleton(); + services.AddSingleton(); services.AddSingleton(); services.AddHostedService(provider => provider.GetRequiredService()); diff --git a/src/ServiceControl/Recoverability/Retrying/Infrastructure/ErrorQueueNameCache.cs b/src/ServiceControl/Recoverability/Retrying/Infrastructure/ErrorQueueNameCache.cs new file mode 100644 index 0000000000..51adf69c19 --- /dev/null +++ b/src/ServiceControl/Recoverability/Retrying/Infrastructure/ErrorQueueNameCache.cs @@ -0,0 +1,22 @@ +namespace ServiceControl.Recoverability; + +using System; + +class ErrorQueueNameCache +{ + string resolvedErrorAddress; + + public string ResolvedErrorAddress + { + get + { + if (string.IsNullOrEmpty(resolvedErrorAddress)) + { + throw new InvalidOperationException($"{nameof(ResolvedErrorAddress)} is not set. Please set it before accessing."); + } + + return resolvedErrorAddress; + } + set => resolvedErrorAddress = value; + } +} diff --git a/src/ServiceControl/Recoverability/Retrying/Infrastructure/ReturnToSenderDequeuer.cs b/src/ServiceControl/Recoverability/Retrying/Infrastructure/ReturnToSenderDequeuer.cs index 648de99e02..81ef5562f3 100644 --- a/src/ServiceControl/Recoverability/Retrying/Infrastructure/ReturnToSenderDequeuer.cs +++ b/src/ServiceControl/Recoverability/Retrying/Infrastructure/ReturnToSenderDequeuer.cs @@ -1,256 +1,266 @@ -namespace ServiceControl.Recoverability +namespace ServiceControl.Recoverability; + +using System; +using System.Threading; +using System.Threading.Tasks; +using Infrastructure.DomainEvents; +using Microsoft.Extensions.Hosting; +using NServiceBus; +using NServiceBus.Logging; +using NServiceBus.Transport; +using Persistence; +using ServiceBus.Management.Infrastructure.Settings; +using ServiceControl.Transports; + +class ReturnToSenderDequeuer : IHostedService { - using System; - using System.Threading; - using System.Threading.Tasks; - using Infrastructure.DomainEvents; - using Microsoft.Extensions.Hosting; - using NServiceBus; - using NServiceBus.Logging; - using NServiceBus.Transport; - using Persistence; - using ServiceBus.Management.Infrastructure.Settings; - using ServiceControl.Transports; - - class ReturnToSenderDequeuer : IHostedService + public ReturnToSenderDequeuer( + ReturnToSender returnToSender, + IErrorMessageDataStore dataStore, + IDomainEvents domainEvents, + ITransportCustomization transportCustomization, + TransportSettings transportSettings, + Settings settings, + ErrorQueueNameCache errorQueueNameCache + ) { - public ReturnToSenderDequeuer(ReturnToSender returnToSender, IErrorMessageDataStore dataStore, IDomainEvents domainEvents, ITransportCustomization transportCustomization, TransportSettings transportSettings, Settings settings) - { - InputAddress = transportCustomization.ToTransportQualifiedQueueName(settings.StagingQueue); - this.returnToSender = returnToSender; - errorQueue = settings.ErrorQueue; - this.transportCustomization = transportCustomization; - this.transportSettings = transportSettings; - - faultManager = new CaptureIfMessageSendingFails(dataStore, domainEvents, IncrementCounterOrProlongTimer); - timer = new Timer(state => StopInternal().GetAwaiter().GetResult()); - } + InputAddress = transportCustomization.ToTransportQualifiedQueueName(settings.StagingQueue); + this.returnToSender = returnToSender; + errorQueue = settings.ErrorQueue; + this.transportCustomization = transportCustomization; + this.transportSettings = transportSettings; + this.errorQueueNameCache = errorQueueNameCache; + + faultManager = new CaptureIfMessageSendingFails(dataStore, domainEvents, IncrementCounterOrProlongTimer); + timer = new Timer(state => StopInternal().GetAwaiter().GetResult()); + } - public string InputAddress { get; } + public string InputAddress { get; } - public async Task StartAsync(CancellationToken cancellationToken) - { - transportInfrastructure = await transportCustomization.CreateTransportInfrastructure(InputAddress, transportSettings, Handle, faultManager.OnError, (_, __) => Task.CompletedTask, TransportTransactionMode.SendsAtomicWithReceive); - messageReceiver = transportInfrastructure.Receivers[InputAddress]; - messageDispatcher = transportInfrastructure.Dispatcher; + public async Task StartAsync(CancellationToken cancellationToken) + { + transportInfrastructure = await transportCustomization.CreateTransportInfrastructure(InputAddress, transportSettings, Handle, faultManager.OnError, (_, __) => Task.CompletedTask, TransportTransactionMode.SendsAtomicWithReceive); + messageReceiver = transportInfrastructure.Receivers[InputAddress]; + messageDispatcher = transportInfrastructure.Dispatcher; - errorQueueTransportAddress = transportInfrastructure.ToTransportAddress(new QueueAddress(errorQueue)); - } + errorQueueTransportAddress = transportInfrastructure.ToTransportAddress(new QueueAddress(errorQueue)); + errorQueueNameCache.ResolvedErrorAddress = errorQueueTransportAddress; + } + + public async Task StopAsync(CancellationToken cancellationToken) + { + timer.Dispose(); + endedPrematurely = true; + await StopInternal(); + await transportInfrastructure.Shutdown(cancellationToken); + } + + bool IsCounting => targetMessageCount.HasValue; - public async Task StopAsync(CancellationToken cancellationToken) + async Task Handle(MessageContext message, CancellationToken cancellationToken) + { + if (Log.IsDebugEnabled) { - timer.Dispose(); - endedPrematurely = true; - await StopInternal(); - await transportInfrastructure.Shutdown(cancellationToken); + var stagingId = message.Headers["ServiceControl.Retry.StagingId"]; + Log.DebugFormat("Handling message with id {0} and staging id {1} in input queue {2}", message.NativeMessageId, stagingId, InputAddress); } - bool IsCounting => targetMessageCount.HasValue; + if (shouldProcess(message)) + { + await returnToSender.HandleMessage(message, messageDispatcher, errorQueueTransportAddress, cancellationToken); + IncrementCounterOrProlongTimer(); + } + else + { + Log.WarnFormat("Rejecting message from staging queue as it's not part of a fully staged batch: {0}", message.NativeMessageId); + } + } - async Task Handle(MessageContext message, CancellationToken cancellationToken) + void IncrementCounterOrProlongTimer() + { + if (IsCounting) + { + CountMessageAndStopIfReachedTarget(); + } + else { if (Log.IsDebugEnabled) { - var stagingId = message.Headers["ServiceControl.Retry.StagingId"]; - Log.DebugFormat("Handling message with id {0} and staging id {1} in input queue {2}", message.NativeMessageId, stagingId, InputAddress); + Log.Debug("Resetting timer"); } - if (shouldProcess(message)) - { - await returnToSender.HandleMessage(message, messageDispatcher, errorQueueTransportAddress, cancellationToken); - IncrementCounterOrProlongTimer(); - } - else - { - Log.WarnFormat("Rejecting message from staging queue as it's not part of a fully staged batch: {0}", message.NativeMessageId); - } + timer.Change(TimeSpan.FromSeconds(45), Timeout.InfiniteTimeSpan); + } + } + + void CountMessageAndStopIfReachedTarget() + { + var currentMessageCount = Interlocked.Increment(ref actualMessageCount); + if (Log.IsDebugEnabled) + { + Log.Debug($"Forwarding message {currentMessageCount} of {targetMessageCount}."); } - void IncrementCounterOrProlongTimer() + if (currentMessageCount >= targetMessageCount.GetValueOrDefault()) { - if (IsCounting) + if (Log.IsDebugEnabled) { - CountMessageAndStopIfReachedTarget(); + Log.DebugFormat("Target count reached. Shutting down forwarder"); } - else - { - if (Log.IsDebugEnabled) - { - Log.Debug("Resetting timer"); - } - timer.Change(TimeSpan.FromSeconds(45), Timeout.InfiniteTimeSpan); - } + // NOTE: This needs to run on a different thread or a deadlock will happen trying to shut down the receiver + _ = Task.Run(StopInternal); } + } - void CountMessageAndStopIfReachedTarget() + public virtual async Task Run(string forwardingBatchId, Predicate filter, int? expectedMessageCount, CancellationToken cancellationToken = default) + { + CancellationTokenRegistration? registration = null; + try { - var currentMessageCount = Interlocked.Increment(ref actualMessageCount); + shouldProcess = filter; + targetMessageCount = expectedMessageCount; + actualMessageCount = 0; + if (Log.IsDebugEnabled) { - Log.Debug($"Forwarding message {currentMessageCount} of {targetMessageCount}."); + Log.DebugFormat("Starting receiver"); } - if (currentMessageCount >= targetMessageCount.GetValueOrDefault()) + syncEvent = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + stopCompletionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + registration = cancellationToken.Register(() => _ = syncEvent.TrySetResult(true)); + + await messageReceiver.StartReceive(cancellationToken); + + Log.Info($"Forwarder for batch {forwardingBatchId} started receiving messages from {messageReceiver.ReceiveAddress}."); + + if (!expectedMessageCount.HasValue) { if (Log.IsDebugEnabled) { - Log.DebugFormat("Target count reached. Shutting down forwarder"); + Log.Debug("Running in timeout mode. Starting timer."); } - // NOTE: This needs to run on a different thread or a deadlock will happen trying to shut down the receiver - _ = Task.Run(StopInternal); + timer.Change(TimeSpan.FromSeconds(45), Timeout.InfiniteTimeSpan); } } - - public virtual async Task Run(string forwardingBatchId, Predicate filter, int? expectedMessageCount, CancellationToken cancellationToken = default) + finally { - CancellationTokenRegistration? registration = null; - try + if (Log.IsDebugEnabled) { - shouldProcess = filter; - targetMessageCount = expectedMessageCount; - actualMessageCount = 0; - - if (Log.IsDebugEnabled) - { - Log.DebugFormat("Starting receiver"); - } - - syncEvent = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - stopCompletionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - registration = cancellationToken.Register(() => _ = syncEvent.TrySetResult(true)); - - await messageReceiver.StartReceive(cancellationToken); - - Log.Info($"Forwarder for batch {forwardingBatchId} started receiving messages from {messageReceiver.ReceiveAddress}."); - - if (!expectedMessageCount.HasValue) - { - if (Log.IsDebugEnabled) - { - Log.Debug("Running in timeout mode. Starting timer."); - } - - timer.Change(TimeSpan.FromSeconds(45), Timeout.InfiniteTimeSpan); - } + Log.DebugFormat($"Waiting for forwarder for batch {forwardingBatchId} to finish."); } - finally - { - if (Log.IsDebugEnabled) - { - Log.DebugFormat($"Waiting for forwarder for batch {forwardingBatchId} to finish."); - } - await syncEvent.Task; - registration?.Dispose(); - await messageReceiver.StopReceive(cancellationToken); + await syncEvent.Task; + registration?.Dispose(); + await messageReceiver.StopReceive(cancellationToken); - Log.Info($"Forwarder for batch {forwardingBatchId} finished forwarding all messages."); + Log.Info($"Forwarder for batch {forwardingBatchId} finished forwarding all messages."); - stopCompletionSource.TrySetResult(true); - } + stopCompletionSource.TrySetResult(true); + } - if (endedPrematurely || cancellationToken.IsCancellationRequested) - { - throw new Exception("We are in the process of shutting down. Safe to ignore."); - } + if (endedPrematurely || cancellationToken.IsCancellationRequested) + { + throw new Exception("We are in the process of shutting down. Safe to ignore."); } + } - async Task StopInternal() + async Task StopInternal() + { + if (Log.IsDebugEnabled) { - if (Log.IsDebugEnabled) - { - Log.Debug("Completing forwarding."); - } + Log.Debug("Completing forwarding."); + } - syncEvent?.TrySetResult(true); - await (stopCompletionSource?.Task ?? Task.CompletedTask); - if (Log.IsDebugEnabled) - { - Log.Debug("Forwarding completed."); - } + syncEvent?.TrySetResult(true); + await (stopCompletionSource?.Task ?? Task.CompletedTask); + if (Log.IsDebugEnabled) + { + Log.Debug("Forwarding completed."); + } + } + + Timer timer; + TaskCompletionSource syncEvent; + TaskCompletionSource stopCompletionSource; + bool endedPrematurely; + int? targetMessageCount; + int actualMessageCount; + Predicate shouldProcess; + CaptureIfMessageSendingFails faultManager; + ReturnToSender returnToSender; + readonly string errorQueue; + string errorQueueTransportAddress; + readonly ITransportCustomization transportCustomization; + readonly TransportSettings transportSettings; + readonly ErrorQueueNameCache errorQueueNameCache; + TransportInfrastructure transportInfrastructure; + IMessageDispatcher messageDispatcher; + IMessageReceiver messageReceiver; + + static readonly ILog Log = LogManager.GetLogger(typeof(ReturnToSenderDequeuer)); + + class CaptureIfMessageSendingFails + { + public CaptureIfMessageSendingFails(IErrorMessageDataStore dataStore, IDomainEvents domainEvents, Action executeOnFailure) + { + this.dataStore = dataStore; + this.executeOnFailure = executeOnFailure; + this.domainEvents = domainEvents; } - Timer timer; - TaskCompletionSource syncEvent; - TaskCompletionSource stopCompletionSource; - bool endedPrematurely; - int? targetMessageCount; - int actualMessageCount; - Predicate shouldProcess; - CaptureIfMessageSendingFails faultManager; - ReturnToSender returnToSender; - readonly string errorQueue; - string errorQueueTransportAddress; - readonly ITransportCustomization transportCustomization; - readonly TransportSettings transportSettings; - TransportInfrastructure transportInfrastructure; - IMessageDispatcher messageDispatcher; - IMessageReceiver messageReceiver; - - static readonly ILog Log = LogManager.GetLogger(typeof(ReturnToSenderDequeuer)); - - class CaptureIfMessageSendingFails + public async Task OnError(ErrorContext errorContext, CancellationToken cancellationToken = default) { - public CaptureIfMessageSendingFails(IErrorMessageDataStore dataStore, IDomainEvents domainEvents, Action executeOnFailure) - { - this.dataStore = dataStore; - this.executeOnFailure = executeOnFailure; - this.domainEvents = domainEvents; - } + // We are currently not propagating the cancellation token further since it would require to change + // the data store APIs and domain handlers to take a cancellation token. If this is needed it can be done + // at a later time. + _ = cancellationToken; - public async Task OnError(ErrorContext errorContext, CancellationToken cancellationToken = default) + try { - // We are currently not propagating the cancellation token further since it would require to change - // the data store APIs and domain handlers to take a cancellation token. If this is needed it can be done - // at a later time. - _ = cancellationToken; + var message = errorContext.Message; + var destination = message.Headers["ServiceControl.TargetEndpointAddress"]; + var messageUniqueId = message.Headers["ServiceControl.Retry.UniqueMessageId"]; + Log.Warn($"Failed to send '{messageUniqueId}' message to '{destination}' for retry. Attempting to revert message status to unresolved so it can be tried again.", errorContext.Exception); + + await dataStore.RevertRetry(messageUniqueId); + string reason; try { - var message = errorContext.Message; - var destination = message.Headers["ServiceControl.TargetEndpointAddress"]; - var messageUniqueId = message.Headers["ServiceControl.Retry.UniqueMessageId"]; - Log.Warn($"Failed to send '{messageUniqueId}' message to '{destination}' for retry. Attempting to revert message status to unresolved so it can be tried again.", errorContext.Exception); - - await dataStore.RevertRetry(messageUniqueId); - - string reason; - try - { - reason = errorContext.Exception.GetBaseException().Message; - } - catch (Exception) - { - reason = "Failed to retrieve reason!"; - } - - await domainEvents.Raise(new MessagesSubmittedForRetryFailed - { - Reason = reason, - FailedMessageId = messageUniqueId, - Destination = destination - }, cancellationToken); - } - catch (Exception ex) - { - // If something goes wrong here we just ignore, not the end of the world! - Log.Error("A failure occurred when trying to handle a retry failure.", ex); + reason = errorContext.Exception.GetBaseException().Message; } - finally + catch (Exception) { - executeOnFailure(); + reason = "Failed to retrieve reason!"; } - return ErrorHandleResult.Handled; + await domainEvents.Raise(new MessagesSubmittedForRetryFailed + { + Reason = reason, + FailedMessageId = messageUniqueId, + Destination = destination + }, cancellationToken); + } + catch (Exception ex) + { + // If something goes wrong here we just ignore, not the end of the world! + Log.Error("A failure occurred when trying to handle a retry failure.", ex); + } + finally + { + executeOnFailure(); } - readonly Action executeOnFailure; - readonly IErrorMessageDataStore dataStore; - readonly IDomainEvents domainEvents; - static readonly ILog Log = LogManager.GetLogger(typeof(CaptureIfMessageSendingFails)); + return ErrorHandleResult.Handled; } + readonly Action executeOnFailure; + readonly IErrorMessageDataStore dataStore; + readonly IDomainEvents domainEvents; + static readonly ILog Log = LogManager.GetLogger(typeof(CaptureIfMessageSendingFails)); } -} \ No newline at end of file + +}