diff --git a/third_party/opentelemetry/instrumentation/agents/agentops_agents_instrumentor.py b/third_party/opentelemetry/instrumentation/agents/agentops_agents_instrumentor.py index 2f1e75ef5..150a51fae 100644 --- a/third_party/opentelemetry/instrumentation/agents/agentops_agents_instrumentor.py +++ b/third_party/opentelemetry/instrumentation/agents/agentops_agents_instrumentor.py @@ -28,6 +28,7 @@ SpanAttributes, Meters, ) +from agentops.helpers.serialization import safe_serialize # Agents SDK imports from agents.tracing.processor_interface import TracingProcessor as AgentsTracingProcessor @@ -51,7 +52,6 @@ # Keep track of active streaming operations to prevent premature shutdown _active_streaming_operations = set() - def safe_execute(func): """Decorator to safely execute a function and log any exceptions.""" @@ -182,10 +182,10 @@ def _export_span(self, span: AgentsSpan[Any]) -> None: attributes[AgentAttributes.AGENT_NAME] = span_data.name if hasattr(span_data, "input") and span_data.input: - attributes[SpanAttributes.LLM_PROMPTS] = str(span_data.input)[:1000] # Truncate long inputs + attributes[SpanAttributes.LLM_PROMPTS] = safe_serialize(span_data.input) if hasattr(span_data, "output") and span_data.output: - attributes[SpanAttributes.LLM_COMPLETIONS] = str(span_data.output)[:1000] # Truncate long outputs + attributes[SpanAttributes.LLM_COMPLETIONS] = safe_serialize(span_data.output) # Extract model information - check for GenerationSpanData specifically if span_type == "Generation" and hasattr(span_data, "model") and span_data.model: @@ -411,7 +411,7 @@ def instrumented_run_streamed( attributes = { "span.kind": WorkflowAttributes.WORKFLOW_STEP, "agent.name": starting_agent.name, - WorkflowAttributes.WORKFLOW_INPUT: str(input)[:1000], + WorkflowAttributes.WORKFLOW_INPUT: safe_serialize(input), WorkflowAttributes.MAX_TURNS: max_turns, "service.name": "agentops.agents", WorkflowAttributes.WORKFLOW_TYPE: "agents.run_streamed", @@ -447,7 +447,7 @@ def instrumented_run_streamed( instruction_type = "unknown" if isinstance(starting_agent.instructions, str): instruction_type = "string" - span.set_attribute("agent.instructions", starting_agent.instructions[:1000]) + span.set_attribute("agent.instructions", starting_agent.instructions) elif callable(starting_agent.instructions): instruction_type = "function" # Store the function name or representation @@ -456,7 +456,7 @@ def instrumented_run_streamed( ) span.set_attribute("agent.instruction_function", func_name) else: - span.set_attribute("agent.instructions", str(starting_agent.instructions)[:1000]) + span.set_attribute("agent.instructions", str(starting_agent.instructions)) span.set_attribute("agent.instruction_type", instruction_type) @@ -609,7 +609,7 @@ async def instrumented_stream_events(): # Add result attributes to the span if hasattr(result, "final_output"): usage_span.set_attribute( - WorkflowAttributes.FINAL_OUTPUT, str(result.final_output)[:1000] + WorkflowAttributes.FINAL_OUTPUT, safe_serialize(result.final_output) ) # Extract model and response information @@ -866,7 +866,7 @@ async def instrumented_method( attributes = { "span.kind": WorkflowAttributes.WORKFLOW_STEP, "agent.name": starting_agent.name, - WorkflowAttributes.WORKFLOW_INPUT: str(input)[:1000], + WorkflowAttributes.WORKFLOW_INPUT: safe_serialize(input), WorkflowAttributes.MAX_TURNS: max_turns, "service.name": "agentops.agents", WorkflowAttributes.WORKFLOW_TYPE: f"agents.{_method_name}", @@ -902,7 +902,7 @@ async def instrumented_method( instruction_type = "unknown" if isinstance(starting_agent.instructions, str): instruction_type = "string" - span.set_attribute("agent.instructions", starting_agent.instructions[:1000]) + span.set_attribute("agent.instructions", starting_agent.instructions) elif callable(starting_agent.instructions): instruction_type = "function" # Store the function name or representation @@ -911,7 +911,7 @@ async def instrumented_method( ) span.set_attribute("agent.instruction_function", func_name) else: - span.set_attribute("agent.instructions", str(starting_agent.instructions)[:1000]) + span.set_attribute("agent.instructions", str(starting_agent.instructions)) span.set_attribute("agent.instruction_type", instruction_type) @@ -971,7 +971,7 @@ async def instrumented_method( # Add result attributes to the span if hasattr(result, "final_output"): - span.set_attribute(WorkflowAttributes.FINAL_OUTPUT, str(result.final_output)[:1000]) + span.set_attribute(WorkflowAttributes.FINAL_OUTPUT, safe_serialize(result.final_output)) # Extract model and response information response_id = None @@ -1136,7 +1136,7 @@ def instrumented_method( attributes = { "span.kind": WorkflowAttributes.WORKFLOW_STEP, "agent.name": starting_agent.name, - WorkflowAttributes.WORKFLOW_INPUT: str(input)[:1000], + WorkflowAttributes.WORKFLOW_INPUT: safe_serialize(input), WorkflowAttributes.MAX_TURNS: max_turns, "service.name": "agentops.agents", WorkflowAttributes.WORKFLOW_TYPE: f"agents.{_method_name}", @@ -1172,7 +1172,7 @@ def instrumented_method( instruction_type = "unknown" if isinstance(starting_agent.instructions, str): instruction_type = "string" - span.set_attribute("agent.instructions", starting_agent.instructions[:1000]) + span.set_attribute("agent.instructions", starting_agent.instructions) elif callable(starting_agent.instructions): instruction_type = "function" # Store the function name or representation @@ -1181,7 +1181,7 @@ def instrumented_method( ) span.set_attribute("agent.instruction_function", func_name) else: - span.set_attribute("agent.instructions", str(starting_agent.instructions)[:1000]) + span.set_attribute("agent.instructions", str(starting_agent.instructions)) span.set_attribute("agent.instruction_type", instruction_type) @@ -1241,7 +1241,7 @@ def instrumented_method( # Add result attributes to the span if hasattr(result, "final_output"): - span.set_attribute(WorkflowAttributes.FINAL_OUTPUT, str(result.final_output)[:1000]) + span.set_attribute(WorkflowAttributes.FINAL_OUTPUT, safe_serialize(result.final_output)) # Extract model and response information response_id = None