Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 37 additions & 26 deletions src/AppCommon/Commands/AzureServiceBusCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public static Command CreateCommand()
string[] queueNames;

public AzureServiceBusCommand(SharedOptions shared, string resourceId, string serviceBusDomain, string region, string metricsDomain)
: base(shared)
: base(shared)
{
azure = new AzureClient(resourceId, serviceBusDomain, region, metricsDomain, Out.WriteLine);
RunInfo.Add("AzureServiceBusNamespace", azure.FullyQualifiedNamespace);
Expand All @@ -84,8 +84,8 @@ protected override async Task<QueueDetails> GetData(CancellationToken cancellati
{
try
{
var endTime = DateOnly.FromDateTime(DateTime.UtcNow).AddDays(-1);
var startTime = endTime.AddDays(-90);
var endTime = DateOnly.FromDateTime(DateTime.UtcNow);
var startTime = endTime.AddDays(-90); // Azure Monitor only gives a data for a month back, but we ask for more just in case
var results = new List<QueueThroughput>();

azure.ResetConnectionQueue();
Expand All @@ -95,40 +95,51 @@ protected override async Task<QueueDetails> GetData(CancellationToken cancellati
{
var queueName = queueNames[i];

Out.WriteLine($"Gathering metrics for queue {i + 1}/{queueNames.Length}: {queueName}");
Out.Write($"Gathering metrics for queue {i + 1}/{queueNames.Length}: {queueName}");

var metricValues = (await azure.GetMetrics(queueName, startTime, endTime, cancellationToken)).OrderBy(m => m.TimeStamp).ToArray();

if (metricValues is not null)
{
var maxThroughput = metricValues.Select(timeEntry => timeEntry.Total).Max();
var maxThroughput = metricValues.Select(timeEntry => timeEntry.Total).Max();
var start = DateOnly.FromDateTime(metricValues.First().TimeStamp.UtcDateTime);
var end = DateOnly.FromDateTime(metricValues.Last().TimeStamp.UtcDateTime);

// Since we get 90 days of data, if there's no throughput in that amount of time, hard to legitimately call it an endpoint
if (maxThroughput is not null and not 0)
// If there's no throughput in that amount of time, hard to legitimately call it an endpoint
if (maxThroughput is not null and not 0)
{
var currentDate = start;
var data = new Dictionary<DateOnly, DailyThroughput>();
while (currentDate <= end)
{
var start = DateOnly.FromDateTime(metricValues.First().TimeStamp.UtcDateTime);
var end = DateOnly.FromDateTime(metricValues.Last().TimeStamp.UtcDateTime);
var currentDate = start;
var data = new Dictionary<DateOnly, DailyThroughput>();
while (currentDate <= end)
data.Add(currentDate, new DailyThroughput
{
data.Add(currentDate, new DailyThroughput { MessageCount = 0, DateUTC = currentDate });
MessageCount = 0,
DateUTC = currentDate
});

currentDate = currentDate.AddDays(1);
}
currentDate = currentDate.AddDays(1);
}

foreach (var metricValue in metricValues)
foreach (var metricValue in metricValues)
{
currentDate = DateOnly.FromDateTime(metricValue.TimeStamp.UtcDateTime);
data[currentDate] = new DailyThroughput
{
currentDate = DateOnly.FromDateTime(metricValue.TimeStamp.UtcDateTime);
data[currentDate] = new DailyThroughput { MessageCount = (long)(metricValue.Total ?? 0), DateUTC = currentDate };
}

results.Add(new QueueThroughput { QueueName = queueName, Throughput = (long?)maxThroughput, DailyThroughputFromBroker = [.. data.Values] });
MessageCount = (long)(metricValue.Total ?? 0),
DateUTC = currentDate
};
}
else

results.Add(new QueueThroughput
{
Out.WriteLine(" - No throughput detected in 90 days, ignoring");
}
QueueName = queueName,
Throughput = (long?)maxThroughput,
DailyThroughputFromBroker = [.. data.Values]
});
Out.WriteLine($" - Max daily throughput: {maxThroughput} ({start.ToShortDateString()} - {end.ToShortDateString()})");
}
else
{
Out.WriteLine($" - No throughput detected for the period {start.ToShortDateString()} - {end.ToShortDateString()}, ignoring");
}
}

Expand Down
46 changes: 37 additions & 9 deletions src/Query/AzureServiceBus/AzureClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@ public class AzureClient

Queue<AuthenticatedClientSet> connectionQueue;
AuthenticatedClientSet currentClients;
const string CompleteMessageMetricName = "CompleteMessage";
const string MicrosoftServicebusNamespacesMetricsNamespace = "Microsoft.ServiceBus/Namespaces";

public string FullyQualifiedNamespace { get; }

public AzureClient(string resourceId, string serviceBusDomain, string region, string metricsDomain, Action<string> log = null)
{
this.resourceId = ResourceIdentifier.Parse(resourceId);

this.log = log ?? new(msg => { });
this.log = log ?? (_ => { });

FullyQualifiedNamespace = $"{this.resourceId.Name}.{serviceBusDomain}";

Expand All @@ -43,9 +46,14 @@ IEnumerable<TokenCredential> CreateCredentials()
yield return new VisualStudioCredential();

// Don't really need this one to take 100s * 4 tries to finally time out
var opts = new TokenCredentialOptions();
opts.Retry.MaxRetries = 1;
opts.Retry.NetworkTimeout = TimeSpan.FromSeconds(10);
var opts = new TokenCredentialOptions
{
Retry =
{
MaxRetries = 1,
NetworkTimeout = TimeSpan.FromSeconds(10)
}
};
yield return new ManagedIdentityCredential(FullyQualifiedNamespace, opts);
}

Expand Down Expand Up @@ -108,8 +116,8 @@ public Task<IList<MetricValue>> GetMetrics(string queueName, DateOnly startTime,
{
var response = await currentClients.Metrics.QueryResourcesAsync(
[resourceId],
["CompleteMessage"],
"Microsoft.ServiceBus/Namespaces",
[CompleteMessageMetricName],
MicrosoftServicebusNamespacesMetricsNamespace,
new MetricsQueryResourcesOptions
{
Filter = $"EntityName eq '{queueName}'",
Expand All @@ -119,8 +127,28 @@ public Task<IList<MetricValue>> GetMetrics(string queueName, DateOnly startTime,
},
token).ConfigureAwait(false);

// Yeah, it's buried deep
return response.Value.Values.FirstOrDefault()?.Metrics.FirstOrDefault()?.TimeSeries.FirstOrDefault()?.Values ?? [];
var metricQueryResult = response.Value.Values.SingleOrDefault(mr => mr.Namespace == MicrosoftServicebusNamespacesMetricsNamespace);

if (metricQueryResult is null)
{
throw new Exception("No metrics query results returned for Microsoft.ServiceBus/Namespace");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
throw new Exception("No metrics query results returned for Microsoft.ServiceBus/Namespace");
throw new Exception($"No metrics query results returned for {MicrosoftServicebusNamespacesMetricsNamespace}");

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops, I had it to automerge, will apply it to master

}

var metricResult = metricQueryResult.GetMetricByName(CompleteMessageMetricName);

if (metricResult.Error.Message is not null)
{
throw new Exception($"Metrics query result for '{metricResult.Name}' failed: {metricResult.Error.Message}");
}

var timeSeries = metricResult.TimeSeries.SingleOrDefault();

if (timeSeries is null)
{
throw new Exception($"Metrics query result for '{metricResult.Name}' contained no time series");
}

return timeSeries.Values;
}
catch (Azure.RequestFailedException reqFailed) when (reqFailed.Message.Contains("ResourceGroupNotFound"))
{
Expand All @@ -136,7 +164,7 @@ public Task<string[]> GetQueueNames(CancellationToken cancellationToken = defaul
return GetDataWithCurrentCredentials(async token =>
{
var queueList = new List<string>();
await foreach (var queue in currentClients.ServiceBus.GetQueuesAsync(cancellationToken).WithCancellation(cancellationToken))
await foreach (var queue in currentClients.ServiceBus.GetQueuesAsync(token))
{
queueList.Add(queue.Name);
}
Expand Down