|
| 1 | +import os |
1 | 2 | import json |
2 | 3 | import asyncio |
3 | 4 | from typing import Any, Dict, List, override |
|
19 | 20 | ShipmentArrivedSiteEvent, |
20 | 21 | ShipmentDepartedFactoryEvent, |
21 | 22 | ) |
| 23 | +from agentex.lib.types.tracing import SGPTracingProcessorConfig |
22 | 24 | from agentex.lib.utils.logging import make_logger |
23 | 25 | from agentex.types.data_content import DataContent |
24 | 26 | from agentex.types.text_content import TextContent |
|
36 | 38 | from project.agents.extract_learnings_agent import new_extract_learnings_agent |
37 | 39 | from agentex.lib.core.temporal.types.workflow import SignalName |
38 | 40 | from agentex.lib.core.temporal.workflows.workflow import BaseWorkflow |
| 41 | +from agentex.lib.core.tracing.tracing_processor_manager import ( |
| 42 | + add_tracing_processor_config, |
| 43 | +) |
39 | 44 | from agentex.lib.core.temporal.plugins.openai_agents.hooks.hooks import TemporalStreamingHooks |
40 | 45 |
|
41 | 46 | environment_variables = EnvironmentVariables.refresh() |
|
48 | 53 |
|
49 | 54 | logger = make_logger(__name__) |
50 | 55 |
|
| 56 | +# Setup tracing for SGP (Scale GenAI Platform) |
| 57 | +# This enables visibility into your agent's execution in the SGP dashboard |
| 58 | +add_tracing_processor_config( |
| 59 | + SGPTracingProcessorConfig( |
| 60 | + sgp_api_key=os.environ.get("SGP_API_KEY", ""), |
| 61 | + sgp_account_id=os.environ.get("SGP_ACCOUNT_ID", ""), |
| 62 | + sgp_base_url=os.environ.get("SGP_BASE_URL"), |
| 63 | + ) |
| 64 | +) |
| 65 | + |
| 66 | + |
| 67 | +class TurnInput(BaseModel): |
| 68 | + """Input model for tracing spans.""" |
| 69 | + input_list: List[Dict[str, Any]] |
| 70 | + |
| 71 | + |
| 72 | +class TurnOutput(BaseModel): |
| 73 | + """Output model for tracing spans.""" |
| 74 | + final_output: Any |
| 75 | + |
| 76 | + |
51 | 77 | class StateModel(BaseModel): |
52 | 78 | """ |
53 | 79 | State model for preserving conversation history. |
54 | 80 |
|
55 | 81 | This allows the agent to maintain context throughout the conversation, |
56 | 82 | making it possible to reference previous messages and build on the discussion. |
| 83 | +
|
| 84 | + Attributes: |
| 85 | + input_list: The conversation history in OpenAI message format. |
| 86 | + turn_number: Counter for tracking conversation turns (useful for tracing). |
57 | 87 | """ |
58 | 88 | input_list: List[Dict[str, Any]] |
| 89 | + turn_number: int |
59 | 90 |
|
60 | 91 |
|
61 | 92 | @workflow.defn(name=environment_variables.WORKFLOW_NAME) |
@@ -119,7 +150,7 @@ async def on_task_event_send(self, params: SendEventParams) -> None: |
119 | 150 | async def on_task_create(self, params: CreateTaskParams) -> str: |
120 | 151 | logger.info(f"Received task create params: {params}") |
121 | 152 |
|
122 | | - self._state = StateModel(input_list=[]) |
| 153 | + self._state = StateModel(input_list=[], turn_number=0) |
123 | 154 |
|
124 | 155 | self._task_id = params.task.id |
125 | 156 | self._trace_id = params.task.id |
@@ -225,21 +256,41 @@ async def on_task_create(self, params: CreateTaskParams) -> str: |
225 | 256 | ) |
226 | 257 | continue # Skip this event, wait for next one |
227 | 258 |
|
| 259 | + # Increment turn number for tracing |
| 260 | + self._state.turn_number += 1 |
| 261 | + |
| 262 | + # Create a span to track this turn of the conversation |
| 263 | + turn_input = TurnInput( |
| 264 | + input_list=self._state.input_list, |
| 265 | + ) |
| 266 | + |
228 | 267 | # Create agent and execute with error handling |
229 | 268 | try: |
230 | | - procurement_agent = new_procurement_agent( |
231 | | - master_construction_schedule=master_construction_schedule, |
232 | | - human_input_learnings=self.human_input_learnings |
233 | | - ) |
| 269 | + async with adk.tracing.span( |
| 270 | + trace_id=params.task.id, |
| 271 | + name=f"Turn {self._state.turn_number}", |
| 272 | + input=turn_input.model_dump(), |
| 273 | + ) as span: |
| 274 | + self._parent_span_id = span.id if span else None |
| 275 | + |
| 276 | + procurement_agent = new_procurement_agent( |
| 277 | + master_construction_schedule=master_construction_schedule, |
| 278 | + human_input_learnings=self.human_input_learnings |
| 279 | + ) |
| 280 | + |
| 281 | + hooks = TemporalStreamingHooks(task_id=params.task.id) |
234 | 282 |
|
235 | | - hooks = TemporalStreamingHooks(task_id=params.task.id) |
| 283 | + # Execute agent with graceful degradation pattern (from temporal-community demos) |
| 284 | + result = await Runner.run(procurement_agent, self._state.input_list, hooks=hooks) # type: ignore[arg-type] |
236 | 285 |
|
237 | | - # Execute agent with graceful degradation pattern (from temporal-community demos) |
238 | | - result = await Runner.run(procurement_agent, self._state.input_list, hooks=hooks) # type: ignore[arg-type] |
| 286 | + # Update state with result |
| 287 | + self._state.input_list = result.to_input_list() # type: ignore[assignment] |
| 288 | + logger.info("Successfully processed event") |
239 | 289 |
|
240 | | - # Update state with result |
241 | | - self._state.input_list = result.to_input_list() # type: ignore[assignment] |
242 | | - logger.info("Successfully processed event") |
| 290 | + # Set span output for tracing |
| 291 | + if span: |
| 292 | + turn_output = TurnOutput(final_output=result.final_output) |
| 293 | + span.output = turn_output.model_dump() |
243 | 294 | # Extract learnings from NEW wait_for_human calls only (using going backwards approach) |
244 | 295 | try: |
245 | 296 | result_context = get_new_wait_for_human_context( |
|
0 commit comments