Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/strands/agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,8 @@ async def stream_async(
"""
self._interrupt_state.resume(prompt)

self.event_loop_metrics.reset_usage_metrics()

merged_state = {}
if kwargs:
warnings.warn("`**kwargs` parameter is deprecating, use `invocation_state` instead.", stacklevel=2)
Expand Down
120 changes: 99 additions & 21 deletions src/strands/telemetry/metrics.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
"""Utilities for collecting and reporting performance metrics in the SDK."""

import logging
Expand Down Expand Up @@ -151,6 +151,34 @@
metrics_client.tool_error_count.add(1, attributes=attributes)


@dataclass
class EventLoopCycleMetric:
"""Aggregated metrics for a single event loop cycle.

Attributes:
event_loop_cycle_id: Current eventLoop cycle id.
usage: Total token usage for the entire cycle (succeeded model invocation, excluding tool invocations).
"""

event_loop_cycle_id: str
usage: Usage


@dataclass
class AgentInvocation:
"""Metrics for a single agent invocation.

AgentInvocation contains all the event loop cycles and accumulated token usage for that invocation.

Attributes:
cycles: List of event loop cycles that occurred during this invocation.
usage: Accumulated token usage for this invocation across all cycles.
"""

cycles: list[EventLoopCycleMetric] = field(default_factory=list)
usage: Usage = field(default_factory=lambda: Usage(inputTokens=0, outputTokens=0, totalTokens=0))


@dataclass
class EventLoopMetrics:
"""Aggregated metrics for an event loop's execution.
Expand All @@ -159,15 +187,17 @@
cycle_count: Number of event loop cycles executed.
tool_metrics: Metrics for each tool used, keyed by tool name.
cycle_durations: List of durations for each cycle in seconds.
agent_invocations: Agent invocation metrics containing cycles and usage data.
traces: List of execution traces.
accumulated_usage: Accumulated token usage across all model invocations.
accumulated_usage: Accumulated token usage across all model invocations (across all requests).
accumulated_metrics: Accumulated performance metrics across all model invocations.
"""

cycle_count: int = 0
tool_metrics: Dict[str, ToolMetrics] = field(default_factory=dict)
cycle_durations: List[float] = field(default_factory=list)
traces: List[Trace] = field(default_factory=list)
tool_metrics: dict[str, ToolMetrics] = field(default_factory=dict)
cycle_durations: list[float] = field(default_factory=list)
agent_invocations: list[AgentInvocation] = field(default_factory=list)
traces: list[Trace] = field(default_factory=list)
accumulated_usage: Usage = field(default_factory=lambda: Usage(inputTokens=0, outputTokens=0, totalTokens=0))
accumulated_metrics: Metrics = field(default_factory=lambda: Metrics(latencyMs=0))

Expand All @@ -176,14 +206,23 @@
"""Get the singleton MetricsClient instance."""
return MetricsClient()

@property
def latest_agent_invocation(self) -> Optional[AgentInvocation]:
"""Get the most recent agent invocation.

Returns:
The most recent AgentInvocation, or None if no invocations exist.
"""
return self.agent_invocations[-1] if self.agent_invocations else None

def start_cycle(

Check warning on line 218 in src/strands/telemetry/metrics.py

View workflow job for this annotation

GitHub Actions / check-api

EventLoopMetrics.start_cycle(attributes)

Parameter is now required
self,
attributes: Optional[Dict[str, Any]] = None,
attributes: Dict[str, Any],
) -> Tuple[float, Trace]:
"""Start a new event loop cycle and create a trace for it.

Args:
attributes: attributes of the metrics.
attributes: attributes of the metrics, including event_loop_cycle_id.

Returns:
A tuple containing the start time and the cycle trace object.
Expand All @@ -194,6 +233,14 @@
start_time = time.time()
cycle_trace = Trace(f"Cycle {self.cycle_count}", start_time=start_time)
self.traces.append(cycle_trace)

self.agent_invocations[-1].cycles.append(
EventLoopCycleMetric(
event_loop_cycle_id=attributes["event_loop_cycle_id"],
usage=Usage(inputTokens=0, outputTokens=0, totalTokens=0),
)
)

return start_time, cycle_trace

def end_cycle(self, start_time: float, cycle_trace: Trace, attributes: Optional[Dict[str, Any]] = None) -> None:
Expand Down Expand Up @@ -252,32 +299,53 @@
)
tool_trace.end()

def _accumulate_usage(self, target: Usage, source: Usage) -> None:
"""Helper method to accumulate usage from source to target.

Args:
target: The Usage object to accumulate into.
source: The Usage object to accumulate from.
"""
target["inputTokens"] += source["inputTokens"]
target["outputTokens"] += source["outputTokens"]
target["totalTokens"] += source["totalTokens"]

if "cacheReadInputTokens" in source:
target["cacheReadInputTokens"] = target.get("cacheReadInputTokens", 0) + source["cacheReadInputTokens"]

if "cacheWriteInputTokens" in source:
target["cacheWriteInputTokens"] = target.get("cacheWriteInputTokens", 0) + source["cacheWriteInputTokens"]

def update_usage(self, usage: Usage) -> None:
"""Update the accumulated token usage with new usage data.

Args:
usage: The usage data to add to the accumulated totals.
"""
# Record metrics to OpenTelemetry
self._metrics_client.event_loop_input_tokens.record(usage["inputTokens"])
self._metrics_client.event_loop_output_tokens.record(usage["outputTokens"])
self.accumulated_usage["inputTokens"] += usage["inputTokens"]
self.accumulated_usage["outputTokens"] += usage["outputTokens"]
self.accumulated_usage["totalTokens"] += usage["totalTokens"]

# Handle optional cached token metrics
# Handle optional cached token metrics for OpenTelemetry
if "cacheReadInputTokens" in usage:
cache_read_tokens = usage["cacheReadInputTokens"]
self._metrics_client.event_loop_cache_read_input_tokens.record(cache_read_tokens)
self.accumulated_usage["cacheReadInputTokens"] = (
self.accumulated_usage.get("cacheReadInputTokens", 0) + cache_read_tokens
)

self._metrics_client.event_loop_cache_read_input_tokens.record(usage["cacheReadInputTokens"])
if "cacheWriteInputTokens" in usage:
cache_write_tokens = usage["cacheWriteInputTokens"]
self._metrics_client.event_loop_cache_write_input_tokens.record(cache_write_tokens)
self.accumulated_usage["cacheWriteInputTokens"] = (
self.accumulated_usage.get("cacheWriteInputTokens", 0) + cache_write_tokens
)
self._metrics_client.event_loop_cache_write_input_tokens.record(usage["cacheWriteInputTokens"])

self._accumulate_usage(self.accumulated_usage, usage)
self._accumulate_usage(self.agent_invocations[-1].usage, usage)

if self.agent_invocations[-1].cycles:
current_cycle = self.agent_invocations[-1].cycles[-1]
self._accumulate_usage(current_cycle.usage, usage)

def reset_usage_metrics(self) -> None:
"""Start a new agent invocation by creating a new AgentInvocation.

This should be called at the start of a new request to begin tracking
a new agent invocation with fresh usage and cycle data.
"""
self.agent_invocations.append(AgentInvocation())

def update_metrics(self, metrics: Metrics) -> None:
"""Update the accumulated performance metrics with new metrics data.
Expand Down Expand Up @@ -322,6 +390,16 @@
"traces": [trace.to_dict() for trace in self.traces],
"accumulated_usage": self.accumulated_usage,
"accumulated_metrics": self.accumulated_metrics,
"agent_invocations": [
{
"usage": invocation.usage,
"cycles": [
{"event_loop_cycle_id": cycle.event_loop_cycle_id, "usage": cycle.usage}
for cycle in invocation.cycles
],
}
for invocation in self.agent_invocations
],
}
return summary

Expand Down
1 change: 1 addition & 0 deletions tests/strands/event_loop/test_event_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ def agent(model, system_prompt, messages, tool_registry, thread_pool, hook_regis
mock.tool_registry = tool_registry
mock.thread_pool = thread_pool
mock.event_loop_metrics = EventLoopMetrics()
mock.event_loop_metrics.reset_usage_metrics()
mock.hooks = hook_registry
mock.tool_executor = tool_executor
mock._interrupt_state = _InterruptState()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ def mock_agent():
agent.messages = []
agent.tool_registry = ToolRegistry()
agent.event_loop_metrics = EventLoopMetrics()
agent.event_loop_metrics.reset_usage_metrics()
agent.hooks = Mock()
agent.hooks.invoke_callbacks_async = AsyncMock()
agent.trace_span = None
Expand Down
94 changes: 92 additions & 2 deletions tests/strands/telemetry/test_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,9 +240,15 @@ def test_tool_metrics_add_call(success, tool, tool_metrics, mock_get_meter_provi
@unittest.mock.patch.object(strands.telemetry.metrics.uuid, "uuid4")
def test_event_loop_metrics_start_cycle(mock_uuid4, mock_time, event_loop_metrics, mock_get_meter_provider):
mock_time.return_value = 1
mock_uuid4.return_value = "i1"
mock_event_loop_cycle_id = "i1"
mock_uuid4.return_value = mock_event_loop_cycle_id

tru_start_time, tru_cycle_trace = event_loop_metrics.start_cycle()
# Reset must be called first
event_loop_metrics.reset_usage_metrics()

tru_start_time, tru_cycle_trace = event_loop_metrics.start_cycle(
attributes={"event_loop_cycle_id": mock_event_loop_cycle_id}
)
exp_start_time, exp_cycle_trace = 1, strands.telemetry.metrics.Trace("Cycle 1")

tru_attrs = {"cycle_count": event_loop_metrics.cycle_count, "traces": event_loop_metrics.traces}
Expand All @@ -256,6 +262,13 @@ def test_event_loop_metrics_start_cycle(mock_uuid4, mock_time, event_loop_metric
and tru_attrs == exp_attrs
)

assert len(event_loop_metrics.agent_invocations) == 1
assert len(event_loop_metrics.agent_invocations[0].cycles) == 1
assert event_loop_metrics.agent_invocations[0].cycles[0].event_loop_cycle_id == "i1"
assert event_loop_metrics.agent_invocations[0].cycles[0].usage["inputTokens"] == 0
assert event_loop_metrics.agent_invocations[0].cycles[0].usage["outputTokens"] == 0
assert event_loop_metrics.agent_invocations[0].cycles[0].usage["totalTokens"] == 0


@unittest.mock.patch.object(strands.telemetry.metrics.time, "time")
def test_event_loop_metrics_end_cycle(mock_time, trace, event_loop_metrics, mock_get_meter_provider):
Expand Down Expand Up @@ -324,13 +337,24 @@ def test_event_loop_metrics_add_tool_usage(mock_time, trace, tool, event_loop_me


def test_event_loop_metrics_update_usage(usage, event_loop_metrics, mock_get_meter_provider):
event_loop_metrics.reset_usage_metrics()
event_loop_metrics.start_cycle(attributes={"event_loop_cycle_id": "test-cycle"})

for _ in range(3):
event_loop_metrics.update_usage(usage)

tru_usage = event_loop_metrics.accumulated_usage
exp_usage = Usage(inputTokens=3, outputTokens=6, totalTokens=9, cacheWriteInputTokens=6)

assert tru_usage == exp_usage

assert event_loop_metrics.latest_agent_invocation.usage == exp_usage

assert len(event_loop_metrics.agent_invocations) == 1
assert len(event_loop_metrics.agent_invocations[0].cycles) == 1
assert event_loop_metrics.agent_invocations[0].cycles[0].event_loop_cycle_id == "test-cycle"
assert event_loop_metrics.agent_invocations[0].cycles[0].usage == exp_usage

mock_get_meter_provider.return_value.get_meter.assert_called()
metrics_client = event_loop_metrics._metrics_client
metrics_client.event_loop_input_tokens.record.assert_called()
Expand Down Expand Up @@ -370,6 +394,7 @@ def test_event_loop_metrics_get_summary(trace, tool, event_loop_metrics, mock_ge
"outputTokens": 0,
"totalTokens": 0,
},
"agent_invocations": [],
"average_cycle_time": 0,
"tool_usage": {
"tool1": {
Expand Down Expand Up @@ -476,3 +501,68 @@ def test_use_ProxyMeter_if_no_global_meter_provider():

# Verify it's using a _ProxyMeter
assert isinstance(metrics_client.meter, _ProxyMeter)


def test_latest_agent_invocation_property(usage, event_loop_metrics, mock_get_meter_provider):
"""Test the latest_agent_invocation property getter"""
# Initially, no invocations exist
assert event_loop_metrics.latest_agent_invocation is None

event_loop_metrics.reset_usage_metrics()
event_loop_metrics.start_cycle(attributes={"event_loop_cycle_id": "cycle-1"})
event_loop_metrics.update_usage(usage)

# latest_agent_invocation should return the first invocation
current = event_loop_metrics.latest_agent_invocation
assert current is not None
assert current.usage["inputTokens"] == 1
assert len(current.cycles) == 1

event_loop_metrics.reset_usage_metrics()
event_loop_metrics.start_cycle(attributes={"event_loop_cycle_id": "cycle-2"})
usage2 = Usage(inputTokens=10, outputTokens=20, totalTokens=30)
event_loop_metrics.update_usage(usage2)

# Should return the second invocation
current = event_loop_metrics.latest_agent_invocation
assert current is not None
assert current.usage["inputTokens"] == 10
assert len(current.cycles) == 1

assert len(event_loop_metrics.agent_invocations) == 2

assert current is event_loop_metrics.agent_invocations[-1]


def test_reset_usage_metrics(usage, event_loop_metrics, mock_get_meter_provider):
"""Test that reset_usage_metrics creates a new agent invocation but preserves accumulated_usage"""
# Add some usage across multiple cycles in first invocation
event_loop_metrics.reset_usage_metrics()
event_loop_metrics.start_cycle(attributes={"event_loop_cycle_id": "cycle-1"})
event_loop_metrics.update_usage(usage)

event_loop_metrics.start_cycle(attributes={"event_loop_cycle_id": "cycle-2"})
usage2 = Usage(inputTokens=10, outputTokens=20, totalTokens=30)
event_loop_metrics.update_usage(usage2)

assert len(event_loop_metrics.agent_invocations) == 1
assert event_loop_metrics.latest_agent_invocation.usage["inputTokens"] == 11
assert len(event_loop_metrics.latest_agent_invocation.cycles) == 2
assert event_loop_metrics.accumulated_usage["inputTokens"] == 11

# Reset - creates a new invocation
event_loop_metrics.reset_usage_metrics()

assert len(event_loop_metrics.agent_invocations) == 2

assert event_loop_metrics.latest_agent_invocation.usage["inputTokens"] == 0
assert event_loop_metrics.latest_agent_invocation.usage["outputTokens"] == 0
assert event_loop_metrics.latest_agent_invocation.usage["totalTokens"] == 0
assert len(event_loop_metrics.latest_agent_invocation.cycles) == 0

# Verify first invocation data is preserved
assert event_loop_metrics.agent_invocations[0].usage["inputTokens"] == 11
assert len(event_loop_metrics.agent_invocations[0].cycles) == 2

# Verify accumulated_usage is NOT cleared
assert event_loop_metrics.accumulated_usage["inputTokens"] == 11
Loading