-
Notifications
You must be signed in to change notification settings - Fork 0
fix: add concurrency protection to prevent parallel invocations from corrupting agent state #23
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…corrupting agent state - Add ConcurrencyException to types.exceptions for concurrent invocation detection - Guard Agent.stream_async() with threading.Lock to prevent concurrent access - Guard direct tool calls in _ToolCaller to enforce single-invocation constraint - Use threading.Lock instead of asyncio.Lock to handle cross-thread concurrency from run_async() - Add comprehensive unit and integration tests for all invocation paths Resolves #22
tests/strands/agent/test_agent.py
Outdated
| @pytest.mark.asyncio | ||
| async def test_agent_concurrent_structured_output_async_raises_exception(): | ||
| """Test that concurrent structured_output_async() calls raise ConcurrencyException.""" | ||
| from strands.types.exceptions import ConcurrencyException |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make this a top level import
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
✅ Done - moved to top-level imports
src/strands/agent/agent.py
Outdated
| # Acquire lock to prevent concurrent invocations | ||
| # Using threading.Lock instead of asyncio.Lock because run_async() creates | ||
| # separate event loops in different threads | ||
| acquired = self._invocation_lock.acquire(blocking=False) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there anyway that we can switch this to be a with block instead of an explicit try/catch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
✅ Done - refactored to use with self._invocation_lock: context manager
tests/strands/agent/test_agent.py
Outdated
|
|
||
| def test_agent_concurrent_call_raises_exception(): | ||
| """Test that concurrent __call__() calls raise ConcurrencyException.""" | ||
| import threading |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use top-level imports for everything unless there's a reason not to
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
✅ Done - moved all test imports to top-level
tests/strands/agent/test_agent.py
Outdated
| return "slow result" | ||
|
|
||
| # Create a slow model to ensure agent invocation takes time | ||
| class SlowMockedModel(MockedModelProvider): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we have one implementation in a fixture that all of these new tests pull in, rather than redefining this class/model every time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
✅ Done - created slow_mocked_model fixture and updated all tests to use it
tests_integ/test_stream_agent.py
Outdated
| print("Concurrent invocation test passed") | ||
|
|
||
|
|
||
| def test_retry_scenario_with_timeout(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Git rid of this test - it's un-necessary for this; this is a client concern
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
✅ Done - removed retry scenario test
|
/strands |
- Move ConcurrencyException import to top-level in tests - Refactor lock acquisition to use context manager (with block) - Create SlowMockedModel fixture to eliminate code duplication - Remove local imports from test functions - Remove retry scenario integration test (client concern) All 229 agent tests passing
Review Feedback Addressed ✅I've successfully addressed all 5 review comments from zastrowm: Changes Made:
Test Results:
Commits:
All review comment replies have been posted (deferred). The PR is ready for re-review. |
| """ | ||
| self._interrupt_state.resume(prompt) | ||
| # Check if lock is already acquired to fail fast on concurrent invocations | ||
| if self._invocation_lock.locked(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line here can still lead to a race condition. 1 task can pass this check and then Python can context switch over to another before hitting with self._invocation_lock. That other task then can come in and pass this check. This means 2 tasks can find there way to the with self._invocation_lock line. This could further lead to a deadlock because one of the tasks will block at with self._invocation_lock without returning control back to the asyncio event loop (no await support with threading.Lock).
Chance of this happening is probably small but it is a chance nonetheless.
Motivation
When multiple invocations occur concurrently on the same Agent instance (such as retry attempts after client timeouts), the internal agent state becomes corrupted, causing both the concurrent invocations and all subsequent invocations to fail. This happens because agent state (message history, metrics, interrupt state) is modified without synchronization, leading to race conditions when accessed from multiple threads or overlapping async contexts.
Resolves #22
Public API Changes
New Exception
Added
ConcurrencyExceptiontostrands.types.exceptions:The exception is raised immediately when concurrent invocation is attempted, before any state modification occurs.
Behavior Change
All agent invocation methods now enforce single-invocation-at-a-time constraint:
agent()(sync call)agent.invoke_async()agent.stream_async()agent.structured_output()agent.structured_output_async()agent.tool.tool_name()(direct tool calls)When an invocation is in progress, any concurrent invocation attempt raises
ConcurrencyExceptionimmediately. Sequential invocations continue to work normally.Use Cases
ConcurrencyExceptioninstead of corrupting stateagent.tool.tool_name()) are protected from executing during agent invocationsImplementation Notes
Uses
threading.Lockinstead ofasyncio.Lockbecauserun_async()creates isolated event loops in separate threads. The lock is acquired at the start ofstream_async()(the common entry point for all invocation paths) and released upon completion, even if exceptions occur.