diff --git a/src/Directory.Packages.props b/src/Directory.Packages.props index 736499ad24..df1811d297 100644 --- a/src/Directory.Packages.props +++ b/src/Directory.Packages.props @@ -8,7 +8,7 @@ - + @@ -17,8 +17,8 @@ - - + + @@ -42,7 +42,7 @@ - + diff --git a/src/ServiceControl.Audit.Persistence.RavenDB/UnitOfWork/RavenAuditIngestionUnitOfWork.cs b/src/ServiceControl.Audit.Persistence.RavenDB/UnitOfWork/RavenAuditIngestionUnitOfWork.cs index abc554152a..eb19562dbc 100644 --- a/src/ServiceControl.Audit.Persistence.RavenDB/UnitOfWork/RavenAuditIngestionUnitOfWork.cs +++ b/src/ServiceControl.Audit.Persistence.RavenDB/UnitOfWork/RavenAuditIngestionUnitOfWork.cs @@ -35,7 +35,7 @@ public async Task RecordProcessedMessage(ProcessedMessage processedMessage, Read if (!body.IsEmpty) { await using var stream = new ReadOnlyStream(body); - var contentType = processedMessage.Headers.GetValueOrDefault(Headers.ContentType, "text/xml"); + var contentType = processedMessage.Headers.GetValueOrDefault(Headers.ContentType, "text/plain"); await bodyStorage.Store(processedMessage.Id, contentType, body.Length, stream, cancellationToken); } diff --git a/src/ServiceControl.Audit.Persistence.Tests/AuditTests.cs b/src/ServiceControl.Audit.Persistence.Tests/AuditTests.cs index 4d89044420..9dcffaee85 100644 --- a/src/ServiceControl.Audit.Persistence.Tests/AuditTests.cs +++ b/src/ServiceControl.Audit.Persistence.Tests/AuditTests.cs @@ -80,7 +80,7 @@ await IngestProcessedMessagesAudits( [Test] public async Task Can_roundtrip_message_body() { - string expectedContentType = "text/xml"; + string expectedContentType = "text/plain"; var unitOfWork = await StartAuditUnitOfWork(1); var body = new byte[100]; diff --git a/src/ServiceControl.Audit.Persistence/BodyStorageEnricher.cs b/src/ServiceControl.Audit.Persistence/BodyStorageEnricher.cs index c7794e30ba..be9e133bf8 100644 --- a/src/ServiceControl.Audit.Persistence/BodyStorageEnricher.cs +++ b/src/ServiceControl.Audit.Persistence/BodyStorageEnricher.cs @@ -21,7 +21,7 @@ public async ValueTask StoreAuditMessageBody(ReadOnlyMemory body, Processe return; } - var contentType = GetContentType(processedMessage.Headers, "text/xml"); + var contentType = GetContentType(processedMessage.Headers, "text/plain"); processedMessage.MessageMetadata.Add("ContentType", contentType); var stored = await TryStoreBody(body, processedMessage, bodySize, contentType, cancellationToken); diff --git a/src/ServiceControl.Audit.UnitTests/BodyStorage/BodyStorageEnricherTests.cs b/src/ServiceControl.Audit.UnitTests/BodyStorage/BodyStorageEnricherTests.cs index 87e1bbe9db..273b4f80fb 100644 --- a/src/ServiceControl.Audit.UnitTests/BodyStorage/BodyStorageEnricherTests.cs +++ b/src/ServiceControl.Audit.UnitTests/BodyStorage/BodyStorageEnricherTests.cs @@ -92,7 +92,7 @@ public async Task Should_store_body_in_metadata_when_below_large_object_heap_and [Headers.MessageId] = "someid", ["ServiceControl.Retry.UniqueMessageId"] = "someid", [Headers.ProcessingEndpoint] = "someendpoint", - [Headers.ContentType] = "text/xml" + [Headers.ContentType] = "text/plain" }; var message = new ProcessedMessage(headers, metadata); diff --git a/src/ServiceControl.Persistence.RavenDB/UnitOfWork/RavenRecoverabilityIngestionUnitOfWork.cs b/src/ServiceControl.Persistence.RavenDB/UnitOfWork/RavenRecoverabilityIngestionUnitOfWork.cs index cf6ee57a13..2351e78e3e 100644 --- a/src/ServiceControl.Persistence.RavenDB/UnitOfWork/RavenRecoverabilityIngestionUnitOfWork.cs +++ b/src/ServiceControl.Persistence.RavenDB/UnitOfWork/RavenRecoverabilityIngestionUnitOfWork.cs @@ -33,7 +33,7 @@ public Task RecordFailedProcessingAttempt( List groups) { var uniqueMessageId = context.Headers.UniqueId(); - var contentType = GetContentType(context.Headers, "text/xml"); + var contentType = GetContentType(context.Headers, "text/plain"); var bodySize = context.Body.Length; processingAttempt.MessageMetadata.Add("ContentType", contentType); diff --git a/src/ServiceControl.Transports.ASBS/AzureQuery.cs b/src/ServiceControl.Transports.ASBS/AzureQuery.cs index 041b2bc246..3007ed7374 100644 --- a/src/ServiceControl.Transports.ASBS/AzureQuery.cs +++ b/src/ServiceControl.Transports.ASBS/AzureQuery.cs @@ -12,8 +12,8 @@ namespace ServiceControl.Transports.ASBS; using Azure.Core; using Azure.Core.Pipeline; using Azure.Identity; -using Azure.Monitor.Query; -using Azure.Monitor.Query.Models; +using Azure.Monitor.Query.Metrics; +using Azure.Monitor.Query.Metrics.Models; using Azure.ResourceManager; using Azure.ResourceManager.Resources; using Azure.ResourceManager.ServiceBus; @@ -26,10 +26,12 @@ public class AzureQuery(ILogger logger, TimeProvider timeProvider, T : BrokerThroughputQuery(logger, "AzureServiceBus") { string serviceBusName = string.Empty; - MetricsQueryClient? client; ArmClient? armClient; - string? resourceId; + TokenCredential? credential; + ResourceIdentifier? resourceId; ArmEnvironment armEnvironment; + MetricsClientAudience metricsClientAudience; + MetricsClient? metricsClient; protected override void InitializeCore(ReadOnlyDictionary settings) { @@ -102,7 +104,7 @@ protected override void InitializeCore(ReadOnlyDictionary settin Diagnostics.AppendLine("Client secret set"); } - (armEnvironment, var metricsQueryAudience) = GetEnvironment(); + (armEnvironment, metricsClientAudience) = GetEnvironment(); if (managementUrl == null) { @@ -118,28 +120,17 @@ protected override void InitializeCore(ReadOnlyDictionary settin return; } - TokenCredential clientCredentials; if (connectionSettings.AuthenticationMethod is TokenCredentialAuthentication tokenCredentialAuthentication) { Diagnostics.AppendLine("Attempting to use managed identity"); - clientCredentials = tokenCredentialAuthentication.Credential; + credential = tokenCredentialAuthentication.Credential; } else { - clientCredentials = new ClientSecretCredential(tenantId, clientId, clientSecret); + credential = new ClientSecretCredential(tenantId, clientId, clientSecret); } - client = new MetricsQueryClient(armEnvironment.Endpoint, clientCredentials, - new MetricsQueryClientOptions - { - Audience = metricsQueryAudience, - Transport = new HttpClientTransport( - new HttpClient(new SocketsHttpHandler - { - PooledConnectionIdleTimeout = TimeSpan.FromMinutes(2) - })) - }); - armClient = new ArmClient(clientCredentials, subscriptionId, + armClient = new ArmClient(credential, subscriptionId, new ArmClientOptions { Environment = armEnvironment, @@ -152,31 +143,31 @@ protected override void InitializeCore(ReadOnlyDictionary settin return; - (ArmEnvironment armEnvironment, MetricsQueryAudience metricsQueryAudience) GetEnvironment() + (ArmEnvironment armEnvironment, MetricsClientAudience metricsClientAudience) GetEnvironment() { if (managementUrlParsed == null) { - return (ArmEnvironment.AzurePublicCloud, MetricsQueryAudience.AzurePublicCloud); + return (ArmEnvironment.AzurePublicCloud, MetricsClientAudience.AzurePublicCloud); } if (managementUrlParsed == ArmEnvironment.AzurePublicCloud.Endpoint) { - return (ArmEnvironment.AzurePublicCloud, MetricsQueryAudience.AzurePublicCloud); + return (ArmEnvironment.AzurePublicCloud, MetricsClientAudience.AzurePublicCloud); } if (managementUrlParsed == ArmEnvironment.AzureChina.Endpoint) { - return (ArmEnvironment.AzureChina, MetricsQueryAudience.AzureChina); + return (ArmEnvironment.AzureChina, MetricsClientAudience.AzureChina); } if (managementUrlParsed == ArmEnvironment.AzureGermany.Endpoint) { - return (ArmEnvironment.AzureGermany, MetricsQueryAudience.AzurePublicCloud); + return (ArmEnvironment.AzureGermany, MetricsClientAudience.AzurePublicCloud); } if (managementUrlParsed == ArmEnvironment.AzureGovernment.Endpoint) { - return (ArmEnvironment.AzureGovernment, MetricsQueryAudience.AzureGovernment); + return (ArmEnvironment.AzureGovernment, MetricsClientAudience.AzureGovernment); } string options = string.Join(", ", @@ -187,7 +178,7 @@ protected override void InitializeCore(ReadOnlyDictionary settin }.Select(armEnvironment => $"\"{armEnvironment.Endpoint}\"")); InitialiseErrors.Add($"Management url configuration is invalid, available options are {options}"); - return (ArmEnvironment.AzurePublicCloud, MetricsQueryAudience.AzurePublicCloud); + return (ArmEnvironment.AzurePublicCloud, MetricsClientAudience.AzurePublicCloud); } } @@ -229,7 +220,6 @@ public override async IAsyncEnumerable GetThroughputPerDay(IBro while (currentDate <= endDate) { data.Add(currentDate, new QueueThroughput { TotalThroughput = 0, DateUTC = currentDate }); - currentDate = currentDate.AddDays(1); } @@ -244,23 +234,67 @@ public override async IAsyncEnumerable GetThroughputPerDay(IBro } } + async Task InitializeMetricsClient(CancellationToken cancellationToken = default) + { + if (resourceId is null || armClient is null || credential is null) + { + throw new InvalidOperationException("AzureQuery has not been initialized correctly."); + } + + var serviceBusNamespaceResource = await armClient + .GetServiceBusNamespaceResource(resourceId).GetAsync(cancellationToken) + ?? throw new Exception($"Could not find ServiceBus with resource Id: \"{resourceId}\""); + + // Determine the region of the namespace + var regionName = serviceBusNamespaceResource.Value.Data.Location.Name; + + // Build the regional Azure Monitor Metrics endpoint from the audience + var metricsEndpoint = BuildMetricsEndpoint(metricsClientAudience, regionName); + + // CreateNewOnMetadataUpdateAttribute the MetricsClient for this namespace + return new MetricsClient( + metricsEndpoint, + credential!, + new MetricsClientOptions + { + Audience = metricsClientAudience, + Transport = new HttpClientTransport( + new HttpClient(new SocketsHttpHandler + { + PooledConnectionIdleTimeout = TimeSpan.FromMinutes(2) + })) + }); + } + + static Uri BuildMetricsEndpoint(MetricsClientAudience audience, string regionName) + { + var region = regionName.ToLowerInvariant(); + var builder = new UriBuilder(audience.ToString()); + builder.Host = $"{region}.{builder.Host}"; + return builder.Uri; + } + async Task> GetMetrics(string queueName, DateOnly startTime, DateOnly endTime, CancellationToken cancellationToken = default) { - var response = await client!.QueryResourceAsync(resourceId, - new[] { "CompleteMessage" }, - new MetricsQueryOptions + metricsClient ??= await InitializeMetricsClient(cancellationToken); + + var response = await metricsClient.QueryResourcesAsync( + [resourceId!], + ["CompleteMessage"], + "Microsoft.ServiceBus/namespaces", + new MetricsQueryResourcesOptions { Filter = $"EntityName eq '{queueName}'", - TimeRange = new QueryTimeRange(startTime.ToDateTime(TimeOnly.MinValue, DateTimeKind.Utc), endTime.ToDateTime(TimeOnly.MaxValue, DateTimeKind.Utc)), + TimeRange = new MetricsQueryTimeRange(startTime.ToDateTime(TimeOnly.MinValue, DateTimeKind.Utc), endTime.ToDateTime(TimeOnly.MaxValue, DateTimeKind.Utc)), Granularity = TimeSpan.FromDays(1) }, cancellationToken); var metricValues = - response.Value.Metrics.FirstOrDefault()?.TimeSeries.FirstOrDefault()?.Values ?? []; + response.Value.Values.FirstOrDefault()?.Metrics.FirstOrDefault()?.TimeSeries.FirstOrDefault()?.Values ?? []; - return metricValues; + return metricValues.AsReadOnly(); } public override async IAsyncEnumerable GetQueueNames( @@ -269,15 +303,14 @@ public override async IAsyncEnumerable GetQueueNames( var validNamespaces = await GetValidNamespaceNames(cancellationToken); SubscriptionResource? subscription = await armClient!.GetDefaultSubscriptionAsync(cancellationToken); - var namespaces = - subscription.GetServiceBusNamespacesAsync(cancellationToken); + var namespaces = subscription.GetServiceBusNamespacesAsync(cancellationToken); - await foreach (var serviceBusNamespaceResource in namespaces.WithCancellation( - cancellationToken)) + await foreach (var serviceBusNamespaceResource in namespaces.WithCancellation(cancellationToken)) { if (validNamespaces.Contains(serviceBusNamespaceResource.Data.Name)) { resourceId = serviceBusNamespaceResource.Id; + await foreach (var queue in serviceBusNamespaceResource.GetServiceBusQueues() .WithCancellation(cancellationToken)) { @@ -372,4 +405,4 @@ public static class AzureServiceBusSettings public static readonly string ManagementUrl = "ASB/ManagementUrl"; public static readonly string ManagementUrlDescription = "Azure management URL"; } -} \ No newline at end of file +} diff --git a/src/ServiceControl.Transports.ASBS/ServiceControl.Transports.ASBS.csproj b/src/ServiceControl.Transports.ASBS/ServiceControl.Transports.ASBS.csproj index ef7f0fd2f5..a338339ece 100644 --- a/src/ServiceControl.Transports.ASBS/ServiceControl.Transports.ASBS.csproj +++ b/src/ServiceControl.Transports.ASBS/ServiceControl.Transports.ASBS.csproj @@ -12,7 +12,7 @@ - +