-
Notifications
You must be signed in to change notification settings - Fork 655
Description
Problem Statement
We implemented throwing in the case of detecting concurrent invocations to the same agent in #1453 because it was resulting in corruption of state without it being obvious that it's unsupported/unsafe in most cases.
We've had a report from a team that was using concurrent invocations as a way to re-trigger the agent in the middle of invocation. They requested that we add an option to allow the concurrent invocation so that they can maintain previous behavior.
Proposed Solution
Add a new parameter concurrent_invocation_mode to Agent.__init__ that is a string/enum with values:
throw- which would be the existing behavior and would be the defaultunsafe_reentrant- which would not throw an exception and would be the previous/slightly unsafe behavior
In the future this enables us to add a new mode interrupt which would interrupt the agent's invocation with a new user message, which I know is another common use for agents when you want to provide feedback out of bound
Use Case
For power users of the Agent interface, allow them to re-invoke the same agent without getting an exception.
Alternatives Solutions
- Move the locking into a specific place in the agent so that subclasses could disable the throwing behavior
- Pro: Not a top level option that few will need
- Con: is the need to create a custom Agent class
- Add a new parameter
unsafe_allow_conccurent_invocationsthat when set toTruewould not throw the exception- Pro: Solves the problem with a clear naming towards the problem
- Con: Very ugly parameter that only a subset of users would ever need
Additional Context
No response
Implementation Requirements
Technical Approach
Based on repository analysis and clarification discussion:
Type Definition
Add a Literal type alias (consistent with existing patterns in the codebase):
from typing import Literal
ConcurrentInvocationMode = Literal["throw", "unsafe_reentrant"]Parameter Addition
Add concurrent_invocation_mode parameter to Agent.__init__:
- Type:
ConcurrentInvocationMode(orLiteral["throw", "unsafe_reentrant"]) - Default:
"throw"(maintains backward compatibility) - Behavior:
"throw": Current behavior - acquire lock, raiseConcurrencyExceptionif lock not available"unsafe_reentrant": Skip lock acquisition entirely (restores pre-fix: add concurrency protection to prevent parallel invocations from corrupting agent state #1453 behavior)
Lock Logic Modification
In stream_async() method (around line 628), conditionally skip lock handling:
# Only acquire lock if mode is "throw"
if self._concurrent_invocation_mode == "throw":
acquired = self._invocation_lock.acquire(blocking=False)
if not acquired:
raise ConcurrencyException(...)Files to Modify
-
src/strands/agent/agent.py- Add
concurrent_invocation_modeparameter to__init__ - Store as instance variable
self._concurrent_invocation_mode - Modify
stream_async()to conditionally skip lock based on mode - Update docstring for new parameter
- Add
-
src/strands/types/agent.py(optional)- Add
ConcurrentInvocationModetype alias for reuse
- Add
-
tests/strands/agent/test_agent.py- Add tests for
unsafe_reentrantmode covering all existing concurrent test scenarios:test_agent_concurrent_call_raises_exception→ add parallel test forunsafe_reentranttest_agent_concurrent_structured_output_raises_exception→ add parallel test forunsafe_reentrant
- Verify concurrent invocations succeed without exception when using
unsafe_reentrant
- Add tests for
Acceptance Criteria
-
concurrent_invocation_modeparameter added toAgent.__init__with default"throw" -
"throw"mode maintains existing behavior (raisesConcurrencyException) -
"unsafe_reentrant"mode skips lock acquisition entirely - No runtime warning emitted for
unsafe_reentrantmode - Unit tests cover both modes for all concurrent invocation scenarios
- Docstring updated with parameter documentation
- All existing tests continue to pass
- Type annotations complete and mypy passes