Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pip_install.log
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
strands-agents-tools 0.2.19 requires strands-agents>=1.0.0, but you have strands-agents 0.1.dev1+g252f896b4 which is incompatible.
85 changes: 51 additions & 34 deletions src/strands/agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
"""

import logging
import threading
import warnings
from typing import (
TYPE_CHECKING,
Expand Down Expand Up @@ -59,7 +60,7 @@
from ..types._events import AgentResultEvent, EventLoopStopEvent, InitEventLoopEvent, ModelStreamChunkEvent, TypedEvent
from ..types.agent import AgentInput
from ..types.content import ContentBlock, Message, Messages, SystemContentBlock
from ..types.exceptions import ContextWindowOverflowException
from ..types.exceptions import ConcurrencyException, ContextWindowOverflowException
from ..types.traces import AttributeValue
from .agent_result import AgentResult
from .conversation_manager import (
Expand Down Expand Up @@ -245,6 +246,11 @@ def __init__(

self._interrupt_state = _InterruptState()

# Initialize lock for guarding concurrent invocations
# Using threading.Lock instead of asyncio.Lock because run_async() creates
# separate event loops in different threads, so asyncio.Lock wouldn't work
self._invocation_lock = threading.Lock()

# Initialize session management functionality
self._session_manager = session_manager
if self._session_manager:
Expand Down Expand Up @@ -554,6 +560,7 @@ async def stream_async(
- And other event data provided by the callback handler

Raises:
ConcurrencyException: If another invocation is already in progress on this agent instance.
Exception: Any exceptions from the agent invocation will be propagated to the caller.

Example:
Expand All @@ -563,50 +570,60 @@ async def stream_async(
yield event["data"]
```
"""
self._interrupt_state.resume(prompt)
# Check if lock is already acquired to fail fast on concurrent invocations
if self._invocation_lock.locked():
Copy link

@pgrayy pgrayy Jan 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line here can still lead to a race condition. 1 task can pass this check and then Python can context switch over to another before hitting with self._invocation_lock. That other task then can come in and pass this check. This means 2 tasks can find there way to the with self._invocation_lock line. This could further lead to a deadlock because one of the tasks will block at with self._invocation_lock without returning control back to the asyncio event loop (no await support with threading.Lock).

Chance of this happening is probably small but it is a chance nonetheless.

raise ConcurrencyException(
"Agent is already processing a request. Concurrent invocations are not supported."
)

self.event_loop_metrics.reset_usage_metrics()
# Acquire lock to prevent concurrent invocations
# Using threading.Lock instead of asyncio.Lock because run_async() creates
# separate event loops in different threads
with self._invocation_lock:
self._interrupt_state.resume(prompt)

merged_state = {}
if kwargs:
warnings.warn("`**kwargs` parameter is deprecating, use `invocation_state` instead.", stacklevel=2)
merged_state.update(kwargs)
if invocation_state is not None:
merged_state["invocation_state"] = invocation_state
else:
if invocation_state is not None:
merged_state = invocation_state
self.event_loop_metrics.reset_usage_metrics()

merged_state = {}
if kwargs:
warnings.warn("`**kwargs` parameter is deprecating, use `invocation_state` instead.", stacklevel=2)
merged_state.update(kwargs)
if invocation_state is not None:
merged_state["invocation_state"] = invocation_state
else:
if invocation_state is not None:
merged_state = invocation_state

callback_handler = self.callback_handler
if kwargs:
callback_handler = kwargs.get("callback_handler", self.callback_handler)
callback_handler = self.callback_handler
if kwargs:
callback_handler = kwargs.get("callback_handler", self.callback_handler)

# Process input and get message to add (if any)
messages = await self._convert_prompt_to_messages(prompt)
# Process input and get message to add (if any)
messages = await self._convert_prompt_to_messages(prompt)

self.trace_span = self._start_agent_trace_span(messages)
self.trace_span = self._start_agent_trace_span(messages)

with trace_api.use_span(self.trace_span):
try:
events = self._run_loop(messages, merged_state, structured_output_model)
with trace_api.use_span(self.trace_span):
try:
events = self._run_loop(messages, merged_state, structured_output_model)

async for event in events:
event.prepare(invocation_state=merged_state)
async for event in events:
event.prepare(invocation_state=merged_state)

if event.is_callback_event:
as_dict = event.as_dict()
callback_handler(**as_dict)
yield as_dict
if event.is_callback_event:
as_dict = event.as_dict()
callback_handler(**as_dict)
yield as_dict

result = AgentResult(*event["stop"])
callback_handler(result=result)
yield AgentResultEvent(result=result).as_dict()
result = AgentResult(*event["stop"])
callback_handler(result=result)
yield AgentResultEvent(result=result).as_dict()

self._end_agent_trace_span(response=result)
self._end_agent_trace_span(response=result)

except Exception as e:
self._end_agent_trace_span(error=e)
raise
except Exception as e:
self._end_agent_trace_span(error=e)
raise

async def _run_loop(
self,
Expand Down
7 changes: 7 additions & 0 deletions src/strands/tools/_caller.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from ..tools.executors._executor import ToolExecutor
from ..types._events import ToolInterruptEvent
from ..types.content import ContentBlock, Message
from ..types.exceptions import ConcurrencyException
from ..types.tools import ToolResult, ToolUse

if TYPE_CHECKING:
Expand Down Expand Up @@ -73,6 +74,12 @@ def caller(
if self._agent._interrupt_state.activated:
raise RuntimeError("cannot directly call tool during interrupt")

# Check if agent is already processing an invocation
if self._agent._invocation_lock.locked():
raise ConcurrencyException(
"Agent is already processing a request. Concurrent invocations are not supported."
)

normalized_name = self._find_normalized_tool_name(name)

# Create unique tool ID and set up the tool request
Expand Down
11 changes: 11 additions & 0 deletions src/strands/types/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,3 +94,14 @@ def __init__(self, message: str):
"""
self.message = message
super().__init__(message)


class ConcurrencyException(Exception):
"""Exception raised when concurrent invocations are attempted on an agent instance.

Agent instances maintain internal state that cannot be safely accessed concurrently.
This exception is raised when an invocation is attempted while another invocation
is already in progress on the same agent instance.
"""

pass
1 change: 1 addition & 0 deletions test_output.log
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
/bin/sh: 1: hatch: not found
Loading