Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
namespace ServiceControl.AcceptanceTests.RavenDB.Recoverability.MessageFailures
{
using System;
using System.Threading;
using System.Threading.Tasks;
using AcceptanceTesting;
using AcceptanceTesting.EndpointTemplates;
Expand Down Expand Up @@ -75,7 +76,7 @@ public async Task It_can_be_reimported()

class MessageFailedHandler(MyContext scenarioContext) : IDomainHandler<MessageFailed>
{
public Task Handle(MessageFailed domainEvent)
public Task Handle(MessageFailed domainEvent, CancellationToken cancellationToken)
{
scenarioContext.MessageFailedEventPublished = true;
return Task.CompletedTask;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using ServiceControl.Audit.Auditing.BodyStorage;

Expand All @@ -15,7 +16,7 @@ public InMemoryAttachmentsBodyStorage()
messageBodies = [];
}

public Task Store(string bodyId, string contentType, int bodySize, Stream bodyStream)
public Task Store(string bodyId, string contentType, int bodySize, Stream bodyStream, CancellationToken cancellationToken)
{
var messageBody = messageBodies.FirstOrDefault(w => w.BodyId == bodyId);

Expand Down Expand Up @@ -43,7 +44,7 @@ public Task Store(string bodyId, string contentType, int bodySize, Stream bodySt
return Task.CompletedTask;
}

public async Task<StreamResult> TryFetch(string bodyId)
public async Task<StreamResult> TryFetch(string bodyId, CancellationToken cancellationToken)
{
var messageBody = messageBodies.FirstOrDefault(w => w.BodyId == bodyId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using ServiceControl.Audit.Auditing;
using ServiceControl.Audit.Auditing.BodyStorage;
Expand All @@ -29,7 +30,7 @@ public InMemoryAuditDataStore(IBodyStorage bodyStorage)
failedAuditImports = [];
}

public async Task<QueryResult<SagaHistory>> QuerySagaHistoryById(Guid input)
public async Task<QueryResult<SagaHistory>> QuerySagaHistoryById(Guid input, CancellationToken cancellationToken)
{
var sagaHistory = sagaHistories.FirstOrDefault(w => w.SagaId == input);

Expand All @@ -41,7 +42,7 @@ public async Task<QueryResult<SagaHistory>> QuerySagaHistoryById(Guid input)
return await Task.FromResult(new QueryResult<SagaHistory>(sagaHistory, new QueryStatsInfo(string.Empty, 1)));
}

public async Task<QueryResult<IList<MessagesView>>> GetMessages(bool includeSystemMessages, PagingInfo pagingInfo, SortInfo sortInfo)
public async Task<QueryResult<IList<MessagesView>>> GetMessages(bool includeSystemMessages, PagingInfo pagingInfo, SortInfo sortInfo, CancellationToken cancellationToken)
{
var matched = messageViews
.Where(w => !w.IsSystemMessage || includeSystemMessages)
Expand All @@ -50,7 +51,7 @@ public async Task<QueryResult<IList<MessagesView>>> GetMessages(bool includeSyst
return await Task.FromResult(new QueryResult<IList<MessagesView>>(matched, new QueryStatsInfo(string.Empty, matched.Count)));
}

public async Task<QueryResult<IList<MessagesView>>> QueryMessages(string keyword, PagingInfo pagingInfo, SortInfo sortInfo)
public async Task<QueryResult<IList<MessagesView>>> QueryMessages(string keyword, PagingInfo pagingInfo, SortInfo sortInfo, CancellationToken cancellationToken)
{
var messages = GetMessageIdsMatchingQuery(keyword);

Expand All @@ -60,33 +61,33 @@ public async Task<QueryResult<IList<MessagesView>>> QueryMessages(string keyword
return await Task.FromResult(new QueryResult<IList<MessagesView>>(matched, new QueryStatsInfo(string.Empty, matched.Count())));
}

public async Task<QueryResult<IList<MessagesView>>> QueryMessagesByReceivingEndpointAndKeyword(string endpoint, string keyword, PagingInfo pagingInfo, SortInfo sortInfo)
public async Task<QueryResult<IList<MessagesView>>> QueryMessagesByReceivingEndpointAndKeyword(string endpoint, string keyword, PagingInfo pagingInfo, SortInfo sortInfo, CancellationToken cancellationToken)
{
var messages = GetMessageIdsMatchingQuery(keyword);

var matched = messageViews.Where(w => w.ReceivingEndpoint.Name == endpoint && messages.Contains(w.MessageId)).ToList();
return await Task.FromResult(new QueryResult<IList<MessagesView>>(matched, new QueryStatsInfo(string.Empty, matched.Count)));
}

public async Task<QueryResult<IList<MessagesView>>> QueryMessagesByReceivingEndpoint(bool includeSystemMessages, string endpointName, PagingInfo pagingInfo, SortInfo sortInfo)
public async Task<QueryResult<IList<MessagesView>>> QueryMessagesByReceivingEndpoint(bool includeSystemMessages, string endpointName, PagingInfo pagingInfo, SortInfo sortInfo, CancellationToken cancellationToken)
{
var matched = messageViews.Where(w => w.ReceivingEndpoint.Name == endpointName).ToList();
return await Task.FromResult(new QueryResult<IList<MessagesView>>(matched, new QueryStatsInfo(string.Empty, matched.Count)));
}

public async Task<QueryResult<IList<MessagesView>>> QueryMessagesByConversationId(string conversationId, PagingInfo pagingInfo, SortInfo sortInfo)
public async Task<QueryResult<IList<MessagesView>>> QueryMessagesByConversationId(string conversationId, PagingInfo pagingInfo, SortInfo sortInfo, CancellationToken cancellationToken)
{
var matched = messageViews.Where(w => w.ConversationId == conversationId).ToList();
return await Task.FromResult(new QueryResult<IList<MessagesView>>(matched, new QueryStatsInfo(string.Empty, matched.Count)));
}

public async Task<MessageBodyView> GetMessageBody(string messageId)
public async Task<MessageBodyView> GetMessageBody(string messageId, CancellationToken cancellationToken)
{
var result = await GetMessageBodyFromMetadata(messageId);

if (!result.Found)
{
var fromAttachments = await GetMessageBodyFromAttachments(messageId);
var fromAttachments = await GetMessageBodyFromAttachments(messageId, cancellationToken);
if (fromAttachments.Found)
{
return fromAttachments;
Expand Down Expand Up @@ -121,9 +122,9 @@ IList<string> GetMessageIdsMatchingQuery(string keyword)
.Select(pm => pm.MessageMetadata["MessageId"] as string)
.ToList();
}
async Task<MessageBodyView> GetMessageBodyFromAttachments(string messageId)
async Task<MessageBodyView> GetMessageBodyFromAttachments(string messageId, CancellationToken cancellationToken)
{
var fromBodyStorage = await bodyStorage.TryFetch(messageId);
var fromBodyStorage = await bodyStorage.TryFetch(messageId, cancellationToken);

if (fromBodyStorage.HasResult)
{
Expand Down Expand Up @@ -165,7 +166,7 @@ Task<MessageBodyView> GetMessageBodyFromMetadata(string messageId)
return Task.FromResult(MessageBodyView.FromString(body, contentType, bodySize, string.Empty));
}

public async Task<QueryResult<IList<KnownEndpointsView>>> QueryKnownEndpoints()
public async Task<QueryResult<IList<KnownEndpointsView>>> QueryKnownEndpoints(CancellationToken cancellationToken)
{
var knownEndpointsView = knownEndpoints
.Select(x => new KnownEndpointsView
Expand All @@ -184,7 +185,7 @@ public async Task<QueryResult<IList<KnownEndpointsView>>> QueryKnownEndpoints()
return await Task.FromResult(new QueryResult<IList<KnownEndpointsView>>(knownEndpointsView, new QueryStatsInfo(string.Empty, knownEndpointsView.Count)));
}

public Task<QueryResult<IList<AuditCount>>> QueryAuditCounts(string endpointName)
public Task<QueryResult<IList<AuditCount>>> QueryAuditCounts(string endpointName, CancellationToken cancellationToken)
{
var results = messageViews
.Where(m => m.ReceivingEndpoint.Name == endpointName && !m.IsSystemMessage)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
namespace ServiceControl.Audit.Persistence.InMemory
{
using System;
using System.Threading;
using System.Threading.Tasks;
using Auditing.BodyStorage;
using ServiceControl.Audit.Auditing;
Expand All @@ -15,21 +16,21 @@ class InMemoryAuditIngestionUnitOfWork(
{
public ValueTask DisposeAsync() => ValueTask.CompletedTask;

public Task RecordKnownEndpoint(KnownEndpoint knownEndpoint)
public Task RecordKnownEndpoint(KnownEndpoint knownEndpoint, CancellationToken cancellationToken)
{
dataStore.knownEndpoints.Add(knownEndpoint);
return Task.CompletedTask;
}

public async Task RecordProcessedMessage(ProcessedMessage processedMessage, ReadOnlyMemory<byte> body)
public async Task RecordProcessedMessage(ProcessedMessage processedMessage, ReadOnlyMemory<byte> body, CancellationToken cancellationToken)
{
if (!body.IsEmpty)
{
await bodyStorageEnricher.StoreAuditMessageBody(body, processedMessage);
await bodyStorageEnricher.StoreAuditMessageBody(body, processedMessage, cancellationToken);
}
await dataStore.SaveProcessedMessage(processedMessage);
}

public Task RecordSagaSnapshot(SagaSnapshot sagaSnapshot) => dataStore.SaveSagaSnapshot(sagaSnapshot);
public Task RecordSagaSnapshot(SagaSnapshot sagaSnapshot, CancellationToken cancellationToken) => dataStore.SaveSagaSnapshot(sagaSnapshot);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
namespace ServiceControl.Audit.Persistence.InMemory
{
using System.Threading;
using System.Threading.Tasks;
using ServiceControl.Audit.Auditing.BodyStorage;
using ServiceControl.Audit.Persistence.UnitOfWork;
Expand All @@ -12,7 +13,7 @@ public InMemoryAuditIngestionUnitOfWorkFactory(InMemoryAuditDataStore dataStore,
bodyStorageEnricher = new BodyStorageEnricher(bodyStorage, settings);
}

public ValueTask<IAuditIngestionUnitOfWork> StartNew(int batchSize)
public ValueTask<IAuditIngestionUnitOfWork> StartNew(int batchSize, CancellationToken cancellationToken)
{
//The batchSize argument is ignored: the in-memory storage implementation doesn't support batching.
return new ValueTask<IAuditIngestionUnitOfWork>(new InMemoryAuditIngestionUnitOfWork(dataStore, bodyStorageEnricher));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
namespace ServiceControl.Audit.Persistence.RavenDB
{
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Auditing.BodyStorage;
using Raven.Client.Documents.BulkInsert;
Expand All @@ -11,21 +12,21 @@ class RavenAttachmentsBodyStorage(
int settingsMaxBodySizeToStore)
: IBodyStorage
{
public Task Store(string bodyId, string contentType, int bodySize, Stream bodyStream)
public Task Store(string bodyId, string contentType, int bodySize, Stream bodyStream, CancellationToken cancellationToken)
{
if (bodySize > settingsMaxBodySizeToStore)
{
return Task.CompletedTask;
}

return bulkInsert.AttachmentsFor(bodyId)
.StoreAsync("body", bodyStream, contentType);
.StoreAsync("body", bodyStream, contentType, cancellationToken);
}

public async Task<StreamResult> TryFetch(string bodyId)
public async Task<StreamResult> TryFetch(string bodyId, CancellationToken cancellationToken)
{
using var session = await sessionProvider.OpenSession();
var result = await session.Advanced.Attachments.GetAsync($"MessageBodies/{bodyId}", "body");
using var session = await sessionProvider.OpenSession(cancellationToken: cancellationToken);
var result = await session.Advanced.Attachments.GetAsync($"MessageBodies/{bodyId}", "body", cancellationToken);

if (result == null)
{
Expand Down
Loading