Skip to content

[Bug] Parallel invocations to the same Agent instance corrupt message history #22

@zastrowm

Description

@zastrowm

Problem

Currently, if an invocation occurs while another one is being processed, the internal global state of the agent becomes corrupted, causing not only the concurrent invocations to fail, but also any subsequent invocations to fail.

A common scenario affected by this behavior is when a retry occurs due to client timeout, which triggers another invocation with the same session ID while the previous request is still being processed.

Proposed Solution

Throw a concurrency exception instead of handling the request and corrupting the agent's state, which causes all subsequent calls to fail.

Implementation Requirements

Based on the clarification discussion and repository analysis, this fix requires the following changes:

1. Add ConcurrencyException

File: src/strands/types/exceptions.py

Add a new exception class:

class ConcurrencyException(Exception):
    """Exception raised when concurrent invocations are attempted on an agent instance.
    
    Agent instances maintain internal state that cannot be safely accessed concurrently.
    This exception is raised when an invocation is attempted while another invocation
    is already in progress on the same agent instance.
    """
    pass

2. Guard Agent Invocation Methods

File: src/strands/agent/agent.py

Changes required:

  1. Add import asyncio at the top

  2. Add lock instance variable in __init__():

    self._invocation_lock = asyncio.Lock()
  3. Guard stream_async() method (line ~521):

    • Check if lock is already acquired BEFORE modifying any state
    • Raise ConcurrencyException if locked
    • Acquire lock using async context manager

    Pattern:

    async def stream_async(self, ...):
        if self._invocation_lock.locked():
            raise ConcurrencyException("Agent is already processing a request. Concurrent invocations are not supported.")
        
        async with self._invocation_lock:
            # existing implementation

Rationale:

  • Guarding stream_async() protects all invocation paths:
    • __call__()invoke_async()stream_async()
    • invoke_async()stream_async()
    • structured_output()structured_output_async()invoke_async()stream_async()
    • Direct calls to stream_async()

Important: Consider the event loop concern raised in PR #1205 comments: __call__() uses run_async() which creates an isolated event loop. The asyncio.Lock instance created in __init__ may need special handling to work across different event loops. Consider lazy initialization or thread-local storage if issues arise.

3. Guard Direct Tool Invocations

File: src/strands/tools/_caller.py

Changes required:
Guard the caller() function inside __getattr__() (around line 54-116):

  • Check if agent's invocation lock is acquired
  • Raise ConcurrencyException if locked
  • Acquire lock during tool execution

Pattern:

def caller(user_message_override: str | None = None, ...):
    if self._agent._interrupt_state.activated:
        raise RuntimeError("cannot directly call tool during interrupt")
    
    # Add concurrency check
    async def check_and_execute():
        if self._agent._invocation_lock.locked():
            raise ConcurrencyException("Agent is already processing a request. Concurrent invocations are not supported.")
        
        async with self._agent._invocation_lock:
            # existing acall() implementation
            ...
    
    tool_result = run_async(check_and_execute)
    ...

4. Testing Requirements

Unit Tests

File: tests/strands/agent/test_agent.py

Add tests for concurrent invocations on all methods:

  1. Test invoke_async() concurrent calls (can reference PR Concurrent invocations protection strands-agents/sdk-python#1205's test)
  2. Test stream_async() concurrent calls
  3. Test structured_output_async() concurrent calls
  4. Test __call__() concurrent calls (sync interface)
  5. Test structured_output() concurrent calls (sync interface)

Pattern from PR strands-agents#1205:

@pytest.mark.asyncio
async def test_agent_parallel_invocations():
    model = MockedModelProvider([{"role": "assistant", "content": [{"text": "hello!"}]}])
    agent = Agent(model=model)

    async with agent._invocation_lock:
        with pytest.raises(ConcurrencyException, match="Concurrent invocations are not supported"):
            await agent.invoke_async("test")

Direct Tool Call Tests

File: tests/strands/tools/test_caller.py (create if needed) or add to existing caller tests

Add test for concurrent direct tool calls:

def test_direct_tool_call_concurrent_invocations():
    # Similar pattern to agent tests but for agent.tool.tool_name()

Integration Test

File: tests_integ/test_stream_agent.py

Add integration test with real threading (can reference PR strands-agents#1205's integration test):

  • Use threading to invoke agent concurrently
  • First thread should succeed
  • Second thread should raise ConcurrencyException
  • Includes retry scenario mentioned in issue

5. Files to Modify Summary

  1. src/strands/types/exceptions.py - Add ConcurrencyException
  2. src/strands/agent/agent.py - Add lock and guard stream_async()
  3. src/strands/tools/_caller.py - Guard direct tool calls
  4. tests/strands/agent/test_agent.py - Unit tests for all invocation methods
  5. tests/strands/tools/test_caller.py - Unit tests for direct tool calls (may need to create)
  6. tests_integ/test_stream_agent.py - Integration test with threading

Acceptance Criteria

  • ConcurrencyException defined in src/strands/types/exceptions.py
  • stream_async() guarded with lock check and acquisition
  • Direct tool calls guarded with lock check and acquisition
  • Unit tests pass for all invocation methods (5 methods)
  • Unit tests pass for direct tool calls
  • Integration test passes with real threading
  • Exception is raised BEFORE any state modification
  • Lock is released properly even on exceptions
  • All existing tests continue to pass
  • Code follows repository style guidelines (type hints, docstrings, logging)

Reference Material

Notes

  • This is a bug fix, no backwards compatibility needed
  • Exception should be thrown BEFORE modifying any internal state, so no cleanup is necessary
  • The guard is per-agent instance, not per-session ID
  • No need for waiting/locking behavior now - fail-fast only
  • Consider event loop safety when using asyncio.Lock across run_async() calls

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions