Skip to content

Conversation

@zastrowm
Copy link
Owner

@zastrowm zastrowm commented Jan 9, 2026

Motivation

When multiple invocations occur concurrently on the same Agent instance (such as retry attempts after client timeouts), the internal agent state becomes corrupted, causing both the concurrent invocations and all subsequent invocations to fail. This happens because agent state (message history, metrics, interrupt state) is modified without synchronization, leading to race conditions when accessed from multiple threads or overlapping async contexts.

Resolves #22

Public API Changes

New Exception

Added ConcurrencyException to strands.types.exceptions:

from strands.types.exceptions import ConcurrencyException

try:
    # Start first invocation
    result1 = agent("process this")
except ConcurrencyException as e:
    # Raised if another invocation is already in progress
    print(f"Agent busy: {e}")

The exception is raised immediately when concurrent invocation is attempted, before any state modification occurs.

Behavior Change

All agent invocation methods now enforce single-invocation-at-a-time constraint:

  • agent() (sync call)
  • agent.invoke_async()
  • agent.stream_async()
  • agent.structured_output()
  • agent.structured_output_async()
  • agent.tool.tool_name() (direct tool calls)

When an invocation is in progress, any concurrent invocation attempt raises ConcurrencyException immediately. Sequential invocations continue to work normally.

Use Cases

  • Retry handling: Client-side retries after timeouts will now fail fast with ConcurrencyException instead of corrupting state
  • Multi-threaded applications: Applications calling the same agent from multiple threads will get clear error messages instead of silent corruption
  • Direct tool access: Direct tool calls (agent.tool.tool_name()) are protected from executing during agent invocations

Implementation Notes

Uses threading.Lock instead of asyncio.Lock because run_async() creates isolated event loops in separate threads. The lock is acquired at the start of stream_async() (the common entry point for all invocation paths) and released upon completion, even if exceptions occur.

…corrupting agent state

- Add ConcurrencyException to types.exceptions for concurrent invocation detection
- Guard Agent.stream_async() with threading.Lock to prevent concurrent access
- Guard direct tool calls in _ToolCaller to enforce single-invocation constraint
- Use threading.Lock instead of asyncio.Lock to handle cross-thread concurrency from run_async()
- Add comprehensive unit and integration tests for all invocation paths

Resolves #22
@pytest.mark.asyncio
async def test_agent_concurrent_structured_output_async_raises_exception():
"""Test that concurrent structured_output_async() calls raise ConcurrencyException."""
from strands.types.exceptions import ConcurrencyException
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make this a top level import

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✅ Done - moved to top-level imports

# Acquire lock to prevent concurrent invocations
# Using threading.Lock instead of asyncio.Lock because run_async() creates
# separate event loops in different threads
acquired = self._invocation_lock.acquire(blocking=False)
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there anyway that we can switch this to be a with block instead of an explicit try/catch

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✅ Done - refactored to use with self._invocation_lock: context manager


def test_agent_concurrent_call_raises_exception():
"""Test that concurrent __call__() calls raise ConcurrencyException."""
import threading
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use top-level imports for everything unless there's a reason not to

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✅ Done - moved all test imports to top-level

return "slow result"

# Create a slow model to ensure agent invocation takes time
class SlowMockedModel(MockedModelProvider):
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have one implementation in a fixture that all of these new tests pull in, rather than redefining this class/model every time?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✅ Done - created slow_mocked_model fixture and updated all tests to use it

print("Concurrent invocation test passed")


def test_retry_scenario_with_timeout():
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Git rid of this test - it's un-necessary for this; this is a client concern

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✅ Done - removed retry scenario test

@zastrowm
Copy link
Owner Author

zastrowm commented Jan 9, 2026

/strands

- Move ConcurrencyException import to top-level in tests
- Refactor lock acquisition to use context manager (with block)
- Create SlowMockedModel fixture to eliminate code duplication
- Remove local imports from test functions
- Remove retry scenario integration test (client concern)

All 229 agent tests passing
@github-actions
Copy link

github-actions bot commented Jan 9, 2026

Review Feedback Addressed ✅

I've successfully addressed all 5 review comments from zastrowm:

Changes Made:

  1. Top-level imports - Moved ConcurrencyException, threading, and time imports to the top of test file
  2. Context manager - Refactored lock acquisition to use with self._invocation_lock: instead of explicit acquire/release
  3. Code deduplication - Created slow_mocked_model pytest fixture and updated all tests to use it
  4. Removed retry test - Deleted test_retry_scenario_with_timeout as it's a client concern

Test Results:

  • ✅ All 229 agent tests passing
  • ✅ All 7 concurrency tests passing
  • ✅ Code formatted with ruff
  • ✅ Imports organized correctly

Commits:

  • Initial: 6194f24 - Core implementation
  • Refactor: 20ea822 - Review feedback addressed

All review comment replies have been posted (deferred). The PR is ready for re-review.

"""
self._interrupt_state.resume(prompt)
# Check if lock is already acquired to fail fast on concurrent invocations
if self._invocation_lock.locked():
Copy link

@pgrayy pgrayy Jan 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line here can still lead to a race condition. 1 task can pass this check and then Python can context switch over to another before hitting with self._invocation_lock. That other task then can come in and pass this check. This means 2 tasks can find there way to the with self._invocation_lock line. This could further lead to a deadlock because one of the tasks will block at with self._invocation_lock without returning control back to the asyncio event loop (no await support with threading.Lock).

Chance of this happening is probably small but it is a chance nonetheless.

@zastrowm zastrowm closed this Jan 15, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug] Parallel invocations to the same Agent instance corrupt message history

4 participants