diff --git a/src/Directory.Packages.props b/src/Directory.Packages.props
index 736499ad24..df1811d297 100644
--- a/src/Directory.Packages.props
+++ b/src/Directory.Packages.props
@@ -8,7 +8,7 @@
-
+
@@ -17,8 +17,8 @@
-
-
+
+
@@ -42,7 +42,7 @@
-
+
diff --git a/src/ServiceControl.Audit.Persistence.RavenDB/UnitOfWork/RavenAuditIngestionUnitOfWork.cs b/src/ServiceControl.Audit.Persistence.RavenDB/UnitOfWork/RavenAuditIngestionUnitOfWork.cs
index abc554152a..eb19562dbc 100644
--- a/src/ServiceControl.Audit.Persistence.RavenDB/UnitOfWork/RavenAuditIngestionUnitOfWork.cs
+++ b/src/ServiceControl.Audit.Persistence.RavenDB/UnitOfWork/RavenAuditIngestionUnitOfWork.cs
@@ -35,7 +35,7 @@ public async Task RecordProcessedMessage(ProcessedMessage processedMessage, Read
if (!body.IsEmpty)
{
await using var stream = new ReadOnlyStream(body);
- var contentType = processedMessage.Headers.GetValueOrDefault(Headers.ContentType, "text/xml");
+ var contentType = processedMessage.Headers.GetValueOrDefault(Headers.ContentType, "text/plain");
await bodyStorage.Store(processedMessage.Id, contentType, body.Length, stream, cancellationToken);
}
diff --git a/src/ServiceControl.Audit.Persistence.Tests/AuditTests.cs b/src/ServiceControl.Audit.Persistence.Tests/AuditTests.cs
index 4d89044420..9dcffaee85 100644
--- a/src/ServiceControl.Audit.Persistence.Tests/AuditTests.cs
+++ b/src/ServiceControl.Audit.Persistence.Tests/AuditTests.cs
@@ -80,7 +80,7 @@ await IngestProcessedMessagesAudits(
[Test]
public async Task Can_roundtrip_message_body()
{
- string expectedContentType = "text/xml";
+ string expectedContentType = "text/plain";
var unitOfWork = await StartAuditUnitOfWork(1);
var body = new byte[100];
diff --git a/src/ServiceControl.Audit.Persistence/BodyStorageEnricher.cs b/src/ServiceControl.Audit.Persistence/BodyStorageEnricher.cs
index c7794e30ba..be9e133bf8 100644
--- a/src/ServiceControl.Audit.Persistence/BodyStorageEnricher.cs
+++ b/src/ServiceControl.Audit.Persistence/BodyStorageEnricher.cs
@@ -21,7 +21,7 @@ public async ValueTask StoreAuditMessageBody(ReadOnlyMemory body, Processe
return;
}
- var contentType = GetContentType(processedMessage.Headers, "text/xml");
+ var contentType = GetContentType(processedMessage.Headers, "text/plain");
processedMessage.MessageMetadata.Add("ContentType", contentType);
var stored = await TryStoreBody(body, processedMessage, bodySize, contentType, cancellationToken);
diff --git a/src/ServiceControl.Audit.UnitTests/BodyStorage/BodyStorageEnricherTests.cs b/src/ServiceControl.Audit.UnitTests/BodyStorage/BodyStorageEnricherTests.cs
index 87e1bbe9db..273b4f80fb 100644
--- a/src/ServiceControl.Audit.UnitTests/BodyStorage/BodyStorageEnricherTests.cs
+++ b/src/ServiceControl.Audit.UnitTests/BodyStorage/BodyStorageEnricherTests.cs
@@ -92,7 +92,7 @@ public async Task Should_store_body_in_metadata_when_below_large_object_heap_and
[Headers.MessageId] = "someid",
["ServiceControl.Retry.UniqueMessageId"] = "someid",
[Headers.ProcessingEndpoint] = "someendpoint",
- [Headers.ContentType] = "text/xml"
+ [Headers.ContentType] = "text/plain"
};
var message = new ProcessedMessage(headers, metadata);
diff --git a/src/ServiceControl.Persistence.RavenDB/UnitOfWork/RavenRecoverabilityIngestionUnitOfWork.cs b/src/ServiceControl.Persistence.RavenDB/UnitOfWork/RavenRecoverabilityIngestionUnitOfWork.cs
index cf6ee57a13..2351e78e3e 100644
--- a/src/ServiceControl.Persistence.RavenDB/UnitOfWork/RavenRecoverabilityIngestionUnitOfWork.cs
+++ b/src/ServiceControl.Persistence.RavenDB/UnitOfWork/RavenRecoverabilityIngestionUnitOfWork.cs
@@ -33,7 +33,7 @@ public Task RecordFailedProcessingAttempt(
List groups)
{
var uniqueMessageId = context.Headers.UniqueId();
- var contentType = GetContentType(context.Headers, "text/xml");
+ var contentType = GetContentType(context.Headers, "text/plain");
var bodySize = context.Body.Length;
processingAttempt.MessageMetadata.Add("ContentType", contentType);
diff --git a/src/ServiceControl.Transports.ASBS/AzureQuery.cs b/src/ServiceControl.Transports.ASBS/AzureQuery.cs
index 041b2bc246..3007ed7374 100644
--- a/src/ServiceControl.Transports.ASBS/AzureQuery.cs
+++ b/src/ServiceControl.Transports.ASBS/AzureQuery.cs
@@ -12,8 +12,8 @@ namespace ServiceControl.Transports.ASBS;
using Azure.Core;
using Azure.Core.Pipeline;
using Azure.Identity;
-using Azure.Monitor.Query;
-using Azure.Monitor.Query.Models;
+using Azure.Monitor.Query.Metrics;
+using Azure.Monitor.Query.Metrics.Models;
using Azure.ResourceManager;
using Azure.ResourceManager.Resources;
using Azure.ResourceManager.ServiceBus;
@@ -26,10 +26,12 @@ public class AzureQuery(ILogger logger, TimeProvider timeProvider, T
: BrokerThroughputQuery(logger, "AzureServiceBus")
{
string serviceBusName = string.Empty;
- MetricsQueryClient? client;
ArmClient? armClient;
- string? resourceId;
+ TokenCredential? credential;
+ ResourceIdentifier? resourceId;
ArmEnvironment armEnvironment;
+ MetricsClientAudience metricsClientAudience;
+ MetricsClient? metricsClient;
protected override void InitializeCore(ReadOnlyDictionary settings)
{
@@ -102,7 +104,7 @@ protected override void InitializeCore(ReadOnlyDictionary settin
Diagnostics.AppendLine("Client secret set");
}
- (armEnvironment, var metricsQueryAudience) = GetEnvironment();
+ (armEnvironment, metricsClientAudience) = GetEnvironment();
if (managementUrl == null)
{
@@ -118,28 +120,17 @@ protected override void InitializeCore(ReadOnlyDictionary settin
return;
}
- TokenCredential clientCredentials;
if (connectionSettings.AuthenticationMethod is TokenCredentialAuthentication tokenCredentialAuthentication)
{
Diagnostics.AppendLine("Attempting to use managed identity");
- clientCredentials = tokenCredentialAuthentication.Credential;
+ credential = tokenCredentialAuthentication.Credential;
}
else
{
- clientCredentials = new ClientSecretCredential(tenantId, clientId, clientSecret);
+ credential = new ClientSecretCredential(tenantId, clientId, clientSecret);
}
- client = new MetricsQueryClient(armEnvironment.Endpoint, clientCredentials,
- new MetricsQueryClientOptions
- {
- Audience = metricsQueryAudience,
- Transport = new HttpClientTransport(
- new HttpClient(new SocketsHttpHandler
- {
- PooledConnectionIdleTimeout = TimeSpan.FromMinutes(2)
- }))
- });
- armClient = new ArmClient(clientCredentials, subscriptionId,
+ armClient = new ArmClient(credential, subscriptionId,
new ArmClientOptions
{
Environment = armEnvironment,
@@ -152,31 +143,31 @@ protected override void InitializeCore(ReadOnlyDictionary settin
return;
- (ArmEnvironment armEnvironment, MetricsQueryAudience metricsQueryAudience) GetEnvironment()
+ (ArmEnvironment armEnvironment, MetricsClientAudience metricsClientAudience) GetEnvironment()
{
if (managementUrlParsed == null)
{
- return (ArmEnvironment.AzurePublicCloud, MetricsQueryAudience.AzurePublicCloud);
+ return (ArmEnvironment.AzurePublicCloud, MetricsClientAudience.AzurePublicCloud);
}
if (managementUrlParsed == ArmEnvironment.AzurePublicCloud.Endpoint)
{
- return (ArmEnvironment.AzurePublicCloud, MetricsQueryAudience.AzurePublicCloud);
+ return (ArmEnvironment.AzurePublicCloud, MetricsClientAudience.AzurePublicCloud);
}
if (managementUrlParsed == ArmEnvironment.AzureChina.Endpoint)
{
- return (ArmEnvironment.AzureChina, MetricsQueryAudience.AzureChina);
+ return (ArmEnvironment.AzureChina, MetricsClientAudience.AzureChina);
}
if (managementUrlParsed == ArmEnvironment.AzureGermany.Endpoint)
{
- return (ArmEnvironment.AzureGermany, MetricsQueryAudience.AzurePublicCloud);
+ return (ArmEnvironment.AzureGermany, MetricsClientAudience.AzurePublicCloud);
}
if (managementUrlParsed == ArmEnvironment.AzureGovernment.Endpoint)
{
- return (ArmEnvironment.AzureGovernment, MetricsQueryAudience.AzureGovernment);
+ return (ArmEnvironment.AzureGovernment, MetricsClientAudience.AzureGovernment);
}
string options = string.Join(", ",
@@ -187,7 +178,7 @@ protected override void InitializeCore(ReadOnlyDictionary settin
}.Select(armEnvironment => $"\"{armEnvironment.Endpoint}\""));
InitialiseErrors.Add($"Management url configuration is invalid, available options are {options}");
- return (ArmEnvironment.AzurePublicCloud, MetricsQueryAudience.AzurePublicCloud);
+ return (ArmEnvironment.AzurePublicCloud, MetricsClientAudience.AzurePublicCloud);
}
}
@@ -229,7 +220,6 @@ public override async IAsyncEnumerable GetThroughputPerDay(IBro
while (currentDate <= endDate)
{
data.Add(currentDate, new QueueThroughput { TotalThroughput = 0, DateUTC = currentDate });
-
currentDate = currentDate.AddDays(1);
}
@@ -244,23 +234,67 @@ public override async IAsyncEnumerable GetThroughputPerDay(IBro
}
}
+ async Task InitializeMetricsClient(CancellationToken cancellationToken = default)
+ {
+ if (resourceId is null || armClient is null || credential is null)
+ {
+ throw new InvalidOperationException("AzureQuery has not been initialized correctly.");
+ }
+
+ var serviceBusNamespaceResource = await armClient
+ .GetServiceBusNamespaceResource(resourceId).GetAsync(cancellationToken)
+ ?? throw new Exception($"Could not find ServiceBus with resource Id: \"{resourceId}\"");
+
+ // Determine the region of the namespace
+ var regionName = serviceBusNamespaceResource.Value.Data.Location.Name;
+
+ // Build the regional Azure Monitor Metrics endpoint from the audience
+ var metricsEndpoint = BuildMetricsEndpoint(metricsClientAudience, regionName);
+
+ // CreateNewOnMetadataUpdateAttribute the MetricsClient for this namespace
+ return new MetricsClient(
+ metricsEndpoint,
+ credential!,
+ new MetricsClientOptions
+ {
+ Audience = metricsClientAudience,
+ Transport = new HttpClientTransport(
+ new HttpClient(new SocketsHttpHandler
+ {
+ PooledConnectionIdleTimeout = TimeSpan.FromMinutes(2)
+ }))
+ });
+ }
+
+ static Uri BuildMetricsEndpoint(MetricsClientAudience audience, string regionName)
+ {
+ var region = regionName.ToLowerInvariant();
+ var builder = new UriBuilder(audience.ToString());
+ builder.Host = $"{region}.{builder.Host}";
+ return builder.Uri;
+ }
+
async Task> GetMetrics(string queueName, DateOnly startTime, DateOnly endTime,
CancellationToken cancellationToken = default)
{
- var response = await client!.QueryResourceAsync(resourceId,
- new[] { "CompleteMessage" },
- new MetricsQueryOptions
+ metricsClient ??= await InitializeMetricsClient(cancellationToken);
+
+ var response = await metricsClient.QueryResourcesAsync(
+ [resourceId!],
+ ["CompleteMessage"],
+ "Microsoft.ServiceBus/namespaces",
+ new MetricsQueryResourcesOptions
{
Filter = $"EntityName eq '{queueName}'",
- TimeRange = new QueryTimeRange(startTime.ToDateTime(TimeOnly.MinValue, DateTimeKind.Utc), endTime.ToDateTime(TimeOnly.MaxValue, DateTimeKind.Utc)),
+ TimeRange = new MetricsQueryTimeRange(startTime.ToDateTime(TimeOnly.MinValue, DateTimeKind.Utc), endTime.ToDateTime(TimeOnly.MaxValue, DateTimeKind.Utc)),
Granularity = TimeSpan.FromDays(1)
},
cancellationToken);
var metricValues =
- response.Value.Metrics.FirstOrDefault()?.TimeSeries.FirstOrDefault()?.Values ?? [];
+ response.Value.Values.FirstOrDefault()?.Metrics.FirstOrDefault()?.TimeSeries.FirstOrDefault()?.Values ?? [];
- return metricValues;
+ return metricValues.AsReadOnly();
}
public override async IAsyncEnumerable GetQueueNames(
@@ -269,15 +303,14 @@ public override async IAsyncEnumerable GetQueueNames(
var validNamespaces = await GetValidNamespaceNames(cancellationToken);
SubscriptionResource? subscription = await armClient!.GetDefaultSubscriptionAsync(cancellationToken);
- var namespaces =
- subscription.GetServiceBusNamespacesAsync(cancellationToken);
+ var namespaces = subscription.GetServiceBusNamespacesAsync(cancellationToken);
- await foreach (var serviceBusNamespaceResource in namespaces.WithCancellation(
- cancellationToken))
+ await foreach (var serviceBusNamespaceResource in namespaces.WithCancellation(cancellationToken))
{
if (validNamespaces.Contains(serviceBusNamespaceResource.Data.Name))
{
resourceId = serviceBusNamespaceResource.Id;
+
await foreach (var queue in serviceBusNamespaceResource.GetServiceBusQueues()
.WithCancellation(cancellationToken))
{
@@ -372,4 +405,4 @@ public static class AzureServiceBusSettings
public static readonly string ManagementUrl = "ASB/ManagementUrl";
public static readonly string ManagementUrlDescription = "Azure management URL";
}
-}
\ No newline at end of file
+}
diff --git a/src/ServiceControl.Transports.ASBS/ServiceControl.Transports.ASBS.csproj b/src/ServiceControl.Transports.ASBS/ServiceControl.Transports.ASBS.csproj
index ef7f0fd2f5..a338339ece 100644
--- a/src/ServiceControl.Transports.ASBS/ServiceControl.Transports.ASBS.csproj
+++ b/src/ServiceControl.Transports.ASBS/ServiceControl.Transports.ASBS.csproj
@@ -12,7 +12,7 @@
-
+