diff --git a/src/strands/agent/agent.py b/src/strands/agent/agent.py index 8fc5be6ca..82d7543c5 100644 --- a/src/strands/agent/agent.py +++ b/src/strands/agent/agent.py @@ -562,6 +562,8 @@ async def stream_async( """ self._interrupt_state.resume(prompt) + self.event_loop_metrics.reset_usage_metrics() + merged_state = {} if kwargs: warnings.warn("`**kwargs` parameter is deprecating, use `invocation_state` instead.", stacklevel=2) diff --git a/src/strands/telemetry/metrics.py b/src/strands/telemetry/metrics.py index abfbbffae..8f3ee1ea1 100644 --- a/src/strands/telemetry/metrics.py +++ b/src/strands/telemetry/metrics.py @@ -151,6 +151,34 @@ def add_call( metrics_client.tool_error_count.add(1, attributes=attributes) +@dataclass +class EventLoopCycleMetric: + """Aggregated metrics for a single event loop cycle. + + Attributes: + event_loop_cycle_id: Current eventLoop cycle id. + usage: Total token usage for the entire cycle (succeeded model invocation, excluding tool invocations). + """ + + event_loop_cycle_id: str + usage: Usage + + +@dataclass +class AgentInvocation: + """Metrics for a single agent invocation. + + AgentInvocation contains all the event loop cycles and accumulated token usage for that invocation. + + Attributes: + cycles: List of event loop cycles that occurred during this invocation. + usage: Accumulated token usage for this invocation across all cycles. + """ + + cycles: list[EventLoopCycleMetric] = field(default_factory=list) + usage: Usage = field(default_factory=lambda: Usage(inputTokens=0, outputTokens=0, totalTokens=0)) + + @dataclass class EventLoopMetrics: """Aggregated metrics for an event loop's execution. @@ -159,15 +187,17 @@ class EventLoopMetrics: cycle_count: Number of event loop cycles executed. tool_metrics: Metrics for each tool used, keyed by tool name. cycle_durations: List of durations for each cycle in seconds. + agent_invocations: Agent invocation metrics containing cycles and usage data. traces: List of execution traces. - accumulated_usage: Accumulated token usage across all model invocations. + accumulated_usage: Accumulated token usage across all model invocations (across all requests). accumulated_metrics: Accumulated performance metrics across all model invocations. """ cycle_count: int = 0 - tool_metrics: Dict[str, ToolMetrics] = field(default_factory=dict) - cycle_durations: List[float] = field(default_factory=list) - traces: List[Trace] = field(default_factory=list) + tool_metrics: dict[str, ToolMetrics] = field(default_factory=dict) + cycle_durations: list[float] = field(default_factory=list) + agent_invocations: list[AgentInvocation] = field(default_factory=list) + traces: list[Trace] = field(default_factory=list) accumulated_usage: Usage = field(default_factory=lambda: Usage(inputTokens=0, outputTokens=0, totalTokens=0)) accumulated_metrics: Metrics = field(default_factory=lambda: Metrics(latencyMs=0)) @@ -176,14 +206,23 @@ def _metrics_client(self) -> "MetricsClient": """Get the singleton MetricsClient instance.""" return MetricsClient() + @property + def latest_agent_invocation(self) -> Optional[AgentInvocation]: + """Get the most recent agent invocation. + + Returns: + The most recent AgentInvocation, or None if no invocations exist. + """ + return self.agent_invocations[-1] if self.agent_invocations else None + def start_cycle( self, - attributes: Optional[Dict[str, Any]] = None, + attributes: Dict[str, Any], ) -> Tuple[float, Trace]: """Start a new event loop cycle and create a trace for it. Args: - attributes: attributes of the metrics. + attributes: attributes of the metrics, including event_loop_cycle_id. Returns: A tuple containing the start time and the cycle trace object. @@ -194,6 +233,14 @@ def start_cycle( start_time = time.time() cycle_trace = Trace(f"Cycle {self.cycle_count}", start_time=start_time) self.traces.append(cycle_trace) + + self.agent_invocations[-1].cycles.append( + EventLoopCycleMetric( + event_loop_cycle_id=attributes["event_loop_cycle_id"], + usage=Usage(inputTokens=0, outputTokens=0, totalTokens=0), + ) + ) + return start_time, cycle_trace def end_cycle(self, start_time: float, cycle_trace: Trace, attributes: Optional[Dict[str, Any]] = None) -> None: @@ -252,32 +299,53 @@ def add_tool_usage( ) tool_trace.end() + def _accumulate_usage(self, target: Usage, source: Usage) -> None: + """Helper method to accumulate usage from source to target. + + Args: + target: The Usage object to accumulate into. + source: The Usage object to accumulate from. + """ + target["inputTokens"] += source["inputTokens"] + target["outputTokens"] += source["outputTokens"] + target["totalTokens"] += source["totalTokens"] + + if "cacheReadInputTokens" in source: + target["cacheReadInputTokens"] = target.get("cacheReadInputTokens", 0) + source["cacheReadInputTokens"] + + if "cacheWriteInputTokens" in source: + target["cacheWriteInputTokens"] = target.get("cacheWriteInputTokens", 0) + source["cacheWriteInputTokens"] + def update_usage(self, usage: Usage) -> None: """Update the accumulated token usage with new usage data. Args: usage: The usage data to add to the accumulated totals. """ + # Record metrics to OpenTelemetry self._metrics_client.event_loop_input_tokens.record(usage["inputTokens"]) self._metrics_client.event_loop_output_tokens.record(usage["outputTokens"]) - self.accumulated_usage["inputTokens"] += usage["inputTokens"] - self.accumulated_usage["outputTokens"] += usage["outputTokens"] - self.accumulated_usage["totalTokens"] += usage["totalTokens"] - # Handle optional cached token metrics + # Handle optional cached token metrics for OpenTelemetry if "cacheReadInputTokens" in usage: - cache_read_tokens = usage["cacheReadInputTokens"] - self._metrics_client.event_loop_cache_read_input_tokens.record(cache_read_tokens) - self.accumulated_usage["cacheReadInputTokens"] = ( - self.accumulated_usage.get("cacheReadInputTokens", 0) + cache_read_tokens - ) - + self._metrics_client.event_loop_cache_read_input_tokens.record(usage["cacheReadInputTokens"]) if "cacheWriteInputTokens" in usage: - cache_write_tokens = usage["cacheWriteInputTokens"] - self._metrics_client.event_loop_cache_write_input_tokens.record(cache_write_tokens) - self.accumulated_usage["cacheWriteInputTokens"] = ( - self.accumulated_usage.get("cacheWriteInputTokens", 0) + cache_write_tokens - ) + self._metrics_client.event_loop_cache_write_input_tokens.record(usage["cacheWriteInputTokens"]) + + self._accumulate_usage(self.accumulated_usage, usage) + self._accumulate_usage(self.agent_invocations[-1].usage, usage) + + if self.agent_invocations[-1].cycles: + current_cycle = self.agent_invocations[-1].cycles[-1] + self._accumulate_usage(current_cycle.usage, usage) + + def reset_usage_metrics(self) -> None: + """Start a new agent invocation by creating a new AgentInvocation. + + This should be called at the start of a new request to begin tracking + a new agent invocation with fresh usage and cycle data. + """ + self.agent_invocations.append(AgentInvocation()) def update_metrics(self, metrics: Metrics) -> None: """Update the accumulated performance metrics with new metrics data. @@ -322,6 +390,16 @@ def get_summary(self) -> Dict[str, Any]: "traces": [trace.to_dict() for trace in self.traces], "accumulated_usage": self.accumulated_usage, "accumulated_metrics": self.accumulated_metrics, + "agent_invocations": [ + { + "usage": invocation.usage, + "cycles": [ + {"event_loop_cycle_id": cycle.event_loop_cycle_id, "usage": cycle.usage} + for cycle in invocation.cycles + ], + } + for invocation in self.agent_invocations + ], } return summary diff --git a/tests/strands/event_loop/test_event_loop.py b/tests/strands/event_loop/test_event_loop.py index 52980729c..6b23bd592 100644 --- a/tests/strands/event_loop/test_event_loop.py +++ b/tests/strands/event_loop/test_event_loop.py @@ -142,6 +142,7 @@ def agent(model, system_prompt, messages, tool_registry, thread_pool, hook_regis mock.tool_registry = tool_registry mock.thread_pool = thread_pool mock.event_loop_metrics = EventLoopMetrics() + mock.event_loop_metrics.reset_usage_metrics() mock.hooks = hook_registry mock.tool_executor = tool_executor mock._interrupt_state = _InterruptState() diff --git a/tests/strands/event_loop/test_event_loop_structured_output.py b/tests/strands/event_loop/test_event_loop_structured_output.py index 508042af0..23b7f3433 100644 --- a/tests/strands/event_loop/test_event_loop_structured_output.py +++ b/tests/strands/event_loop/test_event_loop_structured_output.py @@ -37,6 +37,7 @@ def mock_agent(): agent.messages = [] agent.tool_registry = ToolRegistry() agent.event_loop_metrics = EventLoopMetrics() + agent.event_loop_metrics.reset_usage_metrics() agent.hooks = Mock() agent.hooks.invoke_callbacks_async = AsyncMock() agent.trace_span = None diff --git a/tests/strands/telemetry/test_metrics.py b/tests/strands/telemetry/test_metrics.py index e87277eed..800bcebc4 100644 --- a/tests/strands/telemetry/test_metrics.py +++ b/tests/strands/telemetry/test_metrics.py @@ -240,9 +240,15 @@ def test_tool_metrics_add_call(success, tool, tool_metrics, mock_get_meter_provi @unittest.mock.patch.object(strands.telemetry.metrics.uuid, "uuid4") def test_event_loop_metrics_start_cycle(mock_uuid4, mock_time, event_loop_metrics, mock_get_meter_provider): mock_time.return_value = 1 - mock_uuid4.return_value = "i1" + mock_event_loop_cycle_id = "i1" + mock_uuid4.return_value = mock_event_loop_cycle_id - tru_start_time, tru_cycle_trace = event_loop_metrics.start_cycle() + # Reset must be called first + event_loop_metrics.reset_usage_metrics() + + tru_start_time, tru_cycle_trace = event_loop_metrics.start_cycle( + attributes={"event_loop_cycle_id": mock_event_loop_cycle_id} + ) exp_start_time, exp_cycle_trace = 1, strands.telemetry.metrics.Trace("Cycle 1") tru_attrs = {"cycle_count": event_loop_metrics.cycle_count, "traces": event_loop_metrics.traces} @@ -256,6 +262,13 @@ def test_event_loop_metrics_start_cycle(mock_uuid4, mock_time, event_loop_metric and tru_attrs == exp_attrs ) + assert len(event_loop_metrics.agent_invocations) == 1 + assert len(event_loop_metrics.agent_invocations[0].cycles) == 1 + assert event_loop_metrics.agent_invocations[0].cycles[0].event_loop_cycle_id == "i1" + assert event_loop_metrics.agent_invocations[0].cycles[0].usage["inputTokens"] == 0 + assert event_loop_metrics.agent_invocations[0].cycles[0].usage["outputTokens"] == 0 + assert event_loop_metrics.agent_invocations[0].cycles[0].usage["totalTokens"] == 0 + @unittest.mock.patch.object(strands.telemetry.metrics.time, "time") def test_event_loop_metrics_end_cycle(mock_time, trace, event_loop_metrics, mock_get_meter_provider): @@ -324,6 +337,9 @@ def test_event_loop_metrics_add_tool_usage(mock_time, trace, tool, event_loop_me def test_event_loop_metrics_update_usage(usage, event_loop_metrics, mock_get_meter_provider): + event_loop_metrics.reset_usage_metrics() + event_loop_metrics.start_cycle(attributes={"event_loop_cycle_id": "test-cycle"}) + for _ in range(3): event_loop_metrics.update_usage(usage) @@ -331,6 +347,14 @@ def test_event_loop_metrics_update_usage(usage, event_loop_metrics, mock_get_met exp_usage = Usage(inputTokens=3, outputTokens=6, totalTokens=9, cacheWriteInputTokens=6) assert tru_usage == exp_usage + + assert event_loop_metrics.latest_agent_invocation.usage == exp_usage + + assert len(event_loop_metrics.agent_invocations) == 1 + assert len(event_loop_metrics.agent_invocations[0].cycles) == 1 + assert event_loop_metrics.agent_invocations[0].cycles[0].event_loop_cycle_id == "test-cycle" + assert event_loop_metrics.agent_invocations[0].cycles[0].usage == exp_usage + mock_get_meter_provider.return_value.get_meter.assert_called() metrics_client = event_loop_metrics._metrics_client metrics_client.event_loop_input_tokens.record.assert_called() @@ -370,6 +394,7 @@ def test_event_loop_metrics_get_summary(trace, tool, event_loop_metrics, mock_ge "outputTokens": 0, "totalTokens": 0, }, + "agent_invocations": [], "average_cycle_time": 0, "tool_usage": { "tool1": { @@ -476,3 +501,68 @@ def test_use_ProxyMeter_if_no_global_meter_provider(): # Verify it's using a _ProxyMeter assert isinstance(metrics_client.meter, _ProxyMeter) + + +def test_latest_agent_invocation_property(usage, event_loop_metrics, mock_get_meter_provider): + """Test the latest_agent_invocation property getter""" + # Initially, no invocations exist + assert event_loop_metrics.latest_agent_invocation is None + + event_loop_metrics.reset_usage_metrics() + event_loop_metrics.start_cycle(attributes={"event_loop_cycle_id": "cycle-1"}) + event_loop_metrics.update_usage(usage) + + # latest_agent_invocation should return the first invocation + current = event_loop_metrics.latest_agent_invocation + assert current is not None + assert current.usage["inputTokens"] == 1 + assert len(current.cycles) == 1 + + event_loop_metrics.reset_usage_metrics() + event_loop_metrics.start_cycle(attributes={"event_loop_cycle_id": "cycle-2"}) + usage2 = Usage(inputTokens=10, outputTokens=20, totalTokens=30) + event_loop_metrics.update_usage(usage2) + + # Should return the second invocation + current = event_loop_metrics.latest_agent_invocation + assert current is not None + assert current.usage["inputTokens"] == 10 + assert len(current.cycles) == 1 + + assert len(event_loop_metrics.agent_invocations) == 2 + + assert current is event_loop_metrics.agent_invocations[-1] + + +def test_reset_usage_metrics(usage, event_loop_metrics, mock_get_meter_provider): + """Test that reset_usage_metrics creates a new agent invocation but preserves accumulated_usage""" + # Add some usage across multiple cycles in first invocation + event_loop_metrics.reset_usage_metrics() + event_loop_metrics.start_cycle(attributes={"event_loop_cycle_id": "cycle-1"}) + event_loop_metrics.update_usage(usage) + + event_loop_metrics.start_cycle(attributes={"event_loop_cycle_id": "cycle-2"}) + usage2 = Usage(inputTokens=10, outputTokens=20, totalTokens=30) + event_loop_metrics.update_usage(usage2) + + assert len(event_loop_metrics.agent_invocations) == 1 + assert event_loop_metrics.latest_agent_invocation.usage["inputTokens"] == 11 + assert len(event_loop_metrics.latest_agent_invocation.cycles) == 2 + assert event_loop_metrics.accumulated_usage["inputTokens"] == 11 + + # Reset - creates a new invocation + event_loop_metrics.reset_usage_metrics() + + assert len(event_loop_metrics.agent_invocations) == 2 + + assert event_loop_metrics.latest_agent_invocation.usage["inputTokens"] == 0 + assert event_loop_metrics.latest_agent_invocation.usage["outputTokens"] == 0 + assert event_loop_metrics.latest_agent_invocation.usage["totalTokens"] == 0 + assert len(event_loop_metrics.latest_agent_invocation.cycles) == 0 + + # Verify first invocation data is preserved + assert event_loop_metrics.agent_invocations[0].usage["inputTokens"] == 11 + assert len(event_loop_metrics.agent_invocations[0].cycles) == 2 + + # Verify accumulated_usage is NOT cleared + assert event_loop_metrics.accumulated_usage["inputTokens"] == 11