From fdd69006afc59140225c86f091965ed7825f3c06 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20=C3=96hlund?= Date: Tue, 22 Apr 2025 12:34:26 +0200 Subject: [PATCH 01/11] Refine test to show minimal metadata needed for errors to be imported successfully --- ...ng_message_with_missing_metadata_failed.cs | 38 ++++++++++--------- .../FailureDetails.cs | 7 +--- ...ectNewEndpointsFromErrorImportsEnricher.cs | 9 +---- .../Operations/ErrorIngestor.cs | 12 ++---- .../Operations/ErrorProcessor.cs | 19 +++------- 5 files changed, 31 insertions(+), 54 deletions(-) diff --git a/src/ServiceControl.AcceptanceTests/Recoverability/MessageFailures/When_processing_message_with_missing_metadata_failed.cs b/src/ServiceControl.AcceptanceTests/Recoverability/MessageFailures/When_processing_message_with_missing_metadata_failed.cs index 6fad5fd287..522b595255 100644 --- a/src/ServiceControl.AcceptanceTests/Recoverability/MessageFailures/When_processing_message_with_missing_metadata_failed.cs +++ b/src/ServiceControl.AcceptanceTests/Recoverability/MessageFailures/When_processing_message_with_missing_metadata_failed.cs @@ -42,13 +42,13 @@ public async Task TimeSent_should_not_be_casted() var sentTime = DateTime.Parse("2014-11-11T02:26:58.000462Z"); - await Define(ctx => { ctx.TimeSent = sentTime; }) + await Define(ctx => ctx.TimeSent = sentTime) .WithEndpoint() .Done(async c => { var result = await this.TryGet($"/api/errors/last/{c.UniqueMessageId}"); failure = result; - return c.UniqueMessageId != null & result; + return (c.UniqueMessageId != null) & result; }) .Run(); @@ -61,22 +61,31 @@ public async Task Should_be_able_to_get_the_message_by_id() { FailedMessageView failure = null; - await Define() + var testStartTime = DateTime.UtcNow; + + var context = await Define() .WithEndpoint() .Done(async c => { var result = await this.TryGet($"/api/errors/last/{c.UniqueMessageId}"); failure = result; - return c.UniqueMessageId != null & result; + return (c.UniqueMessageId != null) & result; }) .Run(); Assert.That(failure, Is.Not.Null); + + //No failure time will result in utc now being used + Assert.That(failure.TimeOfFailure, Is.GreaterThan(testStartTime)); + + // ServicePulse assumes that the receiving endpoint name is set making sure that its present + Assert.That(failure.ReceivingEndpoint, Is.Not.Null); + Assert.That(failure.ReceivingEndpoint.Name, Is.EqualTo(context.EndpointNameOfReceivingEndpoint)); } - public class Failing : EndpointConfigurationBuilder + class Failing : EndpointConfigurationBuilder { - public Failing() => EndpointSetup(c => { c.Recoverability().Delayed(x => x.NumberOfRetries(0)); }); + public Failing() => EndpointSetup(c => c.Recoverability().Delayed(x => x.NumberOfRetries(0))); class SendFailedMessage : DispatchRawMessages { @@ -89,30 +98,23 @@ protected override TransportOperations CreateMessage(MyContext context) var headers = new Dictionary { [Headers.MessageId] = context.MessageId, - [Headers.ProcessingEndpoint] = context.EndpointNameOfReceivingEndpoint, - ["NServiceBus.ExceptionInfo.ExceptionType"] = "2014-11-11 02:26:57:767462 Z", - ["NServiceBus.ExceptionInfo.Message"] = "An error occurred while attempting to extract logical messages from transport message NServiceBus.TransportMessage", - ["NServiceBus.ExceptionInfo.InnerExceptionType"] = "System.Exception", - ["NServiceBus.ExceptionInfo.Source"] = "NServiceBus.Core", - ["NServiceBus.ExceptionInfo.StackTrace"] = string.Empty, ["NServiceBus.FailedQ"] = Conventions.EndpointNamingConvention(typeof(Failing)), - ["NServiceBus.TimeOfFailure"] = "2014-11-11 02:26:58:000462 Z" + [Headers.ProcessingMachine] = "unknown", // This is needed for endpoint detection to work, endpoint name is detected from the FailedQ header }; + if (context.TimeSent.HasValue) { headers["NServiceBus.TimeSent"] = DateTimeOffsetHelper.ToWireFormattedString(context.TimeSent.Value); } - var outgoingMessage = new OutgoingMessage(context.MessageId, headers, new byte[0]); + var outgoingMessage = new OutgoingMessage(context.MessageId, headers, Array.Empty()); - return new TransportOperations( - new TransportOperation(outgoingMessage, new UnicastAddressTag("error")) - ); + return new TransportOperations(new TransportOperation(outgoingMessage, new UnicastAddressTag("error"))); } } } - public class MyContext : ScenarioContext + class MyContext : ScenarioContext { public string MessageId { get; set; } diff --git a/src/ServiceControl.Persistence/FailureDetails.cs b/src/ServiceControl.Persistence/FailureDetails.cs index 81d7be97e7..180dd830a1 100644 --- a/src/ServiceControl.Persistence/FailureDetails.cs +++ b/src/ServiceControl.Persistence/FailureDetails.cs @@ -4,14 +4,9 @@ namespace ServiceControl.Contracts.Operations public class FailureDetails { - public FailureDetails() - { - TimeOfFailure = DateTime.UtcNow; - } - public string AddressOfFailingEndpoint { get; set; } - public DateTime TimeOfFailure { get; set; } + public DateTime TimeOfFailure { get; set; } = DateTime.UtcNow; public ExceptionDetails Exception { get; set; } } diff --git a/src/ServiceControl/Monitoring/DetectNewEndpointsFromErrorImportsEnricher.cs b/src/ServiceControl/Monitoring/DetectNewEndpointsFromErrorImportsEnricher.cs index 1e01e73b97..c067fa1746 100644 --- a/src/ServiceControl/Monitoring/DetectNewEndpointsFromErrorImportsEnricher.cs +++ b/src/ServiceControl/Monitoring/DetectNewEndpointsFromErrorImportsEnricher.cs @@ -5,13 +5,8 @@ using ServiceControl.Contracts.Operations; using ServiceControl.Persistence; - class DetectNewEndpointsFromErrorImportsEnricher : IEnrichImportedErrorMessages + class DetectNewEndpointsFromErrorImportsEnricher(IEndpointInstanceMonitoring monitoring) : IEnrichImportedErrorMessages { - public DetectNewEndpointsFromErrorImportsEnricher(IEndpointInstanceMonitoring monitoring) - { - this.monitoring = monitoring; - } - public void Enrich(ErrorEnricherContext context) { var sendingEndpoint = EndpointDetailsParser.SendingEndpoint(context.Headers); @@ -47,7 +42,5 @@ void TryAddEndpoint(EndpointDetails endpointDetails, ErrorEnricherContext contex context.Add(endpointDetails); } } - - IEndpointInstanceMonitoring monitoring; } } \ No newline at end of file diff --git a/src/ServiceControl/Operations/ErrorIngestor.cs b/src/ServiceControl/Operations/ErrorIngestor.cs index 660dcafbb1..88b3513738 100644 --- a/src/ServiceControl/Operations/ErrorIngestor.cs +++ b/src/ServiceControl/Operations/ErrorIngestor.cs @@ -37,13 +37,7 @@ public ErrorIngestor(Metrics metrics, bulkInsertDurationMeter = metrics.GetMeter("Error ingestion - bulk insert duration", FrequencyInMilliseconds); var ingestedMeter = metrics.GetCounter("Error ingestion - ingested"); - var enrichers = new IEnrichImportedErrorMessages[] - { - new MessageTypeEnricher(), - new EnrichWithTrackingIds(), - new ProcessingStatisticsEnricher() - - }.Concat(errorEnrichers).ToArray(); + var enrichers = new IEnrichImportedErrorMessages[] { new MessageTypeEnricher(), new EnrichWithTrackingIds(), new ProcessingStatisticsEnricher() }.Concat(errorEnrichers).ToArray(); errorProcessor = new ErrorProcessor(enrichers, failedMessageEnrichers.ToArray(), domainEvents, ingestedMeter); retryConfirmationProcessor = new RetryConfirmationProcessor(domainEvents); @@ -67,7 +61,6 @@ public async Task Ingest(List contexts, CancellationToken cancel } } - var storedFailed = await PersistFailedMessages(failedMessages, retriedMessages, cancellationToken); try @@ -77,6 +70,7 @@ public async Task Ingest(List contexts, CancellationToken cancel { announcerTasks.Add(errorProcessor.Announce(context)); } + foreach (var context in retriedMessages) { announcerTasks.Add(retryConfirmationProcessor.Announce(context)); @@ -90,6 +84,7 @@ public async Task Ingest(List contexts, CancellationToken cancel { Logger.Debug($"Forwarding {storedFailed.Count} messages"); } + await Forward(storedFailed, cancellationToken); if (Logger.IsDebugEnabled) { @@ -133,6 +128,7 @@ async Task> PersistFailedMessages(List> Process(IReadOnlyList contexts, IIngestionUnitOfWork unitOfWork) { var storedContexts = new List(contexts.Count); @@ -169,10 +163,7 @@ static void RecordKnownEndpoints(EndpointDetails observedEndpoint, Dictionary(); } } \ No newline at end of file From b61f8cf0f6c9f6cf3040a98173aa1620da5dd340 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20=C3=96hlund?= Date: Tue, 22 Apr 2025 12:39:14 +0200 Subject: [PATCH 02/11] Wording --- .../When_processing_message_with_missing_metadata_failed.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ServiceControl.AcceptanceTests/Recoverability/MessageFailures/When_processing_message_with_missing_metadata_failed.cs b/src/ServiceControl.AcceptanceTests/Recoverability/MessageFailures/When_processing_message_with_missing_metadata_failed.cs index 522b595255..e93d552ccc 100644 --- a/src/ServiceControl.AcceptanceTests/Recoverability/MessageFailures/When_processing_message_with_missing_metadata_failed.cs +++ b/src/ServiceControl.AcceptanceTests/Recoverability/MessageFailures/When_processing_message_with_missing_metadata_failed.cs @@ -78,7 +78,7 @@ public async Task Should_be_able_to_get_the_message_by_id() //No failure time will result in utc now being used Assert.That(failure.TimeOfFailure, Is.GreaterThan(testStartTime)); - // ServicePulse assumes that the receiving endpoint name is set making sure that its present + // ServicePulse assumes that the receiving endpoint name Assert.That(failure.ReceivingEndpoint, Is.Not.Null); Assert.That(failure.ReceivingEndpoint.Name, Is.EqualTo(context.EndpointNameOfReceivingEndpoint)); } From 3a44196c997963f176cafcc34861be959b061194 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20=C3=96hlund?= Date: Tue, 22 Apr 2025 12:39:49 +0200 Subject: [PATCH 03/11] Better comment --- .../When_processing_message_with_missing_metadata_failed.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ServiceControl.AcceptanceTests/Recoverability/MessageFailures/When_processing_message_with_missing_metadata_failed.cs b/src/ServiceControl.AcceptanceTests/Recoverability/MessageFailures/When_processing_message_with_missing_metadata_failed.cs index e93d552ccc..bf05777fc4 100644 --- a/src/ServiceControl.AcceptanceTests/Recoverability/MessageFailures/When_processing_message_with_missing_metadata_failed.cs +++ b/src/ServiceControl.AcceptanceTests/Recoverability/MessageFailures/When_processing_message_with_missing_metadata_failed.cs @@ -99,7 +99,7 @@ protected override TransportOperations CreateMessage(MyContext context) { [Headers.MessageId] = context.MessageId, ["NServiceBus.FailedQ"] = Conventions.EndpointNamingConvention(typeof(Failing)), - [Headers.ProcessingMachine] = "unknown", // This is needed for endpoint detection to work, endpoint name is detected from the FailedQ header + [Headers.ProcessingMachine] = "unknown", // This is needed for endpoint detection to work since "host" is required, endpoint name is detected from the FailedQ header }; if (context.TimeSent.HasValue) From 3f8d5b71930eb48a585cfa14f1a2776311fafcf8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20=C3=96hlund?= Date: Tue, 22 Apr 2025 12:43:08 +0200 Subject: [PATCH 04/11] More comments --- .../When_processing_message_with_missing_metadata_failed.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ServiceControl.AcceptanceTests/Recoverability/MessageFailures/When_processing_message_with_missing_metadata_failed.cs b/src/ServiceControl.AcceptanceTests/Recoverability/MessageFailures/When_processing_message_with_missing_metadata_failed.cs index bf05777fc4..7bc6536156 100644 --- a/src/ServiceControl.AcceptanceTests/Recoverability/MessageFailures/When_processing_message_with_missing_metadata_failed.cs +++ b/src/ServiceControl.AcceptanceTests/Recoverability/MessageFailures/When_processing_message_with_missing_metadata_failed.cs @@ -78,7 +78,7 @@ public async Task Should_be_able_to_get_the_message_by_id() //No failure time will result in utc now being used Assert.That(failure.TimeOfFailure, Is.GreaterThan(testStartTime)); - // ServicePulse assumes that the receiving endpoint name + // ServicePulse assumes that the receiving endpoint name is present Assert.That(failure.ReceivingEndpoint, Is.Not.Null); Assert.That(failure.ReceivingEndpoint.Name, Is.EqualTo(context.EndpointNameOfReceivingEndpoint)); } From d786a34274a2c643a29ca29f61d4210d74100a1f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20=C3=96hlund?= Date: Wed, 23 Apr 2025 09:46:39 +0200 Subject: [PATCH 05/11] Isolate changes --- src/ServiceControl.Persistence/FailureDetails.cs | 7 ++++++- .../DetectNewEndpointsFromErrorImportsEnricher.cs | 9 ++++++++- src/ServiceControl/Operations/ErrorIngestor.cs | 12 ++++++++---- 3 files changed, 22 insertions(+), 6 deletions(-) diff --git a/src/ServiceControl.Persistence/FailureDetails.cs b/src/ServiceControl.Persistence/FailureDetails.cs index 180dd830a1..81d7be97e7 100644 --- a/src/ServiceControl.Persistence/FailureDetails.cs +++ b/src/ServiceControl.Persistence/FailureDetails.cs @@ -4,9 +4,14 @@ namespace ServiceControl.Contracts.Operations public class FailureDetails { + public FailureDetails() + { + TimeOfFailure = DateTime.UtcNow; + } + public string AddressOfFailingEndpoint { get; set; } - public DateTime TimeOfFailure { get; set; } = DateTime.UtcNow; + public DateTime TimeOfFailure { get; set; } public ExceptionDetails Exception { get; set; } } diff --git a/src/ServiceControl/Monitoring/DetectNewEndpointsFromErrorImportsEnricher.cs b/src/ServiceControl/Monitoring/DetectNewEndpointsFromErrorImportsEnricher.cs index c067fa1746..1e01e73b97 100644 --- a/src/ServiceControl/Monitoring/DetectNewEndpointsFromErrorImportsEnricher.cs +++ b/src/ServiceControl/Monitoring/DetectNewEndpointsFromErrorImportsEnricher.cs @@ -5,8 +5,13 @@ using ServiceControl.Contracts.Operations; using ServiceControl.Persistence; - class DetectNewEndpointsFromErrorImportsEnricher(IEndpointInstanceMonitoring monitoring) : IEnrichImportedErrorMessages + class DetectNewEndpointsFromErrorImportsEnricher : IEnrichImportedErrorMessages { + public DetectNewEndpointsFromErrorImportsEnricher(IEndpointInstanceMonitoring monitoring) + { + this.monitoring = monitoring; + } + public void Enrich(ErrorEnricherContext context) { var sendingEndpoint = EndpointDetailsParser.SendingEndpoint(context.Headers); @@ -42,5 +47,7 @@ void TryAddEndpoint(EndpointDetails endpointDetails, ErrorEnricherContext contex context.Add(endpointDetails); } } + + IEndpointInstanceMonitoring monitoring; } } \ No newline at end of file diff --git a/src/ServiceControl/Operations/ErrorIngestor.cs b/src/ServiceControl/Operations/ErrorIngestor.cs index 88b3513738..660dcafbb1 100644 --- a/src/ServiceControl/Operations/ErrorIngestor.cs +++ b/src/ServiceControl/Operations/ErrorIngestor.cs @@ -37,7 +37,13 @@ public ErrorIngestor(Metrics metrics, bulkInsertDurationMeter = metrics.GetMeter("Error ingestion - bulk insert duration", FrequencyInMilliseconds); var ingestedMeter = metrics.GetCounter("Error ingestion - ingested"); - var enrichers = new IEnrichImportedErrorMessages[] { new MessageTypeEnricher(), new EnrichWithTrackingIds(), new ProcessingStatisticsEnricher() }.Concat(errorEnrichers).ToArray(); + var enrichers = new IEnrichImportedErrorMessages[] + { + new MessageTypeEnricher(), + new EnrichWithTrackingIds(), + new ProcessingStatisticsEnricher() + + }.Concat(errorEnrichers).ToArray(); errorProcessor = new ErrorProcessor(enrichers, failedMessageEnrichers.ToArray(), domainEvents, ingestedMeter); retryConfirmationProcessor = new RetryConfirmationProcessor(domainEvents); @@ -61,6 +67,7 @@ public async Task Ingest(List contexts, CancellationToken cancel } } + var storedFailed = await PersistFailedMessages(failedMessages, retriedMessages, cancellationToken); try @@ -70,7 +77,6 @@ public async Task Ingest(List contexts, CancellationToken cancel { announcerTasks.Add(errorProcessor.Announce(context)); } - foreach (var context in retriedMessages) { announcerTasks.Add(retryConfirmationProcessor.Announce(context)); @@ -84,7 +90,6 @@ public async Task Ingest(List contexts, CancellationToken cancel { Logger.Debug($"Forwarding {storedFailed.Count} messages"); } - await Forward(storedFailed, cancellationToken); if (Logger.IsDebugEnabled) { @@ -128,7 +133,6 @@ async Task> PersistFailedMessages(List Date: Wed, 23 Apr 2025 09:47:43 +0200 Subject: [PATCH 06/11] More rollbacks --- .../Operations/ErrorProcessor.cs | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/src/ServiceControl/Operations/ErrorProcessor.cs b/src/ServiceControl/Operations/ErrorProcessor.cs index 45689ce6b4..6ac04c8edd 100644 --- a/src/ServiceControl/Operations/ErrorProcessor.cs +++ b/src/ServiceControl/Operations/ErrorProcessor.cs @@ -15,11 +15,17 @@ using ServiceControl.Persistence; using ServiceControl.Persistence.UnitOfWork; - class ErrorProcessor(IEnrichImportedErrorMessages[] enrichers, - IFailedMessageEnricher[] failedMessageEnrichers, - IDomainEvents domainEvents, - Counter ingestedCounter) + class ErrorProcessor { + public ErrorProcessor(IEnrichImportedErrorMessages[] enrichers, IFailedMessageEnricher[] failedMessageEnrichers, IDomainEvents domainEvents, + Counter ingestedCounter) + { + this.enrichers = enrichers; + this.domainEvents = domainEvents; + this.ingestedCounter = ingestedCounter; + failedMessageFactory = new FailedMessageFactory(failedMessageEnrichers); + } + public async Task> Process(IReadOnlyList contexts, IIngestionUnitOfWork unitOfWork) { var storedContexts = new List(contexts.Count); @@ -163,7 +169,10 @@ static void RecordKnownEndpoints(EndpointDetails observedEndpoint, Dictionary(); } } \ No newline at end of file From 6c9622f7ea81d7e9061d0d8829be172664708d3f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20=C3=96hlund?= Date: Thu, 24 Apr 2025 10:33:03 +0200 Subject: [PATCH 07/11] Separate tests for minimal required for ingestion vs minimal required for UX experience --- ...ng_failed_message_with_missing_headers.cs} | 103 ++++++++++-------- 1 file changed, 57 insertions(+), 46 deletions(-) rename src/ServiceControl.AcceptanceTests/Recoverability/MessageFailures/{When_processing_message_with_missing_metadata_failed.cs => When_ingesting_failed_message_with_missing_headers.cs} (50%) diff --git a/src/ServiceControl.AcceptanceTests/Recoverability/MessageFailures/When_processing_message_with_missing_metadata_failed.cs b/src/ServiceControl.AcceptanceTests/Recoverability/MessageFailures/When_ingesting_failed_message_with_missing_headers.cs similarity index 50% rename from src/ServiceControl.AcceptanceTests/Recoverability/MessageFailures/When_processing_message_with_missing_metadata_failed.cs rename to src/ServiceControl.AcceptanceTests/Recoverability/MessageFailures/When_ingesting_failed_message_with_missing_headers.cs index 7bc6536156..67a5764c46 100644 --- a/src/ServiceControl.AcceptanceTests/Recoverability/MessageFailures/When_processing_message_with_missing_metadata_failed.cs +++ b/src/ServiceControl.AcceptanceTests/Recoverability/MessageFailures/When_ingesting_failed_message_with_missing_headers.cs @@ -12,38 +12,44 @@ using NServiceBus.Transport; using NUnit.Framework; using ServiceControl.MessageFailures.Api; - using Conventions = NServiceBus.AcceptanceTesting.Customization.Conventions; - class When_processing_message_with_missing_metadata_failed : AcceptanceTest + class When_ingesting_failed_message_with_missing_headers : AcceptanceTest { [Test] - public async Task Null_TimeSent_should_not_be_cast_to_DateTimeMin() + public async Task TimeSent_should_not_be_casted() { FailedMessageView failure = null; - await Define() - .WithEndpoint() + var sentTime = DateTime.Parse("2014-11-11T02:26:58.000462Z"); + + await Define(c => + { + c.AddMinimalRequiredHeaders(); + c.Headers.Add("NServiceBus.TimeSent", DateTimeOffsetHelper.ToWireFormattedString(sentTime)); + }) + .WithEndpoint() .Done(async c => { - var result = await this.TryGetSingle("/api/errors/", m => m.Id == c.UniqueMessageId); + var result = await this.TryGet($"/api/errors/last/{c.UniqueMessageId}"); failure = result; - return result; + return (c.UniqueMessageId != null) & result; }) .Run(); Assert.That(failure, Is.Not.Null); - Assert.That(failure.TimeSent, Is.Null); + Assert.That(failure.TimeSent, Is.EqualTo(sentTime)); } [Test] - public async Task TimeSent_should_not_be_casted() + public async Task Should_be_ingested_when_minimal_required_headers_is_present() { FailedMessageView failure = null; - var sentTime = DateTime.Parse("2014-11-11T02:26:58.000462Z"); - - await Define(ctx => ctx.TimeSent = sentTime) - .WithEndpoint() + await Define(c => + { + c.AddMinimalRequiredHeaders(); + }) + .WithEndpoint() .Done(async c => { var result = await this.TryGet($"/api/errors/last/{c.UniqueMessageId}"); @@ -53,18 +59,28 @@ await Define(ctx => ctx.TimeSent = sentTime) .Run(); Assert.That(failure, Is.Not.Null); - Assert.That(failure.TimeSent, Is.EqualTo(sentTime)); + Assert.That(failure.TimeSent, Is.Null); } [Test] - public async Task Should_be_able_to_get_the_message_by_id() + public async Task Should_include_headers_required_by_ServicePulse() { FailedMessageView failure = null; var testStartTime = DateTime.UtcNow; - var context = await Define() - .WithEndpoint() + var context = await Define(c => + { + c.AddMinimalRequiredHeaders(); + + // This is needed for ServiceControl to be able to detect both endpoint (via failed q header) and host via the processing machine header + // Missing endpoint or host will case a null ref in ServicePulse + c.Headers[Headers.ProcessingMachine] = "MyMachine"; + + c.Headers["NServiceBus.ExceptionInfo.ExceptionType"] = "SomeExceptionType"; + c.Headers["NServiceBus.ExceptionInfo.Message"] = "Some message"; + }) + .WithEndpoint() .Done(async c => { var result = await this.TryGet($"/api/errors/last/{c.UniqueMessageId}"); @@ -81,48 +97,43 @@ public async Task Should_be_able_to_get_the_message_by_id() // ServicePulse assumes that the receiving endpoint name is present Assert.That(failure.ReceivingEndpoint, Is.Not.Null); Assert.That(failure.ReceivingEndpoint.Name, Is.EqualTo(context.EndpointNameOfReceivingEndpoint)); + Assert.That(failure.ReceivingEndpoint.Host, Is.EqualTo("MyMachine")); + + // ServicePulse needs both an exception type and description to render the UI in a resonable way + Assert.That(failure.Exception.ExceptionType, Is.EqualTo("SomeExceptionType")); + Assert.That(failure.Exception.Message, Is.EqualTo("Some message")); } - class Failing : EndpointConfigurationBuilder + class MyContext : ScenarioContext { - public Failing() => EndpointSetup(c => c.Recoverability().Delayed(x => x.NumberOfRetries(0))); - - class SendFailedMessage : DispatchRawMessages - { - protected override TransportOperations CreateMessage(MyContext context) - { - context.EndpointNameOfReceivingEndpoint = Conventions.EndpointNamingConvention(typeof(Failing)); - context.MessageId = Guid.NewGuid().ToString(); - context.UniqueMessageId = DeterministicGuid.MakeId(context.MessageId, context.EndpointNameOfReceivingEndpoint).ToString(); + public string MessageId { get; } = Guid.NewGuid().ToString(); - var headers = new Dictionary - { - [Headers.MessageId] = context.MessageId, - ["NServiceBus.FailedQ"] = Conventions.EndpointNamingConvention(typeof(Failing)), - [Headers.ProcessingMachine] = "unknown", // This is needed for endpoint detection to work since "host" is required, endpoint name is detected from the FailedQ header - }; + public string EndpointNameOfReceivingEndpoint => "MyEndpoint"; - if (context.TimeSent.HasValue) - { - headers["NServiceBus.TimeSent"] = DateTimeOffsetHelper.ToWireFormattedString(context.TimeSent.Value); - } + public string UniqueMessageId => DeterministicGuid.MakeId(MessageId, EndpointNameOfReceivingEndpoint).ToString(); - var outgoingMessage = new OutgoingMessage(context.MessageId, headers, Array.Empty()); + public Dictionary Headers { get; } = []; - return new TransportOperations(new TransportOperation(outgoingMessage, new UnicastAddressTag("error"))); - } + public void AddMinimalRequiredHeaders() + { + Headers["NServiceBus.FailedQ"] = EndpointNameOfReceivingEndpoint; + Headers[NServiceBus.Headers.MessageId] = MessageId; } } - class MyContext : ScenarioContext + class FailingEndpoint : EndpointConfigurationBuilder { - public string MessageId { get; set; } - - public string EndpointNameOfReceivingEndpoint { get; set; } + public FailingEndpoint() => EndpointSetup(c => c.Recoverability().Delayed(x => x.NumberOfRetries(0))); - public string UniqueMessageId { get; set; } + class SendFailedMessage : DispatchRawMessages + { + protected override TransportOperations CreateMessage(MyContext context) + { + var outgoingMessage = new OutgoingMessage(context.MessageId, context.Headers, Array.Empty()); - public DateTime? TimeSent { get; set; } + return new TransportOperations(new TransportOperation(outgoingMessage, new UnicastAddressTag("error"))); + } + } } } } \ No newline at end of file From 59050c333369d21bf8f80f2517630563a1342c62 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20=C3=96hlund?= Date: Thu, 24 Apr 2025 10:40:49 +0200 Subject: [PATCH 08/11] Improve assertions --- ...ingesting_failed_message_with_missing_headers.cs | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/src/ServiceControl.AcceptanceTests/Recoverability/MessageFailures/When_ingesting_failed_message_with_missing_headers.cs b/src/ServiceControl.AcceptanceTests/Recoverability/MessageFailures/When_ingesting_failed_message_with_missing_headers.cs index 67a5764c46..683d972af0 100644 --- a/src/ServiceControl.AcceptanceTests/Recoverability/MessageFailures/When_ingesting_failed_message_with_missing_headers.cs +++ b/src/ServiceControl.AcceptanceTests/Recoverability/MessageFailures/When_ingesting_failed_message_with_missing_headers.cs @@ -45,6 +45,8 @@ public async Task Should_be_ingested_when_minimal_required_headers_is_present() { FailedMessageView failure = null; + var testStartTime = DateTime.UtcNow; + await Define(c => { c.AddMinimalRequiredHeaders(); @@ -60,6 +62,12 @@ await Define(c => Assert.That(failure, Is.Not.Null); Assert.That(failure.TimeSent, Is.Null); + + //No failure time will result in utc now being used + Assert.That(failure.TimeOfFailure, Is.GreaterThan(testStartTime)); + + // Both host and endpoint name is currently needed so this will be null since no host can be detected from the failed q header5 + Assert.That(failure.ReceivingEndpoint, Is.Null); } [Test] @@ -67,8 +75,6 @@ public async Task Should_include_headers_required_by_ServicePulse() { FailedMessageView failure = null; - var testStartTime = DateTime.UtcNow; - var context = await Define(c => { c.AddMinimalRequiredHeaders(); @@ -91,9 +97,6 @@ public async Task Should_include_headers_required_by_ServicePulse() Assert.That(failure, Is.Not.Null); - //No failure time will result in utc now being used - Assert.That(failure.TimeOfFailure, Is.GreaterThan(testStartTime)); - // ServicePulse assumes that the receiving endpoint name is present Assert.That(failure.ReceivingEndpoint, Is.Not.Null); Assert.That(failure.ReceivingEndpoint.Name, Is.EqualTo(context.EndpointNameOfReceivingEndpoint)); From d1a7326750930078e049ba1dd41f4e311f4ab093 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20=C3=96hlund?= Date: Thu, 24 Apr 2025 12:19:53 +0200 Subject: [PATCH 09/11] Cleanup --- ...ing_failed_message_with_missing_headers.cs | 54 +++++++++---------- 1 file changed, 27 insertions(+), 27 deletions(-) diff --git a/src/ServiceControl.AcceptanceTests/Recoverability/MessageFailures/When_ingesting_failed_message_with_missing_headers.cs b/src/ServiceControl.AcceptanceTests/Recoverability/MessageFailures/When_ingesting_failed_message_with_missing_headers.cs index 683d972af0..d2213a3f54 100644 --- a/src/ServiceControl.AcceptanceTests/Recoverability/MessageFailures/When_ingesting_failed_message_with_missing_headers.cs +++ b/src/ServiceControl.AcceptanceTests/Recoverability/MessageFailures/When_ingesting_failed_message_with_missing_headers.cs @@ -15,31 +15,6 @@ class When_ingesting_failed_message_with_missing_headers : AcceptanceTest { - [Test] - public async Task TimeSent_should_not_be_casted() - { - FailedMessageView failure = null; - - var sentTime = DateTime.Parse("2014-11-11T02:26:58.000462Z"); - - await Define(c => - { - c.AddMinimalRequiredHeaders(); - c.Headers.Add("NServiceBus.TimeSent", DateTimeOffsetHelper.ToWireFormattedString(sentTime)); - }) - .WithEndpoint() - .Done(async c => - { - var result = await this.TryGet($"/api/errors/last/{c.UniqueMessageId}"); - failure = result; - return (c.UniqueMessageId != null) & result; - }) - .Run(); - - Assert.That(failure, Is.Not.Null); - Assert.That(failure.TimeSent, Is.EqualTo(sentTime)); - } - [Test] public async Task Should_be_ingested_when_minimal_required_headers_is_present() { @@ -66,7 +41,7 @@ await Define(c => //No failure time will result in utc now being used Assert.That(failure.TimeOfFailure, Is.GreaterThan(testStartTime)); - // Both host and endpoint name is currently needed so this will be null since no host can be detected from the failed q header5 + // Both host and endpoint name is currently needed so this will be null since no host can be detected from the failed q header Assert.That(failure.ReceivingEndpoint, Is.Null); } @@ -80,7 +55,7 @@ public async Task Should_include_headers_required_by_ServicePulse() c.AddMinimalRequiredHeaders(); // This is needed for ServiceControl to be able to detect both endpoint (via failed q header) and host via the processing machine header - // Missing endpoint or host will case a null ref in ServicePulse + // Missing endpoint or host will cause a null ref in ServicePulse c.Headers[Headers.ProcessingMachine] = "MyMachine"; c.Headers["NServiceBus.ExceptionInfo.ExceptionType"] = "SomeExceptionType"; @@ -107,6 +82,31 @@ public async Task Should_include_headers_required_by_ServicePulse() Assert.That(failure.Exception.Message, Is.EqualTo("Some message")); } + [Test] + public async Task TimeSent_should_not_be_casted() + { + FailedMessageView failure = null; + + var sentTime = DateTime.Parse("2014-11-11T02:26:58.000462Z"); + + await Define(c => + { + c.AddMinimalRequiredHeaders(); + c.Headers.Add("NServiceBus.TimeSent", DateTimeOffsetHelper.ToWireFormattedString(sentTime)); + }) + .WithEndpoint() + .Done(async c => + { + var result = await this.TryGet($"/api/errors/last/{c.UniqueMessageId}"); + failure = result; + return (c.UniqueMessageId != null) & result; + }) + .Run(); + + Assert.That(failure, Is.Not.Null); + Assert.That(failure.TimeSent, Is.EqualTo(sentTime)); + } + class MyContext : ScenarioContext { public string MessageId { get; } = Guid.NewGuid().ToString(); From 821f4d94a2ad6b1745f3956d74ef6c9e49ac5246 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20=C3=96hlund?= Date: Thu, 24 Apr 2025 12:43:59 +0200 Subject: [PATCH 10/11] Consolidate done criteria --- ...ing_failed_message_with_missing_headers.cs | 235 +++++++++--------- 1 file changed, 112 insertions(+), 123 deletions(-) diff --git a/src/ServiceControl.AcceptanceTests/Recoverability/MessageFailures/When_ingesting_failed_message_with_missing_headers.cs b/src/ServiceControl.AcceptanceTests/Recoverability/MessageFailures/When_ingesting_failed_message_with_missing_headers.cs index d2213a3f54..745d629e3c 100644 --- a/src/ServiceControl.AcceptanceTests/Recoverability/MessageFailures/When_ingesting_failed_message_with_missing_headers.cs +++ b/src/ServiceControl.AcceptanceTests/Recoverability/MessageFailures/When_ingesting_failed_message_with_missing_headers.cs @@ -1,141 +1,130 @@ -namespace ServiceControl.AcceptanceTests.Recoverability.MessageFailures +namespace ServiceControl.AcceptanceTests.Recoverability.MessageFailures; + +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using AcceptanceTesting; +using AcceptanceTesting.EndpointTemplates; +using Infrastructure; +using NServiceBus; +using NServiceBus.AcceptanceTesting; +using NServiceBus.Routing; +using NServiceBus.Transport; +using NUnit.Framework; +using ServiceControl.MessageFailures.Api; + +class When_ingesting_failed_message_with_missing_headers : AcceptanceTest { - using System; - using System.Collections.Generic; - using System.Threading.Tasks; - using AcceptanceTesting; - using AcceptanceTesting.EndpointTemplates; - using Infrastructure; - using NServiceBus; - using NServiceBus.AcceptanceTesting; - using NServiceBus.Routing; - using NServiceBus.Transport; - using NUnit.Framework; - using ServiceControl.MessageFailures.Api; - - class When_ingesting_failed_message_with_missing_headers : AcceptanceTest + [Test] + public async Task Should_be_ingested_when_minimal_required_headers_is_present() { - [Test] - public async Task Should_be_ingested_when_minimal_required_headers_is_present() - { - FailedMessageView failure = null; - - var testStartTime = DateTime.UtcNow; - - await Define(c => - { - c.AddMinimalRequiredHeaders(); - }) - .WithEndpoint() - .Done(async c => - { - var result = await this.TryGet($"/api/errors/last/{c.UniqueMessageId}"); - failure = result; - return (c.UniqueMessageId != null) & result; - }) - .Run(); - - Assert.That(failure, Is.Not.Null); - Assert.That(failure.TimeSent, Is.Null); - - //No failure time will result in utc now being used - Assert.That(failure.TimeOfFailure, Is.GreaterThan(testStartTime)); - - // Both host and endpoint name is currently needed so this will be null since no host can be detected from the failed q header - Assert.That(failure.ReceivingEndpoint, Is.Null); - } + var testStartTime = DateTime.UtcNow; - [Test] - public async Task Should_include_headers_required_by_ServicePulse() - { - FailedMessageView failure = null; - - var context = await Define(c => - { - c.AddMinimalRequiredHeaders(); - - // This is needed for ServiceControl to be able to detect both endpoint (via failed q header) and host via the processing machine header - // Missing endpoint or host will cause a null ref in ServicePulse - c.Headers[Headers.ProcessingMachine] = "MyMachine"; - - c.Headers["NServiceBus.ExceptionInfo.ExceptionType"] = "SomeExceptionType"; - c.Headers["NServiceBus.ExceptionInfo.Message"] = "Some message"; - }) - .WithEndpoint() - .Done(async c => - { - var result = await this.TryGet($"/api/errors/last/{c.UniqueMessageId}"); - failure = result; - return (c.UniqueMessageId != null) & result; - }) - .Run(); - - Assert.That(failure, Is.Not.Null); - - // ServicePulse assumes that the receiving endpoint name is present - Assert.That(failure.ReceivingEndpoint, Is.Not.Null); - Assert.That(failure.ReceivingEndpoint.Name, Is.EqualTo(context.EndpointNameOfReceivingEndpoint)); - Assert.That(failure.ReceivingEndpoint.Host, Is.EqualTo("MyMachine")); - - // ServicePulse needs both an exception type and description to render the UI in a resonable way - Assert.That(failure.Exception.ExceptionType, Is.EqualTo("SomeExceptionType")); - Assert.That(failure.Exception.Message, Is.EqualTo("Some message")); - } + var context = await Define(c => c.AddMinimalRequiredHeaders()) + .WithEndpoint() + .Done(async c => await TryGetFailureFromApi(c)) + .Run(); - [Test] - public async Task TimeSent_should_not_be_casted() - { - FailedMessageView failure = null; - - var sentTime = DateTime.Parse("2014-11-11T02:26:58.000462Z"); - - await Define(c => - { - c.AddMinimalRequiredHeaders(); - c.Headers.Add("NServiceBus.TimeSent", DateTimeOffsetHelper.ToWireFormattedString(sentTime)); - }) - .WithEndpoint() - .Done(async c => - { - var result = await this.TryGet($"/api/errors/last/{c.UniqueMessageId}"); - failure = result; - return (c.UniqueMessageId != null) & result; - }) - .Run(); - - Assert.That(failure, Is.Not.Null); - Assert.That(failure.TimeSent, Is.EqualTo(sentTime)); - } + var failure = context.Failure; - class MyContext : ScenarioContext - { - public string MessageId { get; } = Guid.NewGuid().ToString(); + Assert.That(failure, Is.Not.Null); + Assert.That(failure.TimeSent, Is.Null); - public string EndpointNameOfReceivingEndpoint => "MyEndpoint"; + //No failure time will result in utc now being used + Assert.That(failure.TimeOfFailure, Is.GreaterThan(testStartTime)); - public string UniqueMessageId => DeterministicGuid.MakeId(MessageId, EndpointNameOfReceivingEndpoint).ToString(); + // Both host and endpoint name is currently needed so this will be null since no host can be detected from the failed q header + Assert.That(failure.ReceivingEndpoint, Is.Null); + } - public Dictionary Headers { get; } = []; + [Test] + public async Task Should_include_headers_required_by_ServicePulse() + { + var context = await Define(c => + { + c.AddMinimalRequiredHeaders(); + + // This is needed for ServiceControl to be able to detect both endpoint (via failed q header) and host via the processing machine header + // Missing endpoint or host will cause a null ref in ServicePulse + c.Headers[Headers.ProcessingMachine] = "MyMachine"; + + c.Headers["NServiceBus.ExceptionInfo.ExceptionType"] = "SomeExceptionType"; + c.Headers["NServiceBus.ExceptionInfo.Message"] = "Some message"; + }) + .WithEndpoint() + .Done(async c => await TryGetFailureFromApi(c)) + .Run(); + + var failure = context.Failure; + + Assert.That(failure, Is.Not.Null); + + // ServicePulse assumes that the receiving endpoint name is present + Assert.That(failure.ReceivingEndpoint, Is.Not.Null); + Assert.That(failure.ReceivingEndpoint.Name, Is.EqualTo(context.EndpointNameOfReceivingEndpoint)); + Assert.That(failure.ReceivingEndpoint.Host, Is.EqualTo("MyMachine")); + + // ServicePulse needs both an exception type and description to render the UI in a resonable way + Assert.That(failure.Exception.ExceptionType, Is.EqualTo("SomeExceptionType")); + Assert.That(failure.Exception.Message, Is.EqualTo("Some message")); + } + + [Test] + public async Task TimeSent_should_not_be_casted() + { + var sentTime = DateTime.Parse("2014-11-11T02:26:58.000462Z"); - public void AddMinimalRequiredHeaders() + var context = await Define(c => { - Headers["NServiceBus.FailedQ"] = EndpointNameOfReceivingEndpoint; - Headers[NServiceBus.Headers.MessageId] = MessageId; - } - } + c.AddMinimalRequiredHeaders(); + c.Headers.Add("NServiceBus.TimeSent", DateTimeOffsetHelper.ToWireFormattedString(sentTime)); + }) + .WithEndpoint() + .Done(async c => await TryGetFailureFromApi(c)) + .Run(); + + var failure = context.Failure; + + Assert.That(failure, Is.Not.Null); + Assert.That(failure.TimeSent, Is.EqualTo(sentTime)); + } + + async Task TryGetFailureFromApi(TestContext context) + { + context.Failure = await this.TryGet($"/api/errors/last/{context.UniqueMessageId}"); + return context.Failure != null; + } + + class TestContext : ScenarioContext + { + public string MessageId { get; } = Guid.NewGuid().ToString(); - class FailingEndpoint : EndpointConfigurationBuilder + public string EndpointNameOfReceivingEndpoint => "MyEndpoint"; + + public string UniqueMessageId => DeterministicGuid.MakeId(MessageId, EndpointNameOfReceivingEndpoint).ToString(); + + public Dictionary Headers { get; } = []; + + public FailedMessageView Failure { get; set; } + + public void AddMinimalRequiredHeaders() { - public FailingEndpoint() => EndpointSetup(c => c.Recoverability().Delayed(x => x.NumberOfRetries(0))); + Headers["NServiceBus.FailedQ"] = EndpointNameOfReceivingEndpoint; + Headers[NServiceBus.Headers.MessageId] = MessageId; + } + } + + class FailingEndpoint : EndpointConfigurationBuilder + { + public FailingEndpoint() => EndpointSetup(); - class SendFailedMessage : DispatchRawMessages + class SendFailedMessage : DispatchRawMessages + { + protected override TransportOperations CreateMessage(TestContext context) { - protected override TransportOperations CreateMessage(MyContext context) - { - var outgoingMessage = new OutgoingMessage(context.MessageId, context.Headers, Array.Empty()); + var outgoingMessage = new OutgoingMessage(context.MessageId, context.Headers, Array.Empty()); - return new TransportOperations(new TransportOperation(outgoingMessage, new UnicastAddressTag("error"))); - } + return new TransportOperations(new TransportOperation(outgoingMessage, new UnicastAddressTag("error"))); } } } From e9d4989777448616898ee464ed9bf15fddf34257 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20=C3=96hlund?= Date: Thu, 24 Apr 2025 18:56:38 +0200 Subject: [PATCH 11/11] Do not require message id header for error ingestion --- ...ing_failed_message_with_missing_headers.cs | 25 +++++------ .../RavenRecoverabilityIngestionUnitOfWork.cs | 45 ++++++++++++------- .../TransportMessageExtensions.cs | 27 +---------- .../ProcessedMessage.cs | 13 ------ .../Operations/ErrorProcessor.cs | 1 + .../Operations/FailedMessageFactory.cs | 4 +- 6 files changed, 44 insertions(+), 71 deletions(-) diff --git a/src/ServiceControl.AcceptanceTests/Recoverability/MessageFailures/When_ingesting_failed_message_with_missing_headers.cs b/src/ServiceControl.AcceptanceTests/Recoverability/MessageFailures/When_ingesting_failed_message_with_missing_headers.cs index 745d629e3c..ee17c12c96 100644 --- a/src/ServiceControl.AcceptanceTests/Recoverability/MessageFailures/When_ingesting_failed_message_with_missing_headers.cs +++ b/src/ServiceControl.AcceptanceTests/Recoverability/MessageFailures/When_ingesting_failed_message_with_missing_headers.cs @@ -2,6 +2,7 @@ using System; using System.Collections.Generic; +using System.Linq; using System.Threading.Tasks; using AcceptanceTesting; using AcceptanceTesting.EndpointTemplates; @@ -72,7 +73,7 @@ public async Task Should_include_headers_required_by_ServicePulse() [Test] public async Task TimeSent_should_not_be_casted() { - var sentTime = DateTime.Parse("2014-11-11T02:26:58.000462Z"); + var sentTime = DateTime.Parse("2014-11-11T02:26:58.000462Z").ToUniversalTime(); var context = await Define(c => { @@ -91,27 +92,23 @@ public async Task TimeSent_should_not_be_casted() async Task TryGetFailureFromApi(TestContext context) { - context.Failure = await this.TryGet($"/api/errors/last/{context.UniqueMessageId}"); + var allFailures = await this.TryGetMany("/api/errors/"); + + context.Failure = allFailures.Items.SingleOrDefault(f => f.QueueAddress == context.EndpointNameOfReceivingEndpoint); + return context.Failure != null; } class TestContext : ScenarioContext { - public string MessageId { get; } = Guid.NewGuid().ToString(); - - public string EndpointNameOfReceivingEndpoint => "MyEndpoint"; - - public string UniqueMessageId => DeterministicGuid.MakeId(MessageId, EndpointNameOfReceivingEndpoint).ToString(); + // Endpoint name is made unique since we are using it to find the failure once ingestion is complete + public string EndpointNameOfReceivingEndpoint => $"MyEndpoint-{NUnit.Framework.TestContext.CurrentContext.Test.ID}"; public Dictionary Headers { get; } = []; public FailedMessageView Failure { get; set; } - public void AddMinimalRequiredHeaders() - { - Headers["NServiceBus.FailedQ"] = EndpointNameOfReceivingEndpoint; - Headers[NServiceBus.Headers.MessageId] = MessageId; - } + public void AddMinimalRequiredHeaders() => Headers["NServiceBus.FailedQ"] = EndpointNameOfReceivingEndpoint; } class FailingEndpoint : EndpointConfigurationBuilder @@ -122,7 +119,9 @@ class SendFailedMessage : DispatchRawMessages { protected override TransportOperations CreateMessage(TestContext context) { - var outgoingMessage = new OutgoingMessage(context.MessageId, context.Headers, Array.Empty()); + // we can't control the native message id so any guid will do here, we need to find the failed messsage using + // the endpoint name instead + var outgoingMessage = new OutgoingMessage(Guid.NewGuid().ToString(), context.Headers, Array.Empty()); return new TransportOperations(new TransportOperation(outgoingMessage, new UnicastAddressTag("error"))); } diff --git a/src/ServiceControl.Persistence.RavenDB/UnitOfWork/RavenRecoverabilityIngestionUnitOfWork.cs b/src/ServiceControl.Persistence.RavenDB/UnitOfWork/RavenRecoverabilityIngestionUnitOfWork.cs index cf6ee57a13..71c0008b5c 100644 --- a/src/ServiceControl.Persistence.RavenDB/UnitOfWork/RavenRecoverabilityIngestionUnitOfWork.cs +++ b/src/ServiceControl.Persistence.RavenDB/UnitOfWork/RavenRecoverabilityIngestionUnitOfWork.cs @@ -32,7 +32,7 @@ public Task RecordFailedProcessingAttempt( FailedMessage.ProcessingAttempt processingAttempt, List groups) { - var uniqueMessageId = context.Headers.UniqueId(); + var uniqueMessageId = GetUniqueMessageId(context); var contentType = GetContentType(context.Headers, "text/xml"); var bodySize = context.Body.Length; @@ -61,7 +61,7 @@ public Task RecordFailedProcessingAttempt( var storeMessageCmd = CreateFailedMessagesPatchCommand(uniqueMessageId, processingAttempt, groups); parentUnitOfWork.AddCommand(storeMessageCmd); - AddStoreBodyCommands(context, contentType); + AddStoreBodyCommands(uniqueMessageId, context, contentType); return Task.CompletedTask; } @@ -71,10 +71,7 @@ public Task RecordSuccessfulRetry(string retriedMessageUniqueId) var failedMessageDocumentId = FailedMessageIdGenerator.MakeDocumentId(retriedMessageUniqueId); var failedMessageRetryDocumentId = FailedMessageRetry.MakeDocumentId(retriedMessageUniqueId); - var patchRequest = new PatchRequest - { - Script = $@"this.{nameof(FailedMessage.Status)} = {(int)FailedMessageStatus.Resolved};" - }; + var patchRequest = new PatchRequest { Script = $@"this.{nameof(FailedMessage.Status)} = {(int)FailedMessageStatus.Resolved};" }; expirationManager.EnableExpiration(patchRequest); @@ -84,6 +81,21 @@ public Task RecordSuccessfulRetry(string retriedMessageUniqueId) return Task.CompletedTask; } + static string GetUniqueMessageId(MessageContext context) + { + if (context.Headers.TryGetValue("ServiceControl.Retry.UniqueMessageId", out var existingUniqueMessageId)) + { + return existingUniqueMessageId; + } + + if (!context.Headers.TryGetValue(Headers.MessageId, out var messageId)) + { + messageId = context.NativeMessageId; + } + + return DeterministicGuid.MakeId(messageId, context.Headers.ProcessingEndpointName()).ToString(); + } + ICommandData CreateFailedMessagesPatchCommand(string uniqueMessageId, FailedMessage.ProcessingAttempt processingAttempt, List groups) { @@ -119,9 +131,9 @@ ICommandData CreateFailedMessagesPatchCommand(string uniqueMessageId, FailedMess ", Values = new Dictionary { - {"status", (int)FailedMessageStatus.Unresolved}, - {"failureGroups", groups}, - {"attempt", processingAttempt} + { "status", (int)FailedMessageStatus.Unresolved }, + { "failureGroups", groups }, + { "attempt", processingAttempt } }, }, patchIfMissing: new PatchRequest @@ -137,18 +149,17 @@ ICommandData CreateFailedMessagesPatchCommand(string uniqueMessageId, FailedMess ", Values = new Dictionary { - {"status", (int)FailedMessageStatus.Unresolved}, - {"failureGroups", groups}, - {"attempt", processingAttempt}, - {"uniqueMessageId", uniqueMessageId} + { "status", (int)FailedMessageStatus.Unresolved }, + { "failureGroups", groups }, + { "attempt", processingAttempt }, + { "uniqueMessageId", uniqueMessageId } } }); } - void AddStoreBodyCommands(MessageContext context, string contentType) + void AddStoreBodyCommands(string uniqueMessageId, MessageContext context, string contentType) { - var uniqueId = context.Headers.UniqueId(); - var documentId = FailedMessageIdGenerator.MakeDocumentId(uniqueId); + var documentId = FailedMessageIdGenerator.MakeDocumentId(uniqueMessageId); var stream = new ReadOnlyStream(context.Body); var putAttachmentCmd = new PutAttachmentCommandData(documentId, "body", stream, contentType, changeVector: null); @@ -160,9 +171,9 @@ static string GetContentType(IReadOnlyDictionary headers, string => headers.GetValueOrDefault(Headers.ContentType, defaultContentType); static int MaxProcessingAttempts = 10; + // large object heap starts above 85000 bytes and not above 85 KB! internal const int LargeObjectHeapThreshold = 85_000; static readonly Encoding utf8 = new UTF8Encoding(true, true); - } } \ No newline at end of file diff --git a/src/ServiceControl.Persistence/Infrastructure/TransportMessageExtensions.cs b/src/ServiceControl.Persistence/Infrastructure/TransportMessageExtensions.cs index 9ed1567ab5..d8ba148245 100644 --- a/src/ServiceControl.Persistence/Infrastructure/TransportMessageExtensions.cs +++ b/src/ServiceControl.Persistence/Infrastructure/TransportMessageExtensions.cs @@ -36,27 +36,6 @@ public static string ProcessingEndpointName(this IReadOnlyDictionary headers) - { - return headers.TryGetValue("ServiceControl.Retry.UniqueMessageId", out var existingUniqueMessageId) - ? existingUniqueMessageId - : DeterministicGuid.MakeId(headers.MessageId(), headers.ProcessingEndpointName()).ToString(); - } - - public static string ProcessingId(this IReadOnlyDictionary headers) - { - var messageId = headers.MessageId(); - var processingEndpointName = headers.ProcessingEndpointName(); - var processingStarted = headers.ProcessingStarted(); - - if (messageId == default || processingEndpointName == default || processingStarted == default) - { - return Guid.NewGuid().ToString(); - } - - return DeterministicGuid.MakeId(messageId, processingEndpointName, processingStarted).ToString(); - } - // NOTE: Duplicated from TransportMessage public static string MessageId(this IReadOnlyDictionary headers) { @@ -102,16 +81,12 @@ public static bool IsBinary(this IReadOnlyDictionary headers) return true; } + static string ReplyToAddress(this IReadOnlyDictionary headers) { return headers.TryGetValue(Headers.ReplyToAddress, out var destination) ? destination : null; } - static string ProcessingStarted(this IReadOnlyDictionary headers) - { - return headers.TryGetValue(Headers.ProcessingStarted, out var processingStarted) ? processingStarted : null; - } - static string ExtractQueue(string address) { var atIndex = address?.IndexOf("@", StringComparison.InvariantCulture); diff --git a/src/ServiceControl.Persistence/ProcessedMessage.cs b/src/ServiceControl.Persistence/ProcessedMessage.cs index 050e5ca6c7..525f964416 100644 --- a/src/ServiceControl.Persistence/ProcessedMessage.cs +++ b/src/ServiceControl.Persistence/ProcessedMessage.cs @@ -2,9 +2,6 @@ { using System; using System.Collections.Generic; - using NServiceBus; - using ServiceControl.Persistence; - using ServiceControl.Persistence.Infrastructure; public class ProcessedMessage { @@ -14,16 +11,6 @@ public ProcessedMessage() Headers = []; } - public ProcessedMessage(Dictionary headers, Dictionary metadata) - { - UniqueMessageId = headers.UniqueId(); - MessageMetadata = metadata; - Headers = headers; - - ProcessedAt = Headers.TryGetValue(NServiceBus.Headers.ProcessingEnded, out var processedAt) ? - DateTimeOffsetHelper.ToDateTimeOffset(processedAt).UtcDateTime : DateTime.UtcNow; // best guess - } - public string Id { get; set; } public string UniqueMessageId { get; set; } diff --git a/src/ServiceControl/Operations/ErrorProcessor.cs b/src/ServiceControl/Operations/ErrorProcessor.cs index 6ac04c8edd..9e8e8b11ea 100644 --- a/src/ServiceControl/Operations/ErrorProcessor.cs +++ b/src/ServiceControl/Operations/ErrorProcessor.cs @@ -116,6 +116,7 @@ async Task ProcessMessage(MessageContext context, IIngestionUnitOfWork unitOfWor var failureDetails = failedMessageFactory.ParseFailureDetails(context.Headers); var processingAttempt = failedMessageFactory.CreateProcessingAttempt( + messageId, context.Headers, new Dictionary(metadata), failureDetails); diff --git a/src/ServiceControl/Operations/FailedMessageFactory.cs b/src/ServiceControl/Operations/FailedMessageFactory.cs index 5c7abb0644..76d04a4e9a 100644 --- a/src/ServiceControl/Operations/FailedMessageFactory.cs +++ b/src/ServiceControl/Operations/FailedMessageFactory.cs @@ -59,14 +59,14 @@ static ExceptionDetails GetException(IReadOnlyDictionary headers return exceptionDetails; } - public FailedMessage.ProcessingAttempt CreateProcessingAttempt(Dictionary headers, Dictionary metadata, FailureDetails failureDetails) + public FailedMessage.ProcessingAttempt CreateProcessingAttempt(string messageId, Dictionary headers, Dictionary metadata, FailureDetails failureDetails) { return new FailedMessage.ProcessingAttempt { AttemptedAt = failureDetails.TimeOfFailure, FailureDetails = failureDetails, MessageMetadata = metadata, - MessageId = headers[Headers.MessageId], + MessageId = messageId, Headers = headers }; }