Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions src/Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
</PropertyGroup>
<ItemGroup Label="Versions for direct package references">
<PackageVersion Include="Autofac" Version="8.2.0" />
<PackageVersion Include="AWSSDK.CloudWatch" Version="3.7.402.63" />
<PackageVersion Include="AWSSDK.SecurityToken" Version="3.7.401.78" />
<PackageVersion Include="AWSSDK.CloudWatch" Version="4.0.1" />
<PackageVersion Include="AWSSDK.SecurityToken" Version="4.0.0.4" />
<PackageVersion Include="Azure.Identity" Version="1.13.2" />
<PackageVersion Include="Azure.Monitor.Query" Version="1.6.0" />
<PackageVersion Include="Azure.ResourceManager.ServiceBus" Version="1.1.0" />
Expand All @@ -31,7 +31,7 @@
<PackageVersion Include="NLog.Extensions.Logging" Version="5.4.0" />
<PackageVersion Include="NServiceBus" Version="9.2.7" />
<PackageVersion Include="NServiceBus.AcceptanceTesting" Version="9.2.7" />
<PackageVersion Include="NServiceBus.AmazonSQS" Version="7.3.0" />
<PackageVersion Include="NServiceBus.AmazonSQS" Version="7.4.0" />
<PackageVersion Include="NServiceBus.CustomChecks" Version="5.0.1" />
<PackageVersion Include="NServiceBus.Extensions.Hosting" Version="3.0.1" />
<PackageVersion Include="NServiceBus.Extensions.Logging" Version="3.0.1" />
Expand Down Expand Up @@ -91,4 +91,4 @@
<GlobalPackageReference Include="Microsoft.Build.CopyOnWrite" Version="1.0.334" />
<GlobalPackageReference Include="Particular.Packaging" Version="4.2.2" />
</ItemGroup>
</Project>
</Project>
26 changes: 20 additions & 6 deletions src/ServiceControl.Transports.SQS/AmazonSQSQuery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ namespace ServiceControl.Transports.SQS;
using Amazon.CloudWatch.Model;
using Amazon.Runtime;
using Amazon.Runtime.CredentialManagement;
using Amazon.Runtime.Credentials;
using Amazon.SQS;
using Amazon.SQS.Model;
using BrokerThroughput;
Expand All @@ -29,7 +30,7 @@ public class AmazonSQSQuery(ILogger<AmazonSQSQuery> logger, TimeProvider timePro
protected override void InitializeCore(ReadOnlyDictionary<string, string> settings)
{
var sqsConnectionString = new SQSTransportConnectionString(transportSettings.ConnectionString);
AWSCredentials credentials = FallbackCredentialsFactory.GetCredentials();
AWSCredentials credentials = DefaultAWSCredentialsIdentityResolver.GetCredentials();
RegionEndpoint? regionEndpoint = null;
if (settings.TryGetValue(AmazonSQSSettings.Profile, out string? profile))
{
Expand Down Expand Up @@ -91,9 +92,15 @@ protected override void InitializeCore(ReadOnlyDictionary<string, string> settin
}
}

bool IsValidAwsRegion(string region) => RegionEndpoint.EnumerableAllRegions.Any(r => r.SystemName.Equals(region, StringComparison.OrdinalIgnoreCase));

if (settings.TryGetValue(AmazonSQSSettings.Region, out string? region))
{
string? previousSetSystemName = regionEndpoint?.SystemName;
if (!IsValidAwsRegion(region))
{
throw new ArgumentException("Invalid region endpoint provided");
}
regionEndpoint = RegionEndpoint.GetBySystemName(region);

Diagnostics.Append($"Region set to \"{regionEndpoint.SystemName}\"");
Expand All @@ -108,6 +115,10 @@ protected override void InitializeCore(ReadOnlyDictionary<string, string> settin
{
if (sqsConnectionString.Region != null)
{
if (!IsValidAwsRegion(sqsConnectionString.Region))
{
throw new ArgumentException("Invalid region endpoint provided");
}
Comment on lines +118 to +121
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am unsure if we need this validation here, because i assume the connection string region would already be validated, @danielmarbach ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is validated by the SDK but not by us. It would throw a different exception though indicating an unknown name or host or something like that. This would at least keep the behavior consistent

regionEndpoint = RegionEndpoint.GetBySystemName(sqsConnectionString.Region);
Diagnostics.AppendLine(
$"Region not set, defaulted to using \"{regionEndpoint.SystemName}\" from the ConnectionString used by instance");
Expand Down Expand Up @@ -197,8 +208,8 @@ public override async IAsyncEnumerable<QueueThroughput> GetThroughputPerDay(IBro
{
Namespace = "AWS/SQS",
MetricName = "NumberOfMessagesDeleted",
StartTimeUtc = startDate.ToDateTime(TimeOnly.MinValue),
EndTimeUtc = endDate.ToDateTime(TimeOnly.MaxValue),
StartTime = startDate.ToDateTime(TimeOnly.MinValue),
EndTime = endDate.ToDateTime(TimeOnly.MaxValue),
Period = 24 * 60 * 60, // 1 day
Statistics = ["Sum"],
Dimensions = [
Expand All @@ -217,12 +228,15 @@ public override async IAsyncEnumerable<QueueThroughput> GetThroughputPerDay(IBro
currentDate = currentDate.AddDays(1);
}

foreach (var datapoint in resp.Datapoints)
foreach (var datapoint in resp.Datapoints ?? [])
{
// There is a bug in the AWS SDK. The timestamp is actually UTC time, eventhough the DateTime returned type says Local
// See https://github.com/aws/aws-sdk-net/issues/167
// So do not convert the timestamp to UTC time!
data[DateOnly.FromDateTime(datapoint.Timestamp)].TotalThroughput = (long)datapoint.Sum;
if (datapoint.Timestamp.HasValue)
{
data[DateOnly.FromDateTime(datapoint.Timestamp.Value)].TotalThroughput = (long)datapoint.Sum.GetValueOrDefault(0);
}
}

foreach (QueueThroughput queueThroughput in data.Values)
Expand All @@ -244,7 +258,7 @@ public override async IAsyncEnumerable<IBrokerQueue> GetQueueNames(
{
var response = await sqs!.ListQueuesAsync(request, cancellationToken);

foreach (var queue in response.QueueUrls.Select(url => url.Split('/')[4]))
foreach (var queue in (response.QueueUrls ?? []).Select(url => url.Split('/')[4]))
{
if (!queue.EndsWith("-delay.fifo", StringComparison.OrdinalIgnoreCase))
{
Expand Down
18 changes: 7 additions & 11 deletions src/ServiceControl.Transports.SQS/QueueAttributesRequestCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,8 @@
using Amazon.SQS;
using Amazon.SQS.Model;

class QueueAttributesRequestCache
class QueueAttributesRequestCache(IAmazonSQS sqsClient)
{
public QueueAttributesRequestCache(IAmazonSQS sqsClient)
{
cache = new ConcurrentDictionary<string, GetQueueAttributesRequest>();
this.sqsClient = sqsClient;
}

public async Task<GetQueueAttributesRequest> GetQueueAttributesRequest(string queueName, CancellationToken cancellationToken = default)
{
if (cache.TryGetValue(queueName, out var attReq))
Expand All @@ -23,8 +17,11 @@ public async Task<GetQueueAttributesRequest> GetQueueAttributesRequest(string qu

var queueUrl = await GetQueueUrl(queueName, cancellationToken);

attReq = new GetQueueAttributesRequest { QueueUrl = queueUrl };
attReq.AttributeNames.Add("ApproximateNumberOfMessages");
attReq = new GetQueueAttributesRequest
{
QueueUrl = queueUrl,
AttributeNames = ["ApproximateNumberOfMessages"]
};

cache[queueName] = attReq;

Expand All @@ -37,7 +34,6 @@ async Task<string> GetQueueUrl(string queueName, CancellationToken cancellationT
return response.QueueUrl;
}

ConcurrentDictionary<string, GetQueueAttributesRequest> cache;
IAmazonSQS sqsClient;
readonly ConcurrentDictionary<string, GetQueueAttributesRequest> cache = new();
}
}