diff --git a/src/ServiceControl.Infrastructure/RepeatedFailuresOverTimeCircuitBreaker.cs b/src/ServiceControl.Infrastructure/RepeatedFailuresOverTimeCircuitBreaker.cs
new file mode 100644
index 0000000000..abd6c9a354
--- /dev/null
+++ b/src/ServiceControl.Infrastructure/RepeatedFailuresOverTimeCircuitBreaker.cs
@@ -0,0 +1,208 @@
+#nullable enable
+
+namespace NServiceBus;
+
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+using Logging;
+
+///
+/// A circuit breaker that is armed on a failure and disarmed on success. After in the
+/// armed state, the will fire. The and allow
+/// changing other state when the circuit breaker is armed or disarmed.
+///
+public sealed class RepeatedFailuresOverTimeCircuitBreaker
+{
+ ///
+ /// A circuit breaker that is armed on a failure and disarmed on success. After in the
+ /// armed state, the will fire. The and allow
+ /// changing other state when the circuit breaker is armed or disarmed.
+ ///
+ /// A name that is output in log messages when the circuit breaker changes states.
+ /// The time to wait after the first failure before triggering.
+ /// The action to take when the circuit breaker is triggered.
+ /// The action to execute on the first failure.
+ /// Warning: This action is also invoked from within a lock. Any long-running, blocking, or I/O-bound code should be avoided
+ /// within this action, as it can prevent other threads from proceeding, potentially leading to contention or performance bottlenecks.
+ ///
+ /// The action to execute when a success disarms the circuit breaker.
+ /// Warning: This action is also invoked from within a lock. Any long-running, blocking, or I/O-bound code should be avoided
+ /// within this action, as it can prevent other threads from proceeding, potentially leading to contention or performance bottlenecks.
+ ///
+ /// How long to delay on each failure when in the Triggered state. Defaults to 10 seconds.
+ /// How long to delay on each failure when in the Armed state. Defaults to 1 second.
+ ///
+ /// The and are invoked from within a lock to ensure that arming and disarming
+ /// actions are serialized and do not execute concurrently. As a result, care must be taken to ensure that these actions do not
+ /// introduce delays or deadlocks by performing lengthy operations or synchronously waiting on external resources.
+ ///
+ /// Best practice: If the logic inside these actions involves blocking or long-running tasks, consider offloading
+ /// the work to a background task or thread that doesn't hold the lock.
+ ///
+ public RepeatedFailuresOverTimeCircuitBreaker(
+ string name,
+ TimeSpan timeToWaitBeforeTriggering,
+ Action triggerAction,
+ Action? armedAction = null,
+ Action? disarmedAction = null,
+ TimeSpan? timeToWaitWhenTriggered = default,
+ TimeSpan? timeToWaitWhenArmed = default)
+ {
+ this.name = name;
+ this.triggerAction = triggerAction;
+ this.armedAction = armedAction ?? (static () => { });
+ this.disarmedAction = disarmedAction ?? (static () => { });
+ this.timeToWaitBeforeTriggering = timeToWaitBeforeTriggering;
+ this.timeToWaitWhenTriggered = timeToWaitWhenTriggered ?? TimeSpan.FromSeconds(10);
+ this.timeToWaitWhenArmed = timeToWaitWhenArmed ?? TimeSpan.FromSeconds(1);
+
+ timer = new Timer(CircuitBreakerTriggered);
+ }
+
+ ///
+ /// Log a success, disarming the circuit breaker if it was previously armed.
+ ///
+ public void Success()
+ {
+ // Check the status of the circuit breaker, exiting early outside the lock if already disarmed
+ if (Volatile.Read(ref circuitBreakerState) == Disarmed)
+ {
+ return;
+ }
+
+ lock (stateLock)
+ {
+ // Recheck state after obtaining the lock
+ if (circuitBreakerState == Disarmed)
+ {
+ return;
+ }
+
+ circuitBreakerState = Disarmed;
+
+ _ = timer.Change(Timeout.Infinite, Timeout.Infinite);
+ Logger.InfoFormat("The circuit breaker for '{0}' is now disarmed.", name);
+ try
+ {
+ disarmedAction();
+ }
+ catch (Exception ex)
+ {
+ Logger.Error($"The circuit breaker for '{name}' was unable to execute the disarm action.", ex);
+ throw;
+ }
+ }
+ }
+
+ ///
+ /// Log a failure, arming the circuit breaker if it was previously disarmed.
+ ///
+ /// The exception that caused the failure.
+ /// A cancellation token.
+ public Task Failure(Exception exception, CancellationToken cancellationToken = default)
+ {
+ // Atomically store the exception that caused the circuit breaker to trip
+ _ = Interlocked.Exchange(ref lastException, exception);
+
+ var previousState = Volatile.Read(ref circuitBreakerState);
+ if (previousState is Armed or Triggered)
+ {
+ return Delay();
+ }
+
+ lock (stateLock)
+ {
+ // Recheck state after obtaining the lock
+ previousState = circuitBreakerState;
+ if (previousState is Armed or Triggered)
+ {
+ return Delay();
+ }
+
+ circuitBreakerState = Armed;
+
+ try
+ {
+ // Executing the action first before starting the timer to ensure that the action is executed before the timer fires
+ // and the time of the action is not included in the time to wait before triggering.
+ armedAction();
+ }
+ catch (Exception ex)
+ {
+ Logger.Error($"The circuit breaker for '{name}' was unable to execute the arm action.", new AggregateException(ex, exception));
+ throw;
+ }
+
+ _ = timer.Change(timeToWaitBeforeTriggering, NoPeriodicTriggering);
+ Logger.WarnFormat("The circuit breaker for '{0}' is now in the armed state due to '{1}' and might trigger in '{2}' when not disarmed.", name, exception, timeToWaitBeforeTriggering);
+ }
+
+ return Delay();
+
+ Task Delay()
+ {
+ var timeToWait = previousState == Triggered ? timeToWaitWhenTriggered : timeToWaitWhenArmed;
+ if (Logger.IsDebugEnabled)
+ {
+ Logger.DebugFormat("The circuit breaker for '{0}' is delaying the operation by '{1}'.", name, timeToWait);
+ }
+ return Task.Delay(timeToWait, cancellationToken);
+ }
+ }
+
+ ///
+ /// Disposes the resources associated with the circuit breaker.
+ ///
+ public void Dispose() => timer.Dispose();
+
+ void CircuitBreakerTriggered(object? state)
+ {
+ var previousState = Volatile.Read(ref circuitBreakerState);
+ if (previousState == Disarmed)
+ {
+ return;
+ }
+
+ lock (stateLock)
+ {
+ // Recheck state after obtaining the lock
+ if (circuitBreakerState == Disarmed)
+ {
+ return;
+ }
+
+ circuitBreakerState = Triggered;
+ Logger.WarnFormat("The circuit breaker for '{0}' will now be triggered with exception '{1}'.", name, lastException);
+
+ try
+ {
+ triggerAction(lastException!);
+ }
+ catch (Exception ex)
+ {
+ Logger.Fatal($"The circuit breaker for '{name}' was unable to execute the trigger action.", new AggregateException(ex, lastException!));
+ }
+ }
+ }
+
+ int circuitBreakerState = Disarmed;
+ Exception? lastException;
+
+ readonly string name;
+ readonly Timer timer;
+ readonly TimeSpan timeToWaitBeforeTriggering;
+ readonly Action triggerAction;
+ readonly Action armedAction;
+ readonly Action disarmedAction;
+ readonly TimeSpan timeToWaitWhenTriggered;
+ readonly TimeSpan timeToWaitWhenArmed;
+ readonly object stateLock = new();
+
+ const int Disarmed = 0;
+ const int Armed = 1;
+ const int Triggered = 2;
+
+ static readonly TimeSpan NoPeriodicTriggering = TimeSpan.FromMilliseconds(-1);
+ static readonly ILog Logger = LogManager.GetLogger();
+}
\ No newline at end of file
diff --git a/src/ServiceControl.Persistence.RavenDB/ExternalIntegrationRequestsDataStore.cs b/src/ServiceControl.Persistence.RavenDB/ExternalIntegrationRequestsDataStore.cs
index ba8f8ec646..27762fc546 100644
--- a/src/ServiceControl.Persistence.RavenDB/ExternalIntegrationRequestsDataStore.cs
+++ b/src/ServiceControl.Persistence.RavenDB/ExternalIntegrationRequestsDataStore.cs
@@ -33,7 +33,7 @@ public ExternalIntegrationRequestsDataStore(RavenPersisterSettings settings, IRa
"EventDispatcher",
timeToWait,
ex => criticalError.Raise("Repeated failures when dispatching external integration events.", ex),
- delayAfterFailure
+ timeToWaitWhenArmed: delayAfterFailure
);
}
@@ -98,7 +98,7 @@ async Task StartDispatcherTask(CancellationToken cancellationToken)
catch (Exception ex)
{
Logger.Error("An exception occurred when dispatching external integration events", ex);
- await circuitBreaker.Failure(ex);
+ await circuitBreaker.Failure(ex, cancellationToken);
if (!tokenSource.IsCancellationRequested)
{
@@ -192,7 +192,6 @@ public async ValueTask DisposeAsync()
}
tokenSource?.Dispose();
- circuitBreaker?.Dispose();
}
readonly RavenPersisterSettings settings;
diff --git a/src/ServiceControl.Persistence.RavenDB/RepeatedFailuresOverTimeCircuitBreaker.cs b/src/ServiceControl.Persistence.RavenDB/RepeatedFailuresOverTimeCircuitBreaker.cs
deleted file mode 100644
index 288bcebf01..0000000000
--- a/src/ServiceControl.Persistence.RavenDB/RepeatedFailuresOverTimeCircuitBreaker.cs
+++ /dev/null
@@ -1,76 +0,0 @@
-namespace NServiceBus
-{
- using System;
- using System.Threading;
- using System.Threading.Tasks;
- using Logging;
-
- class RepeatedFailuresOverTimeCircuitBreaker : IDisposable
- {
- public RepeatedFailuresOverTimeCircuitBreaker(string name, TimeSpan timeToWaitBeforeTriggering, Action triggerAction, TimeSpan delayAfterFailure)
- {
- this.delayAfterFailure = delayAfterFailure;
- this.name = name;
- this.triggerAction = triggerAction;
- this.timeToWaitBeforeTriggering = timeToWaitBeforeTriggering;
-
- Logger.DebugFormat("RepeatedFailuresOverTime circuit breaker {0} that will trigger after {1} ", name, timeToWaitBeforeTriggering, delayAfterFailure);
- timer = new Timer(CircuitBreakerTriggered);
- }
-
- public void Dispose()
- {
- timer.Dispose();
- GC.SuppressFinalize(this);
- }
-
- public void Success()
- {
- var oldValue = Interlocked.Exchange(ref failureCount, 0);
-
- if (oldValue == 0)
- {
- return;
- }
-
- timer.Change(System.Threading.Timeout.Infinite, System.Threading.Timeout.Infinite);
- Logger.InfoFormat("The circuit breaker for {0} is now disarmed", name);
- }
-
- public Task Failure(Exception exception)
- {
- lastException = exception;
- var newValue = Interlocked.Increment(ref failureCount);
-
- if (newValue == 1)
- {
- timer.Change(timeToWaitBeforeTriggering, NoPeriodicTriggering);
- Logger.WarnFormat("The circuit breaker for {0} is now in the armed state", name);
- }
-
- return Task.Delay(delayAfterFailure);
- }
-
- void CircuitBreakerTriggered(object state)
- {
- if (Interlocked.Read(ref failureCount) > 0)
- {
- Logger.WarnFormat("The circuit breaker for {0} will now be triggered", name);
- triggerAction(lastException);
- }
- }
-
- readonly TimeSpan delayAfterFailure;
-
- long failureCount;
- Exception lastException;
-
- readonly string name;
- readonly Timer timer;
- readonly TimeSpan timeToWaitBeforeTriggering;
- readonly Action triggerAction;
-
- static readonly TimeSpan NoPeriodicTriggering = TimeSpan.FromMilliseconds(-1);
- static readonly ILog Logger = LogManager.GetLogger();
- }
-}
\ No newline at end of file
diff --git a/src/ServiceControl.UnitTests/RepeatedFailuresOverTimeCircuitBreakerTests.cs b/src/ServiceControl.UnitTests/RepeatedFailuresOverTimeCircuitBreakerTests.cs
new file mode 100644
index 0000000000..67e71198cc
--- /dev/null
+++ b/src/ServiceControl.UnitTests/RepeatedFailuresOverTimeCircuitBreakerTests.cs
@@ -0,0 +1,222 @@
+namespace NServiceBus
+{
+ using System;
+ using System.Diagnostics;
+ using System.Linq;
+ using System.Threading;
+ using System.Threading.Tasks;
+ using NUnit.Framework;
+
+ // Ideally the circuit breaker would use a time provider to allow for easier testing but that would require a significant refactor
+ // and we want keep the changes to a minimum for now to allow backporting to older versions.
+ [TestFixture]
+ public class RepeatedFailuresOverTimeCircuitBreakerTests
+ {
+ [Test]
+ public async Task Should_disarm_on_success()
+ {
+ var armedActionCalled = false;
+ var disarmedActionCalled = false;
+
+ var circuitBreaker = new RepeatedFailuresOverTimeCircuitBreaker(
+ "TestCircuitBreaker",
+ TimeSpan.FromMilliseconds(100),
+ ex => { },
+ () => armedActionCalled = true,
+ () => disarmedActionCalled = true,
+ TimeSpan.Zero,
+ TimeSpan.Zero
+ );
+
+ await circuitBreaker.Failure(new Exception("Test Exception"));
+ circuitBreaker.Success();
+
+ Assert.That(armedActionCalled, Is.True, "The armed action should be called.");
+ Assert.That(disarmedActionCalled, Is.True, "The disarmed action should be called.");
+ }
+
+ [Test]
+ public async Task Should_rethrow_exception_on_success()
+ {
+ var circuitBreaker = new RepeatedFailuresOverTimeCircuitBreaker(
+ "TestCircuitBreaker",
+ TimeSpan.FromMilliseconds(100),
+ ex => { },
+ () => { },
+ () => throw new Exception("Exception from disarmed action"),
+ timeToWaitWhenTriggered: TimeSpan.Zero,
+ timeToWaitWhenArmed: TimeSpan.Zero
+ );
+
+ await circuitBreaker.Failure(new Exception("Test Exception"));
+
+ var ex = Assert.Throws(() => circuitBreaker.Success());
+ Assert.That(ex.Message, Is.EqualTo("Exception from disarmed action"));
+ }
+
+ [Test]
+ public async Task Should_trigger_after_failure_timeout()
+ {
+ var triggerActionCalled = false;
+ Exception lastTriggerException = null;
+
+ var circuitBreaker = new RepeatedFailuresOverTimeCircuitBreaker(
+ "TestCircuitBreaker",
+ TimeSpan.Zero,
+ ex => { triggerActionCalled = true; lastTriggerException = ex; },
+ timeToWaitWhenTriggered: TimeSpan.Zero,
+ timeToWaitWhenArmed: TimeSpan.FromMilliseconds(100)
+ );
+
+ await circuitBreaker.Failure(new Exception("Test Exception"));
+
+ Assert.That(triggerActionCalled, Is.True, "The trigger action should be called after timeout.");
+ Assert.That(lastTriggerException, Is.Not.Null, "The exception passed to the trigger action should not be null.");
+ }
+
+ [Test]
+ public void Should_rethrow_exception_on_failure()
+ {
+ var circuitBreaker = new RepeatedFailuresOverTimeCircuitBreaker(
+ "TestCircuitBreaker",
+ TimeSpan.FromMilliseconds(100),
+ ex => { },
+ () => throw new Exception("Exception from armed action"),
+ () => { },
+ timeToWaitWhenTriggered: TimeSpan.Zero,
+ timeToWaitWhenArmed: TimeSpan.Zero
+ );
+
+ var ex = Assert.ThrowsAsync(async () => await circuitBreaker.Failure(new Exception("Test Exception")));
+ Assert.That(ex.Message, Is.EqualTo("Exception from armed action"));
+ }
+
+ [Test]
+ public async Task Should_delay_after_trigger_failure()
+ {
+ var timeToWaitWhenTriggered = TimeSpan.FromMilliseconds(50);
+ var timeToWaitWhenArmed = TimeSpan.FromMilliseconds(100);
+
+ var circuitBreaker = new RepeatedFailuresOverTimeCircuitBreaker(
+ "TestCircuitBreaker",
+ TimeSpan.Zero,
+ _ => { },
+ timeToWaitWhenTriggered: timeToWaitWhenTriggered,
+ timeToWaitWhenArmed: timeToWaitWhenArmed
+ );
+
+ var stopWatch = Stopwatch.StartNew();
+
+ await circuitBreaker.Failure(new Exception("Test Exception"));
+ await circuitBreaker.Failure(new Exception("Test Exception After Trigger"));
+
+ stopWatch.Stop();
+
+ Assert.That(stopWatch.ElapsedMilliseconds, Is.GreaterThanOrEqualTo(timeToWaitWhenTriggered.Add(timeToWaitWhenArmed).TotalMilliseconds).Within(20), "The circuit breaker should delay after a triggered failure.");
+ }
+
+ [Test]
+ public async Task Should_not_trigger_if_disarmed_before_timeout()
+ {
+ var triggerActionCalled = false;
+
+ var circuitBreaker = new RepeatedFailuresOverTimeCircuitBreaker(
+ "TestCircuitBreaker",
+ TimeSpan.FromMilliseconds(100),
+ ex => triggerActionCalled = true,
+ timeToWaitWhenTriggered: TimeSpan.Zero,
+ timeToWaitWhenArmed: TimeSpan.Zero
+ );
+
+ await circuitBreaker.Failure(new Exception("Test Exception"));
+ circuitBreaker.Success();
+
+ Assert.That(triggerActionCalled, Is.False, "The trigger action should not be called if the circuit breaker was disarmed.");
+ }
+
+ [Test]
+ public async Task Should_handle_concurrent_failure_and_success()
+ {
+ var armedActionCalled = false;
+ var disarmedActionCalled = false;
+ var triggerActionCalled = false;
+
+ var circuitBreaker = new RepeatedFailuresOverTimeCircuitBreaker(
+ "TestCircuitBreaker",
+ TimeSpan.FromMilliseconds(100),
+ ex => triggerActionCalled = true,
+ () => armedActionCalled = true,
+ () => disarmedActionCalled = true,
+ TimeSpan.Zero,
+ TimeSpan.Zero
+ );
+
+ var failureTask = circuitBreaker.Failure(new Exception("Test Exception"));
+ var successTask = Task.Run(() =>
+ {
+ Thread.Sleep(50); // Simulate some delay before success
+ circuitBreaker.Success();
+ });
+
+ await Task.WhenAll(failureTask, successTask);
+
+ Assert.That(armedActionCalled, Is.True, "The armed action should be called.");
+ Assert.That(disarmedActionCalled, Is.True, "The disarmed action should be called.");
+ Assert.That(triggerActionCalled, Is.False, "The trigger action should not be called if success occurred before timeout.");
+ }
+
+ [Test]
+ public async Task Should_handle_high_concurrent_failure_and_success()
+ {
+ var armedActionCalled = 0;
+ var disarmedActionCalled = 0;
+ var triggerActionCalled = 0;
+
+ var circuitBreaker = new RepeatedFailuresOverTimeCircuitBreaker(
+ "TestCircuitBreaker",
+ TimeSpan.FromSeconds(5),
+ ex => Interlocked.Increment(ref triggerActionCalled),
+ () => Interlocked.Increment(ref armedActionCalled),
+ () => Interlocked.Increment(ref disarmedActionCalled),
+ TimeSpan.Zero,
+ TimeSpan.FromMilliseconds(25)
+ );
+
+ var tasks = Enumerable.Range(0, 1000)
+ .Select(
+ i => i % 2 == 0 ?
+ circuitBreaker.Failure(new Exception($"Test Exception {i}")) :
+ Task.Run(() =>
+ {
+ Thread.Sleep(25); // Simulate some delay before success
+ circuitBreaker.Success();
+ })
+ ).ToArray();
+
+ await Task.WhenAll(tasks);
+
+ Assert.That(armedActionCalled, Is.EqualTo(1), "The armed action should be called.");
+ Assert.That(disarmedActionCalled, Is.EqualTo(1), "The disarmed action should be called.");
+ Assert.That(triggerActionCalled, Is.Zero, "The trigger action should not be called if success occurred before timeout.");
+ }
+
+ [Test]
+ public async Task Should_trigger_after_multiple_failures_and_timeout()
+ {
+ var triggerActionCalled = false;
+
+ var circuitBreaker = new RepeatedFailuresOverTimeCircuitBreaker(
+ "TestCircuitBreaker",
+ TimeSpan.FromMilliseconds(50),
+ ex => triggerActionCalled = true,
+ timeToWaitWhenTriggered: TimeSpan.FromMilliseconds(50),
+ timeToWaitWhenArmed: TimeSpan.FromMilliseconds(50)
+ );
+
+ await circuitBreaker.Failure(new Exception("Test Exception"));
+ await circuitBreaker.Failure(new Exception("Another Exception After Trigger"));
+
+ Assert.That(triggerActionCalled, Is.True, "The trigger action should be called after repeated failures and timeout.");
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/ServiceControl/ExternalIntegrations/RepeatedFailuresOverTimeCircuitBreaker.cs b/src/ServiceControl/ExternalIntegrations/RepeatedFailuresOverTimeCircuitBreaker.cs
deleted file mode 100644
index ad14c5c97e..0000000000
--- a/src/ServiceControl/ExternalIntegrations/RepeatedFailuresOverTimeCircuitBreaker.cs
+++ /dev/null
@@ -1,75 +0,0 @@
-namespace NServiceBus
-{
- using System;
- using System.Threading;
- using System.Threading.Tasks;
- using Logging;
-
- class RepeatedFailuresOverTimeCircuitBreaker : IDisposable
- {
- public RepeatedFailuresOverTimeCircuitBreaker(string name, TimeSpan timeToWaitBeforeTriggering, Action triggerAction, TimeSpan delayAfterFailure)
- {
- this.delayAfterFailure = delayAfterFailure;
- this.name = name;
- this.triggerAction = triggerAction;
- this.timeToWaitBeforeTriggering = timeToWaitBeforeTriggering;
-
- timer = new Timer(CircuitBreakerTriggered);
- }
-
- public void Dispose()
- {
- timer.Dispose();
- GC.SuppressFinalize(this);
- }
-
- public void Success()
- {
- var oldValue = Interlocked.Exchange(ref failureCount, 0);
-
- if (oldValue == 0)
- {
- return;
- }
-
- timer.Change(System.Threading.Timeout.Infinite, System.Threading.Timeout.Infinite);
- Logger.InfoFormat("The circuit breaker for {0} is now disarmed", name);
- }
-
- public Task Failure(Exception exception)
- {
- lastException = exception;
- var newValue = Interlocked.Increment(ref failureCount);
-
- if (newValue == 1)
- {
- timer.Change(timeToWaitBeforeTriggering, NoPeriodicTriggering);
- Logger.WarnFormat("The circuit breaker for {0} is now in the armed state", name);
- }
-
- return Task.Delay(delayAfterFailure);
- }
-
- void CircuitBreakerTriggered(object state)
- {
- if (Interlocked.Read(ref failureCount) > 0)
- {
- Logger.WarnFormat("The circuit breaker for {0} will now be triggered", name);
- triggerAction(lastException);
- }
- }
-
- readonly TimeSpan delayAfterFailure;
-
- long failureCount;
- Exception lastException;
-
- readonly string name;
- readonly Timer timer;
- readonly TimeSpan timeToWaitBeforeTriggering;
- readonly Action triggerAction;
-
- static readonly TimeSpan NoPeriodicTriggering = TimeSpan.FromMilliseconds(-1);
- static readonly ILog Logger = LogManager.GetLogger();
- }
-}
\ No newline at end of file