From 2dad37f7a8b27398122fbe0616d85ff32e25fe65 Mon Sep 17 00:00:00 2001 From: Strands Agent <217235299+strands-agent@users.noreply.github.com> Date: Mon, 16 Feb 2026 18:23:05 +0000 Subject: [PATCH 1/6] feat(agent): add concurrent_invocation_mode parameter Add a new parameter to Agent.__init__ that controls concurrent invocation behavior: - 'throw' (default): Raises ConcurrencyException if concurrent invocation is attempted, maintaining existing behavior - 'unsafe_reentrant': Skips lock acquisition entirely, allowing concurrent invocations (restores pre-locking behavior) This enables power users to re-invoke the same agent without exceptions when needed, while preserving safe defaults for typical usage. Resolves #1702 --- src/strands/agent/agent.py | 25 ++++--- src/strands/types/agent.py | 10 ++- tests/strands/agent/test_agent.py | 109 ++++++++++++++++++++++++++++++ 3 files changed, 135 insertions(+), 9 deletions(-) diff --git a/src/strands/agent/agent.py b/src/strands/agent/agent.py index 567a92b4a..9746f7fbf 100644 --- a/src/strands/agent/agent.py +++ b/src/strands/agent/agent.py @@ -54,7 +54,7 @@ from ..tools.structured_output._structured_output_context import StructuredOutputContext from ..tools.watcher import ToolWatcher from ..types._events import AgentResultEvent, EventLoopStopEvent, InitEventLoopEvent, ModelStreamChunkEvent, TypedEvent -from ..types.agent import AgentInput +from ..types.agent import AgentInput, ConcurrentInvocationMode from ..types.content import ContentBlock, Message, Messages, SystemContentBlock from ..types.exceptions import ConcurrencyException, ContextWindowOverflowException from ..types.traces import AttributeValue @@ -129,6 +129,7 @@ def __init__( structured_output_prompt: str | None = None, tool_executor: ToolExecutor | None = None, retry_strategy: ModelRetryStrategy | _DefaultRetryStrategySentinel | None = _DEFAULT_RETRY_STRATEGY, + concurrent_invocation_mode: ConcurrentInvocationMode = "throw", ): """Initialize the Agent with the specified configuration. @@ -186,6 +187,10 @@ def __init__( retry_strategy: Strategy for retrying model calls on throttling or other transient errors. Defaults to ModelRetryStrategy with max_attempts=6, initial_delay=4s, max_delay=240s. Implement a custom HookProvider for custom retry logic, or pass None to disable retries. + concurrent_invocation_mode: Mode controlling concurrent invocation behavior. + Defaults to "throw" which raises ConcurrencyException if concurrent invocation is attempted. + Set to "unsafe_reentrant" to skip lock acquisition entirely, allowing concurrent invocations + (restores pre-locking behavior, use with caution). Raises: ValueError: If agent id contains path separators. @@ -263,6 +268,7 @@ def __init__( # Using threading.Lock instead of asyncio.Lock because run_async() creates # separate event loops in different threads, so asyncio.Lock wouldn't work self._invocation_lock = threading.Lock() + self._concurrent_invocation_mode = concurrent_invocation_mode # In the future, we'll have a RetryStrategy base class but until # that API is determined we only allow ModelRetryStrategy @@ -622,14 +628,16 @@ async def stream_async( yield event["data"] ``` """ - # Acquire lock to prevent concurrent invocations + # Conditionally acquire lock based on concurrent_invocation_mode # Using threading.Lock instead of asyncio.Lock because run_async() creates # separate event loops in different threads - acquired = self._invocation_lock.acquire(blocking=False) - if not acquired: - raise ConcurrencyException( - "Agent is already processing a request. Concurrent invocations are not supported." - ) + lock_acquired = False + if self._concurrent_invocation_mode == "throw": + lock_acquired = self._invocation_lock.acquire(blocking=False) + if not lock_acquired: + raise ConcurrencyException( + "Agent is already processing a request. Concurrent invocations are not supported." + ) try: self._interrupt_state.resume(prompt) @@ -678,7 +686,8 @@ async def stream_async( raise finally: - self._invocation_lock.release() + if lock_acquired: + self._invocation_lock.release() async def _run_loop( self, diff --git a/src/strands/types/agent.py b/src/strands/types/agent.py index aa69149a6..b240f4a7f 100644 --- a/src/strands/types/agent.py +++ b/src/strands/types/agent.py @@ -3,9 +3,17 @@ This module defines the types used for an Agent. """ -from typing import TypeAlias +from typing import Literal, TypeAlias from .content import ContentBlock, Messages from .interrupt import InterruptResponseContent AgentInput: TypeAlias = str | list[ContentBlock] | list[InterruptResponseContent] | Messages | None + +ConcurrentInvocationMode = Literal["throw", "unsafe_reentrant"] +"""Mode controlling concurrent invocation behavior. + +Values: + throw: Raises ConcurrencyException if concurrent invocation is attempted (default). + unsafe_reentrant: Allows concurrent invocations without locking (unsafe, restores pre-lock behavior). +""" diff --git a/tests/strands/agent/test_agent.py b/tests/strands/agent/test_agent.py index eb039185c..0117dba17 100644 --- a/tests/strands/agent/test_agent.py +++ b/tests/strands/agent/test_agent.py @@ -2320,6 +2320,115 @@ def invoke(): assert "concurrent" in str(errors[0]).lower() and "invocation" in str(errors[0]).lower() +def test_agent_concurrent_call_succeeds_with_unsafe_reentrant_mode(): + """Test that concurrent __call__() calls succeed when concurrent_invocation_mode is 'unsafe_reentrant'.""" + model = SyncEventMockedModel( + [ + {"role": "assistant", "content": [{"text": "hello"}]}, + {"role": "assistant", "content": [{"text": "world"}]}, + ] + ) + agent = Agent(model=model, concurrent_invocation_mode="unsafe_reentrant") + + results = [] + errors = [] + lock = threading.Lock() + + def invoke(): + try: + result = agent("test") + with lock: + results.append(result) + except ConcurrencyException as e: + with lock: + errors.append(e) + + # Start first thread and wait for it to begin streaming + t1 = threading.Thread(target=invoke) + t1.start() + model.started_event.wait() # Wait until first thread is in the model.stream() + + # Start second thread while first is still running + t2 = threading.Thread(target=invoke) + t2.start() + + # Let both threads proceed + model.proceed_event.set() + t1.join() + t2.join() + + # Both should succeed, no ConcurrencyException raised + assert len(errors) == 0, f"Expected 0 errors, got {len(errors)}: {errors}" + assert len(results) == 2, f"Expected 2 successes, got {len(results)}" + + +def test_agent_concurrent_invocation_mode_throw_raises_exception(): + """Test that concurrent_invocation_mode='throw' maintains the default behavior.""" + model = SyncEventMockedModel( + [ + {"role": "assistant", "content": [{"text": "hello"}]}, + {"role": "assistant", "content": [{"text": "world"}]}, + ] + ) + # Explicit throw mode should behave same as default + agent = Agent(model=model, concurrent_invocation_mode="throw") + + results = [] + errors = [] + lock = threading.Lock() + + def invoke(): + try: + result = agent("test") + with lock: + results.append(result) + except ConcurrencyException as e: + with lock: + errors.append(e) + + # Start first thread and wait for it to begin streaming + t1 = threading.Thread(target=invoke) + t1.start() + model.started_event.wait() # Wait until first thread is in the model.stream() + + # Start second thread while first is still running + t2 = threading.Thread(target=invoke) + t2.start() + + # Give second thread time to attempt invocation and fail + t2.join(timeout=1.0) + + # Now let first thread complete + model.proceed_event.set() + t1.join() + t2.join() + + # One should succeed, one should raise ConcurrencyException + assert len(results) == 1, f"Expected 1 success, got {len(results)}" + assert len(errors) == 1, f"Expected 1 error, got {len(errors)}" + assert "concurrent" in str(errors[0]).lower() and "invocation" in str(errors[0]).lower() + + +def test_agent_concurrent_invocation_mode_default_is_throw(): + """Test that the default concurrent_invocation_mode is 'throw'.""" + model = MockedModelProvider([{"role": "assistant", "content": [{"text": "hello"}]}]) + agent = Agent(model=model) + + # Verify the default mode + assert agent._concurrent_invocation_mode == "throw" + + +def test_agent_concurrent_invocation_mode_stores_value(): + """Test that concurrent_invocation_mode is stored correctly as instance variable.""" + model = MockedModelProvider([{"role": "assistant", "content": [{"text": "hello"}]}]) + + agent_throw = Agent(model=model, concurrent_invocation_mode="throw") + assert agent_throw._concurrent_invocation_mode == "throw" + + agent_reentrant = Agent(model=model, concurrent_invocation_mode="unsafe_reentrant") + assert agent_reentrant._concurrent_invocation_mode == "unsafe_reentrant" + + @pytest.mark.asyncio async def test_agent_sequential_invocations_work(): """Test that sequential invocations work correctly after lock is released.""" From b3f1a4ccb3dacf93b4ea146e8a344e1dff7101de Mon Sep 17 00:00:00 2001 From: Strands Agent <217235299+strands-agent@users.noreply.github.com> Date: Mon, 16 Feb 2026 21:00:52 +0000 Subject: [PATCH 2/6] refactor(tests): update concurrent tests to explicitly pass mode Address PR feedback: - Update existing exception tests to explicitly pass concurrent_invocation_mode='throw' - Remove redundant test_agent_concurrent_invocation_mode_throw_raises_exception test - Rely on test_agent_concurrent_invocation_mode_default_is_throw for default verification --- tests/strands/agent/test_agent.py | 51 ++----------------------------- 1 file changed, 2 insertions(+), 49 deletions(-) diff --git a/tests/strands/agent/test_agent.py b/tests/strands/agent/test_agent.py index 0117dba17..ad67a6328 100644 --- a/tests/strands/agent/test_agent.py +++ b/tests/strands/agent/test_agent.py @@ -2231,7 +2231,7 @@ def test_agent_concurrent_call_raises_exception(): {"role": "assistant", "content": [{"text": "world"}]}, ] ) - agent = Agent(model=model) + agent = Agent(model=model, concurrent_invocation_mode="throw") results = [] errors = [] @@ -2282,7 +2282,7 @@ def test_agent_concurrent_structured_output_raises_exception(): {"role": "assistant", "content": [{"text": "response2"}]}, ], ) - agent = Agent(model=model) + agent = Agent(model=model, concurrent_invocation_mode="throw") results = [] errors = [] @@ -2362,53 +2362,6 @@ def invoke(): assert len(results) == 2, f"Expected 2 successes, got {len(results)}" -def test_agent_concurrent_invocation_mode_throw_raises_exception(): - """Test that concurrent_invocation_mode='throw' maintains the default behavior.""" - model = SyncEventMockedModel( - [ - {"role": "assistant", "content": [{"text": "hello"}]}, - {"role": "assistant", "content": [{"text": "world"}]}, - ] - ) - # Explicit throw mode should behave same as default - agent = Agent(model=model, concurrent_invocation_mode="throw") - - results = [] - errors = [] - lock = threading.Lock() - - def invoke(): - try: - result = agent("test") - with lock: - results.append(result) - except ConcurrencyException as e: - with lock: - errors.append(e) - - # Start first thread and wait for it to begin streaming - t1 = threading.Thread(target=invoke) - t1.start() - model.started_event.wait() # Wait until first thread is in the model.stream() - - # Start second thread while first is still running - t2 = threading.Thread(target=invoke) - t2.start() - - # Give second thread time to attempt invocation and fail - t2.join(timeout=1.0) - - # Now let first thread complete - model.proceed_event.set() - t1.join() - t2.join() - - # One should succeed, one should raise ConcurrencyException - assert len(results) == 1, f"Expected 1 success, got {len(results)}" - assert len(errors) == 1, f"Expected 1 error, got {len(errors)}" - assert "concurrent" in str(errors[0]).lower() and "invocation" in str(errors[0]).lower() - - def test_agent_concurrent_invocation_mode_default_is_throw(): """Test that the default concurrent_invocation_mode is 'throw'.""" model = MockedModelProvider([{"role": "assistant", "content": [{"text": "hello"}]}]) From 8a2695a74372be7337a539ce1dd44a1915d918e3 Mon Sep 17 00:00:00 2001 From: Strands Agent <217235299+strands-agent@users.noreply.github.com> Date: Tue, 17 Feb 2026 01:37:59 +0000 Subject: [PATCH 3/6] docs: improve unsafe_reentrant warning in docstrings Address PR feedback: - Add explicit warning about unsafe_reentrant making no guarantees - Remove parenthetical about restoring pre-lock behavior - Update both type definition and Agent.__init__ docstrings for consistency --- src/strands/agent/agent.py | 5 +++-- src/strands/types/agent.py | 6 +++++- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/src/strands/agent/agent.py b/src/strands/agent/agent.py index 9746f7fbf..a9eacd3f7 100644 --- a/src/strands/agent/agent.py +++ b/src/strands/agent/agent.py @@ -189,8 +189,9 @@ def __init__( Implement a custom HookProvider for custom retry logic, or pass None to disable retries. concurrent_invocation_mode: Mode controlling concurrent invocation behavior. Defaults to "throw" which raises ConcurrencyException if concurrent invocation is attempted. - Set to "unsafe_reentrant" to skip lock acquisition entirely, allowing concurrent invocations - (restores pre-locking behavior, use with caution). + Set to "unsafe_reentrant" to skip lock acquisition entirely, allowing concurrent invocations. + Warning: "unsafe_reentrant" makes no guarantees about resulting behavior and is provided + only for advanced use cases where the caller understands the risks. Raises: ValueError: If agent id contains path separators. diff --git a/src/strands/types/agent.py b/src/strands/types/agent.py index b240f4a7f..f0d937122 100644 --- a/src/strands/types/agent.py +++ b/src/strands/types/agent.py @@ -15,5 +15,9 @@ Values: throw: Raises ConcurrencyException if concurrent invocation is attempted (default). - unsafe_reentrant: Allows concurrent invocations without locking (unsafe, restores pre-lock behavior). + unsafe_reentrant: Allows concurrent invocations without locking. + +Warning: + The ``unsafe_reentrant`` mode makes no guarantees about resulting behavior and is + provided only for advanced use cases where the caller understands the risks. """ From dac3f31129eca254fe89e2b5c3963ae88597609d Mon Sep 17 00:00:00 2001 From: Mackenzie Zastrow Date: Tue, 17 Feb 2026 09:41:10 -0500 Subject: [PATCH 4/6] fix: switch to enum --- src/strands/agent/agent.py | 2 +- src/strands/types/agent.py | 25 +++++++++++++++---------- tests/strands/agent/test_agent.py | 16 ++++++++++++++++ 3 files changed, 32 insertions(+), 11 deletions(-) diff --git a/src/strands/agent/agent.py b/src/strands/agent/agent.py index a9eacd3f7..8ca83e05a 100644 --- a/src/strands/agent/agent.py +++ b/src/strands/agent/agent.py @@ -633,7 +633,7 @@ async def stream_async( # Using threading.Lock instead of asyncio.Lock because run_async() creates # separate event loops in different threads lock_acquired = False - if self._concurrent_invocation_mode == "throw": + if self._concurrent_invocation_mode == ConcurrentInvocationMode.THROW: lock_acquired = self._invocation_lock.acquire(blocking=False) if not lock_acquired: raise ConcurrencyException( diff --git a/src/strands/types/agent.py b/src/strands/types/agent.py index f0d937122..cda01f8aa 100644 --- a/src/strands/types/agent.py +++ b/src/strands/types/agent.py @@ -3,21 +3,26 @@ This module defines the types used for an Agent. """ -from typing import Literal, TypeAlias +from enum import Enum +from typing import TypeAlias from .content import ContentBlock, Messages from .interrupt import InterruptResponseContent AgentInput: TypeAlias = str | list[ContentBlock] | list[InterruptResponseContent] | Messages | None -ConcurrentInvocationMode = Literal["throw", "unsafe_reentrant"] -"""Mode controlling concurrent invocation behavior. -Values: - throw: Raises ConcurrencyException if concurrent invocation is attempted (default). - unsafe_reentrant: Allows concurrent invocations without locking. +class ConcurrentInvocationMode(str, Enum): + """Mode controlling concurrent invocation behavior. -Warning: - The ``unsafe_reentrant`` mode makes no guarantees about resulting behavior and is - provided only for advanced use cases where the caller understands the risks. -""" + Values: + THROW: Raises ConcurrencyException if concurrent invocation is attempted (default). + UNSAFE_REENTRANT: Allows concurrent invocations without locking. + + Warning: + The ``UNSAFE_REENTRANT`` mode makes no guarantees about resulting behavior and is + provided only for advanced use cases where the caller understands the risks. + """ + + THROW = "throw" + UNSAFE_REENTRANT = "unsafe_reentrant" diff --git a/tests/strands/agent/test_agent.py b/tests/strands/agent/test_agent.py index ad67a6328..12241dd84 100644 --- a/tests/strands/agent/test_agent.py +++ b/tests/strands/agent/test_agent.py @@ -2382,6 +2382,22 @@ def test_agent_concurrent_invocation_mode_stores_value(): assert agent_reentrant._concurrent_invocation_mode == "unsafe_reentrant" +def test_agent_concurrent_invocation_mode_accepts_enum(): + """Test that concurrent_invocation_mode accepts enum values as well as strings.""" + from strands.types.agent import ConcurrentInvocationMode + + model = MockedModelProvider([{"role": "assistant", "content": [{"text": "hello"}]}]) + + # Using enum values + agent_throw = Agent(model=model, concurrent_invocation_mode=ConcurrentInvocationMode.THROW) + assert agent_throw._concurrent_invocation_mode == "throw" + assert agent_throw._concurrent_invocation_mode == ConcurrentInvocationMode.THROW + + agent_reentrant = Agent(model=model, concurrent_invocation_mode=ConcurrentInvocationMode.UNSAFE_REENTRANT) + assert agent_reentrant._concurrent_invocation_mode == "unsafe_reentrant" + assert agent_reentrant._concurrent_invocation_mode == ConcurrentInvocationMode.UNSAFE_REENTRANT + + @pytest.mark.asyncio async def test_agent_sequential_invocations_work(): """Test that sequential invocations work correctly after lock is released.""" From 701dd2982d55b75bae0ea899d22ae365733cc88e Mon Sep 17 00:00:00 2001 From: Mackenzie Zastrow Date: Tue, 17 Feb 2026 09:44:19 -0500 Subject: [PATCH 5/6] fix: Linting error --- src/strands/agent/agent.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/strands/agent/agent.py b/src/strands/agent/agent.py index 8ca83e05a..05a35d1a4 100644 --- a/src/strands/agent/agent.py +++ b/src/strands/agent/agent.py @@ -129,7 +129,7 @@ def __init__( structured_output_prompt: str | None = None, tool_executor: ToolExecutor | None = None, retry_strategy: ModelRetryStrategy | _DefaultRetryStrategySentinel | None = _DEFAULT_RETRY_STRATEGY, - concurrent_invocation_mode: ConcurrentInvocationMode = "throw", + concurrent_invocation_mode: ConcurrentInvocationMode = ConcurrentInvocationMode.THROW, ): """Initialize the Agent with the specified configuration. From eb03e212e11e7e3a1069b20c4867fccb0fa61332 Mon Sep 17 00:00:00 2001 From: Mackenzie Zastrow Date: Tue, 17 Feb 2026 10:03:38 -0500 Subject: [PATCH 6/6] fix: address feedback --- src/strands/agent/agent.py | 3 +-- tests/strands/agent/test_agent.py | 9 +++------ 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/src/strands/agent/agent.py b/src/strands/agent/agent.py index 05a35d1a4..e9739f473 100644 --- a/src/strands/agent/agent.py +++ b/src/strands/agent/agent.py @@ -632,7 +632,6 @@ async def stream_async( # Conditionally acquire lock based on concurrent_invocation_mode # Using threading.Lock instead of asyncio.Lock because run_async() creates # separate event loops in different threads - lock_acquired = False if self._concurrent_invocation_mode == ConcurrentInvocationMode.THROW: lock_acquired = self._invocation_lock.acquire(blocking=False) if not lock_acquired: @@ -687,7 +686,7 @@ async def stream_async( raise finally: - if lock_acquired: + if self._invocation_lock.locked(): self._invocation_lock.release() async def _run_loop( diff --git a/tests/strands/agent/test_agent.py b/tests/strands/agent/test_agent.py index 12241dd84..d95d26f92 100644 --- a/tests/strands/agent/test_agent.py +++ b/tests/strands/agent/test_agent.py @@ -26,6 +26,7 @@ from strands.session.repository_session_manager import RepositorySessionManager from strands.telemetry.tracer import serialize from strands.types._events import EventLoopStopEvent, ModelStreamEvent +from strands.types.agent import ConcurrentInvocationMode from strands.types.content import Messages from strands.types.exceptions import ConcurrencyException, ContextWindowOverflowException, EventLoopException from strands.types.session import Session, SessionAgent, SessionMessage, SessionType @@ -2235,16 +2236,13 @@ def test_agent_concurrent_call_raises_exception(): results = [] errors = [] - lock = threading.Lock() def invoke(): try: result = agent("test") - with lock: - results.append(result) + results.append(result) except ConcurrencyException as e: - with lock: - errors.append(e) + errors.append(e) # Start first thread and wait for it to begin streaming t1 = threading.Thread(target=invoke) @@ -2384,7 +2382,6 @@ def test_agent_concurrent_invocation_mode_stores_value(): def test_agent_concurrent_invocation_mode_accepts_enum(): """Test that concurrent_invocation_mode accepts enum values as well as strings.""" - from strands.types.agent import ConcurrentInvocationMode model = MockedModelProvider([{"role": "assistant", "content": [{"text": "hello"}]}])