From 26893e8a92aa5968658fdcbe4a65ab989187756b Mon Sep 17 00:00:00 2001 From: Christian Date: Thu, 2 Oct 2025 09:03:57 -0700 Subject: [PATCH 1/4] EditHandler copy w/o NSB called from controller with some tests passing --- .../Recoverability/EditMessageTestsCopy.cs | 280 ++++++++++++++++++ .../HostApplicationBuilderExtensions.cs | 2 + .../Api/EditFailedMessagesController.cs | 15 +- .../Recoverability/Editing/EditHandlerCopy.cs | 131 ++++++++ 4 files changed, 426 insertions(+), 2 deletions(-) create mode 100644 src/ServiceControl.Persistence.Tests/Recoverability/EditMessageTestsCopy.cs create mode 100644 src/ServiceControl/Recoverability/Editing/EditHandlerCopy.cs diff --git a/src/ServiceControl.Persistence.Tests/Recoverability/EditMessageTestsCopy.cs b/src/ServiceControl.Persistence.Tests/Recoverability/EditMessageTestsCopy.cs new file mode 100644 index 0000000000..e2083991dd --- /dev/null +++ b/src/ServiceControl.Persistence.Tests/Recoverability/EditMessageTestsCopy.cs @@ -0,0 +1,280 @@ +namespace ServiceControl.Persistence.Tests.Recoverability +{ + using System; + using System.Collections.Generic; + using System.Linq; + using System.Text; + using System.Threading; + using System.Threading.Tasks; + using Contracts.Operations; + using MessageFailures; + using Microsoft.Extensions.DependencyInjection; + using NServiceBus.Testing; + using NServiceBus.Transport; + using NUnit.Framework; + using ServiceControl.Persistence.MessageRedirects; + using ServiceControl.Recoverability; + using ServiceControl.Recoverability.Editing; + + sealed class EditMessageTestsCopy : PersistenceTestBase + { + EditHandlerCopy handler; + readonly TestableUnicastDispatcherCopy dispatcher = new(); + readonly ErrorQueueNameCache errorQueueNameCache = new() + { + ResolvedErrorAddress = "errorQueueName" + }; + + public EditMessageTestsCopy() => + RegisterServices = services => services + .AddSingleton(dispatcher) + .AddSingleton(errorQueueNameCache) + .AddTransient(); + + [SetUp] + public void Setup() => handler = ServiceProvider.GetRequiredService(); + + [Test] + public async Task Should_discard_edit_when_failed_message_not_exists() + { + var message = CreateEditMessage("some-id"); + await handler.Handle(message, message.FailedMessageId); + + Assert.That(dispatcher.DispatchedMessages, Is.Empty); + } + + [Test] + [TestCase(FailedMessageStatus.RetryIssued)] + [TestCase(FailedMessageStatus.Archived)] + [TestCase(FailedMessageStatus.Resolved)] + public async Task Should_discard_edit_if_edited_message_not_unresolved(FailedMessageStatus status) + { + var failedMessageId = Guid.NewGuid().ToString("D"); + await CreateAndStoreFailedMessage(failedMessageId, status); + + var message = CreateEditMessage(failedMessageId); + await handler.Handle(message, message.FailedMessageId); + + var failedMessage = await ErrorMessageDataStore.ErrorBy(failedMessageId); + + var editFailedMessagesManager = await ErrorMessageDataStore.CreateEditFailedMessageManager(); + var editOperation = await editFailedMessagesManager.GetCurrentEditingMessageId(failedMessageId); + + Assert.Multiple(() => + { + Assert.That(failedMessage.Status, Is.EqualTo(status)); + Assert.That(editOperation, Is.Null); + Assert.That(dispatcher.DispatchedMessages, Is.Empty); + }); + } + + [Test] + public async Task Should_discard_edit_when_different_edit_already_exists() + { + var failedMessageId = Guid.NewGuid().ToString(); + var previousEdit = Guid.NewGuid().ToString(); + + _ = await CreateAndStoreFailedMessage(failedMessageId); + + using (var editFailedMessagesManager = await ErrorMessageDataStore.CreateEditFailedMessageManager()) + { + _ = await editFailedMessagesManager.GetFailedMessage(failedMessageId); + await editFailedMessagesManager.SetCurrentEditingMessageId(previousEdit); + await editFailedMessagesManager.SaveChanges(); + } + + var message = CreateEditMessage(failedMessageId); + + // Act + await handler.Handle(message, message.FailedMessageId); + + using (var editFailedMessagesManagerAssert = await ErrorMessageDataStore.CreateEditFailedMessageManager()) + { + var failedMessage = await editFailedMessagesManagerAssert.GetFailedMessage(failedMessageId); + var editId = await editFailedMessagesManagerAssert.GetCurrentEditingMessageId(failedMessageId); + + Assert.Multiple(() => + { + Assert.That(editId, Is.EqualTo(previousEdit)); + Assert.That(failedMessage.Status, Is.EqualTo(FailedMessageStatus.Unresolved)); + }); + } + + Assert.That(dispatcher.DispatchedMessages, Is.Empty); + } + + //[Test] + //public async Task Should_dispatch_edited_message_when_first_edit() + //{ + // var failedMessage = await CreateAndStoreFailedMessage(); + + // var newBodyContent = Encoding.UTF8.GetBytes("new body content"); + // var newHeaders = new Dictionary { { "someKey", "someValue" } }; + // var message = CreateEditMessage(failedMessage.UniqueMessageId, newBodyContent, newHeaders); + + // var handlerContent = message.FailedMessageId; + // await handler.Handle(message, handlerContent); + + // var dispatchedMessage = dispatcher.DispatchedMessages.Single(); + // Assert.Multiple(() => + // { + // Assert.That( + // dispatchedMessage.Item1.Destination, + // Is.EqualTo(failedMessage.ProcessingAttempts.Last().FailureDetails.AddressOfFailingEndpoint)); + // Assert.That(dispatchedMessage.Item1.Message.Body.ToArray(), Is.EqualTo(newBodyContent)); + // Assert.That(dispatchedMessage.Item1.Message.Headers["someKey"], Is.EqualTo("someValue")); + // }); + + // using (var x = await ErrorMessageDataStore.CreateEditFailedMessageManager()) + // { + // var failedMessage2 = await x.GetFailedMessage(failedMessage.UniqueMessageId); + // Assert.That(failedMessage2, Is.Not.Null, "Edited failed message"); + + // var editId = await x.GetCurrentEditingMessageId(failedMessage2.UniqueMessageId); + + // Assert.Multiple(() => + // { + // Assert.That(failedMessage2.Status, Is.EqualTo(FailedMessageStatus.Resolved), "Failed message status"); + // Assert.That(editId, Is.EqualTo(handlerContent.MessageId), "MessageId"); + // }); + // } + //} + + //[Test] + //public async Task Should_dispatch_edited_message_when_retrying() + //{ + // var failedMessageId = Guid.NewGuid().ToString(); + // await CreateAndStoreFailedMessage(failedMessageId); + + // var handlerContext = new TestableMessageHandlerContext(); + // var message = CreateEditMessage(failedMessageId); + // await handler.Handle(message, handlerContext); + // await handler.Handle(message, handlerContext); + + // Assert.That(dispatcher.DispatchedMessages, Has.Count.EqualTo(2), "Dispatched message count"); + //} + + //[Test] + //public async Task Should_dispatch_message_using_incoming_transaction() + //{ + // var failedMessage = await CreateAndStoreFailedMessage(); + // var message = CreateEditMessage(failedMessage.UniqueMessageId); + // var handlerContent = new TestableMessageHandlerContext(); + // var transportTransaction = new TransportTransaction(); + // handlerContent.Extensions.Set(transportTransaction); + + // await handler.Handle(message, handlerContent); + + // Assert.That(transportTransaction, Is.SameAs(dispatcher.DispatchedMessages.Single().Item2)); + //} + + //[Test] + //public async Task Should_route_to_redirect_route_if_exists() + //{ + // const string redirectAddress = "a different destination"; + // var failedMessage = await CreateAndStoreFailedMessage(); + // var message = CreateEditMessage(failedMessage.UniqueMessageId); + + // var redirects = await MessageRedirectsDataStore.GetOrCreate(); + // redirects.Redirects.Add(new MessageRedirect + // { + // FromPhysicalAddress = failedMessage.ProcessingAttempts.Last().FailureDetails.AddressOfFailingEndpoint, + // ToPhysicalAddress = redirectAddress + // }); + // await MessageRedirectsDataStore.Save(redirects); + + // await handler.Handle(message, new TestableInvokeHandlerContext()); + + // var sentMessage = dispatcher.DispatchedMessages.Single().Item1; + // Assert.That(sentMessage.Destination, Is.EqualTo(redirectAddress)); + //} + + //[Test] + //public async Task Should_mark_edited_message_with_edit_information() + //{ + // var messageFailure = await CreateAndStoreFailedMessage(); + // var message = CreateEditMessage(messageFailure.UniqueMessageId); + + // await handler.Handle(message, new TestableInvokeHandlerContext()); + + // var sentMessage = dispatcher.DispatchedMessages.Single(); + // Assert.That( + // "FailedMessages/" + sentMessage.Item1.Message.Headers["ServiceControl.EditOf"], + // Is.EqualTo(messageFailure.Id)); + //} + + //[Test] + //public async Task Should_assign_edited_message_new_message_id() + //{ + // var messageFailure = await CreateAndStoreFailedMessage(); + // var message = CreateEditMessage(messageFailure.UniqueMessageId); + + // await handler.Handle(message, new TestableInvokeHandlerContext()); + + // var sentMessage = dispatcher.DispatchedMessages.Single(); + // Assert.That( + // sentMessage.Item1.Message.MessageId, + // Is.Not.EqualTo(messageFailure.ProcessingAttempts.Last().MessageId)); + //} + + //[Test] + //public async Task Should_assign_correct_akcnowledgment_queue_address_when_editing_and_retyring() + //{ + // var messageFailure = await CreateAndStoreFailedMessage(); + // var message = CreateEditMessage(messageFailure.UniqueMessageId); + + // await handler.Handle(message, new TestableInvokeHandlerContext()); + + // var sentMessage = dispatcher.DispatchedMessages.Single(); + // Assert.That( + // sentMessage.Item1.Message.Headers["ServiceControl.Retry.AcknowledgementQueue"], + // Is.EqualTo(errorQueueNameCache.ResolvedErrorAddress)); + //} + + static EditAndSend CreateEditMessage(string failedMessageId, byte[] newBodyContent = null, Dictionary newHeaders = null) + { + return new EditAndSend + { + FailedMessageId = failedMessageId, + NewBody = Convert.ToBase64String(newBodyContent ?? Encoding.UTF8.GetBytes(Guid.NewGuid().ToString())), + NewHeaders = newHeaders ?? [] + }; + } + + async Task CreateAndStoreFailedMessage(string failedMessageId = null, FailedMessageStatus status = FailedMessageStatus.Unresolved) + { + failedMessageId ??= Guid.NewGuid().ToString(); + + var failedMessage = new FailedMessage + { + UniqueMessageId = failedMessageId, + Id = FailedMessageIdGenerator.MakeDocumentId(failedMessageId), + Status = status, + ProcessingAttempts = + [ + new FailedMessage.ProcessingAttempt + { + MessageId = Guid.NewGuid().ToString(), + FailureDetails = new FailureDetails + { + AddressOfFailingEndpoint = "OriginalEndpointAddress" + } + } + ] + }; + await ErrorMessageDataStore.StoreFailedMessagesForTestsOnly(new[] { failedMessage }); + return failedMessage; + } + } + + public sealed class TestableUnicastDispatcherCopy : IMessageDispatcher + { + public List<(UnicastTransportOperation, TransportTransaction)> DispatchedMessages { get; } = []; + + public Task Dispatch(TransportOperations outgoingMessages, TransportTransaction transaction, CancellationToken cancellationToken) + { + DispatchedMessages.AddRange(outgoingMessages.UnicastTransportOperations.Select(m => (m, transaction))); + return Task.CompletedTask; + } + } +} \ No newline at end of file diff --git a/src/ServiceControl/HostApplicationBuilderExtensions.cs b/src/ServiceControl/HostApplicationBuilderExtensions.cs index fbe99dada5..d94a148324 100644 --- a/src/ServiceControl/HostApplicationBuilderExtensions.cs +++ b/src/ServiceControl/HostApplicationBuilderExtensions.cs @@ -14,6 +14,7 @@ namespace Particular.ServiceControl using global::ServiceControl.Infrastructure.WebApi; using global::ServiceControl.Notifications.Email; using global::ServiceControl.Persistence; + using global::ServiceControl.Recoverability.Editing; using global::ServiceControl.Transports; using Licensing; using Microsoft.AspNetCore.HttpLogging; @@ -53,6 +54,7 @@ public static void AddServiceControl(this IHostApplicationBuilder hostBuilder, S services.AddSingleton(); services.AddSingleton(); + services.AddTransient(); services.AddSingleton(settings); services.AddHttpLogging(options => diff --git a/src/ServiceControl/MessageFailures/Api/EditFailedMessagesController.cs b/src/ServiceControl/MessageFailures/Api/EditFailedMessagesController.cs index 9c0413cbac..8ddd9aac86 100644 --- a/src/ServiceControl/MessageFailures/Api/EditFailedMessagesController.cs +++ b/src/ServiceControl/MessageFailures/Api/EditFailedMessagesController.cs @@ -6,11 +6,13 @@ using System.Text; using System.Threading.Tasks; using Microsoft.AspNetCore.Mvc; + using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using NServiceBus; using Persistence; using Recoverability; using ServiceBus.Management.Infrastructure.Settings; + using ServiceControl.Recoverability.Editing; [ApiController] [Route("api")] @@ -18,6 +20,7 @@ public class EditFailedMessagesController( Settings settings, IErrorMessageDataStore store, IMessageSession session, + IServiceProvider serviceContainer, ILogger logger) : ControllerBase { @@ -67,12 +70,20 @@ public async Task Edit(string failedMessageId, [FromBody] EditMes // Encode the body in base64 so that the new body doesn't have to be escaped var base64String = Convert.ToBase64String(Encoding.UTF8.GetBytes(edit.MessageBody)); - await session.SendLocal(new EditAndSend + //await session.SendLocal(new EditAndSend + //{ + // FailedMessageId = failedMessageId, + // NewBody = base64String, + // NewHeaders = edit.MessageHeaders + //}); + + var editHandler = serviceContainer.GetService(); + await editHandler.Handle(new EditAndSend { FailedMessageId = failedMessageId, NewBody = base64String, NewHeaders = edit.MessageHeaders - }); + }, failedMessageId); return Accepted(); } diff --git a/src/ServiceControl/Recoverability/Editing/EditHandlerCopy.cs b/src/ServiceControl/Recoverability/Editing/EditHandlerCopy.cs new file mode 100644 index 0000000000..d5c3ad3dc7 --- /dev/null +++ b/src/ServiceControl/Recoverability/Editing/EditHandlerCopy.cs @@ -0,0 +1,131 @@ +namespace ServiceControl.Recoverability.Editing +{ + using System; + using System.Linq; + using System.Threading; + using System.Threading.Tasks; + using MessageFailures; + using Microsoft.Extensions.Logging; + using NServiceBus; + using NServiceBus.Routing; + using NServiceBus.Support; + using NServiceBus.Transport; + using ServiceControl.Persistence; + using ServiceControl.Persistence.MessageRedirects; + + class EditHandlerCopy + { + public EditHandlerCopy(IErrorMessageDataStore store, IMessageRedirectsDataStore redirectsStore, IMessageDispatcher dispatcher, ErrorQueueNameCache errorQueueNameCache, ILogger logger) + { + this.store = store; + this.redirectsStore = redirectsStore; + this.dispatcher = dispatcher; + this.errorQueueNameCache = errorQueueNameCache; + this.logger = logger; + corruptedReplyToHeaderStrategy = new CorruptedReplyToHeaderStrategy(RuntimeEnvironment.MachineName, logger); + + } + + public async Task Handle(EditAndSend message, string messageId) + { + FailedMessage failedMessage; + using (var session = await store.CreateEditFailedMessageManager()) + { + failedMessage = await session.GetFailedMessage(message.FailedMessageId); + + if (failedMessage == null) + { + logger.LogWarning("Discarding edit {MessageId} because no message failure for id {FailedMessageId} has been found", messageId, message.FailedMessageId); + return; + } + + var editId = await session.GetCurrentEditingMessageId(message.FailedMessageId); + if (editId == null) + { + if (failedMessage.Status != FailedMessageStatus.Unresolved) + { + logger.LogWarning("Discarding edit {MessageId} because message failure {FailedMessageId} doesn't have state 'Unresolved'", messageId, message.FailedMessageId); + return; + } + + // create a retries document to prevent concurrent edits + await session.SetCurrentEditingMessageId(messageId); + } + else if (editId != messageId) + { + logger.LogWarning("Discarding edit & retry request because the failed message id {FailedMessageId} has already been edited by Message ID {EditedMessageId}", message.FailedMessageId, editId); + return; + } + + // the original failure is marked as resolved as any failures of the edited message are treated as a new message failure. + await session.SetFailedMessageAsResolved(); + + + await session.SaveChanges(); + } + + var redirects = await redirectsStore.GetOrCreate(); + + var attempt = failedMessage.ProcessingAttempts.Last(); + + var outgoingMessage = BuildMessage(message); + // mark the new message with a link to the original message id + outgoingMessage.Headers.Add("ServiceControl.EditOf", message.FailedMessageId); + outgoingMessage.Headers["ServiceControl.Retry.AcknowledgementQueue"] = errorQueueNameCache.ResolvedErrorAddress; + + var address = ApplyRedirect(attempt.FailureDetails.AddressOfFailingEndpoint, redirects); + + if (outgoingMessage.Headers.TryGetValue("ServiceControl.RetryTo", out var retryTo)) + { + outgoingMessage.Headers["ServiceControl.TargetEndpointAddress"] = address; + address = retryTo; + } + await DispatchEditedMessage(outgoingMessage, address); + } + + OutgoingMessage BuildMessage(EditAndSend message) + { + var messageId = CombGuid.Generate().ToString(); + var headers = HeaderFilter.RemoveErrorMessageHeaders(message.NewHeaders); + corruptedReplyToHeaderStrategy.FixCorruptedReplyToHeader(headers); + headers[Headers.MessageId] = Guid.NewGuid().ToString("D"); + + var body = Convert.FromBase64String(message.NewBody); + var outgoingMessage = new OutgoingMessage(messageId, headers, body); + return outgoingMessage; + } + + static string ApplyRedirect(string addressOfFailingEndpoint, MessageRedirectsCollection redirects) + { + var redirect = redirects[addressOfFailingEndpoint]; + if (redirect != null) + { + addressOfFailingEndpoint = redirect.ToPhysicalAddress; + } + + return addressOfFailingEndpoint; + } + + Task DispatchEditedMessage(OutgoingMessage editedMessage, string address) + { + AddressTag destination = new UnicastAddressTag(address); + //var transportTransaction = context.Extensions.GetOrCreate(); + + //return dispatcher.Dispatch( + // new TransportOperations(new TransportOperation(editedMessage, destination)), + // transportTransaction, + // context.CancellationToken); + + + return dispatcher.Dispatch( + new TransportOperations(new TransportOperation(editedMessage, destination)), new TransportTransaction(), CancellationToken.None); + } + + readonly CorruptedReplyToHeaderStrategy corruptedReplyToHeaderStrategy; + readonly IErrorMessageDataStore store; + readonly IMessageRedirectsDataStore redirectsStore; + readonly IMessageDispatcher dispatcher; + readonly ErrorQueueNameCache errorQueueNameCache; + readonly ILogger logger; + } +} \ No newline at end of file From 5cd2af4d93a50d2f01d27729ef98d85e18b01382 Mon Sep 17 00:00:00 2001 From: Christian Date: Thu, 2 Oct 2025 09:44:12 -0700 Subject: [PATCH 2/4] More tests passing --- .../Recoverability/EditMessageTestsCopy.cs | 94 +++++++++++-------- .../Recoverability/Editing/EditHandlerCopy.cs | 10 +- 2 files changed, 58 insertions(+), 46 deletions(-) diff --git a/src/ServiceControl.Persistence.Tests/Recoverability/EditMessageTestsCopy.cs b/src/ServiceControl.Persistence.Tests/Recoverability/EditMessageTestsCopy.cs index e2083991dd..98d2b5ae1a 100644 --- a/src/ServiceControl.Persistence.Tests/Recoverability/EditMessageTestsCopy.cs +++ b/src/ServiceControl.Persistence.Tests/Recoverability/EditMessageTestsCopy.cs @@ -103,56 +103,68 @@ public async Task Should_discard_edit_when_different_edit_already_exists() Assert.That(dispatcher.DispatchedMessages, Is.Empty); } - //[Test] - //public async Task Should_dispatch_edited_message_when_first_edit() - //{ - // var failedMessage = await CreateAndStoreFailedMessage(); + [Test] + public async Task Should_dispatch_edited_message_when_first_edit() + { + var failedMessage = await CreateAndStoreFailedMessage(); - // var newBodyContent = Encoding.UTF8.GetBytes("new body content"); - // var newHeaders = new Dictionary { { "someKey", "someValue" } }; - // var message = CreateEditMessage(failedMessage.UniqueMessageId, newBodyContent, newHeaders); + var newBodyContent = Encoding.UTF8.GetBytes("new body content"); + var newHeaders = new Dictionary { { "someKey", "someValue" } }; + var message = CreateEditMessage(failedMessage.UniqueMessageId, newBodyContent, newHeaders); - // var handlerContent = message.FailedMessageId; - // await handler.Handle(message, handlerContent); + await handler.Handle(message, message.FailedMessageId); - // var dispatchedMessage = dispatcher.DispatchedMessages.Single(); - // Assert.Multiple(() => - // { - // Assert.That( - // dispatchedMessage.Item1.Destination, - // Is.EqualTo(failedMessage.ProcessingAttempts.Last().FailureDetails.AddressOfFailingEndpoint)); - // Assert.That(dispatchedMessage.Item1.Message.Body.ToArray(), Is.EqualTo(newBodyContent)); - // Assert.That(dispatchedMessage.Item1.Message.Headers["someKey"], Is.EqualTo("someValue")); - // }); + var dispatchedMessage = dispatcher.DispatchedMessages.Single(); + Assert.Multiple(() => + { + Assert.That( + dispatchedMessage.Item1.Destination, + Is.EqualTo(failedMessage.ProcessingAttempts.Last().FailureDetails.AddressOfFailingEndpoint)); + Assert.That(dispatchedMessage.Item1.Message.Body.ToArray(), Is.EqualTo(newBodyContent)); + Assert.That(dispatchedMessage.Item1.Message.Headers["someKey"], Is.EqualTo("someValue")); + }); - // using (var x = await ErrorMessageDataStore.CreateEditFailedMessageManager()) - // { - // var failedMessage2 = await x.GetFailedMessage(failedMessage.UniqueMessageId); - // Assert.That(failedMessage2, Is.Not.Null, "Edited failed message"); + using (var x = await ErrorMessageDataStore.CreateEditFailedMessageManager()) + { + var failedMessage2 = await x.GetFailedMessage(failedMessage.UniqueMessageId); + Assert.That(failedMessage2, Is.Not.Null, "Edited failed message"); - // var editId = await x.GetCurrentEditingMessageId(failedMessage2.UniqueMessageId); + var editId = await x.GetCurrentEditingMessageId(failedMessage2.UniqueMessageId); - // Assert.Multiple(() => - // { - // Assert.That(failedMessage2.Status, Is.EqualTo(FailedMessageStatus.Resolved), "Failed message status"); - // Assert.That(editId, Is.EqualTo(handlerContent.MessageId), "MessageId"); - // }); - // } - //} + Assert.Multiple(() => + { + Assert.That(failedMessage2.Status, Is.EqualTo(FailedMessageStatus.Resolved), "Failed message status"); + Assert.That(editId, Is.EqualTo(message.FailedMessageId), "MessageId"); + }); + } + } - //[Test] - //public async Task Should_dispatch_edited_message_when_retrying() - //{ - // var failedMessageId = Guid.NewGuid().ToString(); - // await CreateAndStoreFailedMessage(failedMessageId); + [Test] + public async Task Should_dispatch_edited_message_when_retrying() + { + var failedMessageId = Guid.NewGuid().ToString(); + var controlMessageId = Guid.NewGuid().ToString(); + await CreateAndStoreFailedMessage(failedMessageId); - // var handlerContext = new TestableMessageHandlerContext(); - // var message = CreateEditMessage(failedMessageId); - // await handler.Handle(message, handlerContext); - // await handler.Handle(message, handlerContext); + var message = CreateEditMessage(failedMessageId); + await handler.Handle(message, controlMessageId); + await handler.Handle(message, controlMessageId); - // Assert.That(dispatcher.DispatchedMessages, Has.Count.EqualTo(2), "Dispatched message count"); - //} + Assert.That(dispatcher.DispatchedMessages, Has.Count.EqualTo(2), "Dispatched message count"); + } + + [Test] + public async Task Simulate_Two_Tabs_dispatching_two_control_messages_for_the_same_failed_message() + { + var failedMessageId = Guid.NewGuid().ToString(); + await CreateAndStoreFailedMessage(failedMessageId); + + var message = CreateEditMessage(failedMessageId); + await handler.Handle(message, Guid.NewGuid().ToString()); + await handler.Handle(message, Guid.NewGuid().ToString()); + + Assert.That(dispatcher.DispatchedMessages, Has.Count.EqualTo(1), "Dispatched message count"); + } //[Test] //public async Task Should_dispatch_message_using_incoming_transaction() diff --git a/src/ServiceControl/Recoverability/Editing/EditHandlerCopy.cs b/src/ServiceControl/Recoverability/Editing/EditHandlerCopy.cs index d5c3ad3dc7..b8a88e77a4 100644 --- a/src/ServiceControl/Recoverability/Editing/EditHandlerCopy.cs +++ b/src/ServiceControl/Recoverability/Editing/EditHandlerCopy.cs @@ -26,7 +26,7 @@ public EditHandlerCopy(IErrorMessageDataStore store, IMessageRedirectsDataStore } - public async Task Handle(EditAndSend message, string messageId) + public async Task Handle(EditAndSend message, string retryAttemptId) { FailedMessage failedMessage; using (var session = await store.CreateEditFailedMessageManager()) @@ -35,7 +35,7 @@ public async Task Handle(EditAndSend message, string messageId) if (failedMessage == null) { - logger.LogWarning("Discarding edit {MessageId} because no message failure for id {FailedMessageId} has been found", messageId, message.FailedMessageId); + logger.LogWarning("Discarding edit {MessageId} because no message failure for id {FailedMessageId} has been found", retryAttemptId, message.FailedMessageId); return; } @@ -44,14 +44,14 @@ public async Task Handle(EditAndSend message, string messageId) { if (failedMessage.Status != FailedMessageStatus.Unresolved) { - logger.LogWarning("Discarding edit {MessageId} because message failure {FailedMessageId} doesn't have state 'Unresolved'", messageId, message.FailedMessageId); + logger.LogWarning("Discarding edit {MessageId} because message failure {FailedMessageId} doesn't have state 'Unresolved'", retryAttemptId, message.FailedMessageId); return; } // create a retries document to prevent concurrent edits - await session.SetCurrentEditingMessageId(messageId); + await session.SetCurrentEditingMessageId(retryAttemptId); } - else if (editId != messageId) + else if (editId != retryAttemptId) { logger.LogWarning("Discarding edit & retry request because the failed message id {FailedMessageId} has already been edited by Message ID {EditedMessageId}", message.FailedMessageId, editId); return; From 2f37369ec688cd46cfabb5b7cc4fb21c9eeae1ec Mon Sep 17 00:00:00 2001 From: Christian Date: Thu, 2 Oct 2025 09:52:42 -0700 Subject: [PATCH 3/4] Dont not use failed message id as the attempt id Receive this as a parameter is something we might want to do as was only applicable for the NSB handler version --- .../MessageFailures/Api/EditFailedMessagesController.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ServiceControl/MessageFailures/Api/EditFailedMessagesController.cs b/src/ServiceControl/MessageFailures/Api/EditFailedMessagesController.cs index 8ddd9aac86..8e9bbeabf4 100644 --- a/src/ServiceControl/MessageFailures/Api/EditFailedMessagesController.cs +++ b/src/ServiceControl/MessageFailures/Api/EditFailedMessagesController.cs @@ -83,7 +83,7 @@ await editHandler.Handle(new EditAndSend FailedMessageId = failedMessageId, NewBody = base64String, NewHeaders = edit.MessageHeaders - }, failedMessageId); + }, Guid.NewGuid().ToString()); return Accepted(); } From 0a398f678d0a878985a61e4c24698345f33614be Mon Sep 17 00:00:00 2001 From: Christian Date: Thu, 2 Oct 2025 10:45:32 -0700 Subject: [PATCH 4/4] address lint error --- .../Recoverability/EditMessageTestsCopy.cs | 4 +--- .../MessageFailures/Api/EditFailedMessagesController.cs | 1 - 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/src/ServiceControl.Persistence.Tests/Recoverability/EditMessageTestsCopy.cs b/src/ServiceControl.Persistence.Tests/Recoverability/EditMessageTestsCopy.cs index 98d2b5ae1a..95fece1520 100644 --- a/src/ServiceControl.Persistence.Tests/Recoverability/EditMessageTestsCopy.cs +++ b/src/ServiceControl.Persistence.Tests/Recoverability/EditMessageTestsCopy.cs @@ -9,10 +9,8 @@ using Contracts.Operations; using MessageFailures; using Microsoft.Extensions.DependencyInjection; - using NServiceBus.Testing; using NServiceBus.Transport; using NUnit.Framework; - using ServiceControl.Persistence.MessageRedirects; using ServiceControl.Recoverability; using ServiceControl.Recoverability.Editing; @@ -139,7 +137,7 @@ public async Task Should_dispatch_edited_message_when_first_edit() } } - [Test] + [Test] public async Task Should_dispatch_edited_message_when_retrying() { var failedMessageId = Guid.NewGuid().ToString(); diff --git a/src/ServiceControl/MessageFailures/Api/EditFailedMessagesController.cs b/src/ServiceControl/MessageFailures/Api/EditFailedMessagesController.cs index 8e9bbeabf4..8a644fa4c6 100644 --- a/src/ServiceControl/MessageFailures/Api/EditFailedMessagesController.cs +++ b/src/ServiceControl/MessageFailures/Api/EditFailedMessagesController.cs @@ -19,7 +19,6 @@ public class EditFailedMessagesController( Settings settings, IErrorMessageDataStore store, - IMessageSession session, IServiceProvider serviceContainer, ILogger logger) : ControllerBase