Skip to content

[FEATURE] Add option to allow concurrent invocations #1702

@zastrowm

Description

@zastrowm

Problem Statement

We implemented throwing in the case of detecting concurrent invocations to the same agent in #1453 because it was resulting in corruption of state without it being obvious that it's unsupported/unsafe in most cases.

We've had a report from a team that was using concurrent invocations as a way to re-trigger the agent in the middle of invocation. They requested that we add an option to allow the concurrent invocation so that they can maintain previous behavior.

Proposed Solution

Add a new parameter concurrent_invocation_mode to Agent.__init__ that is a string/enum with values:

  • throw - which would be the existing behavior and would be the default
  • unsafe_reentrant - which would not throw an exception and would be the previous/slightly unsafe behavior

In the future this enables us to add a new mode interrupt which would interrupt the agent's invocation with a new user message, which I know is another common use for agents when you want to provide feedback out of bound

Use Case

For power users of the Agent interface, allow them to re-invoke the same agent without getting an exception.

Alternatives Solutions

  • Move the locking into a specific place in the agent so that subclasses could disable the throwing behavior
    • Pro: Not a top level option that few will need
    • Con: is the need to create a custom Agent class
  • Add a new parameter unsafe_allow_conccurent_invocations that when set to True would not throw the exception
    • Pro: Solves the problem with a clear naming towards the problem
    • Con: Very ugly parameter that only a subset of users would ever need

Additional Context

No response


Implementation Requirements

Technical Approach

Based on repository analysis and clarification discussion:

Type Definition

Add a Literal type alias (consistent with existing patterns in the codebase):

from typing import Literal

ConcurrentInvocationMode = Literal["throw", "unsafe_reentrant"]

Parameter Addition

Add concurrent_invocation_mode parameter to Agent.__init__:

Lock Logic Modification

In stream_async() method (around line 628), conditionally skip lock handling:

# Only acquire lock if mode is "throw"
if self._concurrent_invocation_mode == "throw":
    acquired = self._invocation_lock.acquire(blocking=False)
    if not acquired:
        raise ConcurrencyException(...)

Files to Modify

  1. src/strands/agent/agent.py

    • Add concurrent_invocation_mode parameter to __init__
    • Store as instance variable self._concurrent_invocation_mode
    • Modify stream_async() to conditionally skip lock based on mode
    • Update docstring for new parameter
  2. src/strands/types/agent.py (optional)

    • Add ConcurrentInvocationMode type alias for reuse
  3. tests/strands/agent/test_agent.py

    • Add tests for unsafe_reentrant mode covering all existing concurrent test scenarios:
      • test_agent_concurrent_call_raises_exception → add parallel test for unsafe_reentrant
      • test_agent_concurrent_structured_output_raises_exception → add parallel test for unsafe_reentrant
    • Verify concurrent invocations succeed without exception when using unsafe_reentrant

Acceptance Criteria

  • concurrent_invocation_mode parameter added to Agent.__init__ with default "throw"
  • "throw" mode maintains existing behavior (raises ConcurrencyException)
  • "unsafe_reentrant" mode skips lock acquisition entirely
  • No runtime warning emitted for unsafe_reentrant mode
  • Unit tests cover both modes for all concurrent invocation scenarios
  • Docstring updated with parameter documentation
  • All existing tests continue to pass
  • Type annotations complete and mypy passes

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions