Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public async Task<QueryResult<SagaHistory>> QuerySagaHistoryById(Guid input, Can
return await Task.FromResult(new QueryResult<SagaHistory>(sagaHistory, new QueryStatsInfo(string.Empty, 1)));
}

public async Task<QueryResult<IList<MessagesView>>> GetMessages(bool includeSystemMessages, PagingInfo pagingInfo, SortInfo sortInfo, CancellationToken cancellationToken)
public async Task<QueryResult<IList<MessagesView>>> GetMessages(bool includeSystemMessages, PagingInfo pagingInfo, SortInfo sortInfo, DateTimeRange timeSentRange, CancellationToken cancellationToken)
{
var matched = messageViews
.Where(w => !w.IsSystemMessage || includeSystemMessages)
Expand All @@ -51,7 +51,7 @@ public async Task<QueryResult<IList<MessagesView>>> GetMessages(bool includeSyst
return await Task.FromResult(new QueryResult<IList<MessagesView>>(matched, new QueryStatsInfo(string.Empty, matched.Count)));
}

public async Task<QueryResult<IList<MessagesView>>> QueryMessages(string keyword, PagingInfo pagingInfo, SortInfo sortInfo, CancellationToken cancellationToken)
public async Task<QueryResult<IList<MessagesView>>> QueryMessages(string keyword, PagingInfo pagingInfo, SortInfo sortInfo, DateTimeRange timeSentRange, CancellationToken cancellationToken)
{
var messages = GetMessageIdsMatchingQuery(keyword);

Expand All @@ -61,15 +61,15 @@ public async Task<QueryResult<IList<MessagesView>>> QueryMessages(string keyword
return await Task.FromResult(new QueryResult<IList<MessagesView>>(matched, new QueryStatsInfo(string.Empty, matched.Count())));
}

public async Task<QueryResult<IList<MessagesView>>> QueryMessagesByReceivingEndpointAndKeyword(string endpoint, string keyword, PagingInfo pagingInfo, SortInfo sortInfo, CancellationToken cancellationToken)
public async Task<QueryResult<IList<MessagesView>>> QueryMessagesByReceivingEndpointAndKeyword(string endpoint, string keyword, PagingInfo pagingInfo, SortInfo sortInfo, DateTimeRange timeSentRange, CancellationToken cancellationToken)
{
var messages = GetMessageIdsMatchingQuery(keyword);

var matched = messageViews.Where(w => w.ReceivingEndpoint.Name == endpoint && messages.Contains(w.MessageId)).ToList();
return await Task.FromResult(new QueryResult<IList<MessagesView>>(matched, new QueryStatsInfo(string.Empty, matched.Count)));
}

public async Task<QueryResult<IList<MessagesView>>> QueryMessagesByReceivingEndpoint(bool includeSystemMessages, string endpointName, PagingInfo pagingInfo, SortInfo sortInfo, CancellationToken cancellationToken)
public async Task<QueryResult<IList<MessagesView>>> QueryMessagesByReceivingEndpoint(bool includeSystemMessages, string endpointName, PagingInfo pagingInfo, SortInfo sortInfo, DateTimeRange timeSentRange, CancellationToken cancellationToken)
{
var matched = messageViews.Where(w => w.ReceivingEndpoint.Name == endpointName).ToList();
return await Task.FromResult(new QueryResult<IList<MessagesView>>(matched, new QueryStatsInfo(string.Empty, matched.Count)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,26 @@ static class RavenQueryExtensions
return source;
}

public static IQueryable<MessagesViewIndex.SortAndFilterOptions> FilterBySentTimeRange(this IQueryable<MessagesViewIndex.SortAndFilterOptions> source, DateTimeRange range)
{
if (range == null)
{
return source;
}

if (range.From.HasValue)
{
source = source.Where(m => m.TimeSent >= range.From);
}

if (range.To.HasValue)
{
source = source.Where(m => m.TimeSent <= range.To);
}

return source;
}

public static IQueryable<TSource> Paging<TSource>(this IQueryable<TSource> source, PagingInfo pagingInfo)
=> source.Skip(pagingInfo.Offset).Take(pagingInfo.PageSize);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,12 @@ public async Task<QueryResult<SagaHistory>> QuerySagaHistoryById(Guid input, Can
return sagaHistory == null ? QueryResult<SagaHistory>.Empty() : new QueryResult<SagaHistory>(sagaHistory, new QueryStatsInfo($"{stats.ResultEtag}", stats.TotalResults));
}

public async Task<QueryResult<IList<MessagesView>>> GetMessages(bool includeSystemMessages, PagingInfo pagingInfo, SortInfo sortInfo, CancellationToken cancellationToken)
public async Task<QueryResult<IList<MessagesView>>> GetMessages(bool includeSystemMessages, PagingInfo pagingInfo, SortInfo sortInfo, DateTimeRange timeSentRange, CancellationToken cancellationToken)
{
using var session = await sessionProvider.OpenSession(cancellationToken: cancellationToken);
var results = await session.Query<MessagesViewIndex.SortAndFilterOptions>(GetIndexName(isFullTextSearchEnabled))
.Statistics(out var stats)
.FilterBySentTimeRange(timeSentRange)
.IncludeSystemMessagesWhere(includeSystemMessages)
.Sort(sortInfo)
.Paging(pagingInfo)
Expand All @@ -45,12 +46,13 @@ public async Task<QueryResult<IList<MessagesView>>> GetMessages(bool includeSyst
return new QueryResult<IList<MessagesView>>(results, stats.ToQueryStatsInfo());
}

public async Task<QueryResult<IList<MessagesView>>> QueryMessages(string searchParam, PagingInfo pagingInfo, SortInfo sortInfo, CancellationToken cancellationToken)
public async Task<QueryResult<IList<MessagesView>>> QueryMessages(string searchParam, PagingInfo pagingInfo, SortInfo sortInfo, DateTimeRange timeSentRange, CancellationToken cancellationToken)
{
using var session = await sessionProvider.OpenSession(cancellationToken: cancellationToken);
var results = await session.Query<MessagesViewIndex.SortAndFilterOptions>(GetIndexName(isFullTextSearchEnabled))
.Statistics(out var stats)
.Search(x => x.Query, searchParam)
.FilterBySentTimeRange(timeSentRange)
.Sort(sortInfo)
.Paging(pagingInfo)
.ToMessagesView()
Expand All @@ -59,13 +61,14 @@ public async Task<QueryResult<IList<MessagesView>>> QueryMessages(string searchP
return new QueryResult<IList<MessagesView>>(results, stats.ToQueryStatsInfo());
}

public async Task<QueryResult<IList<MessagesView>>> QueryMessagesByReceivingEndpointAndKeyword(string endpoint, string keyword, PagingInfo pagingInfo, SortInfo sortInfo, CancellationToken cancellationToken)
public async Task<QueryResult<IList<MessagesView>>> QueryMessagesByReceivingEndpointAndKeyword(string endpoint, string keyword, PagingInfo pagingInfo, SortInfo sortInfo, DateTimeRange timeSentRange, CancellationToken cancellationToken)
{
using var session = await sessionProvider.OpenSession(cancellationToken: cancellationToken);
var results = await session.Query<MessagesViewIndex.SortAndFilterOptions>(GetIndexName(isFullTextSearchEnabled))
.Statistics(out var stats)
.Search(x => x.Query, keyword)
.Where(m => m.ReceivingEndpointName == endpoint)
.FilterBySentTimeRange(timeSentRange)
.Sort(sortInfo)
.Paging(pagingInfo)
.ToMessagesView()
Expand All @@ -74,13 +77,14 @@ public async Task<QueryResult<IList<MessagesView>>> QueryMessagesByReceivingEndp
return new QueryResult<IList<MessagesView>>(results, stats.ToQueryStatsInfo());
}

public async Task<QueryResult<IList<MessagesView>>> QueryMessagesByReceivingEndpoint(bool includeSystemMessages, string endpointName, PagingInfo pagingInfo, SortInfo sortInfo, CancellationToken cancellationToken)
public async Task<QueryResult<IList<MessagesView>>> QueryMessagesByReceivingEndpoint(bool includeSystemMessages, string endpointName, PagingInfo pagingInfo, SortInfo sortInfo, DateTimeRange timeSentRange, CancellationToken cancellationToken)
{
using var session = await sessionProvider.OpenSession(cancellationToken: cancellationToken);
var results = await session.Query<MessagesViewIndex.SortAndFilterOptions>(GetIndexName(isFullTextSearchEnabled))
.Statistics(out var stats)
.IncludeSystemMessagesWhere(includeSystemMessages)
.Where(m => m.ReceivingEndpointName == endpointName)
.FilterBySentTimeRange(timeSentRange)
.Sort(sortInfo)
.Paging(pagingInfo)
.ToMessagesView()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,13 @@ public async Task AuditMessageRetention()
{
var message = MakeMessage("MyMessageId");

await IngestProcessedMessagesAudits(
message
);
await IngestProcessedMessagesAudits(message);

var queryResultBeforeExpiration = await DataStore.QueryMessages("MyMessageId", new PagingInfo(), new SortInfo("Id", "asc"), TestContext.CurrentContext.CancellationToken);
var queryResultBeforeExpiration = await DataStore.QueryMessages("MyMessageId", new PagingInfo(), new SortInfo("Id", "asc"), cancellationToken: TestContext.CurrentContext.CancellationToken);

await Task.Delay(4000);

var queryResultAfterExpiration = await DataStore.QueryMessages("MyMessageId", new PagingInfo(), new SortInfo("Id", "asc"), TestContext.CurrentContext.CancellationToken);
var queryResultAfterExpiration = await DataStore.QueryMessages("MyMessageId", new PagingInfo(), new SortInfo("Id", "asc"), cancellationToken: TestContext.CurrentContext.CancellationToken);

Assert.That(queryResultBeforeExpiration.Results, Has.Count.EqualTo(1));
Assert.Multiple(() =>
Expand Down
12 changes: 6 additions & 6 deletions src/ServiceControl.Audit.Persistence.Tests/AuditTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ await IngestProcessedMessagesAudits(
message
);

var queryResult = await DataStore.QueryMessages("MyMessageId", new PagingInfo(), new SortInfo("Id", "asc"), TestContext.CurrentContext.CancellationToken);
var queryResult = await DataStore.QueryMessages("MyMessageId", new PagingInfo(), new SortInfo("Id", "asc"), cancellationToken: TestContext.CurrentContext.CancellationToken);

Assert.That(queryResult.Results, Has.Count.EqualTo(1));
Assert.That(queryResult.Results[0].MessageId, Is.EqualTo("MyMessageId"));
Expand All @@ -40,7 +40,7 @@ await IngestProcessedMessagesAudits(
public async Task Handles_no_results_gracefully()
{
var nonExistingMessage = Guid.NewGuid().ToString();
var queryResult = await DataStore.QueryMessages(nonExistingMessage, new PagingInfo(), new SortInfo("Id", "asc"), TestContext.CurrentContext.CancellationToken);
var queryResult = await DataStore.QueryMessages(nonExistingMessage, new PagingInfo(), new SortInfo("Id", "asc"), cancellationToken: TestContext.CurrentContext.CancellationToken);

Assert.That(queryResult.Results, Is.Empty);
}
Expand Down Expand Up @@ -73,7 +73,7 @@ await IngestProcessedMessagesAudits(
);

var queryResult = await DataStore.QueryMessages("MyMessageType", new PagingInfo(),
new SortInfo("message_id", "asc"), TestContext.CurrentContext.CancellationToken);
new SortInfo("message_id", "asc"), cancellationToken: TestContext.CurrentContext.CancellationToken);

Assert.That(queryResult.Results, Has.Count.EqualTo(2));
}
Expand Down Expand Up @@ -158,7 +158,7 @@ public async Task Deduplicates_messages_in_same_batch()

await configuration.CompleteDBOperation();

var queryResult = await DataStore.GetMessages(false, new PagingInfo(), new SortInfo("message_id", "asc"), TestContext.CurrentContext.CancellationToken);
var queryResult = await DataStore.GetMessages(false, new PagingInfo(), new SortInfo("message_id", "asc"), cancellationToken: TestContext.CurrentContext.CancellationToken);

Assert.That(queryResult.QueryStats.TotalCount, Is.EqualTo(1));
}
Expand All @@ -182,7 +182,7 @@ public async Task Deduplicates_messages_in_different_batches()

await configuration.CompleteDBOperation();

var queryResult = await DataStore.GetMessages(false, new PagingInfo(), new SortInfo("message_id", "asc"), TestContext.CurrentContext.CancellationToken);
var queryResult = await DataStore.GetMessages(false, new PagingInfo(), new SortInfo("message_id", "asc"), cancellationToken: TestContext.CurrentContext.CancellationToken);

Assert.That(queryResult.QueryStats.TotalCount, Is.EqualTo(1));
}
Expand All @@ -205,7 +205,7 @@ public async Task Does_not_deduplicate_with_different_processing_started_header(

await configuration.CompleteDBOperation();

var queryResult = await DataStore.GetMessages(false, new PagingInfo(), new SortInfo("message_id", "asc"), TestContext.CurrentContext.CancellationToken);
var queryResult = await DataStore.GetMessages(false, new PagingInfo(), new SortInfo("message_id", "asc"), cancellationToken: TestContext.CurrentContext.CancellationToken);

Assert.That(queryResult.QueryStats.TotalCount, Is.EqualTo(2));
}
Expand Down
8 changes: 4 additions & 4 deletions src/ServiceControl.Audit.Persistence/IAuditDataStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ public interface IAuditDataStore
{
Task<QueryResult<IList<KnownEndpointsView>>> QueryKnownEndpoints(CancellationToken cancellationToken);
Task<QueryResult<SagaHistory>> QuerySagaHistoryById(Guid input, CancellationToken cancellationToken);
Task<QueryResult<IList<MessagesView>>> GetMessages(bool includeSystemMessages, PagingInfo pagingInfo, SortInfo sortInfo, CancellationToken cancellationToken);
Task<QueryResult<IList<MessagesView>>> QueryMessages(string searchParam, PagingInfo pagingInfo, SortInfo sortInfo, CancellationToken cancellationToken);
Task<QueryResult<IList<MessagesView>>> QueryMessagesByReceivingEndpointAndKeyword(string endpoint, string keyword, PagingInfo pagingInfo, SortInfo sortInfo, CancellationToken cancellationToken);
Task<QueryResult<IList<MessagesView>>> QueryMessagesByReceivingEndpoint(bool includeSystemMessages, string endpointName, PagingInfo pagingInfo, SortInfo sortInfo, CancellationToken cancellationToken);
Task<QueryResult<IList<MessagesView>>> GetMessages(bool includeSystemMessages, PagingInfo pagingInfo, SortInfo sortInfo, DateTimeRange timeSentRange = null, CancellationToken cancellationToken = default);
Task<QueryResult<IList<MessagesView>>> QueryMessages(string searchParam, PagingInfo pagingInfo, SortInfo sortInfo, DateTimeRange timeSentRange = null, CancellationToken cancellationToken = default);
Task<QueryResult<IList<MessagesView>>> QueryMessagesByReceivingEndpointAndKeyword(string endpoint, string keyword, PagingInfo pagingInfo, SortInfo sortInfo, DateTimeRange timeSentRange = null, CancellationToken cancellationToken = default);
Task<QueryResult<IList<MessagesView>>> QueryMessagesByReceivingEndpoint(bool includeSystemMessages, string endpointName, PagingInfo pagingInfo, SortInfo sortInfo, DateTimeRange timeSentRange = null, CancellationToken cancellationToken = default);
Task<QueryResult<IList<MessagesView>>> QueryMessagesByConversationId(string conversationId, PagingInfo pagingInfo, SortInfo sortInfo, CancellationToken cancellationToken);
Task<MessageBodyView> GetMessageBody(string messageId, CancellationToken cancellationToken);
Task<QueryResult<IList<AuditCount>>> QueryAuditCounts(string endpointName, CancellationToken cancellationToken);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
namespace ServiceControl.Audit.Infrastructure;

using System;
using System.Globalization;

public class DateTimeRange
{
public DateTime? From { get; }
public DateTime? To { get; }

public DateTimeRange(string from = null, string to = null)
{
if (from != null)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exception handling for invalid date string?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exceptions in REST apis are hard, they don't really bubble to the UI,
So not sure what can we do better?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The point is that this is currently throwing an exception. At least ignore the value if it's not valid so that the client still receives data

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But then they don't know they made a mistake, and they would get unexpected results.
This is a tricky topic that, unfortunately, our REST API does not handle well.
So, I am cautious about introducing a new pattern that is not well thought out.

{
From = DateTime.Parse(from, CultureInfo.InvariantCulture, DateTimeStyles.RoundtripKind);
}
if (to != null)
{
To = DateTime.Parse(to, CultureInfo.InvariantCulture, DateTimeStyles.RoundtripKind);
}
}

public DateTimeRange(DateTime? from = null, DateTime? to = null)
{
From = from;
To = to;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@ GET /messages => ServiceControl.Audit.Auditing.MessagesView.GetMessagesControlle
GET /messages/{id}/body => ServiceControl.Audit.Auditing.MessagesView.GetMessagesController:Get(String id, CancellationToken cancellationToken)
GET /messages/search => ServiceControl.Audit.Auditing.MessagesView.GetMessagesController:Search(PagingInfo pagingInfo, SortInfo sortInfo, String q, CancellationToken cancellationToken)
GET /messages/search/{keyword} => ServiceControl.Audit.Auditing.MessagesView.GetMessagesController:SearchByKeyWord(PagingInfo pagingInfo, SortInfo sortInfo, String keyword, CancellationToken cancellationToken)
GET /messages2 => ServiceControl.Audit.Auditing.MessagesView.GetMessages2Controller:GetAllMessages(SortInfo sortInfo, Int32 pageSize, String endpointName, String from, String to, String q, CancellationToken cancellationToken)
GET /sagas/{id} => ServiceControl.Audit.SagaAudit.SagasController:Sagas(PagingInfo pagingInfo, Guid id, CancellationToken cancellationToken)
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
namespace ServiceControl.Audit.Auditing.MessagesView;

using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Infrastructure;
using Infrastructure.WebApi;
using Microsoft.AspNetCore.Mvc;
using Persistence;

[ApiController]
[Route("api")]
public class GetMessages2Controller(IAuditDataStore dataStore) : ControllerBase
Comment on lines +11 to +13
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it be possible to use ApiVersionAttribute by using the Microsoft.AspNetCore.Mvc.Versioning package?

This is just pseudo typing but it would then become something like:

Suggested change
[ApiController]
[Route("api")]
public class GetMessages2Controller(IAuditDataStore dataStore) : ControllerBase
[ApiController]
[Route("api")]
[ApiVersion("2.0")]
public class GetMessages2Controller(IAuditDataStore dataStore) : ControllerBase

This also allows for a more clean usage of the deprecated API later on.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not in this PR

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ramonsmits I am not too concerned with api versioning given that we control the clients and this is not a publicly documented api so we can evolve it anyway we want as long our clients do not break.
So if you feel strongly about api versioning we can have that discussion separately from this work.
But otherwise can you unblock the PR so we can proceed with the port of SI to SP.

{
[Route("messages2")]
[HttpGet]
public async Task<IList<MessagesView>> GetAllMessages(
[FromQuery] SortInfo sortInfo,
[FromQuery(Name = "page_size")] int pageSize,
[FromQuery(Name = "endpoint_name")] string endpointName,
[FromQuery(Name = "from")] string from,
[FromQuery(Name = "to")] string to,
string q,
CancellationToken cancellationToken)
{
QueryResult<IList<MessagesView>> result;
var pagingInfo = new PagingInfo(pageSize: pageSize);
if (string.IsNullOrWhiteSpace(endpointName))
{
if (string.IsNullOrWhiteSpace(q))
{
result = await dataStore.GetMessages(false, pagingInfo, sortInfo, new DateTimeRange(from, to), cancellationToken);
}
else
{
result = await dataStore.QueryMessages(q, pagingInfo, sortInfo, new DateTimeRange(from, to), cancellationToken);
}
}
else
{
if (string.IsNullOrWhiteSpace(q))
{
result = await dataStore.QueryMessagesByReceivingEndpoint(false, endpointName, pagingInfo, sortInfo,
new DateTimeRange(from, to), cancellationToken);
}
else
{
result = await dataStore.QueryMessagesByReceivingEndpointAndKeyword(endpointName, q, pagingInfo,
sortInfo, new DateTimeRange(from, to), cancellationToken);
}
}

Response.WithTotalCount(result.QueryStats.TotalCount);

return result.Results;
}
}
Loading