-
Notifications
You must be signed in to change notification settings - Fork 0
Description
Problem
Currently, if an invocation occurs while another one is being processed, the internal global state of the agent becomes corrupted, causing not only the concurrent invocations to fail, but also any subsequent invocations to fail.
A common scenario affected by this behavior is when a retry occurs due to client timeout, which triggers another invocation with the same session ID while the previous request is still being processed.
Proposed Solution
Throw a concurrency exception instead of handling the request and corrupting the agent's state, which causes all subsequent calls to fail.
Implementation Requirements
Based on the clarification discussion and repository analysis, this fix requires the following changes:
1. Add ConcurrencyException
File: src/strands/types/exceptions.py
Add a new exception class:
class ConcurrencyException(Exception):
"""Exception raised when concurrent invocations are attempted on an agent instance.
Agent instances maintain internal state that cannot be safely accessed concurrently.
This exception is raised when an invocation is attempted while another invocation
is already in progress on the same agent instance.
"""
pass2. Guard Agent Invocation Methods
File: src/strands/agent/agent.py
Changes required:
-
Add
import asyncioat the top -
Add lock instance variable in
__init__():self._invocation_lock = asyncio.Lock()
-
Guard
stream_async()method (line ~521):- Check if lock is already acquired BEFORE modifying any state
- Raise
ConcurrencyExceptionif locked - Acquire lock using async context manager
Pattern:
async def stream_async(self, ...): if self._invocation_lock.locked(): raise ConcurrencyException("Agent is already processing a request. Concurrent invocations are not supported.") async with self._invocation_lock: # existing implementation
Rationale:
- Guarding
stream_async()protects all invocation paths:__call__()→invoke_async()→stream_async()invoke_async()→stream_async()structured_output()→structured_output_async()→invoke_async()→stream_async()- Direct calls to
stream_async()
Important: Consider the event loop concern raised in PR #1205 comments: __call__() uses run_async() which creates an isolated event loop. The asyncio.Lock instance created in __init__ may need special handling to work across different event loops. Consider lazy initialization or thread-local storage if issues arise.
3. Guard Direct Tool Invocations
File: src/strands/tools/_caller.py
Changes required:
Guard the caller() function inside __getattr__() (around line 54-116):
- Check if agent's invocation lock is acquired
- Raise
ConcurrencyExceptionif locked - Acquire lock during tool execution
Pattern:
def caller(user_message_override: str | None = None, ...):
if self._agent._interrupt_state.activated:
raise RuntimeError("cannot directly call tool during interrupt")
# Add concurrency check
async def check_and_execute():
if self._agent._invocation_lock.locked():
raise ConcurrencyException("Agent is already processing a request. Concurrent invocations are not supported.")
async with self._agent._invocation_lock:
# existing acall() implementation
...
tool_result = run_async(check_and_execute)
...4. Testing Requirements
Unit Tests
File: tests/strands/agent/test_agent.py
Add tests for concurrent invocations on all methods:
- Test
invoke_async()concurrent calls (can reference PR Concurrent invocations protection strands-agents/sdk-python#1205's test) - Test
stream_async()concurrent calls - Test
structured_output_async()concurrent calls - Test
__call__()concurrent calls (sync interface) - Test
structured_output()concurrent calls (sync interface)
Pattern from PR strands-agents#1205:
@pytest.mark.asyncio
async def test_agent_parallel_invocations():
model = MockedModelProvider([{"role": "assistant", "content": [{"text": "hello!"}]}])
agent = Agent(model=model)
async with agent._invocation_lock:
with pytest.raises(ConcurrencyException, match="Concurrent invocations are not supported"):
await agent.invoke_async("test")Direct Tool Call Tests
File: tests/strands/tools/test_caller.py (create if needed) or add to existing caller tests
Add test for concurrent direct tool calls:
def test_direct_tool_call_concurrent_invocations():
# Similar pattern to agent tests but for agent.tool.tool_name()Integration Test
File: tests_integ/test_stream_agent.py
Add integration test with real threading (can reference PR strands-agents#1205's integration test):
- Use threading to invoke agent concurrently
- First thread should succeed
- Second thread should raise
ConcurrencyException - Includes retry scenario mentioned in issue
5. Files to Modify Summary
src/strands/types/exceptions.py- AddConcurrencyExceptionsrc/strands/agent/agent.py- Add lock and guardstream_async()src/strands/tools/_caller.py- Guard direct tool callstests/strands/agent/test_agent.py- Unit tests for all invocation methodstests/strands/tools/test_caller.py- Unit tests for direct tool calls (may need to create)tests_integ/test_stream_agent.py- Integration test with threading
Acceptance Criteria
-
ConcurrencyExceptiondefined insrc/strands/types/exceptions.py -
stream_async()guarded with lock check and acquisition - Direct tool calls guarded with lock check and acquisition
- Unit tests pass for all invocation methods (5 methods)
- Unit tests pass for direct tool calls
- Integration test passes with real threading
- Exception is raised BEFORE any state modification
- Lock is released properly even on exceptions
- All existing tests continue to pass
- Code follows repository style guidelines (type hints, docstrings, logging)
Reference Material
- PR Concurrent invocations protection strands-agents/sdk-python#1205: Concurrent invocations protection strands-agents/sdk-python#1205
- Shows initial implementation approach (asyncio.Lock in invoke_async)
- Includes test patterns
- Has outstanding comments about incomplete coverage
- Issue [BUG] Agent permanent stuck in ValidationException with Bedrock Converse : The number of toolResult blocks at messages.{id}.content exceeds the number of toolUse blocks of previous turn strands-agents/sdk-python#1435: [BUG] Agent permanent stuck in ValidationException with Bedrock Converse : The number of toolResult blocks at messages.{id}.content exceeds the number of toolUse blocks of previous turn strands-agents/sdk-python#1435
- Mentioned the need to guard direct method invocations
Notes
- This is a bug fix, no backwards compatibility needed
- Exception should be thrown BEFORE modifying any internal state, so no cleanup is necessary
- The guard is per-agent instance, not per-session ID
- No need for waiting/locking behavior now - fail-fast only
- Consider event loop safety when using
asyncio.Lockacrossrun_async()calls