From 03bb7fde9ffee65a0138217dc882d89bacb0cbc9 Mon Sep 17 00:00:00 2001 From: Dwij Patel Date: Sat, 15 Mar 2025 04:07:57 +0530 Subject: [PATCH 1/4] Add safe JSON serializa for Agents SDK inputs and outputs --- .../agents/agentops_agents_instrumentor.py | 35 +++++++++++-------- 1 file changed, 21 insertions(+), 14 deletions(-) diff --git a/third_party/opentelemetry/instrumentation/agents/agentops_agents_instrumentor.py b/third_party/opentelemetry/instrumentation/agents/agentops_agents_instrumentor.py index 2f1e75ef5..c0e4a3ef5 100644 --- a/third_party/opentelemetry/instrumentation/agents/agentops_agents_instrumentor.py +++ b/third_party/opentelemetry/instrumentation/agents/agentops_agents_instrumentor.py @@ -119,6 +119,13 @@ def get_model_info(agent: Any, run_config: Any = None) -> Dict[str, Any]: return result +def safe_json_serialize(obj): + """Helper function to safely serialize objects to JSON string.""" + if isinstance(obj, (list, dict)): + return json.dumps(obj) + return str(obj) + + class AgentsDetailedExporter: """ A detailed exporter for Agents SDK traces and spans that forwards them to AgentOps. @@ -182,10 +189,10 @@ def _export_span(self, span: AgentsSpan[Any]) -> None: attributes[AgentAttributes.AGENT_NAME] = span_data.name if hasattr(span_data, "input") and span_data.input: - attributes[SpanAttributes.LLM_PROMPTS] = str(span_data.input)[:1000] # Truncate long inputs + attributes[SpanAttributes.LLM_PROMPTS] = safe_json_serialize(span_data.input) if hasattr(span_data, "output") and span_data.output: - attributes[SpanAttributes.LLM_COMPLETIONS] = str(span_data.output)[:1000] # Truncate long outputs + attributes[SpanAttributes.LLM_COMPLETIONS] = safe_json_serialize(span_data.output) # Extract model information - check for GenerationSpanData specifically if span_type == "Generation" and hasattr(span_data, "model") and span_data.model: @@ -411,7 +418,7 @@ def instrumented_run_streamed( attributes = { "span.kind": WorkflowAttributes.WORKFLOW_STEP, "agent.name": starting_agent.name, - WorkflowAttributes.WORKFLOW_INPUT: str(input)[:1000], + WorkflowAttributes.WORKFLOW_INPUT: safe_json_serialize(input), WorkflowAttributes.MAX_TURNS: max_turns, "service.name": "agentops.agents", WorkflowAttributes.WORKFLOW_TYPE: "agents.run_streamed", @@ -447,7 +454,7 @@ def instrumented_run_streamed( instruction_type = "unknown" if isinstance(starting_agent.instructions, str): instruction_type = "string" - span.set_attribute("agent.instructions", starting_agent.instructions[:1000]) + span.set_attribute("agent.instructions", starting_agent.instructions) elif callable(starting_agent.instructions): instruction_type = "function" # Store the function name or representation @@ -456,7 +463,7 @@ def instrumented_run_streamed( ) span.set_attribute("agent.instruction_function", func_name) else: - span.set_attribute("agent.instructions", str(starting_agent.instructions)[:1000]) + span.set_attribute("agent.instructions", str(starting_agent.instructions)) span.set_attribute("agent.instruction_type", instruction_type) @@ -609,7 +616,7 @@ async def instrumented_stream_events(): # Add result attributes to the span if hasattr(result, "final_output"): usage_span.set_attribute( - WorkflowAttributes.FINAL_OUTPUT, str(result.final_output)[:1000] + WorkflowAttributes.FINAL_OUTPUT, safe_json_serialize(result.final_output) ) # Extract model and response information @@ -866,7 +873,7 @@ async def instrumented_method( attributes = { "span.kind": WorkflowAttributes.WORKFLOW_STEP, "agent.name": starting_agent.name, - WorkflowAttributes.WORKFLOW_INPUT: str(input)[:1000], + WorkflowAttributes.WORKFLOW_INPUT: safe_json_serialize(input), WorkflowAttributes.MAX_TURNS: max_turns, "service.name": "agentops.agents", WorkflowAttributes.WORKFLOW_TYPE: f"agents.{_method_name}", @@ -902,7 +909,7 @@ async def instrumented_method( instruction_type = "unknown" if isinstance(starting_agent.instructions, str): instruction_type = "string" - span.set_attribute("agent.instructions", starting_agent.instructions[:1000]) + span.set_attribute("agent.instructions", starting_agent.instructions) elif callable(starting_agent.instructions): instruction_type = "function" # Store the function name or representation @@ -911,7 +918,7 @@ async def instrumented_method( ) span.set_attribute("agent.instruction_function", func_name) else: - span.set_attribute("agent.instructions", str(starting_agent.instructions)[:1000]) + span.set_attribute("agent.instructions", str(starting_agent.instructions)) span.set_attribute("agent.instruction_type", instruction_type) @@ -971,7 +978,7 @@ async def instrumented_method( # Add result attributes to the span if hasattr(result, "final_output"): - span.set_attribute(WorkflowAttributes.FINAL_OUTPUT, str(result.final_output)[:1000]) + span.set_attribute(WorkflowAttributes.FINAL_OUTPUT, safe_json_serialize(result.final_output)) # Extract model and response information response_id = None @@ -1136,7 +1143,7 @@ def instrumented_method( attributes = { "span.kind": WorkflowAttributes.WORKFLOW_STEP, "agent.name": starting_agent.name, - WorkflowAttributes.WORKFLOW_INPUT: str(input)[:1000], + WorkflowAttributes.WORKFLOW_INPUT: safe_json_serialize(input), WorkflowAttributes.MAX_TURNS: max_turns, "service.name": "agentops.agents", WorkflowAttributes.WORKFLOW_TYPE: f"agents.{_method_name}", @@ -1172,7 +1179,7 @@ def instrumented_method( instruction_type = "unknown" if isinstance(starting_agent.instructions, str): instruction_type = "string" - span.set_attribute("agent.instructions", starting_agent.instructions[:1000]) + span.set_attribute("agent.instructions", starting_agent.instructions) elif callable(starting_agent.instructions): instruction_type = "function" # Store the function name or representation @@ -1181,7 +1188,7 @@ def instrumented_method( ) span.set_attribute("agent.instruction_function", func_name) else: - span.set_attribute("agent.instructions", str(starting_agent.instructions)[:1000]) + span.set_attribute("agent.instructions", str(starting_agent.instructions)) span.set_attribute("agent.instruction_type", instruction_type) @@ -1241,7 +1248,7 @@ def instrumented_method( # Add result attributes to the span if hasattr(result, "final_output"): - span.set_attribute(WorkflowAttributes.FINAL_OUTPUT, str(result.final_output)[:1000]) + span.set_attribute(WorkflowAttributes.FINAL_OUTPUT, safe_json_serialize(result.final_output)) # Extract model and response information response_id = None From e9ece2b9d272e2dcc970abec8577afd37c6b7012 Mon Sep 17 00:00:00 2001 From: Dwij Patel Date: Sat, 15 Mar 2025 04:14:10 +0530 Subject: [PATCH 2/4] Refactor serialization in Agents SDK to use safe_serialize function from helpers --- .../agents/agentops_agents_instrumentor.py | 24 +++++++------------ 1 file changed, 9 insertions(+), 15 deletions(-) diff --git a/third_party/opentelemetry/instrumentation/agents/agentops_agents_instrumentor.py b/third_party/opentelemetry/instrumentation/agents/agentops_agents_instrumentor.py index c0e4a3ef5..a709390dc 100644 --- a/third_party/opentelemetry/instrumentation/agents/agentops_agents_instrumentor.py +++ b/third_party/opentelemetry/instrumentation/agents/agentops_agents_instrumentor.py @@ -28,6 +28,7 @@ SpanAttributes, Meters, ) +from agentops.helpers.serialization import safe_serialize # Agents SDK imports from agents.tracing.processor_interface import TracingProcessor as AgentsTracingProcessor @@ -119,13 +120,6 @@ def get_model_info(agent: Any, run_config: Any = None) -> Dict[str, Any]: return result -def safe_json_serialize(obj): - """Helper function to safely serialize objects to JSON string.""" - if isinstance(obj, (list, dict)): - return json.dumps(obj) - return str(obj) - - class AgentsDetailedExporter: """ A detailed exporter for Agents SDK traces and spans that forwards them to AgentOps. @@ -189,10 +183,10 @@ def _export_span(self, span: AgentsSpan[Any]) -> None: attributes[AgentAttributes.AGENT_NAME] = span_data.name if hasattr(span_data, "input") and span_data.input: - attributes[SpanAttributes.LLM_PROMPTS] = safe_json_serialize(span_data.input) + attributes[SpanAttributes.LLM_PROMPTS] = safe_serialize(span_data.input) if hasattr(span_data, "output") and span_data.output: - attributes[SpanAttributes.LLM_COMPLETIONS] = safe_json_serialize(span_data.output) + attributes[SpanAttributes.LLM_COMPLETIONS] = safe_serialize(span_data.output) # Extract model information - check for GenerationSpanData specifically if span_type == "Generation" and hasattr(span_data, "model") and span_data.model: @@ -418,7 +412,7 @@ def instrumented_run_streamed( attributes = { "span.kind": WorkflowAttributes.WORKFLOW_STEP, "agent.name": starting_agent.name, - WorkflowAttributes.WORKFLOW_INPUT: safe_json_serialize(input), + WorkflowAttributes.WORKFLOW_INPUT: safe_serialize(input), WorkflowAttributes.MAX_TURNS: max_turns, "service.name": "agentops.agents", WorkflowAttributes.WORKFLOW_TYPE: "agents.run_streamed", @@ -616,7 +610,7 @@ async def instrumented_stream_events(): # Add result attributes to the span if hasattr(result, "final_output"): usage_span.set_attribute( - WorkflowAttributes.FINAL_OUTPUT, safe_json_serialize(result.final_output) + WorkflowAttributes.FINAL_OUTPUT, safe_serialize(result.final_output) ) # Extract model and response information @@ -873,7 +867,7 @@ async def instrumented_method( attributes = { "span.kind": WorkflowAttributes.WORKFLOW_STEP, "agent.name": starting_agent.name, - WorkflowAttributes.WORKFLOW_INPUT: safe_json_serialize(input), + WorkflowAttributes.WORKFLOW_INPUT: safe_serialize(input), WorkflowAttributes.MAX_TURNS: max_turns, "service.name": "agentops.agents", WorkflowAttributes.WORKFLOW_TYPE: f"agents.{_method_name}", @@ -978,7 +972,7 @@ async def instrumented_method( # Add result attributes to the span if hasattr(result, "final_output"): - span.set_attribute(WorkflowAttributes.FINAL_OUTPUT, safe_json_serialize(result.final_output)) + span.set_attribute(WorkflowAttributes.FINAL_OUTPUT, safe_serialize(result.final_output)) # Extract model and response information response_id = None @@ -1143,7 +1137,7 @@ def instrumented_method( attributes = { "span.kind": WorkflowAttributes.WORKFLOW_STEP, "agent.name": starting_agent.name, - WorkflowAttributes.WORKFLOW_INPUT: safe_json_serialize(input), + WorkflowAttributes.WORKFLOW_INPUT: safe_serialize(input), WorkflowAttributes.MAX_TURNS: max_turns, "service.name": "agentops.agents", WorkflowAttributes.WORKFLOW_TYPE: f"agents.{_method_name}", @@ -1248,7 +1242,7 @@ def instrumented_method( # Add result attributes to the span if hasattr(result, "final_output"): - span.set_attribute(WorkflowAttributes.FINAL_OUTPUT, safe_json_serialize(result.final_output)) + span.set_attribute(WorkflowAttributes.FINAL_OUTPUT, safe_serialize(result.final_output)) # Extract model and response information response_id = None From cda53c5af0c1762447a9073a46457ad68b9c1b46 Mon Sep 17 00:00:00 2001 From: Dwij Patel Date: Sat, 15 Mar 2025 04:24:23 +0530 Subject: [PATCH 3/4] Implement serialize_if_complex function for improved object serialization in Agents SDK. Update references in agentops_agents_instrumentor to utilize the new function for handling inputs and outputs. --- agentops/helpers/serialization.py | 7 +++++++ .../agents/agentops_agents_instrumentor.py | 18 +++++++++--------- 2 files changed, 16 insertions(+), 9 deletions(-) diff --git a/agentops/helpers/serialization.py b/agentops/helpers/serialization.py index 5420bde60..8f63b0c83 100644 --- a/agentops/helpers/serialization.py +++ b/agentops/helpers/serialization.py @@ -79,3 +79,10 @@ def safe_serialize(obj: Any) -> Any: except (TypeError, ValueError) as e: logger.warning(f"Failed to serialize object: {e}") return str(obj) + +def serialize_if_complex(obj: Any) -> Any: + """Serialize an object if it's a list or dictionary, otherwise return the object as is""" + if isinstance(obj, (list, dict)): + return safe_serialize(obj) + return str(obj) + diff --git a/third_party/opentelemetry/instrumentation/agents/agentops_agents_instrumentor.py b/third_party/opentelemetry/instrumentation/agents/agentops_agents_instrumentor.py index a709390dc..60b7ffab5 100644 --- a/third_party/opentelemetry/instrumentation/agents/agentops_agents_instrumentor.py +++ b/third_party/opentelemetry/instrumentation/agents/agentops_agents_instrumentor.py @@ -28,7 +28,7 @@ SpanAttributes, Meters, ) -from agentops.helpers.serialization import safe_serialize +from agentops.helpers.serialization import serialize_if_complex # Agents SDK imports from agents.tracing.processor_interface import TracingProcessor as AgentsTracingProcessor @@ -183,10 +183,10 @@ def _export_span(self, span: AgentsSpan[Any]) -> None: attributes[AgentAttributes.AGENT_NAME] = span_data.name if hasattr(span_data, "input") and span_data.input: - attributes[SpanAttributes.LLM_PROMPTS] = safe_serialize(span_data.input) + attributes[SpanAttributes.LLM_PROMPTS] = serialize_if_complex(span_data.input) if hasattr(span_data, "output") and span_data.output: - attributes[SpanAttributes.LLM_COMPLETIONS] = safe_serialize(span_data.output) + attributes[SpanAttributes.LLM_COMPLETIONS] = serialize_if_complex(span_data.output) # Extract model information - check for GenerationSpanData specifically if span_type == "Generation" and hasattr(span_data, "model") and span_data.model: @@ -412,7 +412,7 @@ def instrumented_run_streamed( attributes = { "span.kind": WorkflowAttributes.WORKFLOW_STEP, "agent.name": starting_agent.name, - WorkflowAttributes.WORKFLOW_INPUT: safe_serialize(input), + WorkflowAttributes.WORKFLOW_INPUT: serialize_if_complex(input), WorkflowAttributes.MAX_TURNS: max_turns, "service.name": "agentops.agents", WorkflowAttributes.WORKFLOW_TYPE: "agents.run_streamed", @@ -610,7 +610,7 @@ async def instrumented_stream_events(): # Add result attributes to the span if hasattr(result, "final_output"): usage_span.set_attribute( - WorkflowAttributes.FINAL_OUTPUT, safe_serialize(result.final_output) + WorkflowAttributes.FINAL_OUTPUT, serialize_if_complex(result.final_output) ) # Extract model and response information @@ -867,7 +867,7 @@ async def instrumented_method( attributes = { "span.kind": WorkflowAttributes.WORKFLOW_STEP, "agent.name": starting_agent.name, - WorkflowAttributes.WORKFLOW_INPUT: safe_serialize(input), + WorkflowAttributes.WORKFLOW_INPUT: serialize_if_complex(input), WorkflowAttributes.MAX_TURNS: max_turns, "service.name": "agentops.agents", WorkflowAttributes.WORKFLOW_TYPE: f"agents.{_method_name}", @@ -972,7 +972,7 @@ async def instrumented_method( # Add result attributes to the span if hasattr(result, "final_output"): - span.set_attribute(WorkflowAttributes.FINAL_OUTPUT, safe_serialize(result.final_output)) + span.set_attribute(WorkflowAttributes.FINAL_OUTPUT, serialize_if_complex(result.final_output)) # Extract model and response information response_id = None @@ -1137,7 +1137,7 @@ def instrumented_method( attributes = { "span.kind": WorkflowAttributes.WORKFLOW_STEP, "agent.name": starting_agent.name, - WorkflowAttributes.WORKFLOW_INPUT: safe_serialize(input), + WorkflowAttributes.WORKFLOW_INPUT: serialize_if_complex(input), WorkflowAttributes.MAX_TURNS: max_turns, "service.name": "agentops.agents", WorkflowAttributes.WORKFLOW_TYPE: f"agents.{_method_name}", @@ -1242,7 +1242,7 @@ def instrumented_method( # Add result attributes to the span if hasattr(result, "final_output"): - span.set_attribute(WorkflowAttributes.FINAL_OUTPUT, safe_serialize(result.final_output)) + span.set_attribute(WorkflowAttributes.FINAL_OUTPUT, serialize_if_complex(result.final_output)) # Extract model and response information response_id = None From 540d0c84edc1a992bbac9042e4b836f4aad66e13 Mon Sep 17 00:00:00 2001 From: Dwij Patel Date: Sat, 15 Mar 2025 04:59:36 +0530 Subject: [PATCH 4/4] Refactor serialization in Agents SDK to remove serialize_if_complex and replace with safe_serialize --- agentops/helpers/serialization.py | 7 ------- .../agents/agentops_agents_instrumentor.py | 19 +++++++++---------- 2 files changed, 9 insertions(+), 17 deletions(-) diff --git a/agentops/helpers/serialization.py b/agentops/helpers/serialization.py index 8f63b0c83..5420bde60 100644 --- a/agentops/helpers/serialization.py +++ b/agentops/helpers/serialization.py @@ -79,10 +79,3 @@ def safe_serialize(obj: Any) -> Any: except (TypeError, ValueError) as e: logger.warning(f"Failed to serialize object: {e}") return str(obj) - -def serialize_if_complex(obj: Any) -> Any: - """Serialize an object if it's a list or dictionary, otherwise return the object as is""" - if isinstance(obj, (list, dict)): - return safe_serialize(obj) - return str(obj) - diff --git a/third_party/opentelemetry/instrumentation/agents/agentops_agents_instrumentor.py b/third_party/opentelemetry/instrumentation/agents/agentops_agents_instrumentor.py index 60b7ffab5..150a51fae 100644 --- a/third_party/opentelemetry/instrumentation/agents/agentops_agents_instrumentor.py +++ b/third_party/opentelemetry/instrumentation/agents/agentops_agents_instrumentor.py @@ -28,7 +28,7 @@ SpanAttributes, Meters, ) -from agentops.helpers.serialization import serialize_if_complex +from agentops.helpers.serialization import safe_serialize # Agents SDK imports from agents.tracing.processor_interface import TracingProcessor as AgentsTracingProcessor @@ -52,7 +52,6 @@ # Keep track of active streaming operations to prevent premature shutdown _active_streaming_operations = set() - def safe_execute(func): """Decorator to safely execute a function and log any exceptions.""" @@ -183,10 +182,10 @@ def _export_span(self, span: AgentsSpan[Any]) -> None: attributes[AgentAttributes.AGENT_NAME] = span_data.name if hasattr(span_data, "input") and span_data.input: - attributes[SpanAttributes.LLM_PROMPTS] = serialize_if_complex(span_data.input) + attributes[SpanAttributes.LLM_PROMPTS] = safe_serialize(span_data.input) if hasattr(span_data, "output") and span_data.output: - attributes[SpanAttributes.LLM_COMPLETIONS] = serialize_if_complex(span_data.output) + attributes[SpanAttributes.LLM_COMPLETIONS] = safe_serialize(span_data.output) # Extract model information - check for GenerationSpanData specifically if span_type == "Generation" and hasattr(span_data, "model") and span_data.model: @@ -412,7 +411,7 @@ def instrumented_run_streamed( attributes = { "span.kind": WorkflowAttributes.WORKFLOW_STEP, "agent.name": starting_agent.name, - WorkflowAttributes.WORKFLOW_INPUT: serialize_if_complex(input), + WorkflowAttributes.WORKFLOW_INPUT: safe_serialize(input), WorkflowAttributes.MAX_TURNS: max_turns, "service.name": "agentops.agents", WorkflowAttributes.WORKFLOW_TYPE: "agents.run_streamed", @@ -610,7 +609,7 @@ async def instrumented_stream_events(): # Add result attributes to the span if hasattr(result, "final_output"): usage_span.set_attribute( - WorkflowAttributes.FINAL_OUTPUT, serialize_if_complex(result.final_output) + WorkflowAttributes.FINAL_OUTPUT, safe_serialize(result.final_output) ) # Extract model and response information @@ -867,7 +866,7 @@ async def instrumented_method( attributes = { "span.kind": WorkflowAttributes.WORKFLOW_STEP, "agent.name": starting_agent.name, - WorkflowAttributes.WORKFLOW_INPUT: serialize_if_complex(input), + WorkflowAttributes.WORKFLOW_INPUT: safe_serialize(input), WorkflowAttributes.MAX_TURNS: max_turns, "service.name": "agentops.agents", WorkflowAttributes.WORKFLOW_TYPE: f"agents.{_method_name}", @@ -972,7 +971,7 @@ async def instrumented_method( # Add result attributes to the span if hasattr(result, "final_output"): - span.set_attribute(WorkflowAttributes.FINAL_OUTPUT, serialize_if_complex(result.final_output)) + span.set_attribute(WorkflowAttributes.FINAL_OUTPUT, safe_serialize(result.final_output)) # Extract model and response information response_id = None @@ -1137,7 +1136,7 @@ def instrumented_method( attributes = { "span.kind": WorkflowAttributes.WORKFLOW_STEP, "agent.name": starting_agent.name, - WorkflowAttributes.WORKFLOW_INPUT: serialize_if_complex(input), + WorkflowAttributes.WORKFLOW_INPUT: safe_serialize(input), WorkflowAttributes.MAX_TURNS: max_turns, "service.name": "agentops.agents", WorkflowAttributes.WORKFLOW_TYPE: f"agents.{_method_name}", @@ -1242,7 +1241,7 @@ def instrumented_method( # Add result attributes to the span if hasattr(result, "final_output"): - span.set_attribute(WorkflowAttributes.FINAL_OUTPUT, serialize_if_complex(result.final_output)) + span.set_attribute(WorkflowAttributes.FINAL_OUTPUT, safe_serialize(result.final_output)) # Extract model and response information response_id = None