diff --git a/src/Directory.Packages.props b/src/Directory.Packages.props index dcdb33271a..e9cefcd8da 100644 --- a/src/Directory.Packages.props +++ b/src/Directory.Packages.props @@ -5,7 +5,7 @@ - + @@ -27,6 +27,7 @@ + diff --git a/src/ServiceControl.Transports.SQS.Tests/AmazonSQSQueryTests.cs b/src/ServiceControl.Transports.SQS.Tests/AmazonSQSQueryTests.cs index 0f537814bd..c993b61be8 100644 --- a/src/ServiceControl.Transports.SQS.Tests/AmazonSQSQueryTests.cs +++ b/src/ServiceControl.Transports.SQS.Tests/AmazonSQSQueryTests.cs @@ -6,7 +6,7 @@ namespace ServiceControl.Transport.Tests; using System.Linq; using System.Threading; using System.Threading.Tasks; -using Microsoft.Extensions.Logging.Abstractions; +using Microsoft.Extensions.Logging; using Microsoft.Extensions.Time.Testing; using NUnit.Framework; using Particular.Approvals; @@ -32,7 +32,9 @@ public void Initialise() MaxConcurrency = 1, EndpointName = Guid.NewGuid().ToString("N") }; - query = new AmazonSQSQuery(NullLogger.Instance, provider, transportSettings); + var loggerFactory = LoggerFactory.Create(builder => builder.AddSimpleConsole().SetMinimumLevel(LogLevel.Trace)); + var logger = loggerFactory.CreateLogger(); + query = new AmazonSQSQuery(logger, provider, transportSettings); } [Test] @@ -94,11 +96,9 @@ public async Task TestConnectionWithValidSettings() } [Test] + [CancelAfter(2 * 60 * 1000)] public async Task RunScenario() { - // We need to wait a bit of time, to ensure AWS metrics are retrievable - using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromMinutes(6)); - CancellationToken token = cancellationTokenSource.Token; const int numMessagesToIngest = 15; await CreateTestQueue(transportSettings.EndpointName); @@ -111,37 +111,52 @@ public async Task RunScenario() { dictionary.Add(AmazonSQSQuery.AmazonSQSSettings.AccessKey, connectionString.AccessKey); } + if (!string.IsNullOrEmpty(connectionString.SecretKey)) { dictionary.Add(AmazonSQSQuery.AmazonSQSSettings.SecretKey, connectionString.SecretKey); } + if (!string.IsNullOrEmpty(connectionString.Region)) { dictionary.Add(AmazonSQSQuery.AmazonSQSSettings.Region, connectionString.Region); } - query.Initialize(new ReadOnlyDictionary(dictionary)); + query.Initialize(dictionary.AsReadOnly()); - await Task.Delay(TimeSpan.FromMinutes(2), token); + var startDate = DateOnly.FromDateTime(provider.GetUtcNow().DateTime); + provider.Advance(TimeSpan.FromDays(1)); - var queueNames = new List(); - await foreach (IBrokerQueue queueName in query.GetQueueNames(token)) + while (!TestContext.CurrentContext.CancellationToken.IsCancellationRequested) { - queueNames.Add(queueName); - } + await Task.Delay(TimeSpan.FromSeconds(5), TestContext.CurrentContext.CancellationToken); - IBrokerQueue queue = queueNames.Find(name => name.QueueName == $"{connectionString.QueueNamePrefix}{transportSettings.EndpointName}"); - Assert.That(queue, Is.Not.Null); + var queueNames = new List(); + await foreach (IBrokerQueue queueName in query.GetQueueNames(TestContext.CurrentContext.CancellationToken)) + { + queueNames.Add(queueName); + } - long total = 0L; + IBrokerQueue queue = queueNames.Find(name => name.QueueName == $"{connectionString.QueueNamePrefix}{transportSettings.EndpointName}"); - DateTime startDate = provider.GetUtcNow().DateTime; - provider.Advance(TimeSpan.FromDays(1)); - await foreach (QueueThroughput queueThroughput in query.GetThroughputPerDay(queue, DateOnly.FromDateTime(startDate), token)) - { - total += queueThroughput.TotalThroughput; + if (queue == null) + { + continue; + } + + long total = 0L; + + await foreach (QueueThroughput queueThroughput in query.GetThroughputPerDay(queue, startDate, TestContext.CurrentContext.CancellationToken)) + { + total += queueThroughput.TotalThroughput; + } + + if (total == numMessagesToIngest) + { + return; + } } - Assert.That(total, Is.EqualTo(numMessagesToIngest)); + Assert.Fail("Timeout waiting for expected throughput to be report"); } } \ No newline at end of file diff --git a/src/ServiceControl.Transports.SQS.Tests/ServiceControl.Transports.SQS.Tests.csproj b/src/ServiceControl.Transports.SQS.Tests/ServiceControl.Transports.SQS.Tests.csproj index 655018e20c..2f2d4bb0e5 100644 --- a/src/ServiceControl.Transports.SQS.Tests/ServiceControl.Transports.SQS.Tests.csproj +++ b/src/ServiceControl.Transports.SQS.Tests/ServiceControl.Transports.SQS.Tests.csproj @@ -13,6 +13,7 @@ + diff --git a/src/ServiceControl.Transports.SQS/AmazonSQSQuery.cs b/src/ServiceControl.Transports.SQS/AmazonSQSQuery.cs index 0c54ca7f0b..c1d44ad09d 100644 --- a/src/ServiceControl.Transports.SQS/AmazonSQSQuery.cs +++ b/src/ServiceControl.Transports.SQS/AmazonSQSQuery.cs @@ -195,53 +195,62 @@ class AwsHttpClientFactory : HttpClientFactory public override async IAsyncEnumerable GetThroughputPerDay(IBrokerQueue brokerQueue, DateOnly startDate, - [EnumeratorCancellation] CancellationToken cancellationToken = default) + [EnumeratorCancellation] CancellationToken cancellationToken) { - var endDate = DateOnly.FromDateTime(timeProvider.GetUtcNow().DateTime).AddDays(-1); + var utcNow = timeProvider.GetUtcNow(); + var endDate = DateOnly.FromDateTime(utcNow.DateTime).AddDays(-1); // Query date up to but not including today + + var isBeforeStartDate = endDate < startDate; - if (endDate < startDate) + if (isBeforeStartDate) { + logger.LogTrace("Skipping {Start} {End} {UtcNow}, ", startDate, endDate, utcNow); yield break; } + // Convert DATES that state up to but INCLUDING the TO value to a timestamp from X to Y EXCLUDING + // Example: 2025-01-01 to 2025-01-10 => 2025-01-01T00:00:00 to 2025-01-11T00:00:00 + + var queryStartUtc = startDate.ToDateTime(TimeOnly.MinValue, DateTimeKind.Utc); + var queryEndUtc = endDate + .AddDays(1) // Convert from INCLUDING to EXCLUDING, thus need to bump one day, using ToDateTime(TimeOnly.MaxValue) would be wrong + .ToDateTime(TimeOnly.MinValue, DateTimeKind.Utc); + + logger.LogDebug("GetThroughputPerDay {QueueName} {UtcNow} {StartDate} {EndDate} {QueryStart} {QueryEnd}", brokerQueue.QueueName, utcNow, startDate, endDate, queryStartUtc, queryEndUtc); + + const int SecondsInDay = 24 * 60 * 60; + var req = new GetMetricStatisticsRequest { Namespace = "AWS/SQS", MetricName = "NumberOfMessagesDeleted", - StartTime = startDate.ToDateTime(TimeOnly.MinValue), - EndTime = endDate.ToDateTime(TimeOnly.MaxValue), - Period = 24 * 60 * 60, // 1 day + StartTime = queryStartUtc, + EndTime = queryEndUtc, // The value specified is exclusive; results include data points up to the specified time stamp. + Period = SecondsInDay, Statistics = ["Sum"], - Dimensions = [ - new Dimension { Name = "QueueName", Value = brokerQueue.QueueName } + Dimensions = + [ + new Dimension + { + Name = "QueueName", Value = brokerQueue.QueueName + } ] }; var resp = await cloudWatch!.GetMetricStatisticsAsync(req, cancellationToken); + var dataPoints = resp.Datapoints.ToDictionary(x => DateOnly.FromDateTime(x.Timestamp!.Value.Date), x => (long)(x.Sum ?? 0)); - DateOnly currentDate = startDate; - var data = new Dictionary(); - while (currentDate <= endDate) + for (DateOnly currentDate = startDate; currentDate <= endDate; currentDate = currentDate.AddDays(1)) { - data.Add(currentDate, new QueueThroughput { TotalThroughput = 0, DateUTC = currentDate }); + dataPoints.TryGetValue(currentDate, out var sum); - currentDate = currentDate.AddDays(1); - } + logger.LogTrace("Queue throughput {QueueName} {Date} {Total}", brokerQueue.QueueName, currentDate, sum); - foreach (var datapoint in resp.Datapoints ?? []) - { - // There is a bug in the AWS SDK. The timestamp is actually UTC time, eventhough the DateTime returned type says Local - // See https://github.com/aws/aws-sdk-net/issues/167 - // So do not convert the timestamp to UTC time! - if (datapoint.Timestamp.HasValue) + yield return new QueueThroughput { - data[DateOnly.FromDateTime(datapoint.Timestamp.Value)].TotalThroughput = (long)datapoint.Sum.GetValueOrDefault(0); - } - } - - foreach (QueueThroughput queueThroughput in data.Values) - { - yield return queueThroughput; + TotalThroughput = sum, + DateUTC = currentDate + }; } }