Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
</PropertyGroup>
<ItemGroup Label="Versions for direct package references">
<PackageVersion Include="Autofac" Version="8.2.0" />
<PackageVersion Include="AWSSDK.CloudWatch" Version="4.0.1" />
<PackageVersion Include="AWSSDK.CloudWatch" Version="4.0.6.1" />
<PackageVersion Include="AWSSDK.SecurityToken" Version="4.0.0.4" />
<PackageVersion Include="Azure.Identity" Version="1.13.2" />
<PackageVersion Include="Azure.Monitor.Query.Metrics" Version="1.0.0" />
Expand All @@ -27,6 +27,7 @@
<PackageVersion Include="Microsoft.Extensions.Logging" Version="8.0.1" />
<PackageVersion Include="Microsoft.Extensions.Logging.Configuration" Version="8.0.1" />
<PackageVersion Include="Microsoft.Extensions.Logging.Abstractions" Version="8.0.3" />
<PackageVersion Include="Microsoft.Extensions.Logging.Console" Version="8.0.1" />
<PackageVersion Include="Microsoft.Extensions.TimeProvider.Testing" Version="8.10.0" />
<PackageVersion Include="Microsoft.NET.Test.Sdk" Version="18.0.1" />
<PackageVersion Include="Microsoft-WindowsAPICodePack-Shell" Version="1.1.5" />
Expand Down
55 changes: 35 additions & 20 deletions src/ServiceControl.Transports.SQS.Tests/AmazonSQSQueryTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ namespace ServiceControl.Transport.Tests;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Time.Testing;
using NUnit.Framework;
using Particular.Approvals;
Expand All @@ -32,7 +32,9 @@ public void Initialise()
MaxConcurrency = 1,
EndpointName = Guid.NewGuid().ToString("N")
};
query = new AmazonSQSQuery(NullLogger<AmazonSQSQuery>.Instance, provider, transportSettings);
var loggerFactory = LoggerFactory.Create(builder => builder.AddSimpleConsole().SetMinimumLevel(LogLevel.Trace));
var logger = loggerFactory.CreateLogger<AmazonSQSQuery>();
query = new AmazonSQSQuery(logger, provider, transportSettings);
}

[Test]
Expand Down Expand Up @@ -94,11 +96,9 @@ public async Task TestConnectionWithValidSettings()
}

[Test]
[CancelAfter(2 * 60 * 1000)]
public async Task RunScenario()
{
// We need to wait a bit of time, to ensure AWS metrics are retrievable
using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromMinutes(6));
CancellationToken token = cancellationTokenSource.Token;
const int numMessagesToIngest = 15;

await CreateTestQueue(transportSettings.EndpointName);
Expand All @@ -111,37 +111,52 @@ public async Task RunScenario()
{
dictionary.Add(AmazonSQSQuery.AmazonSQSSettings.AccessKey, connectionString.AccessKey);
}

if (!string.IsNullOrEmpty(connectionString.SecretKey))
{
dictionary.Add(AmazonSQSQuery.AmazonSQSSettings.SecretKey, connectionString.SecretKey);
}

if (!string.IsNullOrEmpty(connectionString.Region))
{
dictionary.Add(AmazonSQSQuery.AmazonSQSSettings.Region, connectionString.Region);
}

query.Initialize(new ReadOnlyDictionary<string, string>(dictionary));
query.Initialize(dictionary.AsReadOnly());

await Task.Delay(TimeSpan.FromMinutes(2), token);
var startDate = DateOnly.FromDateTime(provider.GetUtcNow().DateTime);
provider.Advance(TimeSpan.FromDays(1));

var queueNames = new List<IBrokerQueue>();
await foreach (IBrokerQueue queueName in query.GetQueueNames(token))
while (!TestContext.CurrentContext.CancellationToken.IsCancellationRequested)
{
queueNames.Add(queueName);
}
await Task.Delay(TimeSpan.FromSeconds(5), TestContext.CurrentContext.CancellationToken);

IBrokerQueue queue = queueNames.Find(name => name.QueueName == $"{connectionString.QueueNamePrefix}{transportSettings.EndpointName}");
Assert.That(queue, Is.Not.Null);
var queueNames = new List<IBrokerQueue>();
await foreach (IBrokerQueue queueName in query.GetQueueNames(TestContext.CurrentContext.CancellationToken))
{
queueNames.Add(queueName);
}

long total = 0L;
IBrokerQueue queue = queueNames.Find(name => name.QueueName == $"{connectionString.QueueNamePrefix}{transportSettings.EndpointName}");

DateTime startDate = provider.GetUtcNow().DateTime;
provider.Advance(TimeSpan.FromDays(1));
await foreach (QueueThroughput queueThroughput in query.GetThroughputPerDay(queue, DateOnly.FromDateTime(startDate), token))
{
total += queueThroughput.TotalThroughput;
if (queue == null)
{
continue;
}

long total = 0L;

await foreach (QueueThroughput queueThroughput in query.GetThroughputPerDay(queue, startDate, TestContext.CurrentContext.CancellationToken))
{
total += queueThroughput.TotalThroughput;
}

if (total == numMessagesToIngest)
{
return;
}
}

Assert.That(total, Is.EqualTo(numMessagesToIngest));
Assert.Fail("Timeout waiting for expected throughput to be report");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

<ItemGroup>
<PackageReference Include="GitHubActionsTestLogger" />
<PackageReference Include="Microsoft.Extensions.Logging.Console" />
<PackageReference Include="Microsoft.Extensions.TimeProvider.Testing" />
<PackageReference Include="Microsoft.NET.Test.Sdk" />
<PackageReference Include="NServiceBus.AcceptanceTesting" />
Expand Down
63 changes: 36 additions & 27 deletions src/ServiceControl.Transports.SQS/AmazonSQSQuery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -195,53 +195,62 @@ class AwsHttpClientFactory : HttpClientFactory

public override async IAsyncEnumerable<QueueThroughput> GetThroughputPerDay(IBrokerQueue brokerQueue,
DateOnly startDate,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
[EnumeratorCancellation] CancellationToken cancellationToken)
{
var endDate = DateOnly.FromDateTime(timeProvider.GetUtcNow().DateTime).AddDays(-1);
var utcNow = timeProvider.GetUtcNow();
var endDate = DateOnly.FromDateTime(utcNow.DateTime).AddDays(-1); // Query date up to but not including today

var isBeforeStartDate = endDate < startDate;

if (endDate < startDate)
if (isBeforeStartDate)
{
logger.LogTrace("Skipping {Start} {End} {UtcNow}, ", startDate, endDate, utcNow);
yield break;
}

// Convert DATES that state up to but INCLUDING the TO value to a timestamp from X to Y EXCLUDING
// Example: 2025-01-01 to 2025-01-10 => 2025-01-01T00:00:00 to 2025-01-11T00:00:00

var queryStartUtc = startDate.ToDateTime(TimeOnly.MinValue, DateTimeKind.Utc);
var queryEndUtc = endDate
.AddDays(1) // Convert from INCLUDING to EXCLUDING, thus need to bump one day, using ToDateTime(TimeOnly.MaxValue) would be wrong
.ToDateTime(TimeOnly.MinValue, DateTimeKind.Utc);

logger.LogDebug("GetThroughputPerDay {QueueName} {UtcNow} {StartDate} {EndDate} {QueryStart} {QueryEnd}", brokerQueue.QueueName, utcNow, startDate, endDate, queryStartUtc, queryEndUtc);

const int SecondsInDay = 24 * 60 * 60;

var req = new GetMetricStatisticsRequest
{
Namespace = "AWS/SQS",
MetricName = "NumberOfMessagesDeleted",
StartTime = startDate.ToDateTime(TimeOnly.MinValue),
EndTime = endDate.ToDateTime(TimeOnly.MaxValue),
Period = 24 * 60 * 60, // 1 day
StartTime = queryStartUtc,
EndTime = queryEndUtc, // The value specified is exclusive; results include data points up to the specified time stamp.
Period = SecondsInDay,
Statistics = ["Sum"],
Dimensions = [
new Dimension { Name = "QueueName", Value = brokerQueue.QueueName }
Dimensions =
[
new Dimension
{
Name = "QueueName", Value = brokerQueue.QueueName
}
]
};

var resp = await cloudWatch!.GetMetricStatisticsAsync(req, cancellationToken);
var dataPoints = resp.Datapoints.ToDictionary(x => DateOnly.FromDateTime(x.Timestamp!.Value.Date), x => (long)(x.Sum ?? 0));

DateOnly currentDate = startDate;
var data = new Dictionary<DateOnly, QueueThroughput>();
while (currentDate <= endDate)
for (DateOnly currentDate = startDate; currentDate <= endDate; currentDate = currentDate.AddDays(1))
{
data.Add(currentDate, new QueueThroughput { TotalThroughput = 0, DateUTC = currentDate });
dataPoints.TryGetValue(currentDate, out var sum);

currentDate = currentDate.AddDays(1);
}
logger.LogTrace("Queue throughput {QueueName} {Date} {Total}", brokerQueue.QueueName, currentDate, sum);

foreach (var datapoint in resp.Datapoints ?? [])
{
// There is a bug in the AWS SDK. The timestamp is actually UTC time, eventhough the DateTime returned type says Local
// See https://github.com/aws/aws-sdk-net/issues/167
// So do not convert the timestamp to UTC time!
if (datapoint.Timestamp.HasValue)
yield return new QueueThroughput
{
data[DateOnly.FromDateTime(datapoint.Timestamp.Value)].TotalThroughput = (long)datapoint.Sum.GetValueOrDefault(0);
}
}

foreach (QueueThroughput queueThroughput in data.Values)
{
yield return queueThroughput;
TotalThroughput = sum,
DateUTC = currentDate
};
}
}

Expand Down