From c407a390d375acf753ecefc78f1893e562d76730 Mon Sep 17 00:00:00 2001 From: Ramon Smits Date: Thu, 6 Feb 2025 23:45:18 +0100 Subject: [PATCH 01/14] Support cancellation of audit ingestion --- .../InMemoryAuditIngestionUnitOfWorkFactory.cs | 3 ++- .../UnitOfWork/RavenAuditUnitOfWorkFactory.cs | 2 +- .../PersistenceTestFixture.cs | 2 +- .../UnitOfWork/IAuditIngestionUnitOfWorkFactory.cs | 3 ++- .../Auditing/ImportFailedAudits.cs | 13 ++++++++++--- 5 files changed, 16 insertions(+), 7 deletions(-) diff --git a/src/ServiceControl.Audit.Persistence.InMemory/InMemoryAuditIngestionUnitOfWorkFactory.cs b/src/ServiceControl.Audit.Persistence.InMemory/InMemoryAuditIngestionUnitOfWorkFactory.cs index ee654999a1..f07e7036de 100644 --- a/src/ServiceControl.Audit.Persistence.InMemory/InMemoryAuditIngestionUnitOfWorkFactory.cs +++ b/src/ServiceControl.Audit.Persistence.InMemory/InMemoryAuditIngestionUnitOfWorkFactory.cs @@ -1,5 +1,6 @@ namespace ServiceControl.Audit.Persistence.InMemory { + using System.Threading; using System.Threading.Tasks; using ServiceControl.Audit.Auditing.BodyStorage; using ServiceControl.Audit.Persistence.UnitOfWork; @@ -12,7 +13,7 @@ public InMemoryAuditIngestionUnitOfWorkFactory(InMemoryAuditDataStore dataStore, bodyStorageEnricher = new BodyStorageEnricher(bodyStorage, settings); } - public ValueTask StartNew(int batchSize) + public ValueTask StartNew(int batchSize, CancellationToken cancellationToken) { //The batchSize argument is ignored: the in-memory storage implementation doesn't support batching. return new ValueTask(new InMemoryAuditIngestionUnitOfWork(dataStore, bodyStorageEnricher)); diff --git a/src/ServiceControl.Audit.Persistence.RavenDB/UnitOfWork/RavenAuditUnitOfWorkFactory.cs b/src/ServiceControl.Audit.Persistence.RavenDB/UnitOfWork/RavenAuditUnitOfWorkFactory.cs index 3f9c56a6c8..78bc6798c8 100644 --- a/src/ServiceControl.Audit.Persistence.RavenDB/UnitOfWork/RavenAuditUnitOfWorkFactory.cs +++ b/src/ServiceControl.Audit.Persistence.RavenDB/UnitOfWork/RavenAuditUnitOfWorkFactory.cs @@ -13,7 +13,7 @@ class RavenAuditIngestionUnitOfWorkFactory( MinimumRequiredStorageState customCheckState) : IAuditIngestionUnitOfWorkFactory { - public async ValueTask StartNew(int batchSize) + public async ValueTask StartNew(int batchSize, CancellationToken cancellationToken) { var timedCancellationSource = new CancellationTokenSource(databaseConfiguration.BulkInsertCommitTimeout); var bulkInsert = (await documentStoreProvider.GetDocumentStore(timedCancellationSource.Token)) diff --git a/src/ServiceControl.Audit.Persistence.Tests/PersistenceTestFixture.cs b/src/ServiceControl.Audit.Persistence.Tests/PersistenceTestFixture.cs index 9aa9205675..eab82cd25c 100644 --- a/src/ServiceControl.Audit.Persistence.Tests/PersistenceTestFixture.cs +++ b/src/ServiceControl.Audit.Persistence.Tests/PersistenceTestFixture.cs @@ -64,7 +64,7 @@ protected string GetManifestPath() configuration.AuditIngestionUnitOfWorkFactory; protected ValueTask StartAuditUnitOfWork(int batchSize) => - AuditIngestionUnitOfWorkFactory.StartNew(batchSize); + AuditIngestionUnitOfWorkFactory.StartNew(batchSize, TestContext.CurrentContext.CancellationToken); protected IServiceProvider ServiceProvider => configuration.ServiceProvider; diff --git a/src/ServiceControl.Audit.Persistence/UnitOfWork/IAuditIngestionUnitOfWorkFactory.cs b/src/ServiceControl.Audit.Persistence/UnitOfWork/IAuditIngestionUnitOfWorkFactory.cs index 25a5859e50..71dceb269b 100644 --- a/src/ServiceControl.Audit.Persistence/UnitOfWork/IAuditIngestionUnitOfWorkFactory.cs +++ b/src/ServiceControl.Audit.Persistence/UnitOfWork/IAuditIngestionUnitOfWorkFactory.cs @@ -1,10 +1,11 @@ namespace ServiceControl.Audit.Persistence.UnitOfWork { + using System.Threading; using System.Threading.Tasks; public interface IAuditIngestionUnitOfWorkFactory { - ValueTask StartNew(int batchSize); //Throws if not enough space or some other problem preventing from writing data + ValueTask StartNew(int batchSize, CancellationToken cancellationToken); //Throws if not enough space or some other problem preventing from writing data bool CanIngestMore(); } } \ No newline at end of file diff --git a/src/ServiceControl.Audit/Auditing/ImportFailedAudits.cs b/src/ServiceControl.Audit/Auditing/ImportFailedAudits.cs index 5e51add78a..ee0f0dd6ef 100644 --- a/src/ServiceControl.Audit/Auditing/ImportFailedAudits.cs +++ b/src/ServiceControl.Audit/Auditing/ImportFailedAudits.cs @@ -23,7 +23,7 @@ public ImportFailedAudits( public async Task Run(CancellationToken cancellationToken = default) { - await auditIngestor.VerifyCanReachForwardingAddress(); + await auditIngestor.VerifyCanReachForwardingAddress(cancellationToken); var succeeded = 0; var failed = 0; @@ -33,11 +33,18 @@ await failedAuditStore.ProcessFailedMessages( { try { - var messageContext = new MessageContext(transportMessage.Id, transportMessage.Headers, transportMessage.Body, EmptyTransaction, settings.AuditQueue, EmptyContextBag); + var messageContext = new MessageContext( + transportMessage.Id, + transportMessage.Headers, + transportMessage.Body, + EmptyTransaction, + settings.AuditQueue, + EmptyContextBag + ); var taskCompletionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); messageContext.SetTaskCompletionSource(taskCompletionSource); - await auditIngestor.Ingest([messageContext]); + await auditIngestor.Ingest([messageContext], cancellationToken); await taskCompletionSource.Task; From faaf0ef7eae5615c82489d58dc62c7da5d4d160e Mon Sep 17 00:00:00 2001 From: Ramon Smits Date: Fri, 7 Feb 2025 00:04:38 +0100 Subject: [PATCH 02/14] Support cancellation for error ingestion --- .../UnitOfWork/RavenIngestionUnitOfWork.cs | 8 ++++---- .../Expiration/MessageExpiryTests.cs | 12 ++++++------ .../AttachmentsBodyStorageTests.cs | 2 +- .../MonitoringDataStoreTests.cs | 2 +- .../RetryConfirmationProcessorTests.cs | 6 +++--- .../UnitOfWork/FallbackIngestionUnitOfWork.cs | 7 ++++++- .../UnitOfWork/IIngestionUnitOfWork.cs | 3 ++- .../UnitOfWork/IngestionUnitOfWorkBase.cs | 3 ++- .../Operations/ErrorIngestion.cs | 4 ++-- .../Operations/ErrorIngestor.cs | 19 ++++++++++--------- .../Operations/ImportFailedErrors.cs | 16 +++++++++++----- 11 files changed, 48 insertions(+), 34 deletions(-) diff --git a/src/ServiceControl.Persistence.RavenDB/UnitOfWork/RavenIngestionUnitOfWork.cs b/src/ServiceControl.Persistence.RavenDB/UnitOfWork/RavenIngestionUnitOfWork.cs index 2481999816..64693498ff 100644 --- a/src/ServiceControl.Persistence.RavenDB/UnitOfWork/RavenIngestionUnitOfWork.cs +++ b/src/ServiceControl.Persistence.RavenDB/UnitOfWork/RavenIngestionUnitOfWork.cs @@ -1,8 +1,8 @@ namespace ServiceControl.Persistence.RavenDB { using System.Collections.Concurrent; + using System.Threading; using System.Threading.Tasks; - using Raven.Client.Documents; using Raven.Client.Documents.Commands.Batches; using ServiceControl.Persistence.UnitOfWork; @@ -22,12 +22,12 @@ public RavenIngestionUnitOfWork(IRavenSessionProvider sessionProvider, Expiratio internal void AddCommand(ICommandData command) => commands.Enqueue(command); - public override async Task Complete() + public override async Task Complete(CancellationToken cancellationToken) { - using var session = await sessionProvider.OpenSession(); + using var session = await sessionProvider.OpenSession(cancellationToken: cancellationToken); // not really interested in the batch results since a batch is atomic session.Advanced.Defer(commands.ToArray()); - await session.SaveChangesAsync(); + await session.SaveChangesAsync(cancellationToken); } } } \ No newline at end of file diff --git a/src/ServiceControl.Persistence.Tests.RavenDB/Expiration/MessageExpiryTests.cs b/src/ServiceControl.Persistence.Tests.RavenDB/Expiration/MessageExpiryTests.cs index f59bca7ea6..e7c9a45400 100644 --- a/src/ServiceControl.Persistence.Tests.RavenDB/Expiration/MessageExpiryTests.cs +++ b/src/ServiceControl.Persistence.Tests.RavenDB/Expiration/MessageExpiryTests.cs @@ -38,7 +38,7 @@ public async Task SingleMessageMarkedAsArchiveShouldExpire() { await uow.Recoverability.RecordFailedProcessingAttempt(context, attempt, []); - await uow.Complete(); + await uow.Complete(TestContext.CurrentContext.CancellationToken); } CompleteDatabaseOperation(); @@ -68,7 +68,7 @@ public async Task AllMessagesInUnArchivedGroupShouldNotExpire() await uow.Recoverability.RecordFailedProcessingAttempt(contextA, attemptA, [new FailedMessage.FailureGroup { Id = groupIdA }]); await uow.Recoverability.RecordFailedProcessingAttempt(contextB, attemptB, [new FailedMessage.FailureGroup { Id = groupIdB }]); - await uow.Complete(); + await uow.Complete(TestContext.CurrentContext.CancellationToken); } CompleteDatabaseOperation(); @@ -99,7 +99,7 @@ public async Task AllMessagesInArchivedGroupShouldExpire() { await uow.Recoverability.RecordFailedProcessingAttempt(context, attempt, [new FailedMessage.FailureGroup { Id = groupId }]); - await uow.Complete(); + await uow.Complete(TestContext.CurrentContext.CancellationToken); } CompleteDatabaseOperation(); @@ -122,7 +122,7 @@ public async Task SingleMessageMarkedAsResolvedShouldExpire() { await uow.Recoverability.RecordFailedProcessingAttempt(context, attempt, []); - await uow.Complete(); + await uow.Complete(TestContext.CurrentContext.CancellationToken); } CompleteDatabaseOperation(); @@ -145,7 +145,7 @@ public async Task RetryConfirmationProcessingShouldTriggerExpiration() { await uow.Recoverability.RecordFailedProcessingAttempt(context, attempt, []); - await uow.Complete(); + await uow.Complete(TestContext.CurrentContext.CancellationToken); } CompleteDatabaseOperation(); @@ -158,7 +158,7 @@ public async Task RetryConfirmationProcessingShouldTriggerExpiration() { await uow.Recoverability.RecordSuccessfulRetry(errors.Results.First().Id); - await uow.Complete(); + await uow.Complete(TestContext.CurrentContext.CancellationToken); } await WaitUntil(async () => (await GetAllMessages()).Results.Count == 0, "Retry confirmation should cause message removal."); diff --git a/src/ServiceControl.Persistence.Tests/BodyStorage/AttachmentsBodyStorageTests.cs b/src/ServiceControl.Persistence.Tests/BodyStorage/AttachmentsBodyStorageTests.cs index 7304657a48..39ecf8a60b 100644 --- a/src/ServiceControl.Persistence.Tests/BodyStorage/AttachmentsBodyStorageTests.cs +++ b/src/ServiceControl.Persistence.Tests/BodyStorage/AttachmentsBodyStorageTests.cs @@ -70,7 +70,7 @@ async Task RunTest(Func, string> getIdToQuery) var groups = new List(); await uow.Recoverability.RecordFailedProcessingAttempt(context, processingAttempt, groups); - await uow.Complete(); + await uow.Complete(cancellationSource.Token); } CompleteDatabaseOperation(); diff --git a/src/ServiceControl.Persistence.Tests/MonitoringDataStoreTests.cs b/src/ServiceControl.Persistence.Tests/MonitoringDataStoreTests.cs index fdd9144a96..68b58fe2c0 100644 --- a/src/ServiceControl.Persistence.Tests/MonitoringDataStoreTests.cs +++ b/src/ServiceControl.Persistence.Tests/MonitoringDataStoreTests.cs @@ -134,7 +134,7 @@ public async Task Unit_of_work_detects_endpoint() { await unitOfWork.Monitoring.RecordKnownEndpoint(knownEndpoint); - await unitOfWork.Complete(); + await unitOfWork.Complete(TestContext.CurrentContext.CancellationToken); } CompleteDatabaseOperation(); diff --git a/src/ServiceControl.Persistence.Tests/Recoverability/RetryConfirmationProcessorTests.cs b/src/ServiceControl.Persistence.Tests/Recoverability/RetryConfirmationProcessorTests.cs index 5c5a02ecee..e9d720a9e1 100644 --- a/src/ServiceControl.Persistence.Tests/Recoverability/RetryConfirmationProcessorTests.cs +++ b/src/ServiceControl.Persistence.Tests/Recoverability/RetryConfirmationProcessorTests.cs @@ -49,7 +49,7 @@ public async Task Should_handle_multiple_retry_confirmations_in_the_error_ingest var unitOfWork = await UnitOfWorkFactory.StartNew(); await Processor.Process(messageContexts, unitOfWork); - Assert.DoesNotThrowAsync(() => unitOfWork.Complete()); + Assert.DoesNotThrowAsync(() => unitOfWork.Complete(TestContext.CurrentContext.CancellationToken)); } [Test] @@ -71,7 +71,7 @@ public async Task Should_handle_retry_confirmation_followed_by_legacy_command() var unitOfWork = await UnitOfWorkFactory.StartNew(); await Processor.Process(messageContexts, unitOfWork); - await unitOfWork.Complete(); + await unitOfWork.Complete(TestContext.CurrentContext.CancellationToken); Assert.DoesNotThrowAsync( () => Handler.Handle(CreateLegacyRetryConfirmationCommand(), new TestableInvokeHandlerContext())); @@ -89,7 +89,7 @@ public async Task Should_handle_legacy_retry_confirmation_command_followed_by_ne var unitOfWork = await UnitOfWorkFactory.StartNew(); await Processor.Process(messageContexts, unitOfWork); - Assert.DoesNotThrowAsync(() => unitOfWork.Complete()); + Assert.DoesNotThrowAsync(() => unitOfWork.Complete(TestContext.CurrentContext.CancellationToken)); } static MarkMessageFailureResolvedByRetry CreateLegacyRetryConfirmationCommand() diff --git a/src/ServiceControl.Persistence/UnitOfWork/FallbackIngestionUnitOfWork.cs b/src/ServiceControl.Persistence/UnitOfWork/FallbackIngestionUnitOfWork.cs index 5b98d607c6..4fbb1e2885 100644 --- a/src/ServiceControl.Persistence/UnitOfWork/FallbackIngestionUnitOfWork.cs +++ b/src/ServiceControl.Persistence/UnitOfWork/FallbackIngestionUnitOfWork.cs @@ -1,5 +1,6 @@ namespace ServiceControl.Persistence.UnitOfWork { + using System.Threading; using System.Threading.Tasks; // HINT: This allows an implementor to provide only part of the implementation and allow the other part @@ -18,7 +19,11 @@ public FallbackIngestionUnitOfWork(IIngestionUnitOfWork primary, IIngestionUnitO Recoverability = primary.Recoverability ?? fallback.Recoverability; } - public override Task Complete() => Task.WhenAll(primary.Complete(), fallback.Complete()); + public override Task Complete(CancellationToken cancellationToken) + => Task.WhenAll( + primary.Complete(cancellationToken), + fallback.Complete(cancellationToken) + ); protected override void Dispose(bool disposing) { diff --git a/src/ServiceControl.Persistence/UnitOfWork/IIngestionUnitOfWork.cs b/src/ServiceControl.Persistence/UnitOfWork/IIngestionUnitOfWork.cs index 93e0b9ea1e..a14bda0369 100644 --- a/src/ServiceControl.Persistence/UnitOfWork/IIngestionUnitOfWork.cs +++ b/src/ServiceControl.Persistence/UnitOfWork/IIngestionUnitOfWork.cs @@ -1,12 +1,13 @@ namespace ServiceControl.Persistence.UnitOfWork { using System; + using System.Threading; using System.Threading.Tasks; public interface IIngestionUnitOfWork : IDisposable { IMonitoringIngestionUnitOfWork Monitoring { get; } IRecoverabilityIngestionUnitOfWork Recoverability { get; } - Task Complete(); + Task Complete(CancellationToken cancellationToken); } } \ No newline at end of file diff --git a/src/ServiceControl.Persistence/UnitOfWork/IngestionUnitOfWorkBase.cs b/src/ServiceControl.Persistence/UnitOfWork/IngestionUnitOfWorkBase.cs index a87e431170..e277b0491b 100644 --- a/src/ServiceControl.Persistence/UnitOfWork/IngestionUnitOfWorkBase.cs +++ b/src/ServiceControl.Persistence/UnitOfWork/IngestionUnitOfWorkBase.cs @@ -1,6 +1,7 @@ namespace ServiceControl.Persistence.UnitOfWork { using System; + using System.Threading; using System.Threading.Tasks; public abstract class IngestionUnitOfWorkBase : IIngestionUnitOfWork @@ -19,6 +20,6 @@ public void Dispose() public IMonitoringIngestionUnitOfWork Monitoring { get; protected set; } public IRecoverabilityIngestionUnitOfWork Recoverability { get; protected set; } - public virtual Task Complete() => Task.CompletedTask; + public virtual Task Complete(CancellationToken cancellationToken) => Task.CompletedTask; } } \ No newline at end of file diff --git a/src/ServiceControl/Operations/ErrorIngestion.cs b/src/ServiceControl/Operations/ErrorIngestion.cs index 6d0558d3ad..1da987376b 100644 --- a/src/ServiceControl/Operations/ErrorIngestion.cs +++ b/src/ServiceControl/Operations/ErrorIngestion.cs @@ -96,7 +96,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) batchSizeMeter.Mark(contexts.Count); using (batchDurationMeter.Measure()) { - await ingestor.Ingest(contexts); + await ingestor.Ingest(contexts, stoppingToken); } } catch (Exception e) @@ -214,7 +214,7 @@ async Task SetUpAndStartInfrastructure(CancellationToken cancellationToken) if (settings.ForwardErrorMessages) { - await ingestor.VerifyCanReachForwardingAddress(); + await ingestor.VerifyCanReachForwardingAddress(cancellationToken); } await messageReceiver.StartReceive(cancellationToken); diff --git a/src/ServiceControl/Operations/ErrorIngestor.cs b/src/ServiceControl/Operations/ErrorIngestor.cs index a2f13028cf..660dcafbb1 100644 --- a/src/ServiceControl/Operations/ErrorIngestor.cs +++ b/src/ServiceControl/Operations/ErrorIngestor.cs @@ -4,6 +4,7 @@ using System.Collections.Generic; using System.Diagnostics; using System.Linq; + using System.Threading; using System.Threading.Tasks; using Contracts.Operations; using Infrastructure.DomainEvents; @@ -49,7 +50,7 @@ public ErrorIngestor(Metrics metrics, logQueueAddress = new UnicastAddressTag(transportCustomization.ToTransportQualifiedQueueName(this.settings.ErrorLogQueue)); } - public async Task Ingest(List contexts) + public async Task Ingest(List contexts, CancellationToken cancellationToken) { var failedMessages = new List(contexts.Count); var retriedMessages = new List(contexts.Count); @@ -67,7 +68,7 @@ public async Task Ingest(List contexts) } - var storedFailed = await PersistFailedMessages(failedMessages, retriedMessages); + var storedFailed = await PersistFailedMessages(failedMessages, retriedMessages, cancellationToken); try { @@ -89,7 +90,7 @@ public async Task Ingest(List contexts) { Logger.Debug($"Forwarding {storedFailed.Count} messages"); } - await Forward(storedFailed); + await Forward(storedFailed, cancellationToken); if (Logger.IsDebugEnabled) { Logger.Debug("Forwarded messages"); @@ -113,7 +114,7 @@ public async Task Ingest(List contexts) } } - async Task> PersistFailedMessages(List failedMessageContexts, List retriedMessageContexts) + async Task> PersistFailedMessages(List failedMessageContexts, List retriedMessageContexts, CancellationToken cancellationToken) { var stopwatch = Stopwatch.StartNew(); @@ -130,7 +131,7 @@ async Task> PersistFailedMessages(List> PersistFailedMessages(List messageContexts) + Task Forward(IReadOnlyCollection messageContexts, CancellationToken cancellationToken) { var transportOperations = new TransportOperation[messageContexts.Count]; //We could allocate based on the actual number of ProcessedMessages but this should be OK var index = 0; @@ -177,11 +178,11 @@ Task Forward(IReadOnlyCollection messageContexts) return anyContext != null ? messageDispatcher.Value.Dispatch( new TransportOperations(transportOperations), - anyContext.TransportTransaction) + anyContext.TransportTransaction, cancellationToken) : Task.CompletedTask; } - public async Task VerifyCanReachForwardingAddress() + public async Task VerifyCanReachForwardingAddress(CancellationToken cancellationToken) { try { @@ -193,7 +194,7 @@ public async Task VerifyCanReachForwardingAddress() ) ); - await messageDispatcher.Value.Dispatch(transportOperations, new TransportTransaction()); + await messageDispatcher.Value.Dispatch(transportOperations, new TransportTransaction(), cancellationToken); } catch (Exception e) { diff --git a/src/ServiceControl/Operations/ImportFailedErrors.cs b/src/ServiceControl/Operations/ImportFailedErrors.cs index a98df1adec..b834248927 100644 --- a/src/ServiceControl/Operations/ImportFailedErrors.cs +++ b/src/ServiceControl/Operations/ImportFailedErrors.cs @@ -16,17 +16,23 @@ public async Task Run(CancellationToken cancellationToken = default) { if (settings.ForwardErrorMessages) { - await errorIngestor.VerifyCanReachForwardingAddress(); + await errorIngestor.VerifyCanReachForwardingAddress(cancellationToken); } await store.ProcessFailedErrorImports(async transportMessage => { - var messageContext = new MessageContext(transportMessage.Id, transportMessage.Headers, transportMessage.Body, EmptyTransaction, settings.ErrorQueue, EmptyContextBag); - var taskCompletionSource = - new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var messageContext = new MessageContext( + transportMessage.Id, + transportMessage.Headers, + transportMessage.Body, + EmptyTransaction, + settings.ErrorQueue, + EmptyContextBag + ); + var taskCompletionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); messageContext.SetTaskCompletionSource(taskCompletionSource); - await errorIngestor.Ingest([messageContext]); + await errorIngestor.Ingest([messageContext], cancellationToken); await taskCompletionSource.Task; }, cancellationToken); } From d8362725b3e6cc3204834e779d4d6ec40a7bc73f Mon Sep 17 00:00:00 2001 From: Ramon Smits Date: Fri, 7 Feb 2025 00:15:07 +0100 Subject: [PATCH 03/14] Added code comments on how lifetime is managed for `CancellationTokenSource` with `RavenAuditIngestionUnitOfWork` --- .../UnitOfWork/RavenAuditUnitOfWorkFactory.cs | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/src/ServiceControl.Audit.Persistence.RavenDB/UnitOfWork/RavenAuditUnitOfWorkFactory.cs b/src/ServiceControl.Audit.Persistence.RavenDB/UnitOfWork/RavenAuditUnitOfWorkFactory.cs index 78bc6798c8..eddfba1125 100644 --- a/src/ServiceControl.Audit.Persistence.RavenDB/UnitOfWork/RavenAuditUnitOfWorkFactory.cs +++ b/src/ServiceControl.Audit.Persistence.RavenDB/UnitOfWork/RavenAuditUnitOfWorkFactory.cs @@ -15,13 +15,19 @@ class RavenAuditIngestionUnitOfWorkFactory( { public async ValueTask StartNew(int batchSize, CancellationToken cancellationToken) { - var timedCancellationSource = new CancellationTokenSource(databaseConfiguration.BulkInsertCommitTimeout); - var bulkInsert = (await documentStoreProvider.GetDocumentStore(timedCancellationSource.Token)) - .BulkInsert(new BulkInsertOptions { SkipOverwriteIfUnchanged = true, }, timedCancellationSource.Token); + // DO NOT USE using var, will be disposed by RavenAuditIngestionUnitOfWork + var lifetimeForwardedTimedCancellationSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + lifetimeForwardedTimedCancellationSource.CancelAfter(databaseConfiguration.BulkInsertCommitTimeout); + var bulkInsert = (await documentStoreProvider.GetDocumentStore(lifetimeForwardedTimedCancellationSource.Token)) + .BulkInsert(new BulkInsertOptions { SkipOverwriteIfUnchanged = true, }, lifetimeForwardedTimedCancellationSource.Token); return new RavenAuditIngestionUnitOfWork( - bulkInsert, timedCancellationSource, databaseConfiguration.AuditRetentionPeriod, new RavenAttachmentsBodyStorage(sessionProvider, bulkInsert, databaseConfiguration.MaxBodySizeToStore) + bulkInsert, + lifetimeForwardedTimedCancellationSource, // Transfer ownership for disposal + databaseConfiguration.AuditRetentionPeriod, + new RavenAttachmentsBodyStorage(sessionProvider, bulkInsert, databaseConfiguration.MaxBodySizeToStore) ); + // Intentionally not disposing CTS! } public bool CanIngestMore() => customCheckState.CanIngestMore; From e9e7317f602f435fe415e1321d3834f4e8933025 Mon Sep 17 00:00:00 2001 From: Ramon Smits Date: Fri, 7 Feb 2025 00:36:32 +0100 Subject: [PATCH 04/14] started undertaking adding support for cancellation in domain events but pretty much all storage API's aren't supporting cancellation.... --- .../MessageFailures/When_a_message_fails_to_import.cs | 3 ++- src/ServiceControl.DomainEvents/DomainEvents.cs | 7 ++++--- .../Interfaces/IDomainEvents.cs | 3 ++- .../Interfaces/IDomainHandler.cs | 3 ++- src/ServiceControl.Persistence.Tests/FakeDomainEvents.cs | 3 ++- .../Recoverability/Archiving/InMemoryArchive.cs | 9 +++++---- .../Recoverability/Archiving/InMemoryUnarchive.cs | 9 +++++---- src/ServiceControl.UnitTests/FakeDomainEvents.cs | 3 ++- .../Monitoring/EndpointInstanceMonitoringTests.cs | 3 ++- src/ServiceControl/EventLog/AuditEventLogWriter.cs | 5 +++-- .../ExternalIntegrations/IntegrationEventWriter.cs | 3 ++- .../Infrastructure/SignalR/GlobalEventHandler.cs | 5 +++-- .../Infrastructure/SignalR/ServicePulseNotifier.cs | 5 +++-- src/ServiceControl/Monitoring/MonitoringDataPersister.cs | 9 +++++---- .../Notifications/Email/CustomChecksMailNotification.cs | 9 +++++---- .../Recoverability/Retrying/FailedMessageRetryCleaner.cs | 3 ++- .../Retrying/History/StoreHistoryHandler.cs | 3 ++- .../Retrying/Infrastructure/ReturnToSenderDequeuer.cs | 2 +- 18 files changed, 52 insertions(+), 35 deletions(-) diff --git a/src/ServiceControl.AcceptanceTests.RavenDB/Recoverability/MessageFailures/When_a_message_fails_to_import.cs b/src/ServiceControl.AcceptanceTests.RavenDB/Recoverability/MessageFailures/When_a_message_fails_to_import.cs index fc3092c825..bcc703e4ff 100644 --- a/src/ServiceControl.AcceptanceTests.RavenDB/Recoverability/MessageFailures/When_a_message_fails_to_import.cs +++ b/src/ServiceControl.AcceptanceTests.RavenDB/Recoverability/MessageFailures/When_a_message_fails_to_import.cs @@ -1,6 +1,7 @@ namespace ServiceControl.AcceptanceTests.RavenDB.Recoverability.MessageFailures { using System; + using System.Threading; using System.Threading.Tasks; using AcceptanceTesting; using AcceptanceTesting.EndpointTemplates; @@ -75,7 +76,7 @@ public async Task It_can_be_reimported() class MessageFailedHandler(MyContext scenarioContext) : IDomainHandler { - public Task Handle(MessageFailed domainEvent) + public Task Handle(MessageFailed domainEvent, CancellationToken cancellationToken) { scenarioContext.MessageFailedEventPublished = true; return Task.CompletedTask; diff --git a/src/ServiceControl.DomainEvents/DomainEvents.cs b/src/ServiceControl.DomainEvents/DomainEvents.cs index 5714b464d1..5443c784d7 100644 --- a/src/ServiceControl.DomainEvents/DomainEvents.cs +++ b/src/ServiceControl.DomainEvents/DomainEvents.cs @@ -1,6 +1,7 @@ namespace ServiceControl.Infrastructure.DomainEvents { using System; + using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.DependencyInjection; using NServiceBus.Logging; @@ -12,14 +13,14 @@ public class DomainEvents : IDomainEvents readonly IServiceProvider serviceProvider; public DomainEvents(IServiceProvider serviceProvider) => this.serviceProvider = serviceProvider; - public async Task Raise(T domainEvent) where T : IDomainEvent + public async Task Raise(T domainEvent, CancellationToken cancellationToken) where T : IDomainEvent { var handlers = serviceProvider.GetServices>(); foreach (var handler in handlers) { try { - await handler.Handle(domainEvent) + await handler.Handle(domainEvent, cancellationToken) .ConfigureAwait(false); } catch (Exception e) @@ -34,7 +35,7 @@ await handler.Handle(domainEvent) { try { - await handler.Handle(domainEvent) + await handler.Handle(domainEvent, cancellationToken) .ConfigureAwait(false); } catch (Exception e) diff --git a/src/ServiceControl.DomainEvents/Interfaces/IDomainEvents.cs b/src/ServiceControl.DomainEvents/Interfaces/IDomainEvents.cs index 7e7ea4cf3b..812c18fdde 100644 --- a/src/ServiceControl.DomainEvents/Interfaces/IDomainEvents.cs +++ b/src/ServiceControl.DomainEvents/Interfaces/IDomainEvents.cs @@ -1,9 +1,10 @@ namespace ServiceControl.Infrastructure.DomainEvents { + using System.Threading; using System.Threading.Tasks; public interface IDomainEvents { - Task Raise(T domainEvent) where T : IDomainEvent; + Task Raise(T domainEvent, CancellationToken cancellationToken = default) where T : IDomainEvent; } } \ No newline at end of file diff --git a/src/ServiceControl.DomainEvents/Interfaces/IDomainHandler.cs b/src/ServiceControl.DomainEvents/Interfaces/IDomainHandler.cs index f7d49d97ea..c664652928 100644 --- a/src/ServiceControl.DomainEvents/Interfaces/IDomainHandler.cs +++ b/src/ServiceControl.DomainEvents/Interfaces/IDomainHandler.cs @@ -1,9 +1,10 @@ namespace ServiceControl.Infrastructure.DomainEvents { + using System.Threading; using System.Threading.Tasks; public interface IDomainHandler where T : IDomainEvent { - Task Handle(T domainEvent); + Task Handle(T domainEvent, CancellationToken cancellationToken = default); } } \ No newline at end of file diff --git a/src/ServiceControl.Persistence.Tests/FakeDomainEvents.cs b/src/ServiceControl.Persistence.Tests/FakeDomainEvents.cs index d0e26cb57b..0718354ab6 100644 --- a/src/ServiceControl.Persistence.Tests/FakeDomainEvents.cs +++ b/src/ServiceControl.Persistence.Tests/FakeDomainEvents.cs @@ -3,6 +3,7 @@ using System.Collections.Generic; using System.Text.Json; using System.Text.Json.Serialization; + using System.Threading; using System.Threading.Tasks; using NUnit.Framework; using ServiceControl.Infrastructure.DomainEvents; @@ -12,7 +13,7 @@ class FakeDomainEvents : IDomainEvents { public List RaisedEvents { get; } = []; - public Task Raise(T domainEvent) where T : IDomainEvent + public Task Raise(T domainEvent, CancellationToken cancellationToken) where T : IDomainEvent { RaisedEvents.Add(domainEvent); TestContext.Out.WriteLine($"Raised DomainEvent {typeof(T).Name}:"); diff --git a/src/ServiceControl.Persistence/Recoverability/Archiving/InMemoryArchive.cs b/src/ServiceControl.Persistence/Recoverability/Archiving/InMemoryArchive.cs index 7237ce0b00..91c9b81f22 100644 --- a/src/ServiceControl.Persistence/Recoverability/Archiving/InMemoryArchive.cs +++ b/src/ServiceControl.Persistence/Recoverability/Archiving/InMemoryArchive.cs @@ -1,6 +1,7 @@ namespace ServiceControl.Recoverability { using System; + using System.Threading; using System.Threading.Tasks; using Infrastructure.DomainEvents; @@ -51,7 +52,7 @@ public Task Start() ArchiveType = ArchiveType, Progress = GetProgress(), StartTime = Started - }); + }, CancellationToken.None); } public Task BatchArchived(int numberOfMessagesArchivedInBatch) @@ -68,7 +69,7 @@ public Task BatchArchived(int numberOfMessagesArchivedInBatch) Progress = GetProgress(), StartTime = Started, Last = Last.Value - }); + }, CancellationToken.None); } public Task FinalizeArchive() @@ -84,7 +85,7 @@ public Task FinalizeArchive() Progress = GetProgress(), StartTime = Started, Last = Last.Value - }); + }, CancellationToken.None); } public Task Complete() @@ -103,7 +104,7 @@ public Task Complete() Last = Last.Value, CompletionTime = CompletionTime.Value, GroupName = GroupName - }); + }, CancellationToken.None); } public bool NeedsAcknowledgement() diff --git a/src/ServiceControl.Persistence/Recoverability/Archiving/InMemoryUnarchive.cs b/src/ServiceControl.Persistence/Recoverability/Archiving/InMemoryUnarchive.cs index d7bebd73bc..0fa0b1f766 100644 --- a/src/ServiceControl.Persistence/Recoverability/Archiving/InMemoryUnarchive.cs +++ b/src/ServiceControl.Persistence/Recoverability/Archiving/InMemoryUnarchive.cs @@ -1,6 +1,7 @@ namespace ServiceControl.Recoverability { using System; + using System.Threading; using System.Threading.Tasks; using Infrastructure.DomainEvents; @@ -51,7 +52,7 @@ public Task Start() ArchiveType = ArchiveType, Progress = GetProgress(), StartTime = Started - }); + }, CancellationToken.None); } public Task BatchUnarchived(int numberOfMessagesUnarchivedInBatch) @@ -68,7 +69,7 @@ public Task BatchUnarchived(int numberOfMessagesUnarchivedInBatch) Progress = GetProgress(), StartTime = Started, Last = Last.Value - }); + }, CancellationToken.None); } public Task FinalizeUnarchive() @@ -84,7 +85,7 @@ public Task FinalizeUnarchive() Progress = GetProgress(), StartTime = Started, Last = Last.Value - }); + }, CancellationToken.None); } public Task Complete() @@ -103,7 +104,7 @@ public Task Complete() Last = Last.Value, CompletionTime = CompletionTime.Value, GroupName = GroupName - }); + }, CancellationToken.None); } internal bool NeedsAcknowledgement() diff --git a/src/ServiceControl.UnitTests/FakeDomainEvents.cs b/src/ServiceControl.UnitTests/FakeDomainEvents.cs index b50724fd13..ae662f4496 100644 --- a/src/ServiceControl.UnitTests/FakeDomainEvents.cs +++ b/src/ServiceControl.UnitTests/FakeDomainEvents.cs @@ -1,6 +1,7 @@ namespace ServiceControl.UnitTests.Operations { using System.Collections.Generic; + using System.Threading; using System.Threading.Tasks; using ServiceControl.Infrastructure.DomainEvents; @@ -8,7 +9,7 @@ class FakeDomainEvents : IDomainEvents { public List RaisedEvents { get; } = []; - public Task Raise(T domainEvent) where T : IDomainEvent + public Task Raise(T domainEvent, CancellationToken cancellationToken = default) where T : IDomainEvent { RaisedEvents.Add(domainEvent); return Task.CompletedTask; diff --git a/src/ServiceControl.UnitTests/Monitoring/EndpointInstanceMonitoringTests.cs b/src/ServiceControl.UnitTests/Monitoring/EndpointInstanceMonitoringTests.cs index d1ad39c947..a0662eafa3 100644 --- a/src/ServiceControl.UnitTests/Monitoring/EndpointInstanceMonitoringTests.cs +++ b/src/ServiceControl.UnitTests/Monitoring/EndpointInstanceMonitoringTests.cs @@ -1,6 +1,7 @@ namespace ServiceControl.UnitTests.Monitoring { using System; + using System.Threading; using System.Threading.Tasks; using NUnit.Framework; using ServiceControl.Infrastructure.DomainEvents; @@ -33,7 +34,7 @@ public async Task When_endpoint_removed_should_stay_removed() class FakeDomainEvents : IDomainEvents { - public Task Raise(T domainEvent) where T : IDomainEvent + public Task Raise(T domainEvent, CancellationToken cancellationToken) where T : IDomainEvent { return Task.CompletedTask; } diff --git a/src/ServiceControl/EventLog/AuditEventLogWriter.cs b/src/ServiceControl/EventLog/AuditEventLogWriter.cs index caabe47c3f..84e1708e77 100644 --- a/src/ServiceControl/EventLog/AuditEventLogWriter.cs +++ b/src/ServiceControl/EventLog/AuditEventLogWriter.cs @@ -1,5 +1,6 @@ namespace ServiceControl.EventLog { + using System.Threading; using System.Threading.Tasks; using Contracts.EventLog; using Infrastructure.DomainEvents; @@ -19,7 +20,7 @@ public AuditEventLogWriter(GlobalEventHandler broadcaster, IErrorMessageDataStor this.mappings = mappings; } - public async Task Handle(IDomainEvent message) + public async Task Handle(IDomainEvent message, CancellationToken cancellationToken) { if (!mappings.HasMapping(message)) { @@ -41,7 +42,7 @@ await broadcaster.Broadcast(new EventLogItemAdded // The reason is because this data is not useful for end users, so for now we just empty it. // At the moment too much data is being populated in this field, and this has significant down sides to the amount of data we are sending down to ServicePulse (it actually crashes it). RelatedTo = [] - }); + }, cancellationToken); } readonly GlobalEventHandler broadcaster; diff --git a/src/ServiceControl/ExternalIntegrations/IntegrationEventWriter.cs b/src/ServiceControl/ExternalIntegrations/IntegrationEventWriter.cs index 768c0442e9..e4af301c7c 100644 --- a/src/ServiceControl/ExternalIntegrations/IntegrationEventWriter.cs +++ b/src/ServiceControl/ExternalIntegrations/IntegrationEventWriter.cs @@ -2,6 +2,7 @@ { using System.Collections.Generic; using System.Linq; + using System.Threading; using System.Threading.Tasks; using Infrastructure.DomainEvents; using NServiceBus.Logging; @@ -15,7 +16,7 @@ public IntegrationEventWriter(IExternalIntegrationRequestsDataStore store, IEnum this.eventPublishers = eventPublishers; } - public async Task Handle(IDomainEvent message) + public async Task Handle(IDomainEvent message, CancellationToken cancellationToken) { var dispatchContexts = eventPublishers .Where(p => p.Handles(message)) diff --git a/src/ServiceControl/Infrastructure/SignalR/GlobalEventHandler.cs b/src/ServiceControl/Infrastructure/SignalR/GlobalEventHandler.cs index d40ba4a50a..61a026d150 100644 --- a/src/ServiceControl/Infrastructure/SignalR/GlobalEventHandler.cs +++ b/src/ServiceControl/Infrastructure/SignalR/GlobalEventHandler.cs @@ -1,14 +1,15 @@ namespace ServiceControl.Infrastructure.SignalR { + using System.Threading; using System.Threading.Tasks; using Microsoft.AspNetCore.SignalR; class GlobalEventHandler(IHubContext hubContext) { - public Task Broadcast(IUserInterfaceEvent @event) + public Task Broadcast(IUserInterfaceEvent @event, CancellationToken cancellationToken) { var typeName = @event.GetType().Name; - return hubContext.Clients.All.SendAsync("PushEnvelope", new Envelope { Types = [typeName], Message = @event }); + return hubContext.Clients.All.SendAsync("PushEnvelope", new Envelope { Types = [typeName], Message = @event }, cancellationToken); } } } \ No newline at end of file diff --git a/src/ServiceControl/Infrastructure/SignalR/ServicePulseNotifier.cs b/src/ServiceControl/Infrastructure/SignalR/ServicePulseNotifier.cs index 64702ccb36..84f204e485 100644 --- a/src/ServiceControl/Infrastructure/SignalR/ServicePulseNotifier.cs +++ b/src/ServiceControl/Infrastructure/SignalR/ServicePulseNotifier.cs @@ -1,5 +1,6 @@ namespace ServiceControl.Infrastructure.SignalR { + using System.Threading; using System.Threading.Tasks; using DomainEvents; @@ -8,11 +9,11 @@ namespace ServiceControl.Infrastructure.SignalR /// class ServicePulseNotifier(GlobalEventHandler broadcaster) : IDomainHandler { - public async Task Handle(IDomainEvent domainEvent) + public async Task Handle(IDomainEvent domainEvent, CancellationToken cancellationToken) { if (domainEvent is IUserInterfaceEvent userInterfaceEvent) { - await broadcaster.Broadcast(userInterfaceEvent); + await broadcaster.Broadcast(userInterfaceEvent, cancellationToken); } } } diff --git a/src/ServiceControl/Monitoring/MonitoringDataPersister.cs b/src/ServiceControl/Monitoring/MonitoringDataPersister.cs index fd41851f02..3707e314a4 100644 --- a/src/ServiceControl/Monitoring/MonitoringDataPersister.cs +++ b/src/ServiceControl/Monitoring/MonitoringDataPersister.cs @@ -1,5 +1,6 @@ namespace ServiceControl.Monitoring { + using System.Threading; using System.Threading.Tasks; using Contracts.EndpointControl; using Contracts.HeartbeatMonitoring; @@ -19,22 +20,22 @@ public MonitoringDataPersister(IMonitoringDataStore monitoringDataStore, IEndpoi _endpointInstanceMonitoring = endpointInstanceMonitoring; } - public Task Handle(EndpointDetected domainEvent) + public Task Handle(EndpointDetected domainEvent, CancellationToken cancellationToken) { return _monitoringDataStore.CreateIfNotExists(domainEvent.Endpoint); } - public Task Handle(HeartbeatingEndpointDetected domainEvent) + public Task Handle(HeartbeatingEndpointDetected domainEvent, CancellationToken cancellationToken) { return _monitoringDataStore.CreateOrUpdate(domainEvent.Endpoint, _endpointInstanceMonitoring); } - public Task Handle(MonitoringEnabledForEndpoint domainEvent) + public Task Handle(MonitoringEnabledForEndpoint domainEvent, CancellationToken cancellationToken) { return _monitoringDataStore.UpdateEndpointMonitoring(domainEvent.Endpoint, true); } - public Task Handle(MonitoringDisabledForEndpoint domainEvent) + public Task Handle(MonitoringDisabledForEndpoint domainEvent, CancellationToken cancellationToken) { return _monitoringDataStore.UpdateEndpointMonitoring(domainEvent.Endpoint, false); } diff --git a/src/ServiceControl/Notifications/Email/CustomChecksMailNotification.cs b/src/ServiceControl/Notifications/Email/CustomChecksMailNotification.cs index 18a752dbdd..5368119fbb 100644 --- a/src/ServiceControl/Notifications/Email/CustomChecksMailNotification.cs +++ b/src/ServiceControl/Notifications/Email/CustomChecksMailNotification.cs @@ -2,6 +2,7 @@ { using System; using System.Linq; + using System.Threading; using System.Threading.Tasks; using Contracts.CustomChecks; using Infrastructure.DomainEvents; @@ -42,7 +43,7 @@ public CustomChecksMailNotification(IMessageSession messageSession, Settings set } } - public Task Handle(CustomCheckFailed domainEvent) + public Task Handle(CustomCheckFailed domainEvent, CancellationToken cancellationToken) { if (IsHealthCheck(domainEvent.CustomCheckId)) { @@ -61,13 +62,13 @@ public Task Handle(CustomCheckFailed domainEvent) {domainEvent.CustomCheckId} failed on {domainEvent.FailedAt}. {domainEvent.FailureReason}" - }); + }, cancellationToken: cancellationToken); } return Task.CompletedTask; } - public Task Handle(CustomCheckSucceeded domainEvent) + public Task Handle(CustomCheckSucceeded domainEvent, CancellationToken cancellationToken) { if (IsHealthCheck(domainEvent.CustomCheckId)) { @@ -84,7 +85,7 @@ public Task Handle(CustomCheckSucceeded domainEvent) Body = $@"{domainEvent.Category} check for ServiceControl instance {instanceName} at {instanceAddress}. {domainEvent.CustomCheckId} succeeded on {domainEvent.SucceededAt}." - }); + }, cancellationToken: cancellationToken); } return Task.CompletedTask; diff --git a/src/ServiceControl/Recoverability/Retrying/FailedMessageRetryCleaner.cs b/src/ServiceControl/Recoverability/Retrying/FailedMessageRetryCleaner.cs index e2d39648c8..e2b55765a6 100644 --- a/src/ServiceControl/Recoverability/Retrying/FailedMessageRetryCleaner.cs +++ b/src/ServiceControl/Recoverability/Retrying/FailedMessageRetryCleaner.cs @@ -1,5 +1,6 @@ namespace ServiceControl.Recoverability { + using System.Threading; using System.Threading.Tasks; using Contracts.MessageFailures; using Infrastructure.DomainEvents; @@ -14,7 +15,7 @@ public FailedMessageRetryCleaner(IErrorMessageDataStore dataStore) this.dataStore = dataStore; } - public Task Handle(MessageFailed message) + public Task Handle(MessageFailed message, CancellationToken cancellationToken) { if (message.RepeatedFailure) { diff --git a/src/ServiceControl/Recoverability/Retrying/History/StoreHistoryHandler.cs b/src/ServiceControl/Recoverability/Retrying/History/StoreHistoryHandler.cs index c1939f9ed7..707fc1d340 100644 --- a/src/ServiceControl/Recoverability/Retrying/History/StoreHistoryHandler.cs +++ b/src/ServiceControl/Recoverability/Retrying/History/StoreHistoryHandler.cs @@ -1,5 +1,6 @@ namespace ServiceControl.Recoverability.Retrying { + using System.Threading; using System.Threading.Tasks; using Infrastructure.DomainEvents; using ServiceBus.Management.Infrastructure.Settings; @@ -13,7 +14,7 @@ public StoreHistoryHandler(IRetryHistoryDataStore store, Settings settings) this.settings = settings; } - public Task Handle(RetryOperationCompleted message) + public Task Handle(RetryOperationCompleted message, CancellationToken cancellationToken) { return store.RecordRetryOperationCompleted( message.RequestId, diff --git a/src/ServiceControl/Recoverability/Retrying/Infrastructure/ReturnToSenderDequeuer.cs b/src/ServiceControl/Recoverability/Retrying/Infrastructure/ReturnToSenderDequeuer.cs index edd08e220d..648de99e02 100644 --- a/src/ServiceControl/Recoverability/Retrying/Infrastructure/ReturnToSenderDequeuer.cs +++ b/src/ServiceControl/Recoverability/Retrying/Infrastructure/ReturnToSenderDequeuer.cs @@ -231,7 +231,7 @@ await domainEvents.Raise(new MessagesSubmittedForRetryFailed Reason = reason, FailedMessageId = messageUniqueId, Destination = destination - }); + }, cancellationToken); } catch (Exception ex) { From 06cfbd8dc51e9ab0bca4eed92bb36b58ad78d3f0 Mon Sep 17 00:00:00 2001 From: Ramon Smits Date: Fri, 7 Feb 2025 13:58:09 +0100 Subject: [PATCH 05/14] More cancellation token propagation --- src/ServiceControl/CustomChecks/DeleteCustomCheckHandler.cs | 2 +- .../MessageFailures/Handlers/ArchiveMessageHandler.cs | 2 +- .../Handlers/LegacyMessageFailureResolvedHandler.cs | 4 ++-- .../MessageFailures/Handlers/MessageFailureResolvedHandler.cs | 2 +- .../Handlers/UnArchiveMessagesByRangeHandler.cs | 2 +- .../MessageFailures/Handlers/UnArchiveMessagesHandler.cs | 2 +- 6 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/ServiceControl/CustomChecks/DeleteCustomCheckHandler.cs b/src/ServiceControl/CustomChecks/DeleteCustomCheckHandler.cs index 95d2d973e6..79a3e1aa81 100644 --- a/src/ServiceControl/CustomChecks/DeleteCustomCheckHandler.cs +++ b/src/ServiceControl/CustomChecks/DeleteCustomCheckHandler.cs @@ -17,7 +17,7 @@ public async Task Handle(DeleteCustomCheck message, IMessageHandlerContext conte { await customChecksDataStore.DeleteCustomCheck(message.Id); - await domainEvents.Raise(new CustomCheckDeleted { Id = message.Id }); + await domainEvents.Raise(new CustomCheckDeleted { Id = message.Id }, context.CancellationToken); } ICustomChecksDataStore customChecksDataStore; diff --git a/src/ServiceControl/MessageFailures/Handlers/ArchiveMessageHandler.cs b/src/ServiceControl/MessageFailures/Handlers/ArchiveMessageHandler.cs index b75b0bec29..2e317cba54 100644 --- a/src/ServiceControl/MessageFailures/Handlers/ArchiveMessageHandler.cs +++ b/src/ServiceControl/MessageFailures/Handlers/ArchiveMessageHandler.cs @@ -26,7 +26,7 @@ public async Task Handle(ArchiveMessage message, IMessageHandlerContext context) await domainEvents.Raise(new FailedMessageArchived { FailedMessageId = failedMessageId - }); + }, context.CancellationToken); await dataStore.FailedMessageMarkAsArchived(failedMessageId); } diff --git a/src/ServiceControl/MessageFailures/Handlers/LegacyMessageFailureResolvedHandler.cs b/src/ServiceControl/MessageFailures/Handlers/LegacyMessageFailureResolvedHandler.cs index 1cf465463f..6c9e5f62db 100644 --- a/src/ServiceControl/MessageFailures/Handlers/LegacyMessageFailureResolvedHandler.cs +++ b/src/ServiceControl/MessageFailures/Handlers/LegacyMessageFailureResolvedHandler.cs @@ -27,7 +27,7 @@ await domainEvents.Raise(new MessageFailureResolvedByRetry { AlternativeFailedMessageIds = message.AlternativeFailedMessageIds, FailedMessageId = message.FailedMessageId - }); + }, context.CancellationToken); } // This is only needed because we might get this from legacy not yet converted instances @@ -38,7 +38,7 @@ await domainEvents.Raise(new MessageFailureResolvedByRetry { AlternativeFailedMessageIds = message.AlternativeFailedMessageIds, FailedMessageId = message.FailedMessageId - }); + }, context.CancellationToken); } async Task MarkAsResolvedByRetry(string primaryId, string[] messageAlternativeFailedMessageIds) diff --git a/src/ServiceControl/MessageFailures/Handlers/MessageFailureResolvedHandler.cs b/src/ServiceControl/MessageFailures/Handlers/MessageFailureResolvedHandler.cs index 7c92c831c3..251123319d 100644 --- a/src/ServiceControl/MessageFailures/Handlers/MessageFailureResolvedHandler.cs +++ b/src/ServiceControl/MessageFailures/Handlers/MessageFailureResolvedHandler.cs @@ -44,7 +44,7 @@ public async Task Handle(MarkPendingRetryAsResolved message, IMessageHandlerCont await domainEvents.Raise(new MessageFailureResolvedManually { FailedMessageId = message.FailedMessageId - }); + }, context.CancellationToken); } IErrorMessageDataStore dataStore; diff --git a/src/ServiceControl/MessageFailures/Handlers/UnArchiveMessagesByRangeHandler.cs b/src/ServiceControl/MessageFailures/Handlers/UnArchiveMessagesByRangeHandler.cs index 04543de5b9..2415e085bd 100644 --- a/src/ServiceControl/MessageFailures/Handlers/UnArchiveMessagesByRangeHandler.cs +++ b/src/ServiceControl/MessageFailures/Handlers/UnArchiveMessagesByRangeHandler.cs @@ -23,7 +23,7 @@ await domainEvents.Raise(new FailedMessagesUnArchived { DocumentIds = ids, MessagesCount = ids.Length - }); + }, context.CancellationToken); } diff --git a/src/ServiceControl/MessageFailures/Handlers/UnArchiveMessagesHandler.cs b/src/ServiceControl/MessageFailures/Handlers/UnArchiveMessagesHandler.cs index dd4988730f..cb601ddbb4 100644 --- a/src/ServiceControl/MessageFailures/Handlers/UnArchiveMessagesHandler.cs +++ b/src/ServiceControl/MessageFailures/Handlers/UnArchiveMessagesHandler.cs @@ -18,7 +18,7 @@ await domainEvents.Raise(new FailedMessagesUnArchived { DocumentIds = ids, MessagesCount = ids.Length - }); + }, context.CancellationToken); } } } \ No newline at end of file From 7fb0a01725fcc85c29409f49e23eb8c3da8973f9 Mon Sep 17 00:00:00 2001 From: Ramon Smits Date: Fri, 7 Feb 2025 14:38:31 +0100 Subject: [PATCH 06/14] OnMessage: Invoke `TrySetCanceled` when cancelled to return ASAP --- src/ServiceControl/Operations/ErrorIngestion.cs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/ServiceControl/Operations/ErrorIngestion.cs b/src/ServiceControl/Operations/ErrorIngestion.cs index 1da987376b..8f60e7528a 100644 --- a/src/ServiceControl/Operations/ErrorIngestion.cs +++ b/src/ServiceControl/Operations/ErrorIngestion.cs @@ -269,7 +269,11 @@ async Task OnMessage(MessageContext messageContext, CancellationToken cancellati } var taskCompletionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - messageContext.SetTaskCompletionSource(taskCompletionSource); + + // Ideally we want to propagate the cancellationToken to the batch handling + // but cancellation in only cancelled when endpointInstance.Stop is cancelled, not when invoked. + // Not much shutdown speed to gain but this will ensure endpoint.Stop will return. + await using var cancellationTokenRegistration = cancellationToken.Register(() => _ = taskCompletionSource.TrySetCanceled()); receivedMeter.Mark(); From e2e28a233c55ccbb71e056d94d52740620e5ba4c Mon Sep 17 00:00:00 2001 From: Ramon Smits Date: Thu, 6 Mar 2025 12:19:10 +0100 Subject: [PATCH 07/14] Fix API's are rebase with forcing accepting all changes from master # Conflicts: # src/ServiceControl.Audit/Auditing/AuditIngestion.cs # src/ServiceControl.Audit/Auditing/AuditIngestor.cs --- .../Auditing/AuditIngestion.cs | 7 ++++--- .../Auditing/AuditIngestor.cs | 16 ++++++++-------- .../Auditing/AuditPersister.cs | 5 +++-- 3 files changed, 15 insertions(+), 13 deletions(-) diff --git a/src/ServiceControl.Audit/Auditing/AuditIngestion.cs b/src/ServiceControl.Audit/Auditing/AuditIngestion.cs index 2b76b152ea..2fb065f9d8 100644 --- a/src/ServiceControl.Audit/Auditing/AuditIngestion.cs +++ b/src/ServiceControl.Audit/Auditing/AuditIngestion.cs @@ -27,7 +27,8 @@ public AuditIngestion( AuditIngestor auditIngestor, IAuditIngestionUnitOfWorkFactory unitOfWorkFactory, IHostApplicationLifetime applicationLifetime, - IngestionMetrics metrics) + IngestionMetrics metrics + ) { inputEndpoint = settings.AuditQueue; this.transportCustomization = transportCustomization; @@ -132,7 +133,7 @@ async Task SetUpAndStartInfrastructure(CancellationToken cancellationToken) messageReceiver = transportInfrastructure.Receivers[inputEndpoint]; - await auditIngestor.VerifyCanReachForwardingAddress(); + await auditIngestor.VerifyCanReachForwardingAddress(cancellationToken); await messageReceiver.StartReceive(cancellationToken); logger.Info(LogMessages.StartedInfrastructure); @@ -236,7 +237,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) contexts.Add(context); } - await auditIngestor.Ingest(contexts); + await auditIngestor.Ingest(contexts, stoppingToken); batchMetrics.Complete(contexts.Count); } diff --git a/src/ServiceControl.Audit/Auditing/AuditIngestor.cs b/src/ServiceControl.Audit/Auditing/AuditIngestor.cs index 4a333a4b3e..f783a61dea 100644 --- a/src/ServiceControl.Audit/Auditing/AuditIngestor.cs +++ b/src/ServiceControl.Audit/Auditing/AuditIngestor.cs @@ -3,6 +3,7 @@ using System; using System.Collections.Generic; using System.Linq; + using System.Threading; using System.Threading.Tasks; using Infrastructure.Settings; using Monitoring; @@ -42,15 +43,15 @@ ITransportCustomization transportCustomization ); } - public async Task Ingest(List contexts) + public async Task Ingest(List contexts, CancellationToken cancellationToken) { - var stored = await auditPersister.Persist(contexts); + var stored = await auditPersister.Persist(contexts, cancellationToken); try { if (settings.ForwardAuditMessages) { - await Forward(stored, logQueueAddress); + await Forward(stored, logQueueAddress, cancellationToken); } foreach (var context in contexts) @@ -67,7 +68,7 @@ public async Task Ingest(List contexts) } } - Task Forward(IReadOnlyCollection messageContexts, string forwardingAddress) + Task Forward(IReadOnlyCollection messageContexts, string forwardingAddress, CancellationToken cancellationToken) { var transportOperations = new TransportOperation[messageContexts.Count]; //We could allocate based on the actual number of ProcessedMessages but this should be OK var index = 0; @@ -96,12 +97,11 @@ Task Forward(IReadOnlyCollection messageContexts, string forward return anyContext != null ? messageDispatcher.Value.Dispatch( new TransportOperations(transportOperations), - anyContext.TransportTransaction - ) + anyContext.TransportTransaction, cancellationToken) : Task.CompletedTask; } - public async Task VerifyCanReachForwardingAddress() + public async Task VerifyCanReachForwardingAddress(CancellationToken cancellationToken) { if (!settings.ForwardAuditMessages) { @@ -118,7 +118,7 @@ public async Task VerifyCanReachForwardingAddress() ) ); - await messageDispatcher.Value.Dispatch(transportOperations, new TransportTransaction()); + await messageDispatcher.Value.Dispatch(transportOperations, new TransportTransaction(), cancellationToken); } catch (Exception e) { diff --git a/src/ServiceControl.Audit/Auditing/AuditPersister.cs b/src/ServiceControl.Audit/Auditing/AuditPersister.cs index 008c86820a..aad348c8f8 100644 --- a/src/ServiceControl.Audit/Auditing/AuditPersister.cs +++ b/src/ServiceControl.Audit/Auditing/AuditPersister.cs @@ -3,6 +3,7 @@ using System; using System.Collections.Generic; using System.Text.Json; + using System.Threading; using System.Threading.Tasks; using Infrastructure; using Monitoring; @@ -21,14 +22,14 @@ class AuditPersister(IAuditIngestionUnitOfWorkFactory unitOfWorkFactory, IMessageSession messageSession, Lazy messageDispatcher) { - public async Task> Persist(IReadOnlyList contexts) + public async Task> Persist(IReadOnlyList contexts, CancellationToken cancellationToken) { var storedContexts = new List(contexts.Count); IAuditIngestionUnitOfWork unitOfWork = null; try { // deliberately not using the using statement because we dispose async explicitly - unitOfWork = await unitOfWorkFactory.StartNew(contexts.Count); + unitOfWork = await unitOfWorkFactory.StartNew(contexts.Count, cancellationToken); var inserts = new List(contexts.Count); foreach (var context in contexts) { From 2400885fc421428d0f15736ae55a35abab7c3d6b Mon Sep 17 00:00:00 2001 From: Ramon Smits Date: Thu, 6 Mar 2025 12:30:37 +0100 Subject: [PATCH 08/14] Suggestion by feedback to pass CT's to RecordProcessedMessage, RecordSagaSnapshot and RecordKnownEndpoint --- .../InMemoryAttachmentsBodyStorage.cs | 3 ++- .../InMemoryAuditIngestionUnitOfWork.cs | 9 +++++---- .../RavenAttachmentsBodyStorage.cs | 5 +++-- .../RavenAuditIngestionUnitOfWork.cs | 8 ++++---- .../BodyStorageEnricher.cs | 13 +++++++------ .../IBodyStorage.cs | 3 ++- .../UnitOfWork/IAuditIngestionUnitOfWork.cs | 7 ++++--- .../BodyStorage/BodyStorageEnricherTests.cs | 19 ++++++++++--------- .../Auditing/AuditPersister.cs | 6 +++--- 9 files changed, 40 insertions(+), 33 deletions(-) diff --git a/src/ServiceControl.Audit.Persistence.InMemory/InMemoryAttachmentsBodyStorage.cs b/src/ServiceControl.Audit.Persistence.InMemory/InMemoryAttachmentsBodyStorage.cs index 3ce71f7acb..74419d4d03 100644 --- a/src/ServiceControl.Audit.Persistence.InMemory/InMemoryAttachmentsBodyStorage.cs +++ b/src/ServiceControl.Audit.Persistence.InMemory/InMemoryAttachmentsBodyStorage.cs @@ -3,6 +3,7 @@ using System.Collections.Generic; using System.IO; using System.Linq; + using System.Threading; using System.Threading.Tasks; using ServiceControl.Audit.Auditing.BodyStorage; @@ -15,7 +16,7 @@ public InMemoryAttachmentsBodyStorage() messageBodies = []; } - public Task Store(string bodyId, string contentType, int bodySize, Stream bodyStream) + public Task Store(string bodyId, string contentType, int bodySize, Stream bodyStream, CancellationToken cancellationToken) { var messageBody = messageBodies.FirstOrDefault(w => w.BodyId == bodyId); diff --git a/src/ServiceControl.Audit.Persistence.InMemory/InMemoryAuditIngestionUnitOfWork.cs b/src/ServiceControl.Audit.Persistence.InMemory/InMemoryAuditIngestionUnitOfWork.cs index 8d9551a795..961e39801a 100644 --- a/src/ServiceControl.Audit.Persistence.InMemory/InMemoryAuditIngestionUnitOfWork.cs +++ b/src/ServiceControl.Audit.Persistence.InMemory/InMemoryAuditIngestionUnitOfWork.cs @@ -1,6 +1,7 @@ namespace ServiceControl.Audit.Persistence.InMemory { using System; + using System.Threading; using System.Threading.Tasks; using Auditing.BodyStorage; using ServiceControl.Audit.Auditing; @@ -15,21 +16,21 @@ class InMemoryAuditIngestionUnitOfWork( { public ValueTask DisposeAsync() => ValueTask.CompletedTask; - public Task RecordKnownEndpoint(KnownEndpoint knownEndpoint) + public Task RecordKnownEndpoint(KnownEndpoint knownEndpoint, CancellationToken cancellationToken) { dataStore.knownEndpoints.Add(knownEndpoint); return Task.CompletedTask; } - public async Task RecordProcessedMessage(ProcessedMessage processedMessage, ReadOnlyMemory body) + public async Task RecordProcessedMessage(ProcessedMessage processedMessage, ReadOnlyMemory body, CancellationToken cancellationToken) { if (!body.IsEmpty) { - await bodyStorageEnricher.StoreAuditMessageBody(body, processedMessage); + await bodyStorageEnricher.StoreAuditMessageBody(body, processedMessage, cancellationToken); } await dataStore.SaveProcessedMessage(processedMessage); } - public Task RecordSagaSnapshot(SagaSnapshot sagaSnapshot) => dataStore.SaveSagaSnapshot(sagaSnapshot); + public Task RecordSagaSnapshot(SagaSnapshot sagaSnapshot, CancellationToken cancellationToken) => dataStore.SaveSagaSnapshot(sagaSnapshot); } } \ No newline at end of file diff --git a/src/ServiceControl.Audit.Persistence.RavenDB/RavenAttachmentsBodyStorage.cs b/src/ServiceControl.Audit.Persistence.RavenDB/RavenAttachmentsBodyStorage.cs index dc31013c55..28f6c68252 100644 --- a/src/ServiceControl.Audit.Persistence.RavenDB/RavenAttachmentsBodyStorage.cs +++ b/src/ServiceControl.Audit.Persistence.RavenDB/RavenAttachmentsBodyStorage.cs @@ -1,6 +1,7 @@ namespace ServiceControl.Audit.Persistence.RavenDB { using System.IO; + using System.Threading; using System.Threading.Tasks; using Auditing.BodyStorage; using Raven.Client.Documents.BulkInsert; @@ -11,7 +12,7 @@ class RavenAttachmentsBodyStorage( int settingsMaxBodySizeToStore) : IBodyStorage { - public Task Store(string bodyId, string contentType, int bodySize, Stream bodyStream) + public Task Store(string bodyId, string contentType, int bodySize, Stream bodyStream, CancellationToken cancellationToken) { if (bodySize > settingsMaxBodySizeToStore) { @@ -19,7 +20,7 @@ public Task Store(string bodyId, string contentType, int bodySize, Stream bodySt } return bulkInsert.AttachmentsFor(bodyId) - .StoreAsync("body", bodyStream, contentType); + .StoreAsync("body", bodyStream, contentType, cancellationToken); } public async Task TryFetch(string bodyId) diff --git a/src/ServiceControl.Audit.Persistence.RavenDB/UnitOfWork/RavenAuditIngestionUnitOfWork.cs b/src/ServiceControl.Audit.Persistence.RavenDB/UnitOfWork/RavenAuditIngestionUnitOfWork.cs index 9dfd2bace8..abc554152a 100644 --- a/src/ServiceControl.Audit.Persistence.RavenDB/UnitOfWork/RavenAuditIngestionUnitOfWork.cs +++ b/src/ServiceControl.Audit.Persistence.RavenDB/UnitOfWork/RavenAuditIngestionUnitOfWork.cs @@ -22,7 +22,7 @@ class RavenAuditIngestionUnitOfWork( IBodyStorage bodyStorage) : IAuditIngestionUnitOfWork { - public async Task RecordProcessedMessage(ProcessedMessage processedMessage, ReadOnlyMemory body) + public async Task RecordProcessedMessage(ProcessedMessage processedMessage, ReadOnlyMemory body, CancellationToken cancellationToken) { processedMessage.MessageMetadata["ContentLength"] = body.Length; if (!body.IsEmpty) @@ -37,7 +37,7 @@ public async Task RecordProcessedMessage(ProcessedMessage processedMessage, Read await using var stream = new ReadOnlyStream(body); var contentType = processedMessage.Headers.GetValueOrDefault(Headers.ContentType, "text/xml"); - await bodyStorage.Store(processedMessage.Id, contentType, body.Length, stream); + await bodyStorage.Store(processedMessage.Id, contentType, body.Length, stream, cancellationToken); } } @@ -47,10 +47,10 @@ MetadataAsDictionary GetExpirationMetadata() => [Constants.Documents.Metadata.Expires] = DateTime.UtcNow.Add(auditRetentionPeriod) }; - public Task RecordSagaSnapshot(SagaSnapshot sagaSnapshot) + public Task RecordSagaSnapshot(SagaSnapshot sagaSnapshot, CancellationToken cancellationToken) => bulkInsert.StoreAsync(sagaSnapshot, GetExpirationMetadata()); - public Task RecordKnownEndpoint(KnownEndpoint knownEndpoint) + public Task RecordKnownEndpoint(KnownEndpoint knownEndpoint, CancellationToken cancellationToken) => bulkInsert.StoreAsync(knownEndpoint, GetExpirationMetadata()); public async ValueTask DisposeAsync() diff --git a/src/ServiceControl.Audit.Persistence/BodyStorageEnricher.cs b/src/ServiceControl.Audit.Persistence/BodyStorageEnricher.cs index e5a3f57ebf..03e4ae479f 100644 --- a/src/ServiceControl.Audit.Persistence/BodyStorageEnricher.cs +++ b/src/ServiceControl.Audit.Persistence/BodyStorageEnricher.cs @@ -3,6 +3,7 @@ using System; using System.Collections.Generic; using System.Text; + using System.Threading; using System.Threading.Tasks; using NServiceBus; using NServiceBus.Logging; @@ -11,7 +12,7 @@ public class BodyStorageEnricher(IBodyStorage bodyStorage, PersistenceSettings settings) { - public async ValueTask StoreAuditMessageBody(ReadOnlyMemory body, ProcessedMessage processedMessage) + public async ValueTask StoreAuditMessageBody(ReadOnlyMemory body, ProcessedMessage processedMessage, CancellationToken cancellationToken) { var bodySize = body.Length; processedMessage.MessageMetadata.Add("ContentLength", bodySize); @@ -23,7 +24,7 @@ public async ValueTask StoreAuditMessageBody(ReadOnlyMemory body, Processe var contentType = GetContentType(processedMessage.Headers, "text/xml"); processedMessage.MessageMetadata.Add("ContentType", contentType); - var stored = await TryStoreBody(body, processedMessage, bodySize, contentType); + var stored = await TryStoreBody(body, processedMessage, bodySize, contentType, cancellationToken); if (!stored) { processedMessage.MessageMetadata.Add("BodyNotStored", true); @@ -33,7 +34,7 @@ public async ValueTask StoreAuditMessageBody(ReadOnlyMemory body, Processe static string GetContentType(IReadOnlyDictionary headers, string defaultContentType) => headers.GetValueOrDefault(Headers.ContentType, defaultContentType); - async ValueTask TryStoreBody(ReadOnlyMemory body, ProcessedMessage processedMessage, int bodySize, string contentType) + async ValueTask TryStoreBody(ReadOnlyMemory body, ProcessedMessage processedMessage, int bodySize, string contentType, CancellationToken cancellationToken) { var bodyId = MessageId(processedMessage.Headers); var bodyUrl = string.Format(BodyUrlFormatString, bodyId); @@ -71,7 +72,7 @@ async ValueTask TryStoreBody(ReadOnlyMemory body, ProcessedMessage p if (useBodyStore) { - await StoreBodyInBodyStorage(body, bodyId, contentType, bodySize); + await StoreBodyInBodyStorage(body, bodyId, contentType, bodySize, cancellationToken); storedInBodyStorage = true; } } @@ -80,10 +81,10 @@ async ValueTask TryStoreBody(ReadOnlyMemory body, ProcessedMessage p return storedInBodyStorage; } - async Task StoreBodyInBodyStorage(ReadOnlyMemory body, string bodyId, string contentType, int bodySize) + async Task StoreBodyInBodyStorage(ReadOnlyMemory body, string bodyId, string contentType, int bodySize, CancellationToken cancellationToken) { await using var bodyStream = new ReadOnlyStream(body); - await bodyStorage.Store(bodyId, contentType, bodySize, bodyStream); + await bodyStorage.Store(bodyId, contentType, bodySize, bodyStream, cancellationToken); } static string MessageId(IReadOnlyDictionary headers) diff --git a/src/ServiceControl.Audit.Persistence/IBodyStorage.cs b/src/ServiceControl.Audit.Persistence/IBodyStorage.cs index 2ae86066d3..ba87f5fbeb 100644 --- a/src/ServiceControl.Audit.Persistence/IBodyStorage.cs +++ b/src/ServiceControl.Audit.Persistence/IBodyStorage.cs @@ -1,11 +1,12 @@ namespace ServiceControl.Audit.Auditing.BodyStorage { using System.IO; + using System.Threading; using System.Threading.Tasks; public interface IBodyStorage { - Task Store(string bodyId, string contentType, int bodySize, Stream bodyStream); + Task Store(string bodyId, string contentType, int bodySize, Stream bodyStream, CancellationToken cancellationToken); Task TryFetch(string bodyId); } diff --git a/src/ServiceControl.Audit.Persistence/UnitOfWork/IAuditIngestionUnitOfWork.cs b/src/ServiceControl.Audit.Persistence/UnitOfWork/IAuditIngestionUnitOfWork.cs index 2ecea8ebf3..116f3ae013 100644 --- a/src/ServiceControl.Audit.Persistence/UnitOfWork/IAuditIngestionUnitOfWork.cs +++ b/src/ServiceControl.Audit.Persistence/UnitOfWork/IAuditIngestionUnitOfWork.cs @@ -1,6 +1,7 @@ namespace ServiceControl.Audit.Persistence.UnitOfWork { using System; + using System.Threading; using System.Threading.Tasks; using Auditing; using Monitoring; @@ -8,8 +9,8 @@ public interface IAuditIngestionUnitOfWork : IAsyncDisposable { - Task RecordProcessedMessage(ProcessedMessage processedMessage, ReadOnlyMemory body = default); - Task RecordSagaSnapshot(SagaSnapshot sagaSnapshot); - Task RecordKnownEndpoint(KnownEndpoint knownEndpoint); + Task RecordProcessedMessage(ProcessedMessage processedMessage, ReadOnlyMemory body = default, CancellationToken cancellationToken = default); + Task RecordSagaSnapshot(SagaSnapshot sagaSnapshot, CancellationToken cancellationToken = default); + Task RecordKnownEndpoint(KnownEndpoint knownEndpoint, CancellationToken cancellationToken = default); } } \ No newline at end of file diff --git a/src/ServiceControl.Audit.UnitTests/BodyStorage/BodyStorageEnricherTests.cs b/src/ServiceControl.Audit.UnitTests/BodyStorage/BodyStorageEnricherTests.cs index 85fbf642a5..84a8b94580 100644 --- a/src/ServiceControl.Audit.UnitTests/BodyStorage/BodyStorageEnricherTests.cs +++ b/src/ServiceControl.Audit.UnitTests/BodyStorage/BodyStorageEnricherTests.cs @@ -4,6 +4,7 @@ namespace ServiceControl.UnitTests.BodyStorage using System.Collections.Generic; using System.IO; using System.Text; + using System.Threading; using System.Threading.Tasks; using Audit.Auditing; using Audit.Auditing.BodyStorage; @@ -33,7 +34,7 @@ public async Task Should_remove_body_when_above_threshold() var message = new ProcessedMessage(headers, metadata); - await enricher.StoreAuditMessageBody(body, message); + await enricher.StoreAuditMessageBody(body, message, TestContext.CurrentContext.CancellationToken); Assert.Multiple(() => { @@ -63,7 +64,7 @@ public async Task Should_remove_body_when_above_threshold_and_binary() var message = new ProcessedMessage(headers, metadata); - await enricher.StoreAuditMessageBody(body, message); + await enricher.StoreAuditMessageBody(body, message, TestContext.CurrentContext.CancellationToken); Assert.Multiple(() => { @@ -95,7 +96,7 @@ public async Task Should_store_body_in_metadata_when_below_large_object_heap_and var message = new ProcessedMessage(headers, metadata); - await enricher.StoreAuditMessageBody(body, message); + await enricher.StoreAuditMessageBody(body, message, TestContext.CurrentContext.CancellationToken); Assert.Multiple(() => { @@ -127,7 +128,7 @@ public async Task Should_store_body_in_body_property_when_full_text_disabled_and var message = new ProcessedMessage(headers, metadata); - await enricher.StoreAuditMessageBody(body, message); + await enricher.StoreAuditMessageBody(body, message, TestContext.CurrentContext.CancellationToken); Assert.Multiple(() => { @@ -159,7 +160,7 @@ public async Task Should_store_body_in_storage_when_above_large_object_heap_but_ var message = new ProcessedMessage(headers, metadata); - await enricher.StoreAuditMessageBody(body, message); + await enricher.StoreAuditMessageBody(body, message, TestContext.CurrentContext.CancellationToken); Assert.Multiple(() => { @@ -190,7 +191,7 @@ public async Task Should_store_body_in_storage_when_below_threshold_and_binary() var message = new ProcessedMessage(headers, metadata); - await enricher.StoreAuditMessageBody(body, message); + await enricher.StoreAuditMessageBody(body, message, TestContext.CurrentContext.CancellationToken); Assert.Multiple(() => { @@ -221,7 +222,7 @@ public async Task Should_store_body_in_storage_when_below_threshold() var message = new ProcessedMessage(headers, metadata); - await enricher.StoreAuditMessageBody(body, message); + await enricher.StoreAuditMessageBody(body, message, TestContext.CurrentContext.CancellationToken); Assert.Multiple(() => { @@ -251,7 +252,7 @@ public async Task Should_store_body_in_storage_when_encoding_fails() var message = new ProcessedMessage(headers, metadata); - await enricher.StoreAuditMessageBody(body, message); + await enricher.StoreAuditMessageBody(body, message, TestContext.CurrentContext.CancellationToken); Assert.That(fakeStorage.StoredBodySize, Is.GreaterThan(0)); } @@ -260,7 +261,7 @@ class FakeBodyStorage : IBodyStorage { public int StoredBodySize { get; set; } - public Task Store(string bodyId, string contentType, int bodySize, Stream bodyStream) + public Task Store(string bodyId, string contentType, int bodySize, Stream bodyStream, CancellationToken cancellationToken) { StoredBodySize = bodySize; return Task.CompletedTask; diff --git a/src/ServiceControl.Audit/Auditing/AuditPersister.cs b/src/ServiceControl.Audit/Auditing/AuditPersister.cs index aad348c8f8..e50f18b086 100644 --- a/src/ServiceControl.Audit/Auditing/AuditPersister.cs +++ b/src/ServiceControl.Audit/Auditing/AuditPersister.cs @@ -60,11 +60,11 @@ public async Task> Persist(IReadOnlyList> Persist(IReadOnlyList Date: Fri, 7 Mar 2025 17:22:13 +0100 Subject: [PATCH 09/14] Fixes tests --- src/ServiceControl/Operations/ErrorIngestion.cs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/ServiceControl/Operations/ErrorIngestion.cs b/src/ServiceControl/Operations/ErrorIngestion.cs index 8f60e7528a..1d0c75da0c 100644 --- a/src/ServiceControl/Operations/ErrorIngestion.cs +++ b/src/ServiceControl/Operations/ErrorIngestion.cs @@ -269,6 +269,7 @@ async Task OnMessage(MessageContext messageContext, CancellationToken cancellati } var taskCompletionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + messageContext.SetTaskCompletionSource(taskCompletionSource); // Ideally we want to propagate the cancellationToken to the batch handling // but cancellation in only cancelled when endpointInstance.Stop is cancelled, not when invoked. From 13fa363c98045b8f284c2f6f35b6d50b07f2dbc9 Mon Sep 17 00:00:00 2001 From: Ramon Smits Date: Fri, 7 Mar 2025 17:42:00 +0100 Subject: [PATCH 10/14] Cancel OnMessage immediately IHostedService.StopAsync is invoked --- src/ServiceControl/Operations/ErrorIngestion.cs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/ServiceControl/Operations/ErrorIngestion.cs b/src/ServiceControl/Operations/ErrorIngestion.cs index 1d0c75da0c..5bb9a77d6d 100644 --- a/src/ServiceControl/Operations/ErrorIngestion.cs +++ b/src/ServiceControl/Operations/ErrorIngestion.cs @@ -132,6 +132,7 @@ public override async Task StopAsync(CancellationToken cancellationToken) { try { + await stoppingToken.CancelAsync(); await watchdog.Stop(cancellationToken); channel.Writer.Complete(); await base.StopAsync(cancellationToken); @@ -261,8 +262,11 @@ async Task StopAndTeardownInfrastructure(CancellationToken cancellationToken) } } - async Task OnMessage(MessageContext messageContext, CancellationToken cancellationToken) + async Task OnMessage(MessageContext messageContext, CancellationToken cancellationTokenParent) { + using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationTokenParent, stoppingToken.Token); + var cancellationToken = cts.Token; + if (settings.MessageFilter != null && settings.MessageFilter(messageContext)) { return; @@ -307,6 +311,7 @@ async Task EnsureStopped(CancellationToken cancellationToken = default) TransportInfrastructure transportInfrastructure; IMessageReceiver messageReceiver; + readonly CancellationTokenSource stoppingToken = new(); readonly Settings settings; readonly ITransportCustomization transportCustomization; readonly TransportSettings transportSettings; From 011ed4e9dcb72f226df18ef287347e26be171ba8 Mon Sep 17 00:00:00 2001 From: Ramon Smits Date: Fri, 7 Mar 2025 17:55:02 +0100 Subject: [PATCH 11/14] Alternative to previous commit, stop receviers and shutdown with already cancelled token. This should terminate ASAP BUT still cleanup all resources if properly implemented by transports/persisters. --- src/ServiceControl.Audit/Auditing/AuditIngestion.cs | 2 +- src/ServiceControl/Operations/ErrorIngestion.cs | 9 ++------- 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/src/ServiceControl.Audit/Auditing/AuditIngestion.cs b/src/ServiceControl.Audit/Auditing/AuditIngestion.cs index 2fb065f9d8..0b969e356e 100644 --- a/src/ServiceControl.Audit/Auditing/AuditIngestion.cs +++ b/src/ServiceControl.Audit/Auditing/AuditIngestion.cs @@ -185,7 +185,7 @@ async Task EnsureStopped(CancellationToken cancellationToken) try { await startStopSemaphore.WaitAsync(cancellationToken); - await StopAndTeardownInfrastructure(cancellationToken); + await StopAndTeardownInfrastructure(new CancellationToken(canceled: true)); } finally { diff --git a/src/ServiceControl/Operations/ErrorIngestion.cs b/src/ServiceControl/Operations/ErrorIngestion.cs index 5bb9a77d6d..1e1d812e65 100644 --- a/src/ServiceControl/Operations/ErrorIngestion.cs +++ b/src/ServiceControl/Operations/ErrorIngestion.cs @@ -132,7 +132,6 @@ public override async Task StopAsync(CancellationToken cancellationToken) { try { - await stoppingToken.CancelAsync(); await watchdog.Stop(cancellationToken); channel.Writer.Complete(); await base.StopAsync(cancellationToken); @@ -262,11 +261,8 @@ async Task StopAndTeardownInfrastructure(CancellationToken cancellationToken) } } - async Task OnMessage(MessageContext messageContext, CancellationToken cancellationTokenParent) + async Task OnMessage(MessageContext messageContext, CancellationToken cancellationToken) { - using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationTokenParent, stoppingToken.Token); - var cancellationToken = cts.Token; - if (settings.MessageFilter != null && settings.MessageFilter(messageContext)) { return; @@ -297,7 +293,7 @@ async Task EnsureStopped(CancellationToken cancellationToken = default) try { await startStopSemaphore.WaitAsync(cancellationToken); - await StopAndTeardownInfrastructure(cancellationToken); + await StopAndTeardownInfrastructure(new CancellationToken(canceled: true)); } finally { @@ -311,7 +307,6 @@ async Task EnsureStopped(CancellationToken cancellationToken = default) TransportInfrastructure transportInfrastructure; IMessageReceiver messageReceiver; - readonly CancellationTokenSource stoppingToken = new(); readonly Settings settings; readonly ITransportCustomization transportCustomization; readonly TransportSettings transportSettings; From 5e93c9015fb4fcc8ddad29dcc06ece799bf5141a Mon Sep 17 00:00:00 2001 From: Ramon Smits Date: Tue, 11 Mar 2025 10:41:52 +0100 Subject: [PATCH 12/14] Add clarifying code comment on why a CancellationToken is passed in the cancelled state --- src/ServiceControl.Audit/Auditing/AuditIngestion.cs | 3 +++ src/ServiceControl/Operations/ErrorIngestion.cs | 3 +++ 2 files changed, 6 insertions(+) diff --git a/src/ServiceControl.Audit/Auditing/AuditIngestion.cs b/src/ServiceControl.Audit/Auditing/AuditIngestion.cs index 0b969e356e..e36f2036cb 100644 --- a/src/ServiceControl.Audit/Auditing/AuditIngestion.cs +++ b/src/ServiceControl.Audit/Auditing/AuditIngestion.cs @@ -185,6 +185,9 @@ async Task EnsureStopped(CancellationToken cancellationToken) try { await startStopSemaphore.WaitAsync(cancellationToken); + + // By passing a CancellationToken in the cancelled state we stop receivers ASAP and + // still correctly stop/shutdown await StopAndTeardownInfrastructure(new CancellationToken(canceled: true)); } finally diff --git a/src/ServiceControl/Operations/ErrorIngestion.cs b/src/ServiceControl/Operations/ErrorIngestion.cs index 1e1d812e65..5f0ed7f4ee 100644 --- a/src/ServiceControl/Operations/ErrorIngestion.cs +++ b/src/ServiceControl/Operations/ErrorIngestion.cs @@ -293,6 +293,9 @@ async Task EnsureStopped(CancellationToken cancellationToken = default) try { await startStopSemaphore.WaitAsync(cancellationToken); + + // By passing a CancellationToken in the cancelled state we stop receivers ASAP and + // still correctly stop/shutdown await StopAndTeardownInfrastructure(new CancellationToken(canceled: true)); } finally From 0589fba2d0a8bd6f0200b7c1dba29422fa2207dc Mon Sep 17 00:00:00 2001 From: Ramon Smits Date: Tue, 11 Mar 2025 17:38:49 +0100 Subject: [PATCH 13/14] More cancellation as test --- .../InMemoryAttachmentsBodyStorage.cs | 2 +- .../InMemoryAuditDataStore.cs | 25 ++++---- .../RavenAttachmentsBodyStorage.cs | 6 +- .../RavenAuditDataStore.cs | 63 ++++++++++--------- .../EmbeddedLifecycleTests.cs | 2 +- .../RetentionTests.cs | 12 ++-- .../AuditCountingTests.cs | 6 +- .../AuditTests.cs | 18 +++--- .../BodyStorageTests.cs | 2 +- .../KnownEndpointsTests.cs | 4 +- .../SagaHistoryTests.cs | 6 +- .../IAuditDataStore.cs | 19 +++--- .../IBodyStorage.cs | 2 +- .../IFailedAuditStorage.cs | 3 +- .../BodyStorage/BodyStorageEnricherTests.cs | 2 +- .../MessagesView/GetMessagesController.cs | 33 +++++----- .../MessagesConversationController.cs | 5 +- .../KnownEndpointsController.cs | 5 +- .../SagaAudit/SagasController.cs | 5 +- 19 files changed, 116 insertions(+), 104 deletions(-) diff --git a/src/ServiceControl.Audit.Persistence.InMemory/InMemoryAttachmentsBodyStorage.cs b/src/ServiceControl.Audit.Persistence.InMemory/InMemoryAttachmentsBodyStorage.cs index 74419d4d03..b46d3b1fac 100644 --- a/src/ServiceControl.Audit.Persistence.InMemory/InMemoryAttachmentsBodyStorage.cs +++ b/src/ServiceControl.Audit.Persistence.InMemory/InMemoryAttachmentsBodyStorage.cs @@ -44,7 +44,7 @@ public Task Store(string bodyId, string contentType, int bodySize, Stream bodySt return Task.CompletedTask; } - public async Task TryFetch(string bodyId) + public async Task TryFetch(string bodyId, CancellationToken cancellationToken) { var messageBody = messageBodies.FirstOrDefault(w => w.BodyId == bodyId); diff --git a/src/ServiceControl.Audit.Persistence.InMemory/InMemoryAuditDataStore.cs b/src/ServiceControl.Audit.Persistence.InMemory/InMemoryAuditDataStore.cs index a1358291ac..622cbb5437 100644 --- a/src/ServiceControl.Audit.Persistence.InMemory/InMemoryAuditDataStore.cs +++ b/src/ServiceControl.Audit.Persistence.InMemory/InMemoryAuditDataStore.cs @@ -3,6 +3,7 @@ using System; using System.Collections.Generic; using System.Linq; + using System.Threading; using System.Threading.Tasks; using ServiceControl.Audit.Auditing; using ServiceControl.Audit.Auditing.BodyStorage; @@ -29,7 +30,7 @@ public InMemoryAuditDataStore(IBodyStorage bodyStorage) failedAuditImports = []; } - public async Task> QuerySagaHistoryById(Guid input) + public async Task> QuerySagaHistoryById(Guid input, CancellationToken cancellationToken) { var sagaHistory = sagaHistories.FirstOrDefault(w => w.SagaId == input); @@ -41,7 +42,7 @@ public async Task> QuerySagaHistoryById(Guid input) return await Task.FromResult(new QueryResult(sagaHistory, new QueryStatsInfo(string.Empty, 1))); } - public async Task>> GetMessages(bool includeSystemMessages, PagingInfo pagingInfo, SortInfo sortInfo) + public async Task>> GetMessages(bool includeSystemMessages, PagingInfo pagingInfo, SortInfo sortInfo, CancellationToken cancellationToken) { var matched = messageViews .Where(w => !w.IsSystemMessage || includeSystemMessages) @@ -50,7 +51,7 @@ public async Task>> GetMessages(bool includeSyst return await Task.FromResult(new QueryResult>(matched, new QueryStatsInfo(string.Empty, matched.Count))); } - public async Task>> QueryMessages(string keyword, PagingInfo pagingInfo, SortInfo sortInfo) + public async Task>> QueryMessages(string keyword, PagingInfo pagingInfo, SortInfo sortInfo, CancellationToken cancellationToken) { var messages = GetMessageIdsMatchingQuery(keyword); @@ -60,7 +61,7 @@ public async Task>> QueryMessages(string keyword return await Task.FromResult(new QueryResult>(matched, new QueryStatsInfo(string.Empty, matched.Count()))); } - public async Task>> QueryMessagesByReceivingEndpointAndKeyword(string endpoint, string keyword, PagingInfo pagingInfo, SortInfo sortInfo) + public async Task>> QueryMessagesByReceivingEndpointAndKeyword(string endpoint, string keyword, PagingInfo pagingInfo, SortInfo sortInfo, CancellationToken cancellationToken) { var messages = GetMessageIdsMatchingQuery(keyword); @@ -68,25 +69,25 @@ public async Task>> QueryMessagesByReceivingEndp return await Task.FromResult(new QueryResult>(matched, new QueryStatsInfo(string.Empty, matched.Count))); } - public async Task>> QueryMessagesByReceivingEndpoint(bool includeSystemMessages, string endpointName, PagingInfo pagingInfo, SortInfo sortInfo) + public async Task>> QueryMessagesByReceivingEndpoint(bool includeSystemMessages, string endpointName, PagingInfo pagingInfo, SortInfo sortInfo, CancellationToken cancellationToken) { var matched = messageViews.Where(w => w.ReceivingEndpoint.Name == endpointName).ToList(); return await Task.FromResult(new QueryResult>(matched, new QueryStatsInfo(string.Empty, matched.Count))); } - public async Task>> QueryMessagesByConversationId(string conversationId, PagingInfo pagingInfo, SortInfo sortInfo) + public async Task>> QueryMessagesByConversationId(string conversationId, PagingInfo pagingInfo, SortInfo sortInfo, CancellationToken cancellationToken) { var matched = messageViews.Where(w => w.ConversationId == conversationId).ToList(); return await Task.FromResult(new QueryResult>(matched, new QueryStatsInfo(string.Empty, matched.Count))); } - public async Task GetMessageBody(string messageId) + public async Task GetMessageBody(string messageId, CancellationToken cancellationToken) { var result = await GetMessageBodyFromMetadata(messageId); if (!result.Found) { - var fromAttachments = await GetMessageBodyFromAttachments(messageId); + var fromAttachments = await GetMessageBodyFromAttachments(messageId, cancellationToken); if (fromAttachments.Found) { return fromAttachments; @@ -121,9 +122,9 @@ IList GetMessageIdsMatchingQuery(string keyword) .Select(pm => pm.MessageMetadata["MessageId"] as string) .ToList(); } - async Task GetMessageBodyFromAttachments(string messageId) + async Task GetMessageBodyFromAttachments(string messageId, CancellationToken cancellationToken) { - var fromBodyStorage = await bodyStorage.TryFetch(messageId); + var fromBodyStorage = await bodyStorage.TryFetch(messageId, cancellationToken); if (fromBodyStorage.HasResult) { @@ -165,7 +166,7 @@ Task GetMessageBodyFromMetadata(string messageId) return Task.FromResult(MessageBodyView.FromString(body, contentType, bodySize, string.Empty)); } - public async Task>> QueryKnownEndpoints() + public async Task>> QueryKnownEndpoints(CancellationToken cancellationToken) { var knownEndpointsView = knownEndpoints .Select(x => new KnownEndpointsView @@ -184,7 +185,7 @@ public async Task>> QueryKnownEndpoints() return await Task.FromResult(new QueryResult>(knownEndpointsView, new QueryStatsInfo(string.Empty, knownEndpointsView.Count))); } - public Task>> QueryAuditCounts(string endpointName) + public Task>> QueryAuditCounts(string endpointName, CancellationToken cancellationToken) { var results = messageViews .Where(m => m.ReceivingEndpoint.Name == endpointName && !m.IsSystemMessage) diff --git a/src/ServiceControl.Audit.Persistence.RavenDB/RavenAttachmentsBodyStorage.cs b/src/ServiceControl.Audit.Persistence.RavenDB/RavenAttachmentsBodyStorage.cs index 28f6c68252..a8f801490e 100644 --- a/src/ServiceControl.Audit.Persistence.RavenDB/RavenAttachmentsBodyStorage.cs +++ b/src/ServiceControl.Audit.Persistence.RavenDB/RavenAttachmentsBodyStorage.cs @@ -23,10 +23,10 @@ public Task Store(string bodyId, string contentType, int bodySize, Stream bodySt .StoreAsync("body", bodyStream, contentType, cancellationToken); } - public async Task TryFetch(string bodyId) + public async Task TryFetch(string bodyId, CancellationToken cancellationToken) { - using var session = await sessionProvider.OpenSession(); - var result = await session.Advanced.Attachments.GetAsync($"MessageBodies/{bodyId}", "body"); + using var session = await sessionProvider.OpenSession(cancellationToken: cancellationToken); + var result = await session.Advanced.Attachments.GetAsync($"MessageBodies/{bodyId}", "body", cancellationToken); if (result == null) { diff --git a/src/ServiceControl.Audit.Persistence.RavenDB/RavenAuditDataStore.cs b/src/ServiceControl.Audit.Persistence.RavenDB/RavenAuditDataStore.cs index a963d6015b..9f6a120613 100644 --- a/src/ServiceControl.Audit.Persistence.RavenDB/RavenAuditDataStore.cs +++ b/src/ServiceControl.Audit.Persistence.RavenDB/RavenAuditDataStore.cs @@ -3,6 +3,7 @@ using System; using System.Collections.Generic; using System.Linq; + using System.Threading; using System.Threading.Tasks; using Auditing.MessagesView; using Extensions; @@ -19,48 +20,48 @@ class RavenAuditDataStore(IRavenSessionProvider sessionProvider, DatabaseConfiguration databaseConfiguration) : IAuditDataStore { - public async Task> QuerySagaHistoryById(Guid input) + public async Task> QuerySagaHistoryById(Guid input, CancellationToken cancellationToken) { - using var session = await sessionProvider.OpenSession(); + using var session = await sessionProvider.OpenSession(cancellationToken: cancellationToken); var sagaHistory = await session.Query() .Statistics(out var stats) - .SingleOrDefaultAsync(x => x.SagaId == input); + .SingleOrDefaultAsync(x => x.SagaId == input, token: cancellationToken); return sagaHistory == null ? QueryResult.Empty() : new QueryResult(sagaHistory, new QueryStatsInfo($"{stats.ResultEtag}", stats.TotalResults)); } - public async Task>> GetMessages(bool includeSystemMessages, PagingInfo pagingInfo, SortInfo sortInfo) + public async Task>> GetMessages(bool includeSystemMessages, PagingInfo pagingInfo, SortInfo sortInfo, CancellationToken cancellationToken) { - using var session = await sessionProvider.OpenSession(); + using var session = await sessionProvider.OpenSession(cancellationToken: cancellationToken); var results = await session.Query(GetIndexName(isFullTextSearchEnabled)) .Statistics(out var stats) .IncludeSystemMessagesWhere(includeSystemMessages) .Sort(sortInfo) .Paging(pagingInfo) .ToMessagesView() - .ToListAsync(); + .ToListAsync(token: cancellationToken); return new QueryResult>(results, stats.ToQueryStatsInfo()); } - public async Task>> QueryMessages(string searchParam, PagingInfo pagingInfo, SortInfo sortInfo) + public async Task>> QueryMessages(string searchParam, PagingInfo pagingInfo, SortInfo sortInfo, CancellationToken cancellationToken) { - using var session = await sessionProvider.OpenSession(); + using var session = await sessionProvider.OpenSession(cancellationToken: cancellationToken); var results = await session.Query(GetIndexName(isFullTextSearchEnabled)) .Statistics(out var stats) .Search(x => x.Query, searchParam) .Sort(sortInfo) .Paging(pagingInfo) .ToMessagesView() - .ToListAsync(); + .ToListAsync(token: cancellationToken); return new QueryResult>(results, stats.ToQueryStatsInfo()); } - public async Task>> QueryMessagesByReceivingEndpointAndKeyword(string endpoint, string keyword, PagingInfo pagingInfo, SortInfo sortInfo) + public async Task>> QueryMessagesByReceivingEndpointAndKeyword(string endpoint, string keyword, PagingInfo pagingInfo, SortInfo sortInfo, CancellationToken cancellationToken) { - using var session = await sessionProvider.OpenSession(); + using var session = await sessionProvider.OpenSession(cancellationToken: cancellationToken); var results = await session.Query(GetIndexName(isFullTextSearchEnabled)) .Statistics(out var stats) .Search(x => x.Query, keyword) @@ -68,14 +69,14 @@ public async Task>> QueryMessagesByReceivingEndp .Sort(sortInfo) .Paging(pagingInfo) .ToMessagesView() - .ToListAsync(); + .ToListAsync(token: cancellationToken); return new QueryResult>(results, stats.ToQueryStatsInfo()); } - public async Task>> QueryMessagesByReceivingEndpoint(bool includeSystemMessages, string endpointName, PagingInfo pagingInfo, SortInfo sortInfo) + public async Task>> QueryMessagesByReceivingEndpoint(bool includeSystemMessages, string endpointName, PagingInfo pagingInfo, SortInfo sortInfo, CancellationToken cancellationToken) { - using var session = await sessionProvider.OpenSession(); + using var session = await sessionProvider.OpenSession(cancellationToken: cancellationToken); var results = await session.Query(GetIndexName(isFullTextSearchEnabled)) .Statistics(out var stats) .IncludeSystemMessagesWhere(includeSystemMessages) @@ -83,29 +84,29 @@ public async Task>> QueryMessagesByReceivingEndp .Sort(sortInfo) .Paging(pagingInfo) .ToMessagesView() - .ToListAsync(); + .ToListAsync(token: cancellationToken); return new QueryResult>(results, stats.ToQueryStatsInfo()); } - public async Task>> QueryMessagesByConversationId(string conversationId, PagingInfo pagingInfo, SortInfo sortInfo) + public async Task>> QueryMessagesByConversationId(string conversationId, PagingInfo pagingInfo, SortInfo sortInfo, CancellationToken cancellationToken) { - using var session = await sessionProvider.OpenSession(); + using var session = await sessionProvider.OpenSession(cancellationToken: cancellationToken); var results = await session.Query(GetIndexName(isFullTextSearchEnabled)) .Statistics(out var stats) .Where(m => m.ConversationId == conversationId) .Sort(sortInfo) .Paging(pagingInfo) .ToMessagesView() - .ToListAsync(); + .ToListAsync(token: cancellationToken); return new QueryResult>(results, stats.ToQueryStatsInfo()); } - public async Task GetMessageBody(string messageId) + public async Task GetMessageBody(string messageId, CancellationToken cancellationToken) { - using var session = await sessionProvider.OpenSession(); - var result = await session.Advanced.Attachments.GetAsync(messageId, "body"); + using var session = await sessionProvider.OpenSession(cancellationToken: cancellationToken); + var result = await session.Advanced.Attachments.GetAsync(messageId, "body", cancellationToken); if (result == null) { @@ -120,10 +121,10 @@ public async Task GetMessageBody(string messageId) ); } - public async Task>> QueryKnownEndpoints() + public async Task>> QueryKnownEndpoints(CancellationToken cancellationToken) { - using var session = await sessionProvider.OpenSession(); - var endpoints = await session.Advanced.LoadStartingWithAsync(KnownEndpoint.CollectionName, pageSize: 1024); + using var session = await sessionProvider.OpenSession(cancellationToken: cancellationToken); + var endpoints = await session.Advanced.LoadStartingWithAsync(KnownEndpoint.CollectionName, pageSize: 1024, token: cancellationToken); var knownEndpoints = endpoints .Select(x => new KnownEndpointsView @@ -142,11 +143,11 @@ public async Task>> QueryKnownEndpoints() return new QueryResult>(knownEndpoints, new QueryStatsInfo(string.Empty, knownEndpoints.Count)); } - public async Task>> QueryAuditCounts(string endpointName) + public async Task>> QueryAuditCounts(string endpointName, CancellationToken cancellationToken) { var indexName = GetIndexName(isFullTextSearchEnabled); - using var session = await sessionProvider.OpenSession(); + using var session = await sessionProvider.OpenSession(cancellationToken: cancellationToken); // Maximum should really be 31 queries if there are 30 days of audit data, but default limit is 30 session.Advanced.MaxNumberOfRequestsPerSession = 40; @@ -155,7 +156,7 @@ public async Task>> QueryAuditCounts(string endpoi var oldestMsg = await session.Query(indexName) .Where(m => m.ReceivingEndpointName == endpointName) .OrderBy(m => m.ProcessedAt) - .FirstOrDefaultAsync(); + .FirstOrDefaultAsync(token: cancellationToken); if (oldestMsg != null) { @@ -173,11 +174,15 @@ public async Task>> QueryAuditCounts(string endpoi .Statistics(out var stats) .Where(m => m.ReceivingEndpointName == endpointName && !m.IsSystemMessage && m.ProcessedAt >= date && m.ProcessedAt < nextDate) .Take(0) - .ToArrayAsync(); + .ToArrayAsync(token: cancellationToken); if (stats.TotalResults > 0) { - results.Add(new AuditCount { UtcDate = date, Count = stats.TotalResults }); + results.Add(new AuditCount + { + UtcDate = date, + Count = stats.TotalResults + }); } } } diff --git a/src/ServiceControl.Audit.Persistence.Tests.RavenDB/EmbeddedLifecycleTests.cs b/src/ServiceControl.Audit.Persistence.Tests.RavenDB/EmbeddedLifecycleTests.cs index e75cc4ffbc..95194198ed 100644 --- a/src/ServiceControl.Audit.Persistence.Tests.RavenDB/EmbeddedLifecycleTests.cs +++ b/src/ServiceControl.Audit.Persistence.Tests.RavenDB/EmbeddedLifecycleTests.cs @@ -35,7 +35,7 @@ public override async Task Setup() [Test] public async Task Verify_embedded_database() { - await DataStore.QueryKnownEndpoints(); + await DataStore.QueryKnownEndpoints(TestContext.CurrentContext.CancellationToken); Assert.Multiple(() => { diff --git a/src/ServiceControl.Audit.Persistence.Tests.RavenDB/RetentionTests.cs b/src/ServiceControl.Audit.Persistence.Tests.RavenDB/RetentionTests.cs index 90ceaa1904..29c6938bd9 100644 --- a/src/ServiceControl.Audit.Persistence.Tests.RavenDB/RetentionTests.cs +++ b/src/ServiceControl.Audit.Persistence.Tests.RavenDB/RetentionTests.cs @@ -32,11 +32,11 @@ await IngestProcessedMessagesAudits( message ); - var queryResultBeforeExpiration = await DataStore.QueryMessages("MyMessageId", new PagingInfo(), new SortInfo("Id", "asc")); + var queryResultBeforeExpiration = await DataStore.QueryMessages("MyMessageId", new PagingInfo(), new SortInfo("Id", "asc"), TestContext.CurrentContext.CancellationToken); await Task.Delay(4000); - var queryResultAfterExpiration = await DataStore.QueryMessages("MyMessageId", new PagingInfo(), new SortInfo("Id", "asc")); + var queryResultAfterExpiration = await DataStore.QueryMessages("MyMessageId", new PagingInfo(), new SortInfo("Id", "asc"), TestContext.CurrentContext.CancellationToken); Assert.That(queryResultBeforeExpiration.Results, Has.Count.EqualTo(1)); Assert.Multiple(() => @@ -62,11 +62,11 @@ await IngestKnownEndpoints( knownEndpoint ); - var queryResultBeforeExpiration = await DataStore.QueryKnownEndpoints(); + var queryResultBeforeExpiration = await DataStore.QueryKnownEndpoints(TestContext.CurrentContext.CancellationToken); await Task.Delay(4000); - var queryResultAfterExpiration = await DataStore.QueryKnownEndpoints(); + var queryResultAfterExpiration = await DataStore.QueryKnownEndpoints(TestContext.CurrentContext.CancellationToken); Assert.That(queryResultBeforeExpiration.Results, Has.Count.EqualTo(1)); Assert.Multiple(() => @@ -88,11 +88,11 @@ await IngestSagaAudits( new SagaSnapshot { SagaId = sagaId } ); - var queryResultBeforeExpiration = await DataStore.QuerySagaHistoryById(sagaId); + var queryResultBeforeExpiration = await DataStore.QuerySagaHistoryById(sagaId, TestContext.CurrentContext.CancellationToken); await Task.Delay(4000); - var queryResultAfterExpiration = await DataStore.QuerySagaHistoryById(sagaId); + var queryResultAfterExpiration = await DataStore.QuerySagaHistoryById(sagaId, TestContext.CurrentContext.CancellationToken); Assert.That(queryResultBeforeExpiration.Results, Is.Not.Null); Assert.Multiple(() => diff --git a/src/ServiceControl.Audit.Persistence.Tests/AuditCountingTests.cs b/src/ServiceControl.Audit.Persistence.Tests/AuditCountingTests.cs index 712bc022e7..e067bd33b9 100644 --- a/src/ServiceControl.Audit.Persistence.Tests/AuditCountingTests.cs +++ b/src/ServiceControl.Audit.Persistence.Tests/AuditCountingTests.cs @@ -46,9 +46,9 @@ public async Task ShouldCountAuditedMessages() await IngestProcessedMessagesAudits(messages); - var endpointA = (await DataStore.QueryAuditCounts("EndpointA")).Results; - var endpointB = (await DataStore.QueryAuditCounts("EndpointB")).Results; - var sysMsgEndpoint = (await DataStore.QueryAuditCounts("SystemEndpoint")).Results; + var endpointA = (await DataStore.QueryAuditCounts("EndpointA", TestContext.CurrentContext.CancellationToken)).Results; + var endpointB = (await DataStore.QueryAuditCounts("EndpointB", TestContext.CurrentContext.CancellationToken)).Results; + var sysMsgEndpoint = (await DataStore.QueryAuditCounts("SystemEndpoint", TestContext.CurrentContext.CancellationToken)).Results; Assert.That(sysMsgEndpoint, Is.Empty); diff --git a/src/ServiceControl.Audit.Persistence.Tests/AuditTests.cs b/src/ServiceControl.Audit.Persistence.Tests/AuditTests.cs index eac4e83e87..212a3b8621 100644 --- a/src/ServiceControl.Audit.Persistence.Tests/AuditTests.cs +++ b/src/ServiceControl.Audit.Persistence.Tests/AuditTests.cs @@ -30,7 +30,7 @@ await IngestProcessedMessagesAudits( message ); - var queryResult = await DataStore.QueryMessages("MyMessageId", new PagingInfo(), new SortInfo("Id", "asc")); + var queryResult = await DataStore.QueryMessages("MyMessageId", new PagingInfo(), new SortInfo("Id", "asc"), TestContext.CurrentContext.CancellationToken); Assert.That(queryResult.Results, Has.Count.EqualTo(1)); Assert.That(queryResult.Results[0].MessageId, Is.EqualTo("MyMessageId")); @@ -40,7 +40,7 @@ await IngestProcessedMessagesAudits( public async Task Handles_no_results_gracefully() { var nonExistingMessage = Guid.NewGuid().ToString(); - var queryResult = await DataStore.QueryMessages(nonExistingMessage, new PagingInfo(), new SortInfo("Id", "asc")); + var queryResult = await DataStore.QueryMessages(nonExistingMessage, new PagingInfo(), new SortInfo("Id", "asc"), TestContext.CurrentContext.CancellationToken); Assert.That(queryResult.Results, Is.Empty); } @@ -58,7 +58,7 @@ await IngestProcessedMessagesAudits( ); var queryResult = await DataStore.QueryMessagesByConversationId(conversationId, new PagingInfo(), - new SortInfo("message_id", "asc")); + new SortInfo("message_id", "asc"), TestContext.CurrentContext.CancellationToken); Assert.That(queryResult.Results, Has.Count.EqualTo(2)); } @@ -73,7 +73,7 @@ await IngestProcessedMessagesAudits( ); var queryResult = await DataStore.QueryMessages("MyMessageType", new PagingInfo(), - new SortInfo("message_id", "asc")); + new SortInfo("message_id", "asc"), TestContext.CurrentContext.CancellationToken); Assert.That(queryResult.Results, Has.Count.EqualTo(2)); } @@ -93,7 +93,7 @@ public async Task Can_roundtrip_message_body() var bodyId = GetBodyId(processedMessage); - var retrievedMessage = await DataStore.GetMessageBody(bodyId); + var retrievedMessage = await DataStore.GetMessageBody(bodyId, TestContext.CurrentContext.CancellationToken); Assert.That(retrievedMessage, Is.Not.Null); Assert.Multiple(() => @@ -130,7 +130,7 @@ public async Task Does_respect_max_message_body() var bodyId = GetBodyId(processedMessage); - var retrievedMessage = await DataStore.GetMessageBody(bodyId); + var retrievedMessage = await DataStore.GetMessageBody(bodyId, TestContext.CurrentContext.CancellationToken); Assert.That(retrievedMessage, Is.Not.Null); Assert.Multiple(() => @@ -158,7 +158,7 @@ public async Task Deduplicates_messages_in_same_batch() await configuration.CompleteDBOperation(); - var queryResult = await DataStore.GetMessages(false, new PagingInfo(), new SortInfo("message_id", "asc")); + var queryResult = await DataStore.GetMessages(false, new PagingInfo(), new SortInfo("message_id", "asc"), TestContext.CurrentContext.CancellationToken); Assert.That(queryResult.QueryStats.TotalCount, Is.EqualTo(1)); } @@ -182,7 +182,7 @@ public async Task Deduplicates_messages_in_different_batches() await configuration.CompleteDBOperation(); - var queryResult = await DataStore.GetMessages(false, new PagingInfo(), new SortInfo("message_id", "asc")); + var queryResult = await DataStore.GetMessages(false, new PagingInfo(), new SortInfo("message_id", "asc"), TestContext.CurrentContext.CancellationToken); Assert.That(queryResult.QueryStats.TotalCount, Is.EqualTo(1)); } @@ -205,7 +205,7 @@ public async Task Does_not_deduplicate_with_different_processing_started_header( await configuration.CompleteDBOperation(); - var queryResult = await DataStore.GetMessages(false, new PagingInfo(), new SortInfo("message_id", "asc")); + var queryResult = await DataStore.GetMessages(false, new PagingInfo(), new SortInfo("message_id", "asc"), TestContext.CurrentContext.CancellationToken); Assert.That(queryResult.QueryStats.TotalCount, Is.EqualTo(2)); } diff --git a/src/ServiceControl.Audit.Persistence.Tests/BodyStorageTests.cs b/src/ServiceControl.Audit.Persistence.Tests/BodyStorageTests.cs index 4fd52097ec..2f63b14c4c 100644 --- a/src/ServiceControl.Audit.Persistence.Tests/BodyStorageTests.cs +++ b/src/ServiceControl.Audit.Persistence.Tests/BodyStorageTests.cs @@ -11,7 +11,7 @@ class BodyStorageTests : PersistenceTestFixture public async Task Handles_no_results_gracefully() { var nonExistentBodyId = Guid.NewGuid().ToString(); - var result = await BodyStorage.TryFetch(nonExistentBodyId); + var result = await BodyStorage.TryFetch(nonExistentBodyId, TestContext.CurrentContext.CancellationToken); Assert.That(result.HasResult, Is.False); } diff --git a/src/ServiceControl.Audit.Persistence.Tests/KnownEndpointsTests.cs b/src/ServiceControl.Audit.Persistence.Tests/KnownEndpointsTests.cs index 1ea0c5307a..3164a7cb44 100644 --- a/src/ServiceControl.Audit.Persistence.Tests/KnownEndpointsTests.cs +++ b/src/ServiceControl.Audit.Persistence.Tests/KnownEndpointsTests.cs @@ -22,7 +22,7 @@ public async Task Basic_Roundtrip() await IngestKnownEndpoints(ingestedEndpoint); - var endpoints = await DataStore.QueryKnownEndpoints(); + var endpoints = await DataStore.QueryKnownEndpoints(TestContext.CurrentContext.CancellationToken); Assert.That(endpoints.Results, Has.Count.EqualTo(1)); var endpoint = endpoints.Results[0]; @@ -48,7 +48,7 @@ public async Task Can_query_many_known_endpoints() await IngestKnownEndpoints(knownEndpoints); - var queryResult = await DataStore.QueryKnownEndpoints(); + var queryResult = await DataStore.QueryKnownEndpoints(TestContext.CurrentContext.CancellationToken); Assert.Multiple(() => { diff --git a/src/ServiceControl.Audit.Persistence.Tests/SagaHistoryTests.cs b/src/ServiceControl.Audit.Persistence.Tests/SagaHistoryTests.cs index 4a649b057d..0b25b90f7e 100644 --- a/src/ServiceControl.Audit.Persistence.Tests/SagaHistoryTests.cs +++ b/src/ServiceControl.Audit.Persistence.Tests/SagaHistoryTests.cs @@ -21,7 +21,7 @@ await IngestSagaAudits( } ); - var queryResult = await DataStore.QuerySagaHistoryById(sagaId); + var queryResult = await DataStore.QuerySagaHistoryById(sagaId, TestContext.CurrentContext.CancellationToken); Assert.Multiple(() => { @@ -36,7 +36,7 @@ await IngestSagaAudits( public async Task Handles_no_results_gracefully() { var nonExistentSagaId = Guid.NewGuid(); - var queryResult = await DataStore.QuerySagaHistoryById(nonExistentSagaId); + var queryResult = await DataStore.QuerySagaHistoryById(nonExistentSagaId, TestContext.CurrentContext.CancellationToken); Assert.That(queryResult.Results, Is.Null); } @@ -53,7 +53,7 @@ await IngestSagaAudits( new SagaSnapshot { SagaId = sagaId } ); - var queryResult = await DataStore.QuerySagaHistoryById(sagaId); + var queryResult = await DataStore.QuerySagaHistoryById(sagaId, TestContext.CurrentContext.CancellationToken); Assert.That(queryResult.Results.Changes, Has.Count.EqualTo(2)); } diff --git a/src/ServiceControl.Audit.Persistence/IAuditDataStore.cs b/src/ServiceControl.Audit.Persistence/IAuditDataStore.cs index 7df26204e1..9408ceeac7 100644 --- a/src/ServiceControl.Audit.Persistence/IAuditDataStore.cs +++ b/src/ServiceControl.Audit.Persistence/IAuditDataStore.cs @@ -2,6 +2,7 @@ { using System; using System.Collections.Generic; + using System.Threading; using System.Threading.Tasks; using ServiceControl.Audit.Auditing; using ServiceControl.Audit.Auditing.MessagesView; @@ -11,14 +12,14 @@ public interface IAuditDataStore { - Task>> QueryKnownEndpoints(); - Task> QuerySagaHistoryById(Guid input); - Task>> GetMessages(bool includeSystemMessages, PagingInfo pagingInfo, SortInfo sortInfo); - Task>> QueryMessages(string searchParam, PagingInfo pagingInfo, SortInfo sortInfo); - Task>> QueryMessagesByReceivingEndpointAndKeyword(string endpoint, string keyword, PagingInfo pagingInfo, SortInfo sortInfo); - Task>> QueryMessagesByReceivingEndpoint(bool includeSystemMessages, string endpointName, PagingInfo pagingInfo, SortInfo sortInfo); - Task>> QueryMessagesByConversationId(string conversationId, PagingInfo pagingInfo, SortInfo sortInfo); - Task GetMessageBody(string messageId); - Task>> QueryAuditCounts(string endpointName); + Task>> QueryKnownEndpoints(CancellationToken cancellationToken); + Task> QuerySagaHistoryById(Guid input, CancellationToken cancellationToken); + Task>> GetMessages(bool includeSystemMessages, PagingInfo pagingInfo, SortInfo sortInfo, CancellationToken cancellationToken); + Task>> QueryMessages(string searchParam, PagingInfo pagingInfo, SortInfo sortInfo, CancellationToken cancellationToken); + Task>> QueryMessagesByReceivingEndpointAndKeyword(string endpoint, string keyword, PagingInfo pagingInfo, SortInfo sortInfo, CancellationToken cancellationToken); + Task>> QueryMessagesByReceivingEndpoint(bool includeSystemMessages, string endpointName, PagingInfo pagingInfo, SortInfo sortInfo, CancellationToken cancellationToken); + Task>> QueryMessagesByConversationId(string conversationId, PagingInfo pagingInfo, SortInfo sortInfo, CancellationToken cancellationToken); + Task GetMessageBody(string messageId, CancellationToken cancellationToken); + Task>> QueryAuditCounts(string endpointName, CancellationToken cancellationToken); } } \ No newline at end of file diff --git a/src/ServiceControl.Audit.Persistence/IBodyStorage.cs b/src/ServiceControl.Audit.Persistence/IBodyStorage.cs index ba87f5fbeb..ae95cb1d3f 100644 --- a/src/ServiceControl.Audit.Persistence/IBodyStorage.cs +++ b/src/ServiceControl.Audit.Persistence/IBodyStorage.cs @@ -7,7 +7,7 @@ public interface IBodyStorage { Task Store(string bodyId, string contentType, int bodySize, Stream bodyStream, CancellationToken cancellationToken); - Task TryFetch(string bodyId); + Task TryFetch(string bodyId, CancellationToken cancellationToken); } public class StreamResult diff --git a/src/ServiceControl.Audit.Persistence/IFailedAuditStorage.cs b/src/ServiceControl.Audit.Persistence/IFailedAuditStorage.cs index 54346eb31c..86ef3aedcd 100644 --- a/src/ServiceControl.Audit.Persistence/IFailedAuditStorage.cs +++ b/src/ServiceControl.Audit.Persistence/IFailedAuditStorage.cs @@ -11,7 +11,8 @@ public interface IFailedAuditStorage Task ProcessFailedMessages( Func, CancellationToken, Task> onMessage, - CancellationToken cancellationToken); + CancellationToken cancellationToken + ); Task GetFailedAuditsCount(); } diff --git a/src/ServiceControl.Audit.UnitTests/BodyStorage/BodyStorageEnricherTests.cs b/src/ServiceControl.Audit.UnitTests/BodyStorage/BodyStorageEnricherTests.cs index 84a8b94580..05ce6871cc 100644 --- a/src/ServiceControl.Audit.UnitTests/BodyStorage/BodyStorageEnricherTests.cs +++ b/src/ServiceControl.Audit.UnitTests/BodyStorage/BodyStorageEnricherTests.cs @@ -267,7 +267,7 @@ public Task Store(string bodyId, string contentType, int bodySize, Stream bodySt return Task.CompletedTask; } - public Task TryFetch(string bodyId) + public Task TryFetch(string bodyId, CancellationToken cancellationToken) { throw new NotImplementedException(); } diff --git a/src/ServiceControl.Audit/Auditing/MessagesView/GetMessagesController.cs b/src/ServiceControl.Audit/Auditing/MessagesView/GetMessagesController.cs index 9894835c3d..7c33da8d3e 100644 --- a/src/ServiceControl.Audit/Auditing/MessagesView/GetMessagesController.cs +++ b/src/ServiceControl.Audit/Auditing/MessagesView/GetMessagesController.cs @@ -2,6 +2,7 @@ namespace ServiceControl.Audit.Auditing.MessagesView { using System; using System.Collections.Generic; + using System.Threading; using System.Threading.Tasks; using Infrastructure; using Infrastructure.WebApi; @@ -14,36 +15,36 @@ public class GetMessagesController(IAuditDataStore dataStore) : ControllerBase { [Route("messages")] [HttpGet] - public async Task> GetAllMessages([FromQuery] PagingInfo pagingInfo, [FromQuery] SortInfo sortInfo, [FromQuery(Name = "include_system_messages")] bool includeSystemMessages) + public async Task> GetAllMessages([FromQuery] PagingInfo pagingInfo, [FromQuery] SortInfo sortInfo, [FromQuery(Name = "include_system_messages")] bool includeSystemMessages, CancellationToken cancellationToken) { - var result = await dataStore.GetMessages(includeSystemMessages, pagingInfo, sortInfo); + var result = await dataStore.GetMessages(includeSystemMessages, pagingInfo, sortInfo, cancellationToken); Response.WithQueryStatsAndPagingInfo(result.QueryStats, pagingInfo); return result.Results; } [Route("endpoints/{endpoint}/messages")] [HttpGet] - public async Task> GetEndpointMessages([FromQuery] PagingInfo pagingInfo, [FromQuery] SortInfo sortInfo, [FromQuery(Name = "include_system_messages")] bool includeSystemMessages, string endpoint) + public async Task> GetEndpointMessages([FromQuery] PagingInfo pagingInfo, [FromQuery] SortInfo sortInfo, [FromQuery(Name = "include_system_messages")] bool includeSystemMessages, string endpoint, CancellationToken cancellationToken) { - var result = await dataStore.QueryMessagesByReceivingEndpoint(includeSystemMessages, endpoint, pagingInfo, sortInfo); + var result = await dataStore.QueryMessagesByReceivingEndpoint(includeSystemMessages, endpoint, pagingInfo, sortInfo, cancellationToken); Response.WithQueryStatsAndPagingInfo(result.QueryStats, pagingInfo); return result.Results; } [Route("endpoints/{endpoint}/audit-count")] [HttpGet] - public async Task> GetEndpointAuditCounts([FromQuery] PagingInfo pagingInfo, string endpoint) + public async Task> GetEndpointAuditCounts([FromQuery] PagingInfo pagingInfo, string endpoint, CancellationToken cancellationToken) { - var result = await dataStore.QueryAuditCounts(endpoint); + var result = await dataStore.QueryAuditCounts(endpoint, cancellationToken); Response.WithQueryStatsAndPagingInfo(result.QueryStats, pagingInfo); return result.Results; } [Route("messages/{id}/body")] [HttpGet] - public async Task Get(string id) + public async Task Get(string id, CancellationToken cancellationToken) { - var result = await dataStore.GetMessageBody(id); + var result = await dataStore.GetMessageBody(id, cancellationToken); if (result.Found == false) { @@ -67,36 +68,36 @@ public async Task Get(string id) [Route("messages/search")] [HttpGet] - public async Task> Search([FromQuery] PagingInfo pagingInfo, [FromQuery] SortInfo sortInfo, string q) + public async Task> Search([FromQuery] PagingInfo pagingInfo, [FromQuery] SortInfo sortInfo, string q, CancellationToken cancellationToken) { - var result = await dataStore.QueryMessages(q, pagingInfo, sortInfo); + var result = await dataStore.QueryMessages(q, pagingInfo, sortInfo, cancellationToken); Response.WithQueryStatsAndPagingInfo(result.QueryStats, pagingInfo); return result.Results; } [Route("messages/search/{keyword}")] [HttpGet] - public async Task> SearchByKeyWord([FromQuery] PagingInfo pagingInfo, [FromQuery] SortInfo sortInfo, string keyword) + public async Task> SearchByKeyWord([FromQuery] PagingInfo pagingInfo, [FromQuery] SortInfo sortInfo, string keyword, CancellationToken cancellationToken) { - var result = await dataStore.QueryMessages(keyword, pagingInfo, sortInfo); + var result = await dataStore.QueryMessages(keyword, pagingInfo, sortInfo, cancellationToken); Response.WithQueryStatsAndPagingInfo(result.QueryStats, pagingInfo); return result.Results; } [Route("endpoints/{endpoint}/messages/search")] [HttpGet] - public async Task> Search([FromQuery] PagingInfo pagingInfo, [FromQuery] SortInfo sortInfo, string endpoint, string q) + public async Task> Search([FromQuery] PagingInfo pagingInfo, [FromQuery] SortInfo sortInfo, string endpoint, string q, CancellationToken cancellationToken) { - var result = await dataStore.QueryMessagesByReceivingEndpointAndKeyword(endpoint, q, pagingInfo, sortInfo); + var result = await dataStore.QueryMessagesByReceivingEndpointAndKeyword(endpoint, q, pagingInfo, sortInfo, cancellationToken); Response.WithQueryStatsAndPagingInfo(result.QueryStats, pagingInfo); return result.Results; } [Route("endpoints/{endpoint}/messages/search/{keyword}")] [HttpGet] - public async Task> SearchByKeyword([FromQuery] PagingInfo pagingInfo, [FromQuery] SortInfo sortInfo, string endpoint, string keyword) + public async Task> SearchByKeyword([FromQuery] PagingInfo pagingInfo, [FromQuery] SortInfo sortInfo, string endpoint, string keyword, CancellationToken cancellationToken) { - var result = await dataStore.QueryMessagesByReceivingEndpointAndKeyword(endpoint, keyword, pagingInfo, sortInfo); + var result = await dataStore.QueryMessagesByReceivingEndpointAndKeyword(endpoint, keyword, pagingInfo, sortInfo, cancellationToken); Response.WithQueryStatsAndPagingInfo(result.QueryStats, pagingInfo); return result.Results; } diff --git a/src/ServiceControl.Audit/Auditing/MessagesView/MessagesConversationController.cs b/src/ServiceControl.Audit/Auditing/MessagesView/MessagesConversationController.cs index b1b270d656..4fcc04cd88 100644 --- a/src/ServiceControl.Audit/Auditing/MessagesView/MessagesConversationController.cs +++ b/src/ServiceControl.Audit/Auditing/MessagesView/MessagesConversationController.cs @@ -1,6 +1,7 @@ namespace ServiceControl.Audit.Auditing.MessagesView { using System.Collections.Generic; + using System.Threading; using System.Threading.Tasks; using Infrastructure; using Infrastructure.WebApi; @@ -13,9 +14,9 @@ public class MessagesConversationController(IAuditDataStore dataStore) : Control { [Route("conversations/{conversationId}")] [HttpGet] - public async Task> Get([FromQuery] PagingInfo pagingInfo, [FromQuery] SortInfo sortInfo, string conversationId) + public async Task> Get([FromQuery] PagingInfo pagingInfo, [FromQuery] SortInfo sortInfo, string conversationId, CancellationToken cancellationToken) { - var result = await dataStore.QueryMessagesByConversationId(conversationId, pagingInfo, sortInfo); + var result = await dataStore.QueryMessagesByConversationId(conversationId, pagingInfo, sortInfo, cancellationToken); Response.WithQueryStatsAndPagingInfo(result.QueryStats, pagingInfo); return result.Results; } diff --git a/src/ServiceControl.Audit/Monitoring/KnownEndpoints/KnownEndpointsController.cs b/src/ServiceControl.Audit/Monitoring/KnownEndpoints/KnownEndpointsController.cs index 5977ade83d..2ed0b2a278 100644 --- a/src/ServiceControl.Audit/Monitoring/KnownEndpoints/KnownEndpointsController.cs +++ b/src/ServiceControl.Audit/Monitoring/KnownEndpoints/KnownEndpointsController.cs @@ -1,6 +1,7 @@ namespace ServiceControl.Audit.Monitoring { using System.Collections.Generic; + using System.Threading; using System.Threading.Tasks; using Infrastructure; using Infrastructure.WebApi; @@ -13,9 +14,9 @@ public class KnownEndpointsController(IAuditDataStore dataStore) : ControllerBas { [Route("endpoints/known")] [HttpGet] - public async Task> GetAll([FromQuery] PagingInfo pagingInfo) + public async Task> GetAll([FromQuery] PagingInfo pagingInfo, CancellationToken cancellationToken) { - var result = await dataStore.QueryKnownEndpoints(); + var result = await dataStore.QueryKnownEndpoints(cancellationToken); Response.WithQueryStatsAndPagingInfo(result.QueryStats, pagingInfo); return result.Results; } diff --git a/src/ServiceControl.Audit/SagaAudit/SagasController.cs b/src/ServiceControl.Audit/SagaAudit/SagasController.cs index a06a05fda1..cd2356784d 100644 --- a/src/ServiceControl.Audit/SagaAudit/SagasController.cs +++ b/src/ServiceControl.Audit/SagaAudit/SagasController.cs @@ -1,6 +1,7 @@ namespace ServiceControl.Audit.SagaAudit { using System; + using System.Threading; using System.Threading.Tasks; using Infrastructure; using Infrastructure.WebApi; @@ -14,9 +15,9 @@ public class SagasController(IAuditDataStore dataStore) : ControllerBase { [Route("sagas/{id}")] [HttpGet] - public async Task Sagas([FromQuery] PagingInfo pagingInfo, Guid id) + public async Task Sagas([FromQuery] PagingInfo pagingInfo, Guid id, CancellationToken cancellationToken) { - var result = await dataStore.QuerySagaHistoryById(id); + var result = await dataStore.QuerySagaHistoryById(id, cancellationToken); Response.WithQueryStatsAndPagingInfo(result.QueryStats, pagingInfo); return result.Results; } From 90694a2beb93b5a57f0202634733ec77cbbfb9b8 Mon Sep 17 00:00:00 2001 From: Ramon Smits Date: Wed, 12 Mar 2025 10:32:45 +0100 Subject: [PATCH 14/14] APIApprovels HTTP API routes --- .../APIApprovals.HttpApiRoutes.approved.txt | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/src/ServiceControl.Audit.UnitTests/ApprovalFiles/APIApprovals.HttpApiRoutes.approved.txt b/src/ServiceControl.Audit.UnitTests/ApprovalFiles/APIApprovals.HttpApiRoutes.approved.txt index 9483473d76..67a59c4d88 100644 --- a/src/ServiceControl.Audit.UnitTests/ApprovalFiles/APIApprovals.HttpApiRoutes.approved.txt +++ b/src/ServiceControl.Audit.UnitTests/ApprovalFiles/APIApprovals.HttpApiRoutes.approved.txt @@ -1,15 +1,15 @@ GET / => ServiceControl.Audit.Infrastructure.WebApi.RootController:Urls() GET /configuration => ServiceControl.Audit.Infrastructure.WebApi.RootController:Config() GET /connection => ServiceControl.Audit.Connection.ConnectionController:GetConnectionDetails() -GET /conversations/{conversationId} => ServiceControl.Audit.Auditing.MessagesView.MessagesConversationController:Get(PagingInfo pagingInfo, SortInfo sortInfo, String conversationId) -GET /endpoints/{endpoint}/audit-count => ServiceControl.Audit.Auditing.MessagesView.GetMessagesController:GetEndpointAuditCounts(PagingInfo pagingInfo, String endpoint) -GET /endpoints/{endpoint}/messages => ServiceControl.Audit.Auditing.MessagesView.GetMessagesController:GetEndpointMessages(PagingInfo pagingInfo, SortInfo sortInfo, Boolean includeSystemMessages, String endpoint) -GET /endpoints/{endpoint}/messages/search => ServiceControl.Audit.Auditing.MessagesView.GetMessagesController:Search(PagingInfo pagingInfo, SortInfo sortInfo, String endpoint, String q) -GET /endpoints/{endpoint}/messages/search/{keyword} => ServiceControl.Audit.Auditing.MessagesView.GetMessagesController:SearchByKeyword(PagingInfo pagingInfo, SortInfo sortInfo, String endpoint, String keyword) -GET /endpoints/known => ServiceControl.Audit.Monitoring.KnownEndpointsController:GetAll(PagingInfo pagingInfo) +GET /conversations/{conversationId} => ServiceControl.Audit.Auditing.MessagesView.MessagesConversationController:Get(PagingInfo pagingInfo, SortInfo sortInfo, String conversationId, CancellationToken cancellationToken) +GET /endpoints/{endpoint}/audit-count => ServiceControl.Audit.Auditing.MessagesView.GetMessagesController:GetEndpointAuditCounts(PagingInfo pagingInfo, String endpoint, CancellationToken cancellationToken) +GET /endpoints/{endpoint}/messages => ServiceControl.Audit.Auditing.MessagesView.GetMessagesController:GetEndpointMessages(PagingInfo pagingInfo, SortInfo sortInfo, Boolean includeSystemMessages, String endpoint, CancellationToken cancellationToken) +GET /endpoints/{endpoint}/messages/search => ServiceControl.Audit.Auditing.MessagesView.GetMessagesController:Search(PagingInfo pagingInfo, SortInfo sortInfo, String endpoint, String q, CancellationToken cancellationToken) +GET /endpoints/{endpoint}/messages/search/{keyword} => ServiceControl.Audit.Auditing.MessagesView.GetMessagesController:SearchByKeyword(PagingInfo pagingInfo, SortInfo sortInfo, String endpoint, String keyword, CancellationToken cancellationToken) +GET /endpoints/known => ServiceControl.Audit.Monitoring.KnownEndpointsController:GetAll(PagingInfo pagingInfo, CancellationToken cancellationToken) GET /instance-info => ServiceControl.Audit.Infrastructure.WebApi.RootController:Config() -GET /messages => ServiceControl.Audit.Auditing.MessagesView.GetMessagesController:GetAllMessages(PagingInfo pagingInfo, SortInfo sortInfo, Boolean includeSystemMessages) -GET /messages/{id}/body => ServiceControl.Audit.Auditing.MessagesView.GetMessagesController:Get(String id) -GET /messages/search => ServiceControl.Audit.Auditing.MessagesView.GetMessagesController:Search(PagingInfo pagingInfo, SortInfo sortInfo, String q) -GET /messages/search/{keyword} => ServiceControl.Audit.Auditing.MessagesView.GetMessagesController:SearchByKeyWord(PagingInfo pagingInfo, SortInfo sortInfo, String keyword) -GET /sagas/{id} => ServiceControl.Audit.SagaAudit.SagasController:Sagas(PagingInfo pagingInfo, Guid id) +GET /messages => ServiceControl.Audit.Auditing.MessagesView.GetMessagesController:GetAllMessages(PagingInfo pagingInfo, SortInfo sortInfo, Boolean includeSystemMessages, CancellationToken cancellationToken) +GET /messages/{id}/body => ServiceControl.Audit.Auditing.MessagesView.GetMessagesController:Get(String id, CancellationToken cancellationToken) +GET /messages/search => ServiceControl.Audit.Auditing.MessagesView.GetMessagesController:Search(PagingInfo pagingInfo, SortInfo sortInfo, String q, CancellationToken cancellationToken) +GET /messages/search/{keyword} => ServiceControl.Audit.Auditing.MessagesView.GetMessagesController:SearchByKeyWord(PagingInfo pagingInfo, SortInfo sortInfo, String keyword, CancellationToken cancellationToken) +GET /sagas/{id} => ServiceControl.Audit.SagaAudit.SagasController:Sagas(PagingInfo pagingInfo, Guid id, CancellationToken cancellationToken)