From b15bca83abf3154bdd4e15947379a7537abcb17f Mon Sep 17 00:00:00 2001 From: WilliamBZA Date: Thu, 12 Jun 2025 11:17:06 +0200 Subject: [PATCH] Fix race condition in RepeatedFailuresOverTimeCircuitBreaker --- .../RepeatedFailuresOverTimeCircuitBreaker.cs | 208 ++++++++++++++++ .../ExternalIntegrationRequestsDataStore.cs | 5 +- .../RepeatedFailuresOverTimeCircuitBreaker.cs | 76 ------ ...atedFailuresOverTimeCircuitBreakerTests.cs | 222 ++++++++++++++++++ .../RepeatedFailuresOverTimeCircuitBreaker.cs | 75 ------ 5 files changed, 432 insertions(+), 154 deletions(-) create mode 100644 src/ServiceControl.Infrastructure/RepeatedFailuresOverTimeCircuitBreaker.cs delete mode 100644 src/ServiceControl.Persistence.RavenDB/RepeatedFailuresOverTimeCircuitBreaker.cs create mode 100644 src/ServiceControl.UnitTests/RepeatedFailuresOverTimeCircuitBreakerTests.cs delete mode 100644 src/ServiceControl/ExternalIntegrations/RepeatedFailuresOverTimeCircuitBreaker.cs diff --git a/src/ServiceControl.Infrastructure/RepeatedFailuresOverTimeCircuitBreaker.cs b/src/ServiceControl.Infrastructure/RepeatedFailuresOverTimeCircuitBreaker.cs new file mode 100644 index 0000000000..abd6c9a354 --- /dev/null +++ b/src/ServiceControl.Infrastructure/RepeatedFailuresOverTimeCircuitBreaker.cs @@ -0,0 +1,208 @@ +#nullable enable + +namespace NServiceBus; + +using System; +using System.Threading; +using System.Threading.Tasks; +using Logging; + +/// +/// A circuit breaker that is armed on a failure and disarmed on success. After in the +/// armed state, the will fire. The and allow +/// changing other state when the circuit breaker is armed or disarmed. +/// +public sealed class RepeatedFailuresOverTimeCircuitBreaker +{ + /// + /// A circuit breaker that is armed on a failure and disarmed on success. After in the + /// armed state, the will fire. The and allow + /// changing other state when the circuit breaker is armed or disarmed. + /// + /// A name that is output in log messages when the circuit breaker changes states. + /// The time to wait after the first failure before triggering. + /// The action to take when the circuit breaker is triggered. + /// The action to execute on the first failure. + /// Warning: This action is also invoked from within a lock. Any long-running, blocking, or I/O-bound code should be avoided + /// within this action, as it can prevent other threads from proceeding, potentially leading to contention or performance bottlenecks. + /// + /// The action to execute when a success disarms the circuit breaker. + /// Warning: This action is also invoked from within a lock. Any long-running, blocking, or I/O-bound code should be avoided + /// within this action, as it can prevent other threads from proceeding, potentially leading to contention or performance bottlenecks. + /// + /// How long to delay on each failure when in the Triggered state. Defaults to 10 seconds. + /// How long to delay on each failure when in the Armed state. Defaults to 1 second. + /// + /// The and are invoked from within a lock to ensure that arming and disarming + /// actions are serialized and do not execute concurrently. As a result, care must be taken to ensure that these actions do not + /// introduce delays or deadlocks by performing lengthy operations or synchronously waiting on external resources. + /// + /// Best practice: If the logic inside these actions involves blocking or long-running tasks, consider offloading + /// the work to a background task or thread that doesn't hold the lock. + /// + public RepeatedFailuresOverTimeCircuitBreaker( + string name, + TimeSpan timeToWaitBeforeTriggering, + Action triggerAction, + Action? armedAction = null, + Action? disarmedAction = null, + TimeSpan? timeToWaitWhenTriggered = default, + TimeSpan? timeToWaitWhenArmed = default) + { + this.name = name; + this.triggerAction = triggerAction; + this.armedAction = armedAction ?? (static () => { }); + this.disarmedAction = disarmedAction ?? (static () => { }); + this.timeToWaitBeforeTriggering = timeToWaitBeforeTriggering; + this.timeToWaitWhenTriggered = timeToWaitWhenTriggered ?? TimeSpan.FromSeconds(10); + this.timeToWaitWhenArmed = timeToWaitWhenArmed ?? TimeSpan.FromSeconds(1); + + timer = new Timer(CircuitBreakerTriggered); + } + + /// + /// Log a success, disarming the circuit breaker if it was previously armed. + /// + public void Success() + { + // Check the status of the circuit breaker, exiting early outside the lock if already disarmed + if (Volatile.Read(ref circuitBreakerState) == Disarmed) + { + return; + } + + lock (stateLock) + { + // Recheck state after obtaining the lock + if (circuitBreakerState == Disarmed) + { + return; + } + + circuitBreakerState = Disarmed; + + _ = timer.Change(Timeout.Infinite, Timeout.Infinite); + Logger.InfoFormat("The circuit breaker for '{0}' is now disarmed.", name); + try + { + disarmedAction(); + } + catch (Exception ex) + { + Logger.Error($"The circuit breaker for '{name}' was unable to execute the disarm action.", ex); + throw; + } + } + } + + /// + /// Log a failure, arming the circuit breaker if it was previously disarmed. + /// + /// The exception that caused the failure. + /// A cancellation token. + public Task Failure(Exception exception, CancellationToken cancellationToken = default) + { + // Atomically store the exception that caused the circuit breaker to trip + _ = Interlocked.Exchange(ref lastException, exception); + + var previousState = Volatile.Read(ref circuitBreakerState); + if (previousState is Armed or Triggered) + { + return Delay(); + } + + lock (stateLock) + { + // Recheck state after obtaining the lock + previousState = circuitBreakerState; + if (previousState is Armed or Triggered) + { + return Delay(); + } + + circuitBreakerState = Armed; + + try + { + // Executing the action first before starting the timer to ensure that the action is executed before the timer fires + // and the time of the action is not included in the time to wait before triggering. + armedAction(); + } + catch (Exception ex) + { + Logger.Error($"The circuit breaker for '{name}' was unable to execute the arm action.", new AggregateException(ex, exception)); + throw; + } + + _ = timer.Change(timeToWaitBeforeTriggering, NoPeriodicTriggering); + Logger.WarnFormat("The circuit breaker for '{0}' is now in the armed state due to '{1}' and might trigger in '{2}' when not disarmed.", name, exception, timeToWaitBeforeTriggering); + } + + return Delay(); + + Task Delay() + { + var timeToWait = previousState == Triggered ? timeToWaitWhenTriggered : timeToWaitWhenArmed; + if (Logger.IsDebugEnabled) + { + Logger.DebugFormat("The circuit breaker for '{0}' is delaying the operation by '{1}'.", name, timeToWait); + } + return Task.Delay(timeToWait, cancellationToken); + } + } + + /// + /// Disposes the resources associated with the circuit breaker. + /// + public void Dispose() => timer.Dispose(); + + void CircuitBreakerTriggered(object? state) + { + var previousState = Volatile.Read(ref circuitBreakerState); + if (previousState == Disarmed) + { + return; + } + + lock (stateLock) + { + // Recheck state after obtaining the lock + if (circuitBreakerState == Disarmed) + { + return; + } + + circuitBreakerState = Triggered; + Logger.WarnFormat("The circuit breaker for '{0}' will now be triggered with exception '{1}'.", name, lastException); + + try + { + triggerAction(lastException!); + } + catch (Exception ex) + { + Logger.Fatal($"The circuit breaker for '{name}' was unable to execute the trigger action.", new AggregateException(ex, lastException!)); + } + } + } + + int circuitBreakerState = Disarmed; + Exception? lastException; + + readonly string name; + readonly Timer timer; + readonly TimeSpan timeToWaitBeforeTriggering; + readonly Action triggerAction; + readonly Action armedAction; + readonly Action disarmedAction; + readonly TimeSpan timeToWaitWhenTriggered; + readonly TimeSpan timeToWaitWhenArmed; + readonly object stateLock = new(); + + const int Disarmed = 0; + const int Armed = 1; + const int Triggered = 2; + + static readonly TimeSpan NoPeriodicTriggering = TimeSpan.FromMilliseconds(-1); + static readonly ILog Logger = LogManager.GetLogger(); +} \ No newline at end of file diff --git a/src/ServiceControl.Persistence.RavenDB/ExternalIntegrationRequestsDataStore.cs b/src/ServiceControl.Persistence.RavenDB/ExternalIntegrationRequestsDataStore.cs index ba8f8ec646..27762fc546 100644 --- a/src/ServiceControl.Persistence.RavenDB/ExternalIntegrationRequestsDataStore.cs +++ b/src/ServiceControl.Persistence.RavenDB/ExternalIntegrationRequestsDataStore.cs @@ -33,7 +33,7 @@ public ExternalIntegrationRequestsDataStore(RavenPersisterSettings settings, IRa "EventDispatcher", timeToWait, ex => criticalError.Raise("Repeated failures when dispatching external integration events.", ex), - delayAfterFailure + timeToWaitWhenArmed: delayAfterFailure ); } @@ -98,7 +98,7 @@ async Task StartDispatcherTask(CancellationToken cancellationToken) catch (Exception ex) { Logger.Error("An exception occurred when dispatching external integration events", ex); - await circuitBreaker.Failure(ex); + await circuitBreaker.Failure(ex, cancellationToken); if (!tokenSource.IsCancellationRequested) { @@ -192,7 +192,6 @@ public async ValueTask DisposeAsync() } tokenSource?.Dispose(); - circuitBreaker?.Dispose(); } readonly RavenPersisterSettings settings; diff --git a/src/ServiceControl.Persistence.RavenDB/RepeatedFailuresOverTimeCircuitBreaker.cs b/src/ServiceControl.Persistence.RavenDB/RepeatedFailuresOverTimeCircuitBreaker.cs deleted file mode 100644 index 288bcebf01..0000000000 --- a/src/ServiceControl.Persistence.RavenDB/RepeatedFailuresOverTimeCircuitBreaker.cs +++ /dev/null @@ -1,76 +0,0 @@ -namespace NServiceBus -{ - using System; - using System.Threading; - using System.Threading.Tasks; - using Logging; - - class RepeatedFailuresOverTimeCircuitBreaker : IDisposable - { - public RepeatedFailuresOverTimeCircuitBreaker(string name, TimeSpan timeToWaitBeforeTriggering, Action triggerAction, TimeSpan delayAfterFailure) - { - this.delayAfterFailure = delayAfterFailure; - this.name = name; - this.triggerAction = triggerAction; - this.timeToWaitBeforeTriggering = timeToWaitBeforeTriggering; - - Logger.DebugFormat("RepeatedFailuresOverTime circuit breaker {0} that will trigger after {1} ", name, timeToWaitBeforeTriggering, delayAfterFailure); - timer = new Timer(CircuitBreakerTriggered); - } - - public void Dispose() - { - timer.Dispose(); - GC.SuppressFinalize(this); - } - - public void Success() - { - var oldValue = Interlocked.Exchange(ref failureCount, 0); - - if (oldValue == 0) - { - return; - } - - timer.Change(System.Threading.Timeout.Infinite, System.Threading.Timeout.Infinite); - Logger.InfoFormat("The circuit breaker for {0} is now disarmed", name); - } - - public Task Failure(Exception exception) - { - lastException = exception; - var newValue = Interlocked.Increment(ref failureCount); - - if (newValue == 1) - { - timer.Change(timeToWaitBeforeTriggering, NoPeriodicTriggering); - Logger.WarnFormat("The circuit breaker for {0} is now in the armed state", name); - } - - return Task.Delay(delayAfterFailure); - } - - void CircuitBreakerTriggered(object state) - { - if (Interlocked.Read(ref failureCount) > 0) - { - Logger.WarnFormat("The circuit breaker for {0} will now be triggered", name); - triggerAction(lastException); - } - } - - readonly TimeSpan delayAfterFailure; - - long failureCount; - Exception lastException; - - readonly string name; - readonly Timer timer; - readonly TimeSpan timeToWaitBeforeTriggering; - readonly Action triggerAction; - - static readonly TimeSpan NoPeriodicTriggering = TimeSpan.FromMilliseconds(-1); - static readonly ILog Logger = LogManager.GetLogger(); - } -} \ No newline at end of file diff --git a/src/ServiceControl.UnitTests/RepeatedFailuresOverTimeCircuitBreakerTests.cs b/src/ServiceControl.UnitTests/RepeatedFailuresOverTimeCircuitBreakerTests.cs new file mode 100644 index 0000000000..67e71198cc --- /dev/null +++ b/src/ServiceControl.UnitTests/RepeatedFailuresOverTimeCircuitBreakerTests.cs @@ -0,0 +1,222 @@ +namespace NServiceBus +{ + using System; + using System.Diagnostics; + using System.Linq; + using System.Threading; + using System.Threading.Tasks; + using NUnit.Framework; + + // Ideally the circuit breaker would use a time provider to allow for easier testing but that would require a significant refactor + // and we want keep the changes to a minimum for now to allow backporting to older versions. + [TestFixture] + public class RepeatedFailuresOverTimeCircuitBreakerTests + { + [Test] + public async Task Should_disarm_on_success() + { + var armedActionCalled = false; + var disarmedActionCalled = false; + + var circuitBreaker = new RepeatedFailuresOverTimeCircuitBreaker( + "TestCircuitBreaker", + TimeSpan.FromMilliseconds(100), + ex => { }, + () => armedActionCalled = true, + () => disarmedActionCalled = true, + TimeSpan.Zero, + TimeSpan.Zero + ); + + await circuitBreaker.Failure(new Exception("Test Exception")); + circuitBreaker.Success(); + + Assert.That(armedActionCalled, Is.True, "The armed action should be called."); + Assert.That(disarmedActionCalled, Is.True, "The disarmed action should be called."); + } + + [Test] + public async Task Should_rethrow_exception_on_success() + { + var circuitBreaker = new RepeatedFailuresOverTimeCircuitBreaker( + "TestCircuitBreaker", + TimeSpan.FromMilliseconds(100), + ex => { }, + () => { }, + () => throw new Exception("Exception from disarmed action"), + timeToWaitWhenTriggered: TimeSpan.Zero, + timeToWaitWhenArmed: TimeSpan.Zero + ); + + await circuitBreaker.Failure(new Exception("Test Exception")); + + var ex = Assert.Throws(() => circuitBreaker.Success()); + Assert.That(ex.Message, Is.EqualTo("Exception from disarmed action")); + } + + [Test] + public async Task Should_trigger_after_failure_timeout() + { + var triggerActionCalled = false; + Exception lastTriggerException = null; + + var circuitBreaker = new RepeatedFailuresOverTimeCircuitBreaker( + "TestCircuitBreaker", + TimeSpan.Zero, + ex => { triggerActionCalled = true; lastTriggerException = ex; }, + timeToWaitWhenTriggered: TimeSpan.Zero, + timeToWaitWhenArmed: TimeSpan.FromMilliseconds(100) + ); + + await circuitBreaker.Failure(new Exception("Test Exception")); + + Assert.That(triggerActionCalled, Is.True, "The trigger action should be called after timeout."); + Assert.That(lastTriggerException, Is.Not.Null, "The exception passed to the trigger action should not be null."); + } + + [Test] + public void Should_rethrow_exception_on_failure() + { + var circuitBreaker = new RepeatedFailuresOverTimeCircuitBreaker( + "TestCircuitBreaker", + TimeSpan.FromMilliseconds(100), + ex => { }, + () => throw new Exception("Exception from armed action"), + () => { }, + timeToWaitWhenTriggered: TimeSpan.Zero, + timeToWaitWhenArmed: TimeSpan.Zero + ); + + var ex = Assert.ThrowsAsync(async () => await circuitBreaker.Failure(new Exception("Test Exception"))); + Assert.That(ex.Message, Is.EqualTo("Exception from armed action")); + } + + [Test] + public async Task Should_delay_after_trigger_failure() + { + var timeToWaitWhenTriggered = TimeSpan.FromMilliseconds(50); + var timeToWaitWhenArmed = TimeSpan.FromMilliseconds(100); + + var circuitBreaker = new RepeatedFailuresOverTimeCircuitBreaker( + "TestCircuitBreaker", + TimeSpan.Zero, + _ => { }, + timeToWaitWhenTriggered: timeToWaitWhenTriggered, + timeToWaitWhenArmed: timeToWaitWhenArmed + ); + + var stopWatch = Stopwatch.StartNew(); + + await circuitBreaker.Failure(new Exception("Test Exception")); + await circuitBreaker.Failure(new Exception("Test Exception After Trigger")); + + stopWatch.Stop(); + + Assert.That(stopWatch.ElapsedMilliseconds, Is.GreaterThanOrEqualTo(timeToWaitWhenTriggered.Add(timeToWaitWhenArmed).TotalMilliseconds).Within(20), "The circuit breaker should delay after a triggered failure."); + } + + [Test] + public async Task Should_not_trigger_if_disarmed_before_timeout() + { + var triggerActionCalled = false; + + var circuitBreaker = new RepeatedFailuresOverTimeCircuitBreaker( + "TestCircuitBreaker", + TimeSpan.FromMilliseconds(100), + ex => triggerActionCalled = true, + timeToWaitWhenTriggered: TimeSpan.Zero, + timeToWaitWhenArmed: TimeSpan.Zero + ); + + await circuitBreaker.Failure(new Exception("Test Exception")); + circuitBreaker.Success(); + + Assert.That(triggerActionCalled, Is.False, "The trigger action should not be called if the circuit breaker was disarmed."); + } + + [Test] + public async Task Should_handle_concurrent_failure_and_success() + { + var armedActionCalled = false; + var disarmedActionCalled = false; + var triggerActionCalled = false; + + var circuitBreaker = new RepeatedFailuresOverTimeCircuitBreaker( + "TestCircuitBreaker", + TimeSpan.FromMilliseconds(100), + ex => triggerActionCalled = true, + () => armedActionCalled = true, + () => disarmedActionCalled = true, + TimeSpan.Zero, + TimeSpan.Zero + ); + + var failureTask = circuitBreaker.Failure(new Exception("Test Exception")); + var successTask = Task.Run(() => + { + Thread.Sleep(50); // Simulate some delay before success + circuitBreaker.Success(); + }); + + await Task.WhenAll(failureTask, successTask); + + Assert.That(armedActionCalled, Is.True, "The armed action should be called."); + Assert.That(disarmedActionCalled, Is.True, "The disarmed action should be called."); + Assert.That(triggerActionCalled, Is.False, "The trigger action should not be called if success occurred before timeout."); + } + + [Test] + public async Task Should_handle_high_concurrent_failure_and_success() + { + var armedActionCalled = 0; + var disarmedActionCalled = 0; + var triggerActionCalled = 0; + + var circuitBreaker = new RepeatedFailuresOverTimeCircuitBreaker( + "TestCircuitBreaker", + TimeSpan.FromSeconds(5), + ex => Interlocked.Increment(ref triggerActionCalled), + () => Interlocked.Increment(ref armedActionCalled), + () => Interlocked.Increment(ref disarmedActionCalled), + TimeSpan.Zero, + TimeSpan.FromMilliseconds(25) + ); + + var tasks = Enumerable.Range(0, 1000) + .Select( + i => i % 2 == 0 ? + circuitBreaker.Failure(new Exception($"Test Exception {i}")) : + Task.Run(() => + { + Thread.Sleep(25); // Simulate some delay before success + circuitBreaker.Success(); + }) + ).ToArray(); + + await Task.WhenAll(tasks); + + Assert.That(armedActionCalled, Is.EqualTo(1), "The armed action should be called."); + Assert.That(disarmedActionCalled, Is.EqualTo(1), "The disarmed action should be called."); + Assert.That(triggerActionCalled, Is.Zero, "The trigger action should not be called if success occurred before timeout."); + } + + [Test] + public async Task Should_trigger_after_multiple_failures_and_timeout() + { + var triggerActionCalled = false; + + var circuitBreaker = new RepeatedFailuresOverTimeCircuitBreaker( + "TestCircuitBreaker", + TimeSpan.FromMilliseconds(50), + ex => triggerActionCalled = true, + timeToWaitWhenTriggered: TimeSpan.FromMilliseconds(50), + timeToWaitWhenArmed: TimeSpan.FromMilliseconds(50) + ); + + await circuitBreaker.Failure(new Exception("Test Exception")); + await circuitBreaker.Failure(new Exception("Another Exception After Trigger")); + + Assert.That(triggerActionCalled, Is.True, "The trigger action should be called after repeated failures and timeout."); + } + } +} \ No newline at end of file diff --git a/src/ServiceControl/ExternalIntegrations/RepeatedFailuresOverTimeCircuitBreaker.cs b/src/ServiceControl/ExternalIntegrations/RepeatedFailuresOverTimeCircuitBreaker.cs deleted file mode 100644 index ad14c5c97e..0000000000 --- a/src/ServiceControl/ExternalIntegrations/RepeatedFailuresOverTimeCircuitBreaker.cs +++ /dev/null @@ -1,75 +0,0 @@ -namespace NServiceBus -{ - using System; - using System.Threading; - using System.Threading.Tasks; - using Logging; - - class RepeatedFailuresOverTimeCircuitBreaker : IDisposable - { - public RepeatedFailuresOverTimeCircuitBreaker(string name, TimeSpan timeToWaitBeforeTriggering, Action triggerAction, TimeSpan delayAfterFailure) - { - this.delayAfterFailure = delayAfterFailure; - this.name = name; - this.triggerAction = triggerAction; - this.timeToWaitBeforeTriggering = timeToWaitBeforeTriggering; - - timer = new Timer(CircuitBreakerTriggered); - } - - public void Dispose() - { - timer.Dispose(); - GC.SuppressFinalize(this); - } - - public void Success() - { - var oldValue = Interlocked.Exchange(ref failureCount, 0); - - if (oldValue == 0) - { - return; - } - - timer.Change(System.Threading.Timeout.Infinite, System.Threading.Timeout.Infinite); - Logger.InfoFormat("The circuit breaker for {0} is now disarmed", name); - } - - public Task Failure(Exception exception) - { - lastException = exception; - var newValue = Interlocked.Increment(ref failureCount); - - if (newValue == 1) - { - timer.Change(timeToWaitBeforeTriggering, NoPeriodicTriggering); - Logger.WarnFormat("The circuit breaker for {0} is now in the armed state", name); - } - - return Task.Delay(delayAfterFailure); - } - - void CircuitBreakerTriggered(object state) - { - if (Interlocked.Read(ref failureCount) > 0) - { - Logger.WarnFormat("The circuit breaker for {0} will now be triggered", name); - triggerAction(lastException); - } - } - - readonly TimeSpan delayAfterFailure; - - long failureCount; - Exception lastException; - - readonly string name; - readonly Timer timer; - readonly TimeSpan timeToWaitBeforeTriggering; - readonly Action triggerAction; - - static readonly TimeSpan NoPeriodicTriggering = TimeSpan.FromMilliseconds(-1); - static readonly ILog Logger = LogManager.GetLogger(); - } -} \ No newline at end of file