diff --git a/.reposync.yml b/.reposync.yml
index 05ec9e3185..8b13789179 100644
--- a/.reposync.yml
+++ b/.reposync.yml
@@ -1,2 +1 @@
-exclusions:
-- src/NServiceBus.snk
+
diff --git a/src/Directory.Packages.props b/src/Directory.Packages.props
index cfe0a35952..bc50b29325 100644
--- a/src/Directory.Packages.props
+++ b/src/Directory.Packages.props
@@ -38,7 +38,7 @@
-
+
diff --git a/src/NServiceBus.snk b/src/NServiceBus.snk
new file mode 100644
index 0000000000..6fa7ddec10
Binary files /dev/null and b/src/NServiceBus.snk differ
diff --git a/src/ServiceControl.Config/Commands/ScmuCommandChecks.cs b/src/ServiceControl.Config/Commands/ScmuCommandChecks.cs
index b89291e01c..b21e0b37fd 100644
--- a/src/ServiceControl.Config/Commands/ScmuCommandChecks.cs
+++ b/src/ServiceControl.Config/Commands/ScmuCommandChecks.cs
@@ -2,6 +2,7 @@
{
using System;
using System.Diagnostics;
+ using System.Text;
using System.Threading.Tasks;
using ServiceControl.Config.Framework;
using ServiceControlInstaller.Engine;
@@ -21,12 +22,19 @@ protected override async Task PromptForRabbitMqCheck(bool isUpgrade)
{
var title = isUpgrade ? "UPGRADE WARNING" : "INSTALL WARNING";
var beforeWhat = isUpgrade ? "upgrading" : "installing";
- var message = $"ServiceControl version {Constants.CurrentVersion} requires RabbitMQ broker version 3.10.0 or higher. Also, the stream_queue and quorum_queue feature flags must be enabled on the broker. Please confirm your broker meets the minimum requirements before {beforeWhat}.";
var question = "Do you want to proceed?";
var yes = "Yes, my RabbitMQ broker meets the minimum requirements";
var no = "No, cancel the install";
- var continueInstall = await windowManager.ShowYesNoDialog(title, message, question, yes, no);
+ var message = new StringBuilder();
+ message.AppendLine($"ServiceControl version {Constants.CurrentVersion} requires:");
+ message.AppendLine("• RabbitMQ broker version 3.10.0 or higher");
+ message.AppendLine("• The stream_queue and quorum_queue feature flags must be enabled");
+ message.AppendLine($"• The management plugin API must be enabled and accessible. This might require custom settings to be added to the connection string before {beforeWhat}. See the ServiceControl documentation for details.");
+ message.AppendLine();
+ message.AppendLine($"Please confirm your broker meets the minimum requirements before {beforeWhat}.");
+
+ var continueInstall = await windowManager.ShowYesNoDialog(title, message.ToString(), question, yes, no);
return continueInstall;
}
diff --git a/src/ServiceControl.Management.PowerShell/Validation/AcknowledgementValues.cs b/src/ServiceControl.Management.PowerShell/Validation/AcknowledgementValues.cs
index 4b0b88a9c0..60de4a497d 100644
--- a/src/ServiceControl.Management.PowerShell/Validation/AcknowledgementValues.cs
+++ b/src/ServiceControl.Management.PowerShell/Validation/AcknowledgementValues.cs
@@ -3,5 +3,6 @@
static class AcknowledgementValues
{
public const string RabbitMQBrokerVersion310 = "RabbitMQBrokerVersion310";
+ public const string RabbitMQManagementApi = "RabbitMQManagementApi";
}
}
diff --git a/src/ServiceControl.Management.PowerShell/Validation/PowerShellCommandChecks.cs b/src/ServiceControl.Management.PowerShell/Validation/PowerShellCommandChecks.cs
index a53cb711a9..9d9ffd27bc 100644
--- a/src/ServiceControl.Management.PowerShell/Validation/PowerShellCommandChecks.cs
+++ b/src/ServiceControl.Management.PowerShell/Validation/PowerShellCommandChecks.cs
@@ -71,15 +71,23 @@ protected override Task NotifyForMissingSystemPrerequisites(string missingPrereq
protected override Task PromptForRabbitMqCheck(bool isUpgrade)
{
- if (acknowledgements.Any(ack => ack.Equals(AcknowledgementValues.RabbitMQBrokerVersion310, StringComparison.OrdinalIgnoreCase)))
+ if (!acknowledgements.Any(ack => ack.Equals(AcknowledgementValues.RabbitMQBrokerVersion310, StringComparison.OrdinalIgnoreCase)))
{
- return Task.FromResult(true);
+ var terminateMsg = $"ServiceControl version {Constants.CurrentVersion} requires RabbitMQ broker version 3.10.0 or higher. Also, the stream_queue and quorum_queue feature flags must be enabled on the broker. Use -Acknowledgements {AcknowledgementValues.RabbitMQBrokerVersion310} if you are sure your broker meets these requirements.";
+
+ Terminate(terminateMsg, "Install Error", ErrorCategory.InvalidArgument);
+ return Task.FromResult(false);
}
- var terminateMsg = $"ServiceControl version {Constants.CurrentVersion} requires RabbitMQ broker version 3.10.0 or higher. Also, the stream_queue and quorum_queue feature flags must be enabled on the broker. Use -Acknowledgements {AcknowledgementValues.RabbitMQBrokerVersion310} if you are sure your broker meets these requirements.";
+ if (!acknowledgements.Any(ack => ack.Equals(AcknowledgementValues.RabbitMQManagementApi, StringComparison.OrdinalIgnoreCase)))
+ {
+ var terminateMsg = $"ServiceControl version {Constants.CurrentVersion} requires that the management plugin API must be enabled and accessible. This might require custom settings to be added to the connection string. See the ServiceControl documentation for details. Use -Acknowledgements {AcknowledgementValues.RabbitMQManagementApi} if you are sure your broker meets these requirements.";
- Terminate(terminateMsg, "Install Error", ErrorCategory.InvalidArgument);
- return Task.FromResult(false);
+ Terminate(terminateMsg, "Install Error", ErrorCategory.InvalidArgument);
+ return Task.FromResult(false);
+ }
+
+ return Task.FromResult(true);
}
protected override Task PromptToStopRunningInstance(BaseService instance)
diff --git a/src/ServiceControl.Transports.RabbitMQ/ConnectionConfiguration.cs b/src/ServiceControl.Transports.RabbitMQ/ConnectionConfiguration.cs
deleted file mode 100644
index 0f2cb882b5..0000000000
--- a/src/ServiceControl.Transports.RabbitMQ/ConnectionConfiguration.cs
+++ /dev/null
@@ -1,286 +0,0 @@
-namespace ServiceControl.Transports.RabbitMQ
-{
- using System;
- using System.Collections.Generic;
- using System.Data.Common;
- using System.Diagnostics;
- using System.IO;
- using System.Linq;
- using System.Text;
- using NServiceBus;
- using NServiceBus.Support;
-
- class ConnectionConfiguration
- {
- const bool defaultUseTls = false;
- const int defaultPort = 5672;
- const int defaultTlsPort = 5671;
- const string defaultVirtualHost = "/";
- const string defaultUserName = "guest";
- const string defaultPassword = "guest";
- const ushort defaultRequestedHeartbeat = 60;
- static readonly TimeSpan defaultRetryDelay = TimeSpan.FromSeconds(10);
- const string defaultCertPath = "";
- const string defaultCertPassphrase = null;
-
- public string Host { get; }
-
- public int Port { get; }
-
- public string VirtualHost { get; }
-
- public string UserName { get; }
-
- public string Password { get; }
-
- public TimeSpan RequestedHeartbeat { get; }
-
- public TimeSpan RetryDelay { get; }
-
- public bool UseTls { get; }
-
- public string CertPath { get; }
-
- public string CertPassphrase { get; }
-
- public Dictionary ClientProperties { get; }
-
- ConnectionConfiguration(
- string host,
- int port,
- string virtualHost,
- string userName,
- string password,
- TimeSpan requestedHeartbeat,
- TimeSpan retryDelay,
- bool useTls,
- string certPath,
- string certPassphrase,
- Dictionary clientProperties)
- {
- Host = host;
- Port = port;
- VirtualHost = virtualHost;
- UserName = userName;
- Password = password;
- RequestedHeartbeat = requestedHeartbeat;
- RetryDelay = retryDelay;
- UseTls = useTls;
- CertPath = certPath;
- CertPassphrase = certPassphrase;
- ClientProperties = clientProperties;
- }
-
- public static ConnectionConfiguration Create(string connectionString, string endpointName)
- {
- Dictionary dictionary;
- var invalidOptionsMessage = new StringBuilder();
-
- if (connectionString.StartsWith("amqp", StringComparison.OrdinalIgnoreCase))
- {
- dictionary = ParseAmqpConnectionString(connectionString, invalidOptionsMessage);
- }
- else
- {
- dictionary = ParseNServiceBusConnectionString(connectionString, invalidOptionsMessage);
- }
-
- var host = GetValue(dictionary, "host", default);
- var useTls = GetValue(dictionary, "useTls", bool.TryParse, defaultUseTls, invalidOptionsMessage);
- var port = GetValue(dictionary, "port", int.TryParse, useTls ? defaultTlsPort : defaultPort, invalidOptionsMessage);
- var virtualHost = GetValue(dictionary, "virtualHost", defaultVirtualHost);
- var userName = GetValue(dictionary, "userName", defaultUserName);
- var password = GetValue(dictionary, "password", defaultPassword);
-
- var requestedHeartbeatSeconds = GetValue(dictionary, "requestedHeartbeat", ushort.TryParse, defaultRequestedHeartbeat, invalidOptionsMessage);
- var requestedHeartbeat = TimeSpan.FromSeconds(requestedHeartbeatSeconds);
-
- var retryDelay = GetValue(dictionary, "retryDelay", TimeSpan.TryParse, defaultRetryDelay, invalidOptionsMessage);
- var certPath = GetValue(dictionary, "certPath", defaultCertPath);
- var certPassPhrase = GetValue(dictionary, "certPassphrase", defaultCertPassphrase);
-
- if (invalidOptionsMessage.Length > 0)
- {
- throw new NotSupportedException(invalidOptionsMessage.ToString().TrimEnd('\r', '\n'));
- }
-
- var nsbVersion = FileVersionInfo.GetVersionInfo(typeof(Endpoint).Assembly.Location);
- var nsbFileVersion = $"{nsbVersion.FileMajorPart}.{nsbVersion.FileMinorPart}.{nsbVersion.FileBuildPart}";
-
- var rabbitMQVersion =
- FileVersionInfo.GetVersionInfo(typeof(ConnectionConfiguration).Assembly.Location);
- var rabbitMQFileVersion = $"{rabbitMQVersion.FileMajorPart}.{rabbitMQVersion.FileMinorPart}.{rabbitMQVersion.FileBuildPart}";
-
- var applicationNameAndPath = Environment.GetCommandLineArgs()[0];
- var applicationName = Path.GetFileName(applicationNameAndPath);
- var applicationPath = Path.GetDirectoryName(applicationNameAndPath);
-
- var hostname = RuntimeEnvironment.MachineName;
-
- var clientProperties = new Dictionary
- {
- { "client_api", "NServiceBus" },
- { "nservicebus_version", nsbFileVersion },
- { "nservicebus.rabbitmq_version", rabbitMQFileVersion },
- { "application", applicationName },
- { "application_location", applicationPath },
- { "machine_name", hostname },
- { "user", userName },
- { "endpoint_name", endpointName },
- };
-
- return new ConnectionConfiguration(
- host, port, virtualHost, userName, password, requestedHeartbeat, retryDelay, useTls, certPath, certPassPhrase, clientProperties);
- }
-
- static Dictionary ParseAmqpConnectionString(string connectionString, StringBuilder invalidOptionsMessage)
- {
- var dictionary = new Dictionary();
- var uri = new Uri(connectionString);
-
- var usingTls = string.Equals("amqps", uri.Scheme, StringComparison.OrdinalIgnoreCase) ? bool.TrueString : bool.FalseString;
- dictionary.Add("useTls", usingTls);
-
- dictionary.Add("host", uri.Host);
-
- if (!uri.IsDefaultPort)
- {
- dictionary.Add("port", uri.Port.ToString());
- }
-
- if (!string.IsNullOrEmpty(uri.UserInfo))
- {
- var userPass = uri.UserInfo.Split(':');
-
- if (userPass.Length > 2)
- {
- invalidOptionsMessage.AppendLine($"Bad user info in AMQP URI: {uri.UserInfo}");
- }
- else
- {
- dictionary.Add("userName", UriDecode(userPass[0]));
-
- if (userPass.Length == 2)
- {
- dictionary.Add("password", UriDecode(userPass[1]));
- }
- }
- }
-
- if (uri.Segments.Length > 2)
- {
- invalidOptionsMessage.AppendLine($"Multiple segments in path of AMQP URI: {string.Join(", ", uri.Segments)}");
- }
- else if (uri.Segments.Length == 2)
- {
- dictionary.Add("virtualHost", UriDecode(uri.Segments[1]));
- }
-
- return dictionary;
- }
-
- static Dictionary ParseNServiceBusConnectionString(string connectionString, StringBuilder invalidOptionsMessage)
- {
- var dictionary = new DbConnectionStringBuilder { ConnectionString = connectionString }
- .OfType>()
- .ToDictionary(pair => pair.Key, pair => pair.Value.ToString(), StringComparer.OrdinalIgnoreCase);
-
- RegisterDeprecatedSettingsAsInvalidOptions(dictionary, invalidOptionsMessage);
-
- if (dictionary.TryGetValue("port", out var portValue) && !int.TryParse(portValue, out var port))
- {
- invalidOptionsMessage.AppendLine($"'{portValue}' is not a valid Int32 value for the 'port' connection string option.");
- }
-
- if (dictionary.TryGetValue("host", out var value))
- {
- var firstHostAndPort = value.Split(',')[0];
- var parts = firstHostAndPort.Split(':');
- var host = parts.ElementAt(0);
-
- if (host.Length == 0)
- {
- invalidOptionsMessage.AppendLine("Empty host name in 'host' connection string option.");
- }
-
- dictionary["host"] = host;
-
- if (parts.Length > 1)
- {
- if (!int.TryParse(parts[1], out port))
- {
- invalidOptionsMessage.AppendLine($"'{parts[1]}' is not a valid Int32 value for the port in the 'host' connection string option.");
- }
- else
- {
- dictionary["port"] = port.ToString();
- }
- }
- }
- else
- {
- invalidOptionsMessage.AppendLine("Invalid connection string. 'host' value must be supplied. e.g: \"host=myServer\"");
- }
-
- return dictionary;
- }
-
- static void RegisterDeprecatedSettingsAsInvalidOptions(Dictionary dictionary, StringBuilder invalidOptionsMessage)
- {
- if (dictionary.TryGetValue("host", out var value))
- {
- var hostsAndPorts = value.Split(',');
-
- if (hostsAndPorts.Length > 1)
- {
- invalidOptionsMessage.AppendLine("Multiple hosts are no longer supported. If using RabbitMQ in a cluster, consider using a load balancer to represent the nodes as a single host.");
- }
- }
-
- if (dictionary.ContainsKey("dequeuetimeout"))
- {
- invalidOptionsMessage.AppendLine("The 'DequeueTimeout' connection string option has been removed. Consult the documentation for further information.");
- }
-
- if (dictionary.ContainsKey("maxwaittimeforconfirms"))
- {
- invalidOptionsMessage.AppendLine("The 'MaxWaitTimeForConfirms' connection string option has been removed. Consult the documentation for further information.");
- }
-
- if (dictionary.ContainsKey("prefetchcount"))
- {
- invalidOptionsMessage.AppendLine("The 'PrefetchCount' connection string option has been removed. Use 'EndpointConfiguration.UseTransport().PrefetchCount' instead.");
- }
-
- if (dictionary.ContainsKey("usepublisherconfirms"))
- {
- invalidOptionsMessage.AppendLine("The 'UsePublisherConfirms' connection string option has been removed. Consult the documentation for further information.");
- }
- }
-
- static string GetValue(Dictionary dictionary, string key, string defaultValue)
- {
- return dictionary.TryGetValue(key, out var value) ? value : defaultValue;
- }
-
- static string UriDecode(string value)
- {
- return Uri.UnescapeDataString(value);
- }
-
- static T GetValue(Dictionary dictionary, string key, Convert convert, T defaultValue, StringBuilder invalidOptionsMessage)
- {
- if (dictionary.TryGetValue(key, out var value))
- {
- if (!convert(value, out defaultValue))
- {
- invalidOptionsMessage.AppendLine($"'{value}' is not a valid {typeof(T).Name} value for the '{key}' connection string option.");
- }
- }
-
- return defaultValue;
- }
-
- delegate bool Convert(string input, out T output);
- }
-}
diff --git a/src/ServiceControl.Transports.RabbitMQ/ConnectionFactory.cs b/src/ServiceControl.Transports.RabbitMQ/ConnectionFactory.cs
deleted file mode 100644
index e52f7e1de3..0000000000
--- a/src/ServiceControl.Transports.RabbitMQ/ConnectionFactory.cs
+++ /dev/null
@@ -1,96 +0,0 @@
-namespace ServiceControl.Transports.RabbitMQ
-{
- using System;
- using System.Net.Security;
- using System.Security.Authentication;
- using System.Security.Cryptography.X509Certificates;
- using global::RabbitMQ.Client;
-
- class ConnectionFactory
- {
- readonly string endpointName;
- readonly global::RabbitMQ.Client.ConnectionFactory connectionFactory;
- readonly object lockObject = new object();
-
- public ConnectionFactory(string endpointName, ConnectionConfiguration connectionConfiguration,
- X509Certificate2Collection clientCertificateCollection, bool disableRemoteCertificateValidation,
- bool useExternalAuthMechanism, TimeSpan? heartbeatInterval, TimeSpan? networkRecoveryInterval)
- {
- if (endpointName is null)
- {
- throw new ArgumentNullException(nameof(endpointName));
- }
-
- if (endpointName == string.Empty)
- {
- throw new ArgumentException("The endpoint name cannot be empty.", nameof(endpointName));
- }
-
- this.endpointName = endpointName;
-
- if (connectionConfiguration == null)
- {
- throw new ArgumentNullException(nameof(connectionConfiguration));
- }
-
- if (connectionConfiguration.Host == null)
- {
- throw new ArgumentException("The connectionConfiguration has a null Host.", nameof(connectionConfiguration));
- }
-
- connectionFactory = new global::RabbitMQ.Client.ConnectionFactory
- {
- HostName = connectionConfiguration.Host,
- Port = connectionConfiguration.Port,
- VirtualHost = connectionConfiguration.VirtualHost,
- UserName = connectionConfiguration.UserName,
- Password = connectionConfiguration.Password,
- RequestedHeartbeat = heartbeatInterval ?? connectionConfiguration.RequestedHeartbeat,
- NetworkRecoveryInterval = networkRecoveryInterval ?? connectionConfiguration.RetryDelay,
- };
-
- connectionFactory.Ssl.ServerName = connectionConfiguration.Host;
- connectionFactory.Ssl.Certs = clientCertificateCollection;
- connectionFactory.Ssl.CertPath = connectionConfiguration.CertPath;
- connectionFactory.Ssl.CertPassphrase = connectionConfiguration.CertPassphrase;
- connectionFactory.Ssl.Version = SslProtocols.Tls12;
- connectionFactory.Ssl.Enabled = connectionConfiguration.UseTls;
-
- if (disableRemoteCertificateValidation)
- {
- connectionFactory.Ssl.AcceptablePolicyErrors = SslPolicyErrors.RemoteCertificateChainErrors |
- SslPolicyErrors.RemoteCertificateNameMismatch |
- SslPolicyErrors.RemoteCertificateNotAvailable;
- }
-
- if (useExternalAuthMechanism)
- {
- connectionFactory.AuthMechanisms = new[] { new ExternalMechanismFactory() };
- }
-
- connectionFactory.ClientProperties.Clear();
-
- foreach (var item in connectionConfiguration.ClientProperties)
- {
- connectionFactory.ClientProperties.Add(item.Key, item.Value);
- }
- }
-
- public IConnection CreatePublishConnection() => CreateConnection($"{endpointName} Publish", false);
-
- public IConnection CreateAdministrationConnection() => CreateConnection($"{endpointName} Administration", false);
-
- public IConnection CreateConnection(string connectionName, bool automaticRecoveryEnabled = true)
- {
- lock (lockObject)
- {
- connectionFactory.AutomaticRecoveryEnabled = automaticRecoveryEnabled;
- connectionFactory.ClientProperties["connected"] = DateTime.UtcNow.ToString("G");
-
- var connection = connectionFactory.CreateConnection(connectionName);
-
- return connection;
- }
- }
- }
-}
diff --git a/src/ServiceControl.Transports.RabbitMQ/IManagementClientProvider.cs b/src/ServiceControl.Transports.RabbitMQ/IManagementClientProvider.cs
new file mode 100644
index 0000000000..ab0bd30b3b
--- /dev/null
+++ b/src/ServiceControl.Transports.RabbitMQ/IManagementClientProvider.cs
@@ -0,0 +1,9 @@
+namespace ServiceControl.Transports.RabbitMQ;
+
+using System;
+using NServiceBus.Transport.RabbitMQ.ManagementApi;
+
+interface IManagementClientProvider
+{
+ Lazy GetManagementClient();
+}
diff --git a/src/ServiceControl.Transports.RabbitMQ/QueueLengthProvider.cs b/src/ServiceControl.Transports.RabbitMQ/QueueLengthProvider.cs
index e7e97025c7..3c60019abe 100644
--- a/src/ServiceControl.Transports.RabbitMQ/QueueLengthProvider.cs
+++ b/src/ServiceControl.Transports.RabbitMQ/QueueLengthProvider.cs
@@ -2,18 +2,23 @@
{
using System;
using System.Collections.Concurrent;
- using System.Data.Common;
using System.Threading;
using System.Threading.Tasks;
- using global::RabbitMQ.Client;
using NServiceBus.Logging;
+ using NServiceBus.Transport.RabbitMQ.ManagementApi;
class QueueLengthProvider : AbstractQueueLengthProvider
{
- public QueueLengthProvider(TransportSettings settings, Action store) : base(settings, store)
+ public QueueLengthProvider(TransportSettings settings, Action store, ITransportCustomization transportCustomization) : base(settings, store)
{
- queryExecutor = new QueryExecutor(ConnectionString);
- queryExecutor.Initialize();
+ if (transportCustomization is IManagementClientProvider provider)
+ {
+ managementClient = provider.GetManagementClient();
+ }
+ else
+ {
+ throw new ArgumentException($"Transport customization does not implement {nameof(IManagementClientProvider)}. Type: {transportCustomization.GetType().Name}", nameof(transportCustomization));
+ }
}
public override void TrackEndpointInputQueue(EndpointToQueueMapping queueToTrack) =>
@@ -75,89 +80,29 @@ async Task FetchQueueLengths(CancellationToken cancellationToken)
{
foreach (var endpointQueuePair in endpointQueues)
{
- await queryExecutor.Execute(m =>
- {
- var queueName = endpointQueuePair.Value;
-
- try
- {
- var size = (int)m.MessageCount(queueName);
-
- sizes.AddOrUpdate(queueName, _ => size, (_, __) => size);
- }
- catch (Exception e)
- {
- Logger.Warn($"Error querying queue length for {queueName}", e);
- }
- }, cancellationToken);
- }
- }
+ var queueName = endpointQueuePair.Value;
- readonly QueryExecutor queryExecutor;
- static readonly TimeSpan QueryDelayInterval = TimeSpan.FromMilliseconds(200);
-
- readonly ConcurrentDictionary endpointQueues = new ConcurrentDictionary();
- readonly ConcurrentDictionary sizes = new ConcurrentDictionary();
-
- static readonly ILog Logger = LogManager.GetLogger();
-
- class QueryExecutor(string connectionString) : IDisposable
- {
-
- public void Initialize()
- {
- var connectionConfiguration =
- ConnectionConfiguration.Create(connectionString, "ServiceControl.Monitoring");
-
- var dbConnectionStringBuilder = new DbConnectionStringBuilder { ConnectionString = connectionString };
-
- connectionFactory = new ConnectionFactory("ServiceControl.Monitoring",
- connectionConfiguration,
- null, //providing certificates is not supported yet
- dbConnectionStringBuilder.GetBooleanValue("DisableRemoteCertificateValidation"),
- dbConnectionStringBuilder.GetBooleanValue("UseExternalAuthMechanism"),
- null, // value would come from config API in actual transport
- null); // value would come from config API in actual transport
- }
-
- public async Task Execute(Action action, CancellationToken cancellationToken = default)
- {
try
{
- connection ??= connectionFactory.CreateConnection("queue length monitor");
-
- //Connection implements reconnection logic
- while (!connection.IsOpen)
- {
- await Task.Delay(ReconnectionDelay, cancellationToken);
- }
-
- if (model == null || model.IsClosed)
- {
- model?.Dispose();
-
- model = connection.CreateModel();
- }
+ var queue = await managementClient.Value.GetQueue(queueName, cancellationToken);
- action(model);
- }
- catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
- {
- // no-op
+ var size = queue.MessageCount;
+ sizes.AddOrUpdate(queueName, _ => size, (_, _) => size);
}
catch (Exception e)
{
- Logger.Warn("Error querying queue length.", e);
+ Logger.Warn($"Error querying queue length for {queueName}", e);
}
}
+ }
- public void Dispose() => connection?.Dispose();
+ static readonly TimeSpan QueryDelayInterval = TimeSpan.FromMilliseconds(200);
- IConnection connection;
- IModel model;
- ConnectionFactory connectionFactory;
+ readonly ConcurrentDictionary endpointQueues = new();
+ readonly ConcurrentDictionary sizes = new();
- static readonly TimeSpan ReconnectionDelay = TimeSpan.FromSeconds(5);
- }
+ static readonly ILog Logger = LogManager.GetLogger();
+
+ readonly Lazy managementClient;
}
}
diff --git a/src/ServiceControl.Transports.RabbitMQ/RabbitMQBrokerQueueDetails.cs b/src/ServiceControl.Transports.RabbitMQ/RabbitMQBrokerQueue.cs
similarity index 50%
rename from src/ServiceControl.Transports.RabbitMQ/RabbitMQBrokerQueueDetails.cs
rename to src/ServiceControl.Transports.RabbitMQ/RabbitMQBrokerQueue.cs
index 742efa89a1..542e94ef7e 100644
--- a/src/ServiceControl.Transports.RabbitMQ/RabbitMQBrokerQueueDetails.cs
+++ b/src/ServiceControl.Transports.RabbitMQ/RabbitMQBrokerQueue.cs
@@ -2,22 +2,27 @@
namespace ServiceControl.Transports.RabbitMQ;
using System.Collections.Generic;
-using System.Text.Json;
+using NServiceBus.Transport.RabbitMQ.ManagementApi;
using ServiceControl.Transports.BrokerThroughput;
-public class RabbitMQBrokerQueueDetails(JsonElement token) : IBrokerQueue
+class RabbitMQBrokerQueue(Queue queue) : IBrokerQueue
{
- public string QueueName { get; } = token.GetProperty("name").GetString()!;
+ public string QueueName { get; } = queue.Name;
+
public string SanitizedName => QueueName;
- public string Scope => VHost;
- public string VHost { get; } = token.GetProperty("vhost").GetString()!;
+
+ public string? Scope => null;
+
public List EndpointIndicators { get; } = [];
- long? AckedMessages { get; set; } = FromToken(token);
- long Baseline { get; set; } = FromToken(token) ?? 0;
- public long CalculateThroughputFrom(RabbitMQBrokerQueueDetails newReading)
+ long? AckedMessages { get; set; } = queue.MessageStats?.Ack;
+
+ long Baseline { get; set; } = queue.MessageStats?.Ack ?? 0;
+
+ public long CalculateThroughputFrom(RabbitMQBrokerQueue newReading)
{
var newlyAckedMessages = 0L;
+
if (newReading.AckedMessages is null)
{
return newlyAckedMessages;
@@ -28,13 +33,9 @@ public long CalculateThroughputFrom(RabbitMQBrokerQueueDetails newReading)
newlyAckedMessages = newReading.AckedMessages.Value - Baseline;
AckedMessages += newlyAckedMessages;
}
+
Baseline = newReading.AckedMessages.Value;
return newlyAckedMessages;
}
-
- static long? FromToken(JsonElement jsonElement) =>
- jsonElement.TryGetProperty("message_stats", out var stats) && stats.TryGetProperty("ack", out var val)
- ? val.GetInt64()
- : null;
}
\ No newline at end of file
diff --git a/src/ServiceControl.Transports.RabbitMQ/RabbitMQConventionalRoutingTransportCustomization.cs b/src/ServiceControl.Transports.RabbitMQ/RabbitMQConventionalRoutingTransportCustomization.cs
index bc8db946f3..0a096c5222 100644
--- a/src/ServiceControl.Transports.RabbitMQ/RabbitMQConventionalRoutingTransportCustomization.cs
+++ b/src/ServiceControl.Transports.RabbitMQ/RabbitMQConventionalRoutingTransportCustomization.cs
@@ -5,34 +5,55 @@
using BrokerThroughput;
using Microsoft.Extensions.DependencyInjection;
using NServiceBus;
+ using NServiceBus.Transport.RabbitMQ.ManagementApi;
- public abstract class RabbitMQConventionalRoutingTransportCustomization(QueueType queueType)
- : TransportCustomization
+ public abstract class RabbitMQConventionalRoutingTransportCustomization(NServiceBus.QueueType queueType) : TransportCustomization, IManagementClientProvider
{
- protected override void CustomizeTransportForPrimaryEndpoint(EndpointConfiguration endpointConfiguration, RabbitMQTransport transportDefinition, TransportSettings transportSettings) { }
+ RabbitMQTransport transport;
- protected override void CustomizeTransportForAuditEndpoint(EndpointConfiguration endpointConfiguration, RabbitMQTransport transportDefinition, TransportSettings transportSettings) { }
+ Lazy IManagementClientProvider.GetManagementClient()
+ {
+ return new(() => Get());
+
+ ManagementClient Get()
+ {
+ if (transport is null)
+ {
+ throw new InvalidOperationException("Management client not available because a CustomizeTransport method has not been called first.");
+ }
- protected override void CustomizeTransportForMonitoringEndpoint(EndpointConfiguration endpointConfiguration, RabbitMQTransport transportDefinition, TransportSettings transportSettings) { }
+ // Since some tests don't actually start an endpoint, this is needed to ensure a management client is available
+ if (transport.ManagementClient is null)
+ {
+ return new ManagementClient(transport.ConnectionConfiguration, transport.ManagementApiConfiguration);
+ }
+
+ return transport.ManagementClient;
+ }
+ }
+
+ protected override void CustomizeTransportForPrimaryEndpoint(EndpointConfiguration endpointConfiguration, RabbitMQTransport transportDefinition, TransportSettings transportSettings) => transport = transportDefinition;
+
+ protected override void CustomizeTransportForAuditEndpoint(EndpointConfiguration endpointConfiguration, RabbitMQTransport transportDefinition, TransportSettings transportSettings) => transport = transportDefinition;
+
+ protected override void CustomizeTransportForMonitoringEndpoint(EndpointConfiguration endpointConfiguration, RabbitMQTransport transportDefinition, TransportSettings transportSettings) => transport = transportDefinition;
protected override RabbitMQTransport CreateTransport(TransportSettings transportSettings, TransportTransactionMode preferredTransactionMode = TransportTransactionMode.ReceiveOnly)
{
- if (transportSettings.ConnectionString == null)
+ if (transportSettings.ConnectionString is null)
{
throw new InvalidOperationException("Connection string not configured");
}
var transport = new RabbitMQTransport(RoutingTopology.Conventional(queueType), transportSettings.ConnectionString, enableDelayedDelivery: false);
transport.TransportTransactionMode = transport.GetSupportedTransactionModes().Contains(preferredTransactionMode) ? preferredTransactionMode : TransportTransactionMode.ReceiveOnly;
+ transport.ApplySettingsFromConnectionString(transportSettings.ConnectionString);
return transport;
}
- protected override void AddTransportForPrimaryCore(IServiceCollection services,
- TransportSettings transportSettings)
- {
- services.AddSingleton();
- }
+ protected override void AddTransportForPrimaryCore(IServiceCollection services, TransportSettings transportSettings)
+ => services.AddSingleton();
protected sealed override void AddTransportForMonitoringCore(IServiceCollection services, TransportSettings transportSettings)
{
diff --git a/src/ServiceControl.Transports.RabbitMQ/RabbitMQDirectRoutingTransportCustomization.cs b/src/ServiceControl.Transports.RabbitMQ/RabbitMQDirectRoutingTransportCustomization.cs
index c4e09725dd..2d497877d0 100644
--- a/src/ServiceControl.Transports.RabbitMQ/RabbitMQDirectRoutingTransportCustomization.cs
+++ b/src/ServiceControl.Transports.RabbitMQ/RabbitMQDirectRoutingTransportCustomization.cs
@@ -5,36 +5,55 @@
using BrokerThroughput;
using Microsoft.Extensions.DependencyInjection;
using NServiceBus;
+ using NServiceBus.Transport.RabbitMQ.ManagementApi;
- public abstract class RabbitMQDirectRoutingTransportCustomization : TransportCustomization
+ public abstract class RabbitMQDirectRoutingTransportCustomization(NServiceBus.QueueType queueType) : TransportCustomization, IManagementClientProvider
{
- readonly QueueType queueType;
+ RabbitMQTransport transport;
- protected RabbitMQDirectRoutingTransportCustomization(QueueType queueType) => this.queueType = queueType;
+ Lazy IManagementClientProvider.GetManagementClient()
+ {
+ return new(() => Get());
+
+ ManagementClient Get()
+ {
+ if (transport is null)
+ {
+ throw new InvalidOperationException("Management client not available because a CustomizeTransport method has not been called first.");
+ }
+
+ // Since some tests don't actually start an endpoint, this is needed to ensure a management client is available
+ if (transport.ManagementClient is null)
+ {
+ return new ManagementClient(transport.ConnectionConfiguration, transport.ManagementApiConfiguration);
+ }
+
+ return transport.ManagementClient;
+ }
+ }
+
+ protected override void CustomizeTransportForPrimaryEndpoint(EndpointConfiguration endpointConfiguration, RabbitMQTransport transportDefinition, TransportSettings transportSettings) => transport = transportDefinition;
- protected override void CustomizeTransportForPrimaryEndpoint(EndpointConfiguration endpointConfiguration, RabbitMQTransport transportDefinition, TransportSettings transportSettings) { }
+ protected override void CustomizeTransportForAuditEndpoint(EndpointConfiguration endpointConfiguration, RabbitMQTransport transportDefinition, TransportSettings transportSettings) => transport = transportDefinition;
- protected override void CustomizeTransportForAuditEndpoint(EndpointConfiguration endpointConfiguration, RabbitMQTransport transportDefinition, TransportSettings transportSettings) { }
+ protected override void CustomizeTransportForMonitoringEndpoint(EndpointConfiguration endpointConfiguration, RabbitMQTransport transportDefinition, TransportSettings transportSettings) => transport = transportDefinition;
- protected override void CustomizeTransportForMonitoringEndpoint(EndpointConfiguration endpointConfiguration, RabbitMQTransport transportDefinition, TransportSettings transportSettings) { }
protected override RabbitMQTransport CreateTransport(TransportSettings transportSettings, TransportTransactionMode preferredTransactionMode = TransportTransactionMode.ReceiveOnly)
{
- if (transportSettings.ConnectionString == null)
+ if (transportSettings.ConnectionString is null)
{
throw new InvalidOperationException("Connection string not configured");
}
var transport = new RabbitMQTransport(RoutingTopology.Direct(queueType, routingKeyConvention: type => type.FullName.Replace(".", "-")), transportSettings.ConnectionString, enableDelayedDelivery: false);
transport.TransportTransactionMode = transport.GetSupportedTransactionModes().Contains(preferredTransactionMode) ? preferredTransactionMode : TransportTransactionMode.ReceiveOnly;
+ transport.ApplySettingsFromConnectionString(transportSettings.ConnectionString);
return transport;
}
- protected override void AddTransportForPrimaryCore(IServiceCollection services,
- TransportSettings transportSettings)
- {
- services.AddSingleton();
- }
+ protected override void AddTransportForPrimaryCore(IServiceCollection services, TransportSettings transportSettings)
+ => services.AddSingleton();
protected sealed override void AddTransportForMonitoringCore(IServiceCollection services, TransportSettings transportSettings)
{
diff --git a/src/ServiceControl.Transports.RabbitMQ/RabbitMQQuery.cs b/src/ServiceControl.Transports.RabbitMQ/RabbitMQQuery.cs
index 9bacec74c3..8367d46bcb 100644
--- a/src/ServiceControl.Transports.RabbitMQ/RabbitMQQuery.cs
+++ b/src/ServiceControl.Transports.RabbitMQ/RabbitMQQuery.cs
@@ -7,122 +7,57 @@ namespace ServiceControl.Transports.RabbitMQ;
using System.Linq;
using System.Net;
using System.Net.Http;
-using System.Net.Http.Json;
using System.Runtime.CompilerServices;
-using System.Text.Json;
-using System.Text.Json.Nodes;
using System.Threading;
using System.Threading.Tasks;
-using System.Web;
using Microsoft.Extensions.Logging;
+using NServiceBus.Transport.RabbitMQ.ManagementApi;
using Polly;
using Polly.Retry;
using ServiceControl.Transports.BrokerThroughput;
public class RabbitMQQuery : BrokerThroughputQuery
{
- HttpClient? httpClient;
- readonly ResiliencePipeline pipeline = new ResiliencePipelineBuilder()
- .AddRetry(new RetryStrategyOptions()) // Add retry using the default options
- .AddTimeout(TimeSpan.FromMinutes(2)) // Add timeout if it keeps failing
- .Build();
- readonly ILogger logger;
readonly TimeProvider timeProvider;
- readonly ConnectionConfiguration connectionConfiguration;
+ readonly Lazy managementClient;
- public RabbitMQQuery(ILogger logger,
- TimeProvider timeProvider,
- TransportSettings transportSettings) : base(logger, "RabbitMQ")
- {
- this.logger = logger;
- this.timeProvider = timeProvider;
-
- connectionConfiguration = ConnectionConfiguration.Create(transportSettings.ConnectionString, string.Empty);
- }
+ readonly ResiliencePipeline pipeline = new ResiliencePipelineBuilder()
+ .AddRetry(new RetryStrategyOptions()) // Add retry using the default options
+ .AddTimeout(TimeSpan.FromMinutes(2)) // Add timeout if it keeps failing
+ .Build();
- protected override void InitializeCore(ReadOnlyDictionary settings)
+ public RabbitMQQuery(ILogger logger, TimeProvider timeProvider, ITransportCustomization transportCustomization) : base(logger, "RabbitMQ")
{
- if (!settings.TryGetValue(RabbitMQSettings.UserName, out string? username) ||
- string.IsNullOrEmpty(username))
- {
- logger.LogInformation("Using username from connectionstring");
- username = connectionConfiguration.UserName;
- Diagnostics.AppendLine(
- $"Username not set, defaulted to using \"{username}\" username from the ConnectionString used by instance");
- }
- else
- {
- Diagnostics.AppendLine($"Username set to \"{username}\"");
- }
-
- if (!settings.TryGetValue(RabbitMQSettings.Password, out string? password) ||
- string.IsNullOrEmpty(password))
- {
- logger.LogInformation("Using password from connectionstring");
- password = connectionConfiguration.Password;
- Diagnostics.AppendLine(
- "Password not set, defaulted to using password from the ConnectionString used by instance");
- }
- else
- {
- Diagnostics.AppendLine("Password set");
- }
-
- var defaultCredential = new NetworkCredential(username, password);
+ this.timeProvider = timeProvider;
- if (!settings.TryGetValue(RabbitMQSettings.API, out string? apiUrl) ||
- string.IsNullOrEmpty(apiUrl))
+ if (transportCustomization is IManagementClientProvider provider)
{
- apiUrl =
- $"{(connectionConfiguration.UseTls ? $"https://{connectionConfiguration.Host}:15671" : $"http://{connectionConfiguration.Host}:15672")}";
- Diagnostics.AppendLine(
- $"RabbitMQ API Url not set, defaulted to using \"{apiUrl}\" from the ConnectionString used by instance");
+ managementClient = provider.GetManagementClient();
}
else
{
- Diagnostics.AppendLine($"RabbitMQ API Url set to \"{apiUrl}\"");
- if (!Uri.TryCreate(apiUrl, UriKind.Absolute, out _))
- {
- InitialiseErrors.Add("API url configured is invalid");
- }
- }
-
- if (InitialiseErrors.Count == 0)
- {
- // ideally we would use the HttpClientFactory, but it would be a bit more involved to set that up
- // so for now we are using a virtual method that can be overriden in tests
- // https://github.com/Particular/ServiceControl/issues/4493
- httpClient = CreateHttpClient(defaultCredential, apiUrl);
+ throw new ArgumentException($"Transport customization does not implement {nameof(IManagementClientProvider)}. Type: {transportCustomization.GetType().Name}", nameof(transportCustomization));
}
}
- protected virtual HttpClient CreateHttpClient(NetworkCredential defaultCredential, string apiUrl) =>
- new(new SocketsHttpHandler
- {
- Credentials = defaultCredential,
- PooledConnectionLifetime = TimeSpan.FromMinutes(2)
- })
- { BaseAddress = new Uri(apiUrl) };
-
- public override async IAsyncEnumerable GetThroughputPerDay(IBrokerQueue brokerQueue,
- DateOnly startDate,
- [EnumeratorCancellation] CancellationToken cancellationToken = default)
+ public override async IAsyncEnumerable GetThroughputPerDay(IBrokerQueue brokerQueue, DateOnly startDate, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
- var queue = (RabbitMQBrokerQueueDetails)brokerQueue;
- var url = $"/api/queues/{HttpUtility.UrlEncode(queue.VHost)}/{HttpUtility.UrlEncode(queue.QueueName)}";
+ var rabbitBrokerQueue = (RabbitMQBrokerQueue)brokerQueue;
+ var queue = await pipeline.ExecuteAsync(async token => await managementClient.Value.GetQueue(rabbitBrokerQueue.QueueName, token), cancellationToken);
+ var newReading = new RabbitMQBrokerQueue(queue);
- logger.LogDebug($"Querying {url}");
- var newReading = await pipeline.ExecuteAsync(async token => new RabbitMQBrokerQueueDetails(await httpClient!.GetFromJsonAsync(url, token)), cancellationToken);
- _ = queue.CalculateThroughputFrom(newReading);
+ _ = rabbitBrokerQueue.CalculateThroughputFrom(newReading);
// looping for 24hrs, in 4 increments of 15 minutes
for (var i = 0; i < 24 * 4; i++)
{
await Task.Delay(TimeSpan.FromMinutes(15), timeProvider, cancellationToken);
- logger.LogDebug($"Querying {url}");
- newReading = await pipeline.ExecuteAsync(async token => new RabbitMQBrokerQueueDetails(await httpClient!.GetFromJsonAsync(url, token)), cancellationToken);
- var newTotalThroughput = queue.CalculateThroughputFrom(newReading);
+ queue = await pipeline.ExecuteAsync(async token => await managementClient.Value.GetQueue(rabbitBrokerQueue.QueueName, token), cancellationToken);
+ newReading = new RabbitMQBrokerQueue(queue);
+
+ var newTotalThroughput = rabbitBrokerQueue.CalculateThroughputFrom(newReading);
+
yield return new QueueThroughput
{
DateUTC = DateOnly.FromDateTime(timeProvider.GetUtcNow().DateTime),
@@ -131,86 +66,59 @@ public override async IAsyncEnumerable GetThroughputPerDay(IBro
}
}
- async Task<(string rabbitVersion, string managementVersion)> GetRabbitDetails(bool skipResiliencePipeline, CancellationToken cancellationToken)
- {
- var overviewUrl = "/api/overview";
-
- JsonObject obj;
-
- if (skipResiliencePipeline)
- {
- obj = (await httpClient!.GetFromJsonAsync(overviewUrl, cancellationToken))!;
- }
- else
- {
- obj = (await pipeline.ExecuteAsync(async token =>
- await httpClient!.GetFromJsonAsync(overviewUrl, token), cancellationToken))!;
- }
-
- var statsDisabled = obj["disable_stats"]?.GetValue() ?? false;
-
- if (statsDisabled)
- {
- throw new Exception("The RabbitMQ broker is configured with 'management.disable_stats = true' or 'management_agent.disable_metrics_collector = true' and as a result queue statistics cannot be collected using this tool. Consider changing the configuration of the RabbitMQ broker.");
- }
-
- var rabbitVersion = obj["rabbitmq_version"] ?? obj["product_version"];
- var mgmtVersion = obj["management_version"];
-
- return (rabbitVersion?.GetValue() ?? "Unknown", mgmtVersion?.GetValue() ?? "Unknown");
- }
-
- public override async IAsyncEnumerable GetQueueNames(
- [EnumeratorCancellation] CancellationToken cancellationToken)
+ public override async IAsyncEnumerable GetQueueNames([EnumeratorCancellation] CancellationToken cancellationToken)
{
var page = 1;
bool morePages;
- var vHosts = new HashSet(StringComparer.CurrentCultureIgnoreCase);
- (string rabbitVersion, string managementVersion) = await GetRabbitDetails(false, cancellationToken);
- Data["RabbitMQVersion"] = rabbitVersion;
- Data["RabbitMQManagementVersionVersion"] = managementVersion;
+ await GetBrokerDetails(cancellationToken);
do
{
- (var queues, morePages) = await GetPage(page, cancellationToken);
+ (var queues, morePages) = await pipeline.ExecuteAsync(async token => await managementClient.Value.GetQueues(page, 500, token), cancellationToken);
- if (queues != null)
+ foreach (var queue in queues)
{
- foreach (var rabbitMQQueueDetails in queues)
+ if (queue.Name.StartsWith("nsb.delay-level-") ||
+ queue.Name.StartsWith("nsb.v2.delay-level-") ||
+ queue.Name.StartsWith("nsb.v2.verify-"))
{
- if (rabbitMQQueueDetails.QueueName.StartsWith("nsb.delay-level-") ||
- rabbitMQQueueDetails.QueueName.StartsWith("nsb.v2.delay-level-") ||
- rabbitMQQueueDetails.QueueName.StartsWith("nsb.v2.verify-"))
- {
- continue;
- }
- vHosts.Add(rabbitMQQueueDetails.VHost);
- await AddAdditionalQueueDetails(rabbitMQQueueDetails, cancellationToken);
- yield return rabbitMQQueueDetails;
+ continue;
}
+
+ var brokerQueue = new RabbitMQBrokerQueue(queue);
+ await AddEndpointIndicators(brokerQueue, cancellationToken);
+ yield return brokerQueue;
}
page++;
} while (morePages);
+ }
+
+ async Task GetBrokerDetails(CancellationToken cancellationToken)
+ {
+ var overview = await pipeline.ExecuteAsync(async async => await managementClient.Value.GetOverview(cancellationToken), cancellationToken);
- ScopeType = vHosts.Count > 1 ? "VirtualHost" : null;
+ if (overview.DisableStats)
+ {
+ throw new Exception(disableStatsErrorMessage);
+ }
+
+ Data["RabbitMQVersion"] = overview.BrokerVersion ?? "Unknown";
}
- async Task AddAdditionalQueueDetails(RabbitMQBrokerQueueDetails brokerQueue, CancellationToken cancellationToken)
+ async Task AddEndpointIndicators(RabbitMQBrokerQueue brokerQueue, CancellationToken cancellationToken)
{
try
{
- var bindingsUrl = $"/api/queues/{HttpUtility.UrlEncode(brokerQueue.VHost)}/{HttpUtility.UrlEncode(brokerQueue.QueueName)}/bindings";
- var bindings = await pipeline.ExecuteAsync(async token => await httpClient!.GetFromJsonAsync(bindingsUrl, token), cancellationToken);
- var conventionalBindingFound = bindings?.Any(binding => binding!["source"]?.GetValue() == brokerQueue.QueueName
- && binding["vhost"]?.GetValue() == brokerQueue.VHost
- && binding["destination"]?.GetValue() == brokerQueue.QueueName
- && binding["destination_type"]?.GetValue() == "queue"
- && binding["routing_key"]?.GetValue() == string.Empty
- && binding["properties_key"]?.GetValue() == "~") ?? false;
+ var bindings = await pipeline.ExecuteAsync(async token => await managementClient.Value.GetBindingsForQueue(brokerQueue.QueueName, token), cancellationToken);
- if (conventionalBindingFound)
+ // Check if conventional binding is found
+ if (bindings.Any(binding => binding.Source == brokerQueue.QueueName
+ && binding.Destination == brokerQueue.QueueName
+ && binding.DestinationType == "queue"
+ && binding.RoutingKey == string.Empty
+ && binding.PropertiesKey == "~"))
{
brokerQueue.EndpointIndicators.Add("ConventionalTopologyBinding");
}
@@ -222,20 +130,13 @@ async Task AddAdditionalQueueDetails(RabbitMQBrokerQueueDetails brokerQueue, Can
try
{
- var exchangeUrl = $"/api/exchanges/{HttpUtility.UrlEncode(brokerQueue.VHost)}/{HttpUtility.UrlEncode(brokerQueue.QueueName)}/bindings/destination";
- var bindings = await pipeline.ExecuteAsync(async token => await httpClient!.GetFromJsonAsync(exchangeUrl, token), cancellationToken);
- var delayBindingFound = bindings?.Any(binding =>
- {
- var source = binding!["source"]?.GetValue();
-
- return source is "nsb.v2.delay-delivery" or "nsb.delay-delivery"
- && binding["vhost"]?.GetValue() == brokerQueue.VHost
- && binding["destination"]?.GetValue() == brokerQueue.QueueName
- && binding["destination_type"]?.GetValue() == "exchange"
- && binding["routing_key"]?.GetValue() == $"#.{brokerQueue.QueueName}";
- }) ?? false;
+ var bindings = await pipeline.ExecuteAsync(async token => await managementClient.Value.GetBindingsForExchange(brokerQueue.QueueName, token), cancellationToken);
- if (delayBindingFound)
+ // Check if delayed binding is found
+ if (bindings.Any(binding => binding.Source is "nsb.v2.delay-delivery" or "nsb.delay-delivery"
+ && binding.Destination == brokerQueue.QueueName
+ && binding.DestinationType == "exchange"
+ && binding.RoutingKey == $"#.{brokerQueue.QueueName}"))
{
brokerQueue.EndpointIndicators.Add("DelayBinding");
}
@@ -246,74 +147,29 @@ async Task AddAdditionalQueueDetails(RabbitMQBrokerQueueDetails brokerQueue, Can
}
}
- public async Task<(RabbitMQBrokerQueueDetails[]?, bool morePages)> GetPage(int page, CancellationToken cancellationToken)
- {
- var url = $"/api/queues/{HttpUtility.UrlEncode(connectionConfiguration.VirtualHost)}?page={page}&page_size=500&name=&use_regex=false&pagination=true";
-
- var container = await pipeline.ExecuteAsync(async token => await httpClient!.GetFromJsonAsync(url, token), cancellationToken);
- switch (container)
- {
- case JsonObject obj:
- {
- var pageCount = obj["page_count"]!.GetValue();
- var pageReturned = obj["page"]!.GetValue();
-
- if (obj["items"] is not JsonArray items)
- {
- return (null, false);
- }
-
- return (MaterializeQueueDetails(items), pageCount > pageReturned);
- }
- // Older versions of RabbitMQ API did not have paging and returned the array of items directly
- case JsonArray arr:
- {
- return (MaterializeQueueDetails(arr), false);
- }
- default:
- throw new Exception("Was not able to get list of queues from RabbitMQ broker.");
- }
- }
-
- static RabbitMQBrokerQueueDetails[] MaterializeQueueDetails(JsonArray items)
- {
- // It is not possible to directly operated on the JsonNode. When the JsonNode is a JObject
- // and the indexer is access the internal dictionary is initialized which can cause key not found exceptions
- // when the payload contains the same key multiple times (which happened in the past).
- var queues = items.Select(item => new RabbitMQBrokerQueueDetails(item!.Deserialize())).ToArray();
- return queues;
- }
+ public override KeyDescriptionPair[] Settings => [];
- public override KeyDescriptionPair[] Settings =>
- [
- new KeyDescriptionPair(RabbitMQSettings.API, RabbitMQSettings.APIDescription),
- new KeyDescriptionPair(RabbitMQSettings.UserName, RabbitMQSettings.UserNameDescription),
- new KeyDescriptionPair(RabbitMQSettings.Password, RabbitMQSettings.PasswordDescription)
- ];
-
- protected override async Task<(bool Success, List Errors)> TestConnectionCore(
- CancellationToken cancellationToken)
+ protected override async Task<(bool Success, List Errors)> TestConnectionCore(CancellationToken cancellationToken)
{
try
{
- await GetRabbitDetails(true, cancellationToken);
+ var overview = await managementClient.Value.GetOverview(cancellationToken);
+
+ if (overview.DisableStats)
+ {
+ return (false, [disableStatsErrorMessage]);
+ }
+
+ return (true, []);
}
- catch (HttpRequestException e)
+ catch (HttpRequestException ex)
{
- throw new Exception($"Failed to connect to '{httpClient!.BaseAddress}'", e);
+ throw new Exception($"Failed to connect to RabbitMQ management API", ex);
}
-
- return (true, []);
}
- public static class RabbitMQSettings
- {
- public static readonly string API = "RabbitMQ/ApiUrl";
- public static readonly string APIDescription = "RabbitMQ management URL";
- public static readonly string UserName = "RabbitMQ/UserName";
- public static readonly string UserNameDescription = "Username to access the RabbitMQ management interface";
- public static readonly string Password = "RabbitMQ/Password";
- public static readonly string PasswordDescription = "Password to access the RabbitMQ management interface";
- }
+ protected override void InitializeCore(ReadOnlyDictionary settings) => Diagnostics.AppendLine("Using settings from connection string");
+
+ const string disableStatsErrorMessage = "The RabbitMQ broker is configured with 'management.disable_stats = true' or 'management_agent.disable_metrics_collector = true' and as a result queue statistics cannot be collected using this tool. Consider changing the configuration of the RabbitMQ broker.";
}
diff --git a/src/ServiceControl.Transports.RabbitMQ/RabbitMQTransportExtensions.cs b/src/ServiceControl.Transports.RabbitMQ/RabbitMQTransportExtensions.cs
new file mode 100644
index 0000000000..ebf285839c
--- /dev/null
+++ b/src/ServiceControl.Transports.RabbitMQ/RabbitMQTransportExtensions.cs
@@ -0,0 +1,46 @@
+namespace ServiceControl.Transports.RabbitMQ;
+
+using System;
+using System.Collections.Generic;
+using System.Data.Common;
+using System.Linq;
+using NServiceBus;
+
+static class RabbitMQTransportExtensions
+{
+ public static void ApplySettingsFromConnectionString(this RabbitMQTransport transport, string connectionString)
+ {
+ if (connectionString.StartsWith("amqp", StringComparison.OrdinalIgnoreCase))
+ {
+ return;
+ }
+
+ var dictionary = new DbConnectionStringBuilder { ConnectionString = connectionString }
+ .OfType>()
+ .ToDictionary(pair => pair.Key, pair => pair.Value.ToString(), StringComparer.OrdinalIgnoreCase);
+
+ if (dictionary.TryGetValue("ValidateDeliveryLimits", out var validateDeliveryLimitsString))
+ {
+ _ = bool.TryParse(validateDeliveryLimitsString, out var validateDeliveryLimits);
+ transport.ValidateDeliveryLimits = validateDeliveryLimits;
+ }
+
+ dictionary.TryGetValue("ManagementApiUrl", out var url);
+ dictionary.TryGetValue("ManagementApiUserName", out var userName);
+ dictionary.TryGetValue("ManagementApiPassword", out var password);
+
+ transport.ManagementApiConfiguration = ManagementApiConfiguration.Create(url, userName, password);
+
+ if (dictionary.TryGetValue("DisableRemoteCertificateValidation", out var disableRemoteCertificateValidationString))
+ {
+ _ = bool.TryParse(disableRemoteCertificateValidationString, out var disableRemoteCertificateValidation);
+ transport.ValidateRemoteCertificate = !disableRemoteCertificateValidation;
+ }
+
+ if (dictionary.TryGetValue("UseExternalAuthMechanism", out var useExternalAuthMechanismString))
+ {
+ _ = bool.TryParse(useExternalAuthMechanismString, out var useExternalAuthMechanism);
+ transport.UseExternalAuthMechanism = useExternalAuthMechanism;
+ }
+ }
+}
diff --git a/src/ServiceControl.Transports.RabbitMQ/ServiceControl.Transports.RabbitMQ.csproj b/src/ServiceControl.Transports.RabbitMQ/ServiceControl.Transports.RabbitMQ.csproj
index b04877eea0..3450935934 100644
--- a/src/ServiceControl.Transports.RabbitMQ/ServiceControl.Transports.RabbitMQ.csproj
+++ b/src/ServiceControl.Transports.RabbitMQ/ServiceControl.Transports.RabbitMQ.csproj
@@ -2,6 +2,8 @@
net8.0
+ true
+ ..\NServiceBus.snk
true
diff --git a/src/ServiceControl.Transports.RabbitMQ/TransportConfigurationExtensions.cs b/src/ServiceControl.Transports.RabbitMQ/TransportConfigurationExtensions.cs
deleted file mode 100644
index ee10f15afb..0000000000
--- a/src/ServiceControl.Transports.RabbitMQ/TransportConfigurationExtensions.cs
+++ /dev/null
@@ -1,44 +0,0 @@
-namespace ServiceControl.Transports.RabbitMQ
-{
- using NServiceBus;
- using System;
- using System.Data.Common;
-
- static class TransportConfigurationExtensions
- {
- public static void ApplyConnectionString(this TransportExtensions transport, string connectionString)
- {
- if (!connectionString.StartsWith("amqp", StringComparison.InvariantCultureIgnoreCase))
- {
- var builder = new DbConnectionStringBuilder { ConnectionString = connectionString };
-
- if (builder.GetBooleanValue("DisableRemoteCertificateValidation"))
- {
- transport.DisableRemoteCertificateValidation();
- }
-
- if (builder.GetBooleanValue("UseExternalAuthMechanism"))
- {
- transport.UseExternalAuthMechanism();
- }
- }
-
- transport.ConnectionString(connectionString);
- }
-
- public static bool GetBooleanValue(this DbConnectionStringBuilder dbConnectionStringBuilder, string key)
- {
- if (!dbConnectionStringBuilder.TryGetValue(key, out var rawValue))
- {
- return false;
- }
-
- if (!bool.TryParse(rawValue.ToString(), out var value))
- {
- throw new Exception($"Can't parse key '{key}'. '{rawValue}' is not a valid boolean value.");
- }
-
- return value;
- }
- }
-}
\ No newline at end of file
diff --git a/src/ServiceControl.Transports.RabbitMQ/transport.manifest b/src/ServiceControl.Transports.RabbitMQ/transport.manifest
index 2442343847..7203eb3f81 100644
--- a/src/ServiceControl.Transports.RabbitMQ/transport.manifest
+++ b/src/ServiceControl.Transports.RabbitMQ/transport.manifest
@@ -27,7 +27,7 @@
"DisplayName": "RabbitMQ - Conventional routing topology (quorum queues)",
"AssemblyName": "ServiceControl.Transports.RabbitMQ",
"TypeName": "ServiceControl.Transports.RabbitMQ.RabbitMQQuorumConventionalRoutingTransportCustomization, ServiceControl.Transports.RabbitMQ",
- "SampleConnectionString": "host=;username=;password=;DisableRemoteCertificateValidation=;UseExternalAuthMechanism=",
+ "SampleConnectionString": "host=;username=;password=;DisableRemoteCertificateValidation=;UseExternalAuthMechanism=;ValidateDeliveryLimits=;ManagementApiUrl=;ManagementApiUserName=;ManagementApiPassword=",
"AvailableInSCMU": true,
"Aliases": [
"ServiceControl.Transports.RabbitMQ.QuorumConventialRoutingTopologyRabbitMQTransport, ServiceControl.Transports.RabbitMQ"
@@ -38,7 +38,7 @@
"DisplayName": "RabbitMQ - Direct routing topology (quorum queues)",
"AssemblyName": "ServiceControl.Transports.RabbitMQ",
"TypeName": "ServiceControl.Transports.RabbitMQ.RabbitMQQuorumDirectRoutingTransportCustomization, ServiceControl.Transports.RabbitMQ",
- "SampleConnectionString": "host=;username=;password=;DisableRemoteCertificateValidation=;UseExternalAuthMechanism=",
+ "SampleConnectionString": "host=;username=;password=;DisableRemoteCertificateValidation=;UseExternalAuthMechanism=;ValidateDeliveryLimits=;ManagementApiUrl=;ManagementApiUserName=;ManagementApiPassword=",
"AvailableInSCMU": true,
"Aliases": [
"ServiceControl.Transports.RabbitMQ.QuorumDirectRoutingTopologyRabbitMQTransport, ServiceControl.Transports.RabbitMQ"
diff --git a/src/ServiceControl.Transports.RabbitMQClassicConventionalRouting.Tests/RabbitMQQueryTests.cs b/src/ServiceControl.Transports.RabbitMQClassicConventionalRouting.Tests/RabbitMQQueryTests.cs
index 062d498524..25116c8a47 100644
--- a/src/ServiceControl.Transports.RabbitMQClassicConventionalRouting.Tests/RabbitMQQueryTests.cs
+++ b/src/ServiceControl.Transports.RabbitMQClassicConventionalRouting.Tests/RabbitMQQueryTests.cs
@@ -8,10 +8,11 @@ namespace ServiceControl.Transport.Tests;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Time.Testing;
+using NServiceBus;
using NUnit.Framework;
+using ServiceControl.Transports.BrokerThroughput;
using Transports;
using Transports.RabbitMQ;
-using ServiceControl.Transports.BrokerThroughput;
[TestFixture]
class RabbitMQQueryTests : TransportTestFixture
@@ -20,25 +21,30 @@ class RabbitMQQueryTests : TransportTestFixture
public async Task GetQueueNames_FindsQueues()
{
using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(30));
- CancellationToken token = cancellationTokenSource.Token;
+ var token = cancellationTokenSource.Token;
+
var provider = new FakeTimeProvider();
+
var transportSettings = new TransportSettings
{
ConnectionString = configuration.ConnectionString,
- MaxConcurrency = 1,
EndpointName = Guid.NewGuid().ToString("N")
};
- var query = new RabbitMQQuery(NullLogger.Instance, provider, transportSettings);
- string[] additionalQueues = Enumerable.Range(1, 10).Select(i => $"myqueue{i}").ToArray();
+
+ configuration.TransportCustomization.CustomizePrimaryEndpoint(new EndpointConfiguration(transportSettings.EndpointName), transportSettings);
+
+ var additionalQueues = Enumerable.Range(1, 10).Select(i => $"myqueue{i}").ToArray();
await configuration.TransportCustomization.ProvisionQueues(transportSettings, additionalQueues);
+ var query = new RabbitMQQuery(NullLogger.Instance, provider, configuration.TransportCustomization);
query.Initialize(ReadOnlyDictionary.Empty);
var queueNames = new List();
+
await foreach (IBrokerQueue queueName in query.GetQueueNames(token))
{
queueNames.Add(queueName);
- Assert.That(queueName.Scope, Is.EqualTo("/"));
+
if (queueName.QueueName == transportSettings.EndpointName)
{
Assert.That(queueName.EndpointIndicators, Has.Member("ConventionalTopologyBinding"));
diff --git a/src/ServiceControl.Transports.RabbitMQQuorumDirectRouting.Tests/ApprovalFiles/RabbitMQQueryTests.TestConnectionWithInvalidSettings.approved.txt b/src/ServiceControl.Transports.RabbitMQQuorumDirectRouting.Tests/ApprovalFiles/RabbitMQQueryTests.TestConnectionWithInvalidSettings.approved.txt
deleted file mode 100644
index 5790b9e0da..0000000000
--- a/src/ServiceControl.Transports.RabbitMQQuorumDirectRouting.Tests/ApprovalFiles/RabbitMQQueryTests.TestConnectionWithInvalidSettings.approved.txt
+++ /dev/null
@@ -1,7 +0,0 @@
-Connection test to RabbitMQ failed:
-Failed to connect to 'http://localhost:12345/'
-
-Connection attempted with the following settings:
-Username not set, defaulted to using "xxxxx" username from the ConnectionString used by instance
-Password not set, defaulted to using password from the ConnectionString used by instance
-RabbitMQ API Url set to "http://localhost:12345"
diff --git a/src/ServiceControl.Transports.RabbitMQQuorumDirectRouting.Tests/ApprovalFiles/RabbitMQQueryTests.TestConnectionWithValidSettings.approved.txt b/src/ServiceControl.Transports.RabbitMQQuorumDirectRouting.Tests/ApprovalFiles/RabbitMQQueryTests.TestConnectionWithValidSettings.approved.txt
deleted file mode 100644
index f64695447c..0000000000
--- a/src/ServiceControl.Transports.RabbitMQQuorumDirectRouting.Tests/ApprovalFiles/RabbitMQQueryTests.TestConnectionWithValidSettings.approved.txt
+++ /dev/null
@@ -1,6 +0,0 @@
-Connection test to RabbitMQ was successful
-
-Connection settings used:
-Username not set, defaulted to using "xxxxx" username from the ConnectionString used by instance
-Password not set, defaulted to using password from the ConnectionString used by instance
-RabbitMQ API Url not set, defaulted to using "xxxx" from the ConnectionString used by instance
diff --git a/src/ServiceControl.Transports.RabbitMQQuorumDirectRouting.Tests/ApprovalFiles/RabbitMQQuery_ResponseParsing_Tests.Should_fetch_queue_details_in_old_format.approved.txt b/src/ServiceControl.Transports.RabbitMQQuorumDirectRouting.Tests/ApprovalFiles/RabbitMQQuery_ResponseParsing_Tests.Should_fetch_queue_details_in_old_format.approved.txt
deleted file mode 100644
index 063ba3dd64..0000000000
--- a/src/ServiceControl.Transports.RabbitMQQuorumDirectRouting.Tests/ApprovalFiles/RabbitMQQuery_ResponseParsing_Tests.Should_fetch_queue_details_in_old_format.approved.txt
+++ /dev/null
@@ -1,23 +0,0 @@
-[
- {
- "QueueName": "queue1",
- "SanitizedName": "queue1",
- "Scope": "vhost1",
- "VHost": "vhost1",
- "EndpointIndicators": []
- },
- {
- "QueueName": "queue2",
- "SanitizedName": "queue2",
- "Scope": "vhost2",
- "VHost": "vhost2",
- "EndpointIndicators": []
- },
- {
- "QueueName": "queue3",
- "SanitizedName": "queue3",
- "Scope": "vhost1",
- "VHost": "vhost1",
- "EndpointIndicators": []
- }
-]
\ No newline at end of file
diff --git a/src/ServiceControl.Transports.RabbitMQQuorumDirectRouting.Tests/ApprovalFiles/RabbitMQQuery_ResponseParsing_Tests.Should_handle_duplicated_json_data.approved.txt b/src/ServiceControl.Transports.RabbitMQQuorumDirectRouting.Tests/ApprovalFiles/RabbitMQQuery_ResponseParsing_Tests.Should_handle_duplicated_json_data.approved.txt
deleted file mode 100644
index 016fd18d12..0000000000
--- a/src/ServiceControl.Transports.RabbitMQQuorumDirectRouting.Tests/ApprovalFiles/RabbitMQQuery_ResponseParsing_Tests.Should_handle_duplicated_json_data.approved.txt
+++ /dev/null
@@ -1,16 +0,0 @@
-[
- {
- "QueueName": "queue1",
- "SanitizedName": "queue1",
- "Scope": "vhost1",
- "VHost": "vhost1",
- "EndpointIndicators": []
- },
- {
- "QueueName": "queue2",
- "SanitizedName": "queue2",
- "Scope": "vhost2",
- "VHost": "vhost2",
- "EndpointIndicators": []
- }
-]
\ No newline at end of file
diff --git a/src/ServiceControl.Transports.RabbitMQQuorumDirectRouting.Tests/RabbitMQQueryTests.cs b/src/ServiceControl.Transports.RabbitMQQuorumDirectRouting.Tests/RabbitMQQueryTests.cs
index 91b3a029ce..38a8b6f6ca 100644
--- a/src/ServiceControl.Transports.RabbitMQQuorumDirectRouting.Tests/RabbitMQQueryTests.cs
+++ b/src/ServiceControl.Transports.RabbitMQQuorumDirectRouting.Tests/RabbitMQQueryTests.cs
@@ -3,75 +3,65 @@ namespace ServiceControl.Transport.Tests;
using System;
using System.Collections.Generic;
using System.Collections.ObjectModel;
-using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Time.Testing;
+using NServiceBus;
using NUnit.Framework;
-using Particular.Approvals;
+using ServiceControl.Transports.BrokerThroughput;
using Transports;
using Transports.RabbitMQ;
-using ServiceControl.Transports.BrokerThroughput;
[TestFixture]
class RabbitMQQueryTests : TransportTestFixture
{
- FakeTimeProvider provider;
- TransportSettings transportSettings;
- RabbitMQQuery query;
-
- [SetUp]
- public void Initialise()
- {
- provider = new();
- provider.SetUtcNow(DateTimeOffset.UtcNow);
- transportSettings = new TransportSettings
- {
- ConnectionString = configuration.ConnectionString,
- MaxConcurrency = 1,
- EndpointName = Guid.NewGuid().ToString("N")
- };
- query = new RabbitMQQuery(NullLogger.Instance, provider, transportSettings);
- }
-
[Test]
public async Task TestConnectionWithInvalidSettings()
{
using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(30));
+ var token = cancellationTokenSource.Token;
- var dictionary = new Dictionary
+ var provider = new FakeTimeProvider(DateTimeOffset.UtcNow);
+
+ var transportSettings = new TransportSettings
{
- { RabbitMQQuery.RabbitMQSettings.API, "http://localhost:12345" }
+ ConnectionString = configuration.ConnectionString + ";ManagementApiUrl=http://localhost:12345",
+ EndpointName = Guid.NewGuid().ToString("N")
};
- query.Initialize(new ReadOnlyDictionary(dictionary));
+
+ configuration.TransportCustomization.CustomizePrimaryEndpoint(new EndpointConfiguration(transportSettings.EndpointName), transportSettings);
+
+ var query = new RabbitMQQuery(NullLogger.Instance, provider, configuration.TransportCustomization);
+ query.Initialize(ReadOnlyDictionary.Empty);
+
(bool success, _, string diagnostics) = await query.TestConnection(cancellationTokenSource.Token);
Assert.That(success, Is.False);
- Approver.Verify(diagnostics,
- s => Regex.Replace(s, "defaulted to using \"\\w*\" username", "defaulted to using \"xxxxx\" username",
- RegexOptions.Multiline));
}
[Test]
public async Task TestConnectionWithValidSettings()
{
using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(30));
+ var token = cancellationTokenSource.Token;
+ var provider = new FakeTimeProvider(DateTimeOffset.UtcNow);
+
+ var transportSettings = new TransportSettings
+ {
+ ConnectionString = configuration.ConnectionString,
+ EndpointName = Guid.NewGuid().ToString("N")
+ };
+
+ configuration.TransportCustomization.CustomizePrimaryEndpoint(new EndpointConfiguration(transportSettings.EndpointName), transportSettings);
+
+ var query = new RabbitMQQuery(NullLogger.Instance, provider, configuration.TransportCustomization);
query.Initialize(ReadOnlyDictionary.Empty);
+
(bool success, _, string diagnostics) = await query.TestConnection(cancellationTokenSource.Token);
Assert.That(success, Is.True);
- Approver.Verify(diagnostics,
- s =>
- {
- s = Regex.Replace(s,
- "RabbitMQ API Url not set, defaulted to using \"http://[\\w.]*:15672\" from the ConnectionString used by instance",
- "RabbitMQ API Url not set, defaulted to using \"xxxx\" from the ConnectionString used by instance",
- RegexOptions.Multiline);
- return Regex.Replace(s, "defaulted to using \"\\w*\" username", "defaulted to using \"xxxxx\" username",
- RegexOptions.Multiline);
- });
}
[Test]
@@ -79,10 +69,21 @@ public async Task RunScenario()
{
// We need to wait a bit of time, because the scenario running takes on average 1 sec per run.
using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromMinutes(3));
- CancellationToken token = cancellationTokenSource.Token;
+ var token = cancellationTokenSource.Token;
+
+ var provider = new FakeTimeProvider(DateTimeOffset.UtcNow);
+
+ var transportSettings = new TransportSettings
+ {
+ ConnectionString = configuration.ConnectionString,
+ EndpointName = Guid.NewGuid().ToString("N")
+ };
+
+ configuration.TransportCustomization.CustomizePrimaryEndpoint(new EndpointConfiguration(transportSettings.EndpointName), transportSettings);
await CreateTestQueue(transportSettings.EndpointName);
+ var query = new RabbitMQQuery(NullLogger.Instance, provider, configuration.TransportCustomization);
query.Initialize(ReadOnlyDictionary.Empty);
var queueNames = new List();
@@ -123,7 +124,7 @@ public async Task RunScenario()
reset.Set();
await runScenarioAndAdvanceTime.WaitAsync(token);
- // Asserting that we have one message per hour during 24 hours, the first snapshot is not counted hence the 23 assertion.
+ // Asserting that we have one message per hour during 24 hours, the first snapshot is not counted hence the 23 assertion.
Assert.That(total, Is.GreaterThan(numMessagesToIngest));
}
}
\ No newline at end of file
diff --git a/src/ServiceControl.Transports.RabbitMQQuorumDirectRouting.Tests/RabbitMQQuery_ResponseParsing_Tests.cs b/src/ServiceControl.Transports.RabbitMQQuorumDirectRouting.Tests/RabbitMQQuery_ResponseParsing_Tests.cs
deleted file mode 100644
index 9e706832b9..0000000000
--- a/src/ServiceControl.Transports.RabbitMQQuorumDirectRouting.Tests/RabbitMQQuery_ResponseParsing_Tests.cs
+++ /dev/null
@@ -1,148 +0,0 @@
-namespace ServiceControl.Transport.Tests;
-
-using System;
-using System.Threading;
-using System.Threading.Tasks;
-using Microsoft.Extensions.Logging.Abstractions;
-using Microsoft.Extensions.Time.Testing;
-using NUnit.Framework;
-using Transports;
-using Transports.RabbitMQ;
-using System.Net.Http;
-using Particular.Approvals;
-using System.Collections.ObjectModel;
-using System.Net;
-
-[TestFixture]
-class RabbitMQQuery_ResponseParsing_Tests
-{
- FakeTimeProvider provider;
- TransportSettings transportSettings;
- FakeHttpHandler httpHandler;
- RabbitMQQuery rabbitMQQuery;
-
- [SetUp]
- public void SetUp()
- {
- provider = new();
- provider.SetUtcNow(DateTimeOffset.UtcNow);
- transportSettings = new TransportSettings
- {
- ConnectionString = "host=localhost;username=rabbitmq;password=rabbitmq",
- MaxConcurrency = 1,
- EndpointName = Guid.NewGuid().ToString("N")
- };
- httpHandler = new FakeHttpHandler();
- var httpClient = new HttpClient(httpHandler) { BaseAddress = new Uri("http://localhost:15672") };
-
- rabbitMQQuery = new TestableRabbitMQQuery(provider, transportSettings, httpClient);
- rabbitMQQuery.Initialize(ReadOnlyDictionary.Empty);
- }
-
- [TearDown]
- public void TearDown() => httpHandler.Dispose();
-
- public Func SendCallback
- {
- get => httpHandler.SendCallback;
- set => httpHandler.SendCallback = value;
- }
-
- [Test]
- public async Task Should_handle_duplicated_json_data()
- {
- SendCallback = _ =>
- {
- var response = new HttpResponseMessage
- {
- Content = new StringContent("""
- {
- "items": [
- {
- "name": "queue1",
- "vhost": "vhost1",
- "memory": 1024,
- "memory": 1024,
- "message_stats": {
- "ack": 1
- }
- },
- {
- "name": "queue2",
- "vhost": "vhost2",
- "vhost": "vhost2",
- "message_stats": {
- "ack": 2
- }
- }
- ],
- "page": 1,
- "page_count": 1,
- "page_size": 500,
- "total_count": 2
- }
- """)
- };
- return response;
- };
-
- var queues = (await rabbitMQQuery.GetPage(1, default)).Item1;
- Approver.Verify(queues);
- }
-
- [Test]
- public async Task Should_fetch_queue_details_in_old_format()
- {
- SendCallback = _ =>
- {
- var response = new HttpResponseMessage
- {
- Content = new StringContent("""
- [
- {
- "name": "queue1",
- "vhost": "vhost1",
- "memory": 1024,
- "message_stats": {
- "ack": 1
- }
- },
- {
- "name": "queue2",
- "vhost": "vhost2",
- "message_stats": {
- "ack": 2
- }
- },
- {
- "name": "queue3",
- "vhost": "vhost1"
- }
- ]
- """)
- };
- return response;
- };
-
- var queues = (await rabbitMQQuery.GetPage(1, default)).Item1;
- Approver.Verify(queues);
- }
-
- sealed class TestableRabbitMQQuery(
- TimeProvider timeProvider,
- TransportSettings transportSettings,
- HttpClient customHttpClient)
- : RabbitMQQuery(NullLogger.Instance, timeProvider, transportSettings)
- {
- protected override HttpClient CreateHttpClient(NetworkCredential defaultCredential, string apiUrl) => customHttpClient;
- }
-
- sealed class FakeHttpHandler : HttpClientHandler
- {
- public Func SendCallback { get; set; }
-
- protected override HttpResponseMessage Send(HttpRequestMessage request, CancellationToken cancellationToken) => SendCallback(request);
-
- protected override Task SendAsync(HttpRequestMessage request, CancellationToken cancellationToken) => Task.FromResult(SendCallback(request));
- }
-}
\ No newline at end of file
diff --git a/src/ServiceControl.Transports.RabbitMQQuorumDirectRouting.Tests/ServiceControl.Transports.RabbitMQQuorumDirectRouting.Tests.csproj b/src/ServiceControl.Transports.RabbitMQQuorumDirectRouting.Tests/ServiceControl.Transports.RabbitMQQuorumDirectRouting.Tests.csproj
index 6cb4aacd7d..32f68e736f 100644
--- a/src/ServiceControl.Transports.RabbitMQQuorumDirectRouting.Tests/ServiceControl.Transports.RabbitMQQuorumDirectRouting.Tests.csproj
+++ b/src/ServiceControl.Transports.RabbitMQQuorumDirectRouting.Tests/ServiceControl.Transports.RabbitMQQuorumDirectRouting.Tests.csproj
@@ -20,7 +20,6 @@
-
diff --git a/src/ServiceControl.Transports.Tests/QueueLengthMonitoringTests.cs b/src/ServiceControl.Transports.Tests/QueueLengthMonitoringTests.cs
index 5504962847..c121bcae6d 100644
--- a/src/ServiceControl.Transports.Tests/QueueLengthMonitoringTests.cs
+++ b/src/ServiceControl.Transports.Tests/QueueLengthMonitoringTests.cs
@@ -10,7 +10,7 @@ class QueueLengthMonitoringTests : TransportTestFixture
[Test]
public async Task Should_report_queue_length()
{
- var queueName = GetTestQueueName("queuelenght");
+ var queueName = GetTestQueueName("queuelength");
await CreateTestQueue(queueName);
diff --git a/src/ServiceControl.Transports.Tests/TransportTestFixture.cs b/src/ServiceControl.Transports.Tests/TransportTestFixture.cs
index c6eefe193b..0f6d9027e1 100644
--- a/src/ServiceControl.Transports.Tests/TransportTestFixture.cs
+++ b/src/ServiceControl.Transports.Tests/TransportTestFixture.cs
@@ -8,6 +8,7 @@
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
+ using NServiceBus;
using NServiceBus.Logging;
using NServiceBus.Transport;
using NUnit.Framework;
@@ -87,21 +88,24 @@ protected async Task StartQueueLengthProvider(string queueName
// currently working around by creating a service collection per start call and then disposing the provider
// as part of the method scope. This could lead to potential problems later once we add disposable resources
// but this code probably requires a major overhaul anyway.
- var serviceCollection = new ServiceCollection();
+
var transportSettings = new TransportSettings
{
ConnectionString = configuration.ConnectionString,
EndpointName = queueName,
MaxConcurrency = 1
};
+
+ var serviceCollection = new ServiceCollection();
+
configuration.TransportCustomization.AddTransportForMonitoring(serviceCollection, transportSettings);
- serviceCollection.AddSingleton>((qlt, _) =>
- onQueueLengthReported(qlt.First()));
+ configuration.TransportCustomization.CustomizeMonitoringEndpoint(new EndpointConfiguration("queueName"), transportSettings);
+
+ serviceCollection.AddSingleton>((qlt, _) => onQueueLengthReported(qlt.First()));
var serviceProvider = serviceCollection.BuildServiceProvider();
- queueLengthProvider = serviceProvider.GetRequiredService();
+ queueLengthProvider = serviceProvider.GetRequiredService();
await queueLengthProvider.StartAsync(CancellationToken.None);
-
queueLengthProvider.TrackEndpointInputQueue(new EndpointToQueueMapping(queueName, queueName));
return new QueueLengthProviderScope(serviceProvider);
diff --git a/src/ServiceControl.Transports/BrokerThroughput/IBrokerQueue.cs b/src/ServiceControl.Transports/BrokerThroughput/IBrokerQueue.cs
index 01fb7cb5b5..6cd791267e 100644
--- a/src/ServiceControl.Transports/BrokerThroughput/IBrokerQueue.cs
+++ b/src/ServiceControl.Transports/BrokerThroughput/IBrokerQueue.cs
@@ -7,8 +7,11 @@ namespace ServiceControl.Transports.BrokerThroughput;
public interface IBrokerQueue
#pragma warning restore CA1711
{
- public string QueueName { get; }
- public string SanitizedName { get; }
- public string? Scope { get; }
- public List EndpointIndicators { get; }
+ string QueueName { get; }
+
+ string SanitizedName { get; }
+
+ string? Scope { get; }
+
+ List EndpointIndicators { get; }
}
\ No newline at end of file
diff --git a/src/ServiceControl.Transports/ServiceControl.Transports.csproj b/src/ServiceControl.Transports/ServiceControl.Transports.csproj
index 6bd8eeea61..435491c160 100644
--- a/src/ServiceControl.Transports/ServiceControl.Transports.csproj
+++ b/src/ServiceControl.Transports/ServiceControl.Transports.csproj
@@ -2,6 +2,8 @@
net8.0
+ true
+ ..\NServiceBus.snk
diff --git a/src/ServiceControlInstaller.Engine/Configuration/ServiceControl/ServiceControlAppConfig.cs b/src/ServiceControlInstaller.Engine/Configuration/ServiceControl/ServiceControlAppConfig.cs
index c00b1d8ee5..c3c79743bf 100644
--- a/src/ServiceControlInstaller.Engine/Configuration/ServiceControl/ServiceControlAppConfig.cs
+++ b/src/ServiceControlInstaller.Engine/Configuration/ServiceControl/ServiceControlAppConfig.cs
@@ -1,7 +1,11 @@
namespace ServiceControlInstaller.Engine.Configuration.ServiceControl
{
+ using System;
+ using System.Configuration;
+ using System.Data.Common;
using System.IO;
using Instances;
+ using NuGet.Versioning;
public class ServiceControlAppConfig : AppConfig
{
@@ -12,9 +16,12 @@ public ServiceControlAppConfig(IServiceControlInstance instance) : base(Path.Com
protected override void UpdateSettings()
{
+ UpdateConnectionString();
Config.ConnectionStrings.ConnectionStrings.Set("NServiceBus/Transport", details.ConnectionString);
+
var settings = Config.AppSettings.Settings;
var version = details.Version;
+
settings.Set(ServiceControlSettings.InstanceName, details.InstanceName, version);
settings.Set(ServiceControlSettings.VirtualDirectory, details.VirtualDirectory);
settings.Set(ServiceControlSettings.Port, details.Port.ToString());
@@ -43,6 +50,9 @@ protected override void UpdateSettings()
settings.RemoveIfRetired(ServiceControlSettings.AuditLogQueue, version);
settings.RemoveIfRetired(ServiceControlSettings.ForwardAuditMessages, version);
settings.RemoveIfRetired(ServiceControlSettings.InternalQueueName, version);
+ settings.RemoveIfRetired(ServiceControlSettings.LicensingComponentRabbitMqManagementApiUrl, version);
+ settings.RemoveIfRetired(ServiceControlSettings.LicensingComponentRabbitMqManagementApiUsername, version);
+ settings.RemoveIfRetired(ServiceControlSettings.LicensingComponentRabbitMqManagementApiPassword, version);
RemoveRavenDB35Settings(settings, version);
}
@@ -68,6 +78,44 @@ public override void SetTransportType(string transportTypeName)
settings.Set(ServiceControlSettings.TransportType, transportTypeName, version);
}
- IServiceControlInstance details;
+ void UpdateConnectionString()
+ {
+ if (details.TransportPackage.Name.Contains("rabbitmq", StringComparison.OrdinalIgnoreCase))
+ {
+ MigrateLicensingComponentRabbitMqManagementApiSettings();
+ }
+ }
+
+ void MigrateLicensingComponentRabbitMqManagementApiSettings()
+ {
+ if (details.ConnectionString.StartsWith("amqp", StringComparison.OrdinalIgnoreCase))
+ {
+ return;
+ }
+
+ var shouldMigrate = VersionComparer.Version.Compare(details.Version, new SemanticVersion(6, 5, 0)) >= 0;
+
+ if (shouldMigrate)
+ {
+ var connectionStringBuilder = new DbConnectionStringBuilder { ConnectionString = details.ConnectionString };
+ var settings = Config.AppSettings.Settings;
+
+ MigrateSetting(connectionStringBuilder, settings[ServiceControlSettings.LicensingComponentRabbitMqManagementApiUrl.Name], "ManagementApiUrl");
+ MigrateSetting(connectionStringBuilder, settings[ServiceControlSettings.LicensingComponentRabbitMqManagementApiUsername.Name], "ManagementApiUserName");
+ MigrateSetting(connectionStringBuilder, settings[ServiceControlSettings.LicensingComponentRabbitMqManagementApiPassword.Name], "ManagementApiPassword");
+
+ details.ConnectionString = connectionStringBuilder.ConnectionString;
+ }
+
+ static void MigrateSetting(DbConnectionStringBuilder connectionStringBuilder, KeyValueConfigurationElement setting, string connectionStringSettingName)
+ {
+ if (setting is not null && !connectionStringBuilder.ContainsKey(connectionStringSettingName))
+ {
+ connectionStringBuilder.Add(connectionStringSettingName, setting.Value);
+ }
+ }
+ }
+
+ readonly IServiceControlInstance details;
}
}
\ No newline at end of file
diff --git a/src/ServiceControlInstaller.Engine/Configuration/ServiceControl/SettingsList.cs b/src/ServiceControlInstaller.Engine/Configuration/ServiceControl/SettingsList.cs
index f982b8bc34..9dc2166000 100644
--- a/src/ServiceControlInstaller.Engine/Configuration/ServiceControl/SettingsList.cs
+++ b/src/ServiceControlInstaller.Engine/Configuration/ServiceControl/SettingsList.cs
@@ -95,5 +95,23 @@ public static class ServiceControlSettings
Name = "ServiceControl/ShutdownTimeout",
SupportedFrom = new SemanticVersion(6, 4, 1)
};
+
+ public static readonly SettingInfo LicensingComponentRabbitMqManagementApiUrl = new()
+ {
+ Name = "LicensingComponent/RabbitMQ/ApiUrl",
+ RemovedFrom = new SemanticVersion(6, 5, 0)
+ };
+
+ public static readonly SettingInfo LicensingComponentRabbitMqManagementApiUsername = new()
+ {
+ Name = "LicensingComponent/RabbitMQ/UserName",
+ RemovedFrom = new SemanticVersion(6, 5, 0)
+ };
+
+ public static readonly SettingInfo LicensingComponentRabbitMqManagementApiPassword = new()
+ {
+ Name = "LicensingComponent/RabbitMQ/Password",
+ RemovedFrom = new SemanticVersion(6, 5, 0)
+ };
}
-}
\ No newline at end of file
+}
diff --git a/src/ServiceControlInstaller.Engine/Interfaces.cs b/src/ServiceControlInstaller.Engine/Interfaces.cs
index 54a1c8728f..914be2cc43 100644
--- a/src/ServiceControlInstaller.Engine/Interfaces.cs
+++ b/src/ServiceControlInstaller.Engine/Interfaces.cs
@@ -16,7 +16,7 @@ public interface ILogging
public interface ITransportConfig
{
TransportInfo TransportPackage { get; }
- string ConnectionString { get; }
+ string ConnectionString { get; set; }
}
public interface IPersistenceConfig
diff --git a/src/ServiceControlInstaller.Engine/Validation/AbstractCommandChecks.cs b/src/ServiceControlInstaller.Engine/Validation/AbstractCommandChecks.cs
index 79bb2edfa7..8400dec2b2 100644
--- a/src/ServiceControlInstaller.Engine/Validation/AbstractCommandChecks.cs
+++ b/src/ServiceControlInstaller.Engine/Validation/AbstractCommandChecks.cs
@@ -4,6 +4,7 @@
using System.Linq;
using System.ServiceProcess;
using System.Threading.Tasks;
+ using NuGet.Versioning;
using ServiceControl.LicenseManagement;
using ServiceControlInstaller.Engine.Configuration.ServiceControl;
using ServiceControlInstaller.Engine.Instances;
@@ -46,7 +47,7 @@ public async Task ValidateNewInstance(params IServiceInstance[] instances)
.Select(i => i.TransportPackage)
.First(t => t is not null);
- var continueInstall = await RabbitMqCheckIsOK(transport, false).ConfigureAwait(false);
+ var continueInstall = await RabbitMqCheckIsOK(transport, Constants.CurrentVersion, false).ConfigureAwait(false);
return continueInstall;
}
@@ -81,12 +82,9 @@ async Task CanEditOrDelete(BaseService instance, bool isDelete)
return true;
}
- async Task RabbitMqCheckIsOK(TransportInfo transport, bool isUpgrade)
+ async Task RabbitMqCheckIsOK(TransportInfo transport, SemanticVersion instanceVersion, bool isUpgrade)
{
- if (transport is null)
- {
- throw new ArgumentNullException(nameof(transport));
- }
+ ArgumentNullException.ThrowIfNull(transport);
if (transport.ZipName != "RabbitMQ")
{
@@ -94,9 +92,9 @@ async Task RabbitMqCheckIsOK(TransportInfo transport, bool isUpgrade)
return true;
}
- // Only way we DON'T need to warn is if we're updating an instance that's already on a "new" (AvailableInSCMU) Rabbit transport
- var needToWarn = !(isUpgrade && transport.AvailableInSCMU);
- if (!needToWarn)
+ var newerThan650 = VersionComparer.Version.Compare(instanceVersion, new SemanticVersion(6, 5, 0)) > 0;
+
+ if (isUpgrade && newerThan650)
{
return true;
}
@@ -166,7 +164,7 @@ public async Task CanUpgradeInstance(BaseService instance, bool forceUpgra
}
}
- if (!await RabbitMqCheckIsOK(instance.TransportPackage, isUpgrade: true).ConfigureAwait(false))
+ if (!await RabbitMqCheckIsOK(instance.TransportPackage, instance.Version, isUpgrade: true).ConfigureAwait(false))
{
return false;
}
diff --git a/src/TestHelper/TestHelper.csproj b/src/TestHelper/TestHelper.csproj
index a310aef090..077bb6b110 100644
--- a/src/TestHelper/TestHelper.csproj
+++ b/src/TestHelper/TestHelper.csproj
@@ -14,5 +14,5 @@
-
+