diff --git a/agentops/integration/callbacks/langchain/README.md b/agentops/integration/callbacks/langchain/README.md new file mode 100644 index 000000000..971e2a9e0 --- /dev/null +++ b/agentops/integration/callbacks/langchain/README.md @@ -0,0 +1,59 @@ +# AgentOps LangChain Callback Handler + +This callback handler enables seamless integration between LangChain and AgentOps for tracing and monitoring LLM applications. + +## Features + +- **Complete Coverage**: Supports all LangChain callback methods +- **Session Tracking**: Creates a session span that serves as the root for all operations +- **Proper Hierarchy**: Maintains parent-child relationships between operations +- **Complete Instrumentation**: Tracks LLMs, chains, tools, and agent actions +- **Error Tracking**: Records errors from LLMs, chains, and tools +- **Streaming Support**: Handles token streaming for real-time insights +- **Attribute Capture**: Records inputs, outputs, and metadata for all operations +- **Error Resilience**: Handles errors gracefully to ensure spans are always properly closed + +## Supported Callbacks + +The handler implements all LangChain callback methods: + +| Method | Description | Span Kind | Attributes | +|--------|-------------|-----------|------------| +| `on_llm_start` | Start of an LLM call | `llm` | Model, prompts, parameters | +| `on_llm_end` | End of an LLM call | `llm` | Completions, token usage | +| `on_llm_new_token` | Streaming token received | N/A | Token count, last token | +| `on_llm_error` | LLM call error | `llm` | Error details | +| `on_chat_model_start` | Start of a chat model call | `llm` | Model, messages, parameters | +| `on_chain_start` | Start of a chain | `task` | Chain type, inputs | +| `on_chain_end` | End of a chain | `task` | Outputs | +| `on_chain_error` | Chain execution error | `task` | Error details | +| `on_tool_start` | Start of a tool call | `tool` | Tool name, input | +| `on_tool_end` | End of a tool call | `tool` | Output | +| `on_tool_error` | Tool execution error | `tool` | Error details | +| `on_agent_action` | Agent taking an action | `agent` | Tool, input, log | +| `on_agent_finish` | Agent completing a task | `agent` | Output, log | +| `on_text` | Arbitrary text event | `text` | Text content | + +All spans have appropriate attributes such as: +- Model information for LLM spans +- Input/output for all operations +- Tool names and types +- Chain types and configurations +- Error details for failed operations + +## Troubleshooting + +If you're not seeing data in AgentOps: + +1. Check that your API key is correctly configured +2. Ensure you're passing the handler to all relevant components +3. Verify that all operations are properly ending/closing + +## How It Works + +The callback handler: +1. Creates a session span when initialized +2. Intercepts LangChain callbacks for various operations +3. Creates appropriate spans with meaningful attributes +4. Maintains proper parent-child relationships +5. Automatically cleans up and ends spans when operations complete \ No newline at end of file diff --git a/agentops/integration/callbacks/langchain/__init__.py b/agentops/integration/callbacks/langchain/__init__.py new file mode 100644 index 000000000..6bdcf8e6a --- /dev/null +++ b/agentops/integration/callbacks/langchain/__init__.py @@ -0,0 +1,15 @@ +""" +LangChain integration for AgentOps. + +This module provides the AgentOps LangChain integration, including callbacks and utilities. +""" + +from agentops.integration.callbacks.langchain.callback import ( + LangchainCallbackHandler, + AsyncLangchainCallbackHandler, +) + +__all__ = [ + "LangchainCallbackHandler", + "AsyncLangchainCallbackHandler", +] \ No newline at end of file diff --git a/agentops/integration/callbacks/langchain/callback.py b/agentops/integration/callbacks/langchain/callback.py new file mode 100644 index 000000000..4ebb70ef7 --- /dev/null +++ b/agentops/integration/callbacks/langchain/callback.py @@ -0,0 +1,882 @@ +""" +LangChain callback handler for AgentOps. + +This module provides the LangChain callback handler for AgentOps tracing and monitoring. +""" + +from typing import Any, Dict, List, Optional, Union + +from opentelemetry import trace +from opentelemetry.context import attach, detach, get_current +from opentelemetry.trace import SpanContext, set_span_in_context + +from agentops.helpers.serialization import safe_serialize +from agentops.logging import logger +from agentops.sdk.core import TracingCore +from agentops.semconv import SpanKind, SpanAttributes, LangChainAttributes, LangChainAttributeValues, CoreAttributes +from agentops.integration.callbacks.langchain.utils import get_model_info + +from langchain_core.callbacks.base import BaseCallbackHandler, AsyncCallbackHandler +from langchain_core.outputs import LLMResult +from langchain_core.agents import AgentAction, AgentFinish + +class LangchainCallbackHandler(BaseCallbackHandler): + """ + AgentOps sync callback handler for Langchain. + + This handler creates spans for LLM calls and other langchain operations, + maintaining proper parent-child relationships with session as root span. + + Args: + api_key (str, optional): AgentOps API key + tags (List[str], optional): Tags to add to the session + auto_session (bool, optional): Whether to automatically create a session span + """ + + def __init__( + self, + api_key: Optional[str] = None, + tags: Optional[List[str]] = None, + auto_session: bool = True, + ): + """Initialize the callback handler.""" + self.active_spans = {} + self.api_key = api_key + self.tags = tags or [] + self.session_span = None + self.session_token = None + self.context_tokens = {} # Store context tokens by run_id + self.token_counts = {} # Track token counts for streaming + + # Initialize AgentOps + if auto_session: + self._initialize_agentops() + + def _initialize_agentops(self): + """Initialize AgentOps""" + import agentops + + if not TracingCore.get_instance().initialized: + init_kwargs = { + "auto_start_session": False, + "instrument_llm_calls": True, + } + + if self.api_key: + init_kwargs["api_key"] = self.api_key + + agentops.init(**init_kwargs) + logger.debug("AgentOps initialized from LangChain callback handler") + + if not TracingCore.get_instance().initialized: + logger.warning("AgentOps not initialized, session span will not be created") + return + + tracer = TracingCore.get_instance().get_tracer() + + span_name = f"session.{SpanKind.SESSION}" + + attributes = { + SpanAttributes.AGENTOPS_SPAN_KIND: SpanKind.SESSION, + "session.tags": self.tags, + "agentops.operation.name": "session", + "span.kind": SpanKind.SESSION, + } + + # Create a root session span + self.session_span = tracer.start_span(span_name, attributes=attributes) + + # Attach session span to the current context + self.session_token = attach(set_span_in_context(self.session_span)) + + logger.debug("Created session span as root span for LangChain") + + def _create_span( + self, + operation_name: str, + span_kind: str, + run_id: Any = None, + attributes: Optional[Dict[str, Any]] = None, + parent_run_id: Optional[Any] = None + ): + """ + Create a span for the operation. + + Args: + operation_name: Name of the operation + span_kind: Type of span + run_id: Unique identifier for the operation + attributes: Additional attributes for the span + parent_run_id: The run_id of the parent span if this is a child span + + Returns: + The created span + """ + if not TracingCore.get_instance().initialized: + logger.warning("AgentOps not initialized, spans will not be created") + return trace.NonRecordingSpan(SpanContext.INVALID) + + tracer = TracingCore.get_instance().get_tracer() + + span_name = f"{operation_name}.{span_kind}" + + if attributes is None: + attributes = {} + + attributes[SpanAttributes.AGENTOPS_SPAN_KIND] = span_kind + attributes["agentops.operation.name"] = operation_name + + if run_id is None: + run_id = id(attributes) + + # Get the current active context + current_context = get_current() + + parent_span = None + if parent_run_id is not None and parent_run_id in self.active_spans: + # Get parent span from active spans + parent_span = self.active_spans.get(parent_run_id) + # Create context with parent span + parent_ctx = set_span_in_context(parent_span) + # Start span with parent context + span = tracer.start_span(span_name, context=parent_ctx, attributes=attributes) + logger.debug(f"Started span: {span_name} with parent: {parent_run_id}") + else: + # If no parent_run_id or parent not found, use session as parent + parent_ctx = set_span_in_context(self.session_span) + # Start span with session as parent context + span = tracer.start_span(span_name, context=parent_ctx, attributes=attributes) + logger.debug(f"Started span: {span_name} with session as parent") + + # Store span in active_spans + self.active_spans[run_id] = span + + # Store token to detach later + token = attach(set_span_in_context(span)) + self.context_tokens[run_id] = token + + return span + + def _end_span(self, run_id: Any): + """ + End the span associated with the run_id. + + Args: + run_id: Unique identifier for the operation + """ + if run_id not in self.active_spans: + logger.warning(f"No span found for call {run_id}") + return + + span = self.active_spans.pop(run_id) + token = self.context_tokens.pop(run_id, None) + + if token is not None: + detach(token) + + try: + span.end() + logger.debug(f"Ended span: {span.name}") + except Exception as e: + logger.warning(f"Error ending span: {e}") + + # Clean up token counts if present + if run_id in self.token_counts: + del self.token_counts[run_id] + + def on_llm_start( + self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any + ) -> None: + """Run when LLM starts running.""" + try: + # Add null check for serialized + if serialized is None: + serialized = {} + + model_info = get_model_info(serialized) + # Ensure default values if model_info returns unknown + model_name = model_info.get("model_name", "unknown") + + attributes = { + # Use both standard and LangChain-specific attributes + SpanAttributes.LLM_REQUEST_MODEL: model_name, + LangChainAttributes.LLM_MODEL: model_name, + SpanAttributes.LLM_PROMPTS: safe_serialize(prompts), + LangChainAttributes.LLM_NAME: serialized.get("id", "unknown_llm"), + } + + if "kwargs" in serialized: + for key, value in serialized["kwargs"].items(): + if key == "temperature": + attributes[SpanAttributes.LLM_REQUEST_TEMPERATURE] = value + elif key == "max_tokens": + attributes[SpanAttributes.LLM_REQUEST_MAX_TOKENS] = value + elif key == "top_p": + attributes[SpanAttributes.LLM_REQUEST_TOP_P] = value + + run_id = kwargs.get("run_id", id(serialized or {})) + parent_run_id = kwargs.get("parent_run_id", None) + + # Initialize token count for streaming if needed + self.token_counts[run_id] = 0 + + # Log parent relationship for debugging + if parent_run_id: + logger.debug(f"LLM span with run_id {run_id} has parent {parent_run_id}") + + self._create_span("llm", SpanKind.LLM, run_id, attributes, parent_run_id) + + logger.debug(f"Started LLM span for {model_name}") + except Exception as e: + logger.warning(f"Error in on_llm_start: {e}") + + def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None: + """Run when LLM ends running.""" + try: + run_id = kwargs.get("run_id", id(response)) + + if run_id not in self.active_spans: + logger.warning(f"No span found for LLM call {run_id}") + return + + span = self.active_spans.get(run_id) + + if hasattr(response, "generations") and response.generations: + completions = [] + for gen_list in response.generations: + for gen in gen_list: + if hasattr(gen, "text"): + completions.append(gen.text) + + if completions: + try: + span.set_attribute( + SpanAttributes.LLM_COMPLETIONS, + safe_serialize(completions) + ) + except Exception as e: + logger.warning(f"Failed to set completions: {e}") + + if hasattr(response, "llm_output") and response.llm_output: + token_usage = response.llm_output.get("token_usage", {}) + + if "completion_tokens" in token_usage: + try: + span.set_attribute( + SpanAttributes.LLM_USAGE_COMPLETION_TOKENS, + token_usage["completion_tokens"] + ) + except Exception as e: + logger.warning(f"Failed to set completion tokens: {e}") + + if "prompt_tokens" in token_usage: + try: + span.set_attribute( + SpanAttributes.LLM_USAGE_PROMPT_TOKENS, + token_usage["prompt_tokens"] + ) + except Exception as e: + logger.warning(f"Failed to set prompt tokens: {e}") + + if "total_tokens" in token_usage: + try: + span.set_attribute( + SpanAttributes.LLM_USAGE_TOTAL_TOKENS, + token_usage["total_tokens"] + ) + except Exception as e: + logger.warning(f"Failed to set total tokens: {e}") + + # For streaming, record the total tokens streamed + if run_id in self.token_counts and self.token_counts[run_id] > 0: + try: + span.set_attribute( + SpanAttributes.LLM_USAGE_STREAMING_TOKENS, + self.token_counts[run_id] + ) + except Exception as e: + logger.warning(f"Failed to set streaming tokens: {e}") + + # End the span after setting all attributes + self._end_span(run_id) + + except Exception as e: + logger.warning(f"Error in on_llm_end: {e}") + + def on_chain_start( + self, serialized: Dict[str, Any], inputs: Dict[str, Any], **kwargs: Any + ) -> None: + """Run when chain starts running.""" + try: + # Add null check for serialized + if serialized is None: + serialized = {} + + chain_type = serialized.get("name", "unknown_chain") + + attributes = { + LangChainAttributes.CHAIN_TYPE: chain_type, + LangChainAttributes.CHAIN_NAME: serialized.get("id", "unknown_chain"), + LangChainAttributes.CHAIN_VERBOSE: serialized.get("verbose", False), + "chain.inputs": safe_serialize(inputs), + } + + # Add specific chain types + if "sequential" in chain_type.lower(): + attributes[LangChainAttributes.CHAIN_KIND] = LangChainAttributeValues.CHAIN_KIND_SEQUENTIAL + elif "llm" in chain_type.lower(): + attributes[LangChainAttributes.CHAIN_KIND] = LangChainAttributeValues.CHAIN_KIND_LLM + elif "router" in chain_type.lower(): + attributes[LangChainAttributes.CHAIN_KIND] = LangChainAttributeValues.CHAIN_KIND_ROUTER + + run_id = kwargs.get("run_id", id(serialized or {})) + parent_run_id = kwargs.get("parent_run_id", None) + + # Log parent relationship for debugging + if parent_run_id: + logger.debug(f"Chain span with run_id {run_id} has parent {parent_run_id}") + + self._create_span("chain", SpanKind.CHAIN, run_id, attributes, parent_run_id) + + logger.debug(f"Started Chain span for {chain_type}") + except Exception as e: + logger.warning(f"Error in on_chain_start: {e}") + + def on_chain_end(self, outputs: Dict[str, Any], **kwargs: Any) -> None: + """Run when chain ends running.""" + try: + run_id = kwargs.get("run_id", id(outputs)) + + if run_id not in self.active_spans: + logger.warning(f"No span found for chain call {run_id}") + return + + span = self.active_spans.get(run_id) + + try: + span.set_attribute( + "chain.outputs", + safe_serialize(outputs) + ) + except Exception as e: + logger.warning(f"Failed to set chain outputs: {e}") + + # End the span after setting all attributes + self._end_span(run_id) + + except Exception as e: + logger.warning(f"Error in on_chain_end: {e}") + + def on_tool_start( + self, serialized: Dict[str, Any], input_str: str, **kwargs: Any + ) -> None: + """Run when tool starts running.""" + try: + # Add null check for serialized + if serialized is None: + serialized = {} + + tool_name = serialized.get("name", "unknown_tool") + + attributes = { + LangChainAttributes.TOOL_NAME: tool_name, + LangChainAttributes.TOOL_DESCRIPTION: serialized.get("description", ""), + LangChainAttributes.TOOL_INPUT: input_str, + } + + # Add more tool-specific attributes + if "return_direct" in serialized: + attributes[LangChainAttributes.TOOL_RETURN_DIRECT] = serialized["return_direct"] + + if "args_schema" in serialized: + schema = serialized.get("args_schema") + if schema: + schema_str = str(schema) + if len(schema_str) < 1000: # Avoid extremely large attributes + attributes[LangChainAttributes.TOOL_ARGS_SCHEMA] = schema_str + + run_id = kwargs.get("run_id", id(serialized or {})) + parent_run_id = kwargs.get("parent_run_id", None) + + self._create_span("tool", SpanKind.TOOL, run_id, attributes, parent_run_id) + + logger.debug(f"Started Tool span for {tool_name}") + except Exception as e: + logger.warning(f"Error in on_tool_start: {e}") + + def on_tool_end(self, output: str, **kwargs: Any) -> None: + """Run when tool ends running.""" + try: + run_id = kwargs.get("run_id", id(output)) + + if run_id not in self.active_spans: + logger.warning(f"No span found for tool call {run_id}") + return + + span = self.active_spans.get(run_id) + + try: + span.set_attribute( + LangChainAttributes.TOOL_OUTPUT, + output if isinstance(output, str) else safe_serialize(output) + ) + except Exception as e: + logger.warning(f"Failed to set tool output: {e}") + + # End the span after setting all attributes + self._end_span(run_id) + + except Exception as e: + logger.warning(f"Error in on_tool_end: {e}") + + def on_agent_action(self, action: AgentAction, **kwargs: Any) -> None: + """Run on agent action.""" + try: + tool = action.tool + tool_input = action.tool_input + log = action.log + + attributes = { + LangChainAttributes.AGENT_ACTION_TOOL: tool, + LangChainAttributes.AGENT_ACTION_INPUT: safe_serialize(tool_input), + LangChainAttributes.AGENT_ACTION_LOG: log, + } + + run_id = kwargs.get("run_id", id(action)) + parent_run_id = kwargs.get("parent_run_id", None) + + self._create_span("agent_action", SpanKind.AGENT_ACTION, run_id, attributes, parent_run_id) + + logger.debug(f"Started Agent Action span for {tool}") + except Exception as e: + logger.warning(f"Error in on_agent_action: {e}") + + def on_agent_finish(self, finish: AgentFinish, **kwargs: Any) -> None: + """Run on agent end.""" + try: + run_id = kwargs.get("run_id", id(finish)) + + if run_id not in self.active_spans: + logger.warning(f"No span found for agent finish {run_id}") + return + + span = self.active_spans.get(run_id) + + try: + span.set_attribute( + LangChainAttributes.AGENT_FINISH_RETURN_VALUES, + safe_serialize(finish.return_values) + ) + except Exception as e: + logger.warning(f"Failed to set agent return values: {e}") + + try: + span.set_attribute( + LangChainAttributes.AGENT_FINISH_LOG, + finish.log + ) + except Exception as e: + logger.warning(f"Failed to set agent log: {e}") + + # End the span after setting all attributes + self._end_span(run_id) + + except Exception as e: + logger.warning(f"Error in on_agent_finish: {e}") + + def __del__(self): + """Clean up resources when the handler is deleted.""" + try: + # End any remaining spans + for run_id in list(self.active_spans.keys()): + try: + self._end_span(run_id) + except Exception as e: + logger.warning(f"Error ending span during cleanup: {e}") + + # End session span and detach session token + if self.session_span: + try: + # Detach session token if exists + if hasattr(self, 'session_token') and self.session_token: + detach(self.session_token) + + self.session_span.end() + logger.debug("Ended session span") + except Exception as e: + logger.warning(f"Error ending session span: {e}") + + except Exception as e: + logger.warning(f"Error in __del__: {e}") + + def on_llm_new_token(self, token: str, **kwargs: Any) -> None: + """Run on new token from LLM.""" + try: + run_id = kwargs.get("run_id") + + if not run_id: + logger.warning("No run_id provided for on_llm_new_token") + return + + if run_id not in self.active_spans: + logger.warning(f"No span found for token in run {run_id}") + return + + # Count tokens for later attribution + if run_id in self.token_counts: + self.token_counts[run_id] += 1 + else: + self.token_counts[run_id] = 1 + + # We don't set attributes on each token because it's inefficient + # and can lead to "setting attribute on ended span" errors + # Instead, we count tokens and set the total at the end + + except Exception as e: + logger.warning(f"Error in on_llm_new_token: {e}") + + def on_chat_model_start( + self, serialized: Dict[str, Any], messages: List[Any], **kwargs: Any + ) -> None: + """Run when a chat model starts generating.""" + try: + # Add null check for serialized + if serialized is None: + serialized = {} + + model_info = get_model_info(serialized) + # Ensure default values if model_info returns unknown + model_name = model_info.get("model_name", "unknown") + + # Extract message contents and roles + formatted_messages = [] + roles = [] + + for message in messages: + if hasattr(message, "content") and hasattr(message, "type"): + formatted_messages.append({ + "content": message.content, + "role": message.type + }) + roles.append(message.type) + + attributes = { + # Use both standard and LangChain-specific attributes + SpanAttributes.LLM_REQUEST_MODEL: model_name, + LangChainAttributes.LLM_MODEL: model_name, + SpanAttributes.LLM_PROMPTS: safe_serialize(formatted_messages), + LangChainAttributes.LLM_NAME: serialized.get("id", "unknown_chat_model"), + LangChainAttributes.CHAT_MESSAGE_ROLES: safe_serialize(roles), + LangChainAttributes.CHAT_MODEL_TYPE: "chat", + } + + # Add generation parameters + if "kwargs" in serialized: + for key, value in serialized["kwargs"].items(): + if key == "temperature": + attributes[SpanAttributes.LLM_REQUEST_TEMPERATURE] = value + elif key == "max_tokens": + attributes[SpanAttributes.LLM_REQUEST_MAX_TOKENS] = value + elif key == "top_p": + attributes[SpanAttributes.LLM_REQUEST_TOP_P] = value + + run_id = kwargs.get("run_id", id(serialized or {})) + parent_run_id = kwargs.get("parent_run_id", None) + + # Initialize token count for streaming if needed + self.token_counts[run_id] = 0 + + self._create_span("chat_model", SpanKind.LLM, run_id, attributes, parent_run_id) + + logger.debug(f"Started Chat Model span for {model_name}") + except Exception as e: + logger.warning(f"Error in on_chat_model_start: {e}") + + def on_llm_error( + self, error: Union[Exception, KeyboardInterrupt], **kwargs: Any + ) -> None: + """Run when LLM errors.""" + try: + run_id = kwargs.get("run_id") + + if not run_id or run_id not in self.active_spans: + logger.warning(f"No span found for LLM error {run_id}") + return + + span = self.active_spans.get(run_id) + + # Record error attributes + try: + span.set_attribute( + "error", True + ) + span.set_attribute( + CoreAttributes.ERROR_TYPE, error.__class__.__name__ + ) + span.set_attribute( + CoreAttributes.ERROR_MESSAGE, str(error) + ) + span.set_attribute( + LangChainAttributes.LLM_ERROR, str(error) + ) + except Exception as e: + logger.warning(f"Failed to set error attributes: {e}") + + # End span with error + self._end_span(run_id) + + except Exception as e: + logger.warning(f"Error in on_llm_error: {e}") + + def on_chain_error( + self, error: Union[Exception, KeyboardInterrupt], **kwargs: Any + ) -> None: + """Run when chain errors.""" + try: + run_id = kwargs.get("run_id") + + if not run_id or run_id not in self.active_spans: + logger.warning(f"No span found for chain error {run_id}") + return + + span = self.active_spans.get(run_id) + + # Record error attributes + try: + span.set_attribute( + "error", True + ) + span.set_attribute( + CoreAttributes.ERROR_TYPE, error.__class__.__name__ + ) + span.set_attribute( + CoreAttributes.ERROR_MESSAGE, str(error) + ) + span.set_attribute( + LangChainAttributes.CHAIN_ERROR, str(error) + ) + except Exception as e: + logger.warning(f"Failed to set error attributes: {e}") + + # End span with error + self._end_span(run_id) + + except Exception as e: + logger.warning(f"Error in on_chain_error: {e}") + + def on_tool_error( + self, error: Union[Exception, KeyboardInterrupt], **kwargs: Any + ) -> None: + """Run when tool errors.""" + try: + run_id = kwargs.get("run_id") + + if not run_id or run_id not in self.active_spans: + logger.warning(f"No span found for tool error {run_id}") + return + + span = self.active_spans.get(run_id) + + # Record error attributes + try: + span.set_attribute( + "error", True + ) + span.set_attribute( + CoreAttributes.ERROR_TYPE, error.__class__.__name__ + ) + span.set_attribute( + CoreAttributes.ERROR_MESSAGE, str(error) + ) + span.set_attribute( + LangChainAttributes.TOOL_ERROR, str(error) + ) + except Exception as e: + logger.warning(f"Failed to set error attributes: {e}") + + # End span with error + self._end_span(run_id) + + except Exception as e: + logger.warning(f"Error in on_tool_error: {e}") + + def on_text(self, text: str, **kwargs: Any) -> None: + """ + Run on arbitrary text. + + This can be used for logging or recording intermediate steps. + """ + try: + run_id = kwargs.get("run_id") + + if run_id is None: + # Create a new span for this text + run_id = id(text) + parent_run_id = kwargs.get("parent_run_id") + + attributes = { + LangChainAttributes.TEXT_CONTENT: text, + } + + self._create_span("text", SpanKind.TEXT, run_id, attributes, parent_run_id) + + # Immediately end the span as text events are one-off + self._end_span(run_id) + else: + # Try to find a parent span to add the text to + parent_run_id = kwargs.get("parent_run_id") + + if parent_run_id and parent_run_id in self.active_spans: + # Add text to parent span + try: + parent_span = self.active_spans[parent_run_id] + # Use get_attribute to check if text already exists + existing_text = "" + try: + existing_text = parent_span.get_attribute(LangChainAttributes.TEXT_CONTENT) or "" + except Exception: + # If get_attribute isn't available or fails, just set the text + pass + + if existing_text: + parent_span.set_attribute( + LangChainAttributes.TEXT_CONTENT, + f"{existing_text}\n{text}" + ) + else: + parent_span.set_attribute( + LangChainAttributes.TEXT_CONTENT, + text + ) + except Exception as e: + logger.warning(f"Failed to update parent span with text: {e}") + except Exception as e: + logger.warning(f"Error in on_text: {e}") + +class AsyncLangchainCallbackHandler(AsyncCallbackHandler): + """ + AgentOps async callback handler for Langchain. + + This handler creates spans for LLM calls and other langchain operations, + maintaining proper parent-child relationships with session as root span. + This is the async version of the handler. + + Args: + api_key (str, optional): AgentOps API key + tags (List[str], optional): Tags to add to the session + auto_session (bool, optional): Whether to automatically create a session span + """ + + def __init__( + self, + api_key: Optional[str] = None, + tags: Optional[List[str]] = None, + auto_session: bool = True, + ): + """Initialize the callback handler.""" + # Create an internal sync handler to delegate to + self._sync_handler = LangchainCallbackHandler( + api_key=api_key, + tags=tags, + auto_session=auto_session + ) + + @property + def active_spans(self): + """Access to the active spans dictionary from sync handler.""" + return self._sync_handler.active_spans + + @property + def session_span(self): + """Access to the session span from sync handler.""" + return self._sync_handler.session_span + + async def on_llm_start( + self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any + ) -> None: + """Run when LLM starts running.""" + # Delegate to sync handler + self._sync_handler.on_llm_start(serialized, prompts, **kwargs) + + async def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None: + """Run when LLM ends running.""" + # Delegate to sync handler + self._sync_handler.on_llm_end(response, **kwargs) + + async def on_chain_start( + self, serialized: Dict[str, Any], inputs: Dict[str, Any], **kwargs: Any + ) -> None: + """Run when chain starts running.""" + # Delegate to sync handler + self._sync_handler.on_chain_start(serialized, inputs, **kwargs) + + async def on_chain_end(self, outputs: Dict[str, Any], **kwargs: Any) -> None: + """Run when chain ends running.""" + # Delegate to sync handler + self._sync_handler.on_chain_end(outputs, **kwargs) + + async def on_tool_start( + self, serialized: Dict[str, Any], input_str: str, **kwargs: Any + ) -> None: + """Run when tool starts running.""" + # Delegate to sync handler + self._sync_handler.on_tool_start(serialized, input_str, **kwargs) + + async def on_tool_end(self, output: str, **kwargs: Any) -> None: + """Run when tool ends running.""" + # Delegate to sync handler + self._sync_handler.on_tool_end(output, **kwargs) + + async def on_agent_action(self, action: AgentAction, **kwargs: Any) -> None: + """Run on agent action.""" + # Delegate to sync handler + self._sync_handler.on_agent_action(action, **kwargs) + + async def on_agent_finish(self, finish: AgentFinish, **kwargs: Any) -> None: + """Run on agent end.""" + # Delegate to sync handler + self._sync_handler.on_agent_finish(finish, **kwargs) + + async def on_llm_new_token(self, token: str, **kwargs: Any) -> None: + """Run on new token from LLM.""" + # Delegate to sync handler + self._sync_handler.on_llm_new_token(token, **kwargs) + + async def on_chat_model_start( + self, serialized: Dict[str, Any], messages: List[Any], **kwargs: Any + ) -> None: + """Run when a chat model starts generating.""" + # Delegate to sync handler + self._sync_handler.on_chat_model_start(serialized, messages, **kwargs) + + async def on_llm_error( + self, error: Union[Exception, KeyboardInterrupt], **kwargs: Any + ) -> None: + """Run when LLM errors.""" + # Delegate to sync handler + self._sync_handler.on_llm_error(error, **kwargs) + + async def on_chain_error( + self, error: Union[Exception, KeyboardInterrupt], **kwargs: Any + ) -> None: + """Run when chain errors.""" + # Delegate to sync handler + self._sync_handler.on_chain_error(error, **kwargs) + + async def on_tool_error( + self, error: Union[Exception, KeyboardInterrupt], **kwargs: Any + ) -> None: + """Run when tool errors.""" + # Delegate to sync handler + self._sync_handler.on_tool_error(error, **kwargs) + + async def on_text(self, text: str, **kwargs: Any) -> None: + """Run on arbitrary text.""" + # Delegate to sync handler + self._sync_handler.on_text(text, **kwargs) + + def __del__(self): + """Clean up resources when the handler is deleted.""" + # The sync handler's __del__ will handle cleanup + if hasattr(self, '_sync_handler'): + del self._sync_handler \ No newline at end of file diff --git a/agentops/integration/callbacks/langchain/utils.py b/agentops/integration/callbacks/langchain/utils.py new file mode 100644 index 000000000..913984372 --- /dev/null +++ b/agentops/integration/callbacks/langchain/utils.py @@ -0,0 +1,55 @@ +""" +Utility functions for LangChain integration. +""" + +from typing import Any, Dict, Optional + +from agentops.helpers.serialization import safe_serialize +from agentops.logging import logger + + +def get_model_info(serialized: Optional[Dict[str, Any]]) -> Dict[str, str]: + """ + Extract model information from serialized LangChain data. + + This function attempts to extract model name information + from the serialized data of a LangChain model. + + Args: + serialized: Serialized data from LangChain + + Returns: + Dictionary with model_name key + """ + if serialized is None: + return {"model_name": "unknown"} + + model_info = {"model_name": "unknown"} + + try: + if isinstance(serialized.get("id"), list) and len(serialized["id"]) > 0: + id_list = serialized["id"] + if len(id_list) > 0: + model_info["model_name"] = id_list[-1] + + if isinstance(serialized.get("model_name"), str): + model_info["model_name"] = serialized["model_name"] + + elif serialized.get("id") and isinstance(serialized.get("id"), str): + model_id = serialized.get("id", "") + if "/" in model_id: + _, model_name = model_id.split("/", 1) + model_info["model_name"] = model_name + else: + model_info["model_name"] = model_id + + if serialized.get("kwargs") and isinstance(serialized["kwargs"], dict): + if serialized["kwargs"].get("model_name"): + model_info["model_name"] = serialized["kwargs"]["model_name"] + elif serialized["kwargs"].get("model"): + model_info["model_name"] = serialized["kwargs"]["model"] + + except Exception as e: + logger.warning(f"Error extracting model info: {e}") + + return model_info \ No newline at end of file diff --git a/agentops/semconv/__init__.py b/agentops/semconv/__init__.py index ea26eed4b..ec06895f2 100644 --- a/agentops/semconv/__init__.py +++ b/agentops/semconv/__init__.py @@ -12,6 +12,7 @@ from .meters import Meters from .span_kinds import AgentOpsSpanKindValues from .resource import ResourceAttributes +from .langchain import LangChainAttributes, LangChainAttributeValues SUPPRESS_LANGUAGE_MODEL_INSTRUMENTATION_KEY = "suppress_language_model_instrumentation" __all__ = [ @@ -26,5 +27,8 @@ "LLMRequestTypeValues", "SpanAttributes", "Meters", - "AgentOpsSpanKindValuesResourceAttributes", + "AgentOpsSpanKindValues", + "ResourceAttributes", + "LangChainAttributes", + "LangChainAttributeValues", ] diff --git a/agentops/semconv/langchain.py b/agentops/semconv/langchain.py new file mode 100644 index 000000000..c462c695c --- /dev/null +++ b/agentops/semconv/langchain.py @@ -0,0 +1,58 @@ +"""Semantic conventions for LangChain instrumentation.""" +class LangChainAttributeValues: + """Standard values for LangChain attributes.""" + + CHAIN_KIND_SEQUENTIAL = "sequential" + CHAIN_KIND_LLM = "llm" + CHAIN_KIND_ROUTER = "router" + + # Chat message roles + ROLE_SYSTEM = "system" + ROLE_USER = "user" + ROLE_ASSISTANT = "assistant" + ROLE_FUNCTION = "function" + ROLE_TOOL = "tool" + + +class LangChainAttributes: + """ + Attributes for LangChain instrumentation. + + Note: LLM-specific attributes are derived from SpanAttributes to maintain + consistency across instrumentations. + """ + + # Session attributes + SESSION_TAGS = "langchain.session.tags" + + # Chain attributes - specific to LangChain + CHAIN_NAME = "langchain.chain.name" + CHAIN_TYPE = "langchain.chain.type" + CHAIN_ERROR = "langchain.chain.error" + CHAIN_KIND = "langchain.chain.kind" + CHAIN_VERBOSE = "langchain.chain.verbose" + + # Agent attributes - specific to LangChain agents + AGENT_ACTION_LOG = "langchain.agent.action.log" + AGENT_ACTION_INPUT = "langchain.agent.action.input" + AGENT_ACTION_TOOL = "langchain.agent.action.tool" + AGENT_FINISH_RETURN_VALUES = "langchain.agent.finish.return_values" + AGENT_FINISH_LOG = "langchain.agent.finish.log" + + # Tool attributes - specific to LangChain tools + TOOL_NAME = "langchain.tool.name" + TOOL_INPUT = "langchain.tool.input" + TOOL_OUTPUT = "langchain.tool.output" + TOOL_DESCRIPTION = "langchain.tool.description" + TOOL_ERROR = "langchain.tool.error" + TOOL_ARGS_SCHEMA = "langchain.tool.args_schema" + TOOL_RETURN_DIRECT = "langchain.tool.return_direct" + + # Chat attributes - specific to LangChain chat models + CHAT_MESSAGE_ROLES = "langchain.chat_message.roles" + CHAT_MODEL_TYPE = "langchain.chat_model.type" + + # Text callback attributes + TEXT_CONTENT = "langchain.text.content" + + LLM_ERROR = "langchain.llm.error" \ No newline at end of file diff --git a/agentops/semconv/span_attributes.py b/agentops/semconv/span_attributes.py index 324c7b443..27476801a 100644 --- a/agentops/semconv/span_attributes.py +++ b/agentops/semconv/span_attributes.py @@ -37,6 +37,7 @@ class SpanAttributes: LLM_USAGE_TOTAL_TOKENS = "gen_ai.usage.total_tokens" LLM_USAGE_CACHE_CREATION_INPUT_TOKENS = "gen_ai.usage.cache_creation_input_tokens" LLM_USAGE_CACHE_READ_INPUT_TOKENS = "gen_ai.usage.cache_read_input_tokens" + LLM_USAGE_STREAMING_TOKENS = "gen_ai.usage.streaming_tokens" # Token type LLM_TOKEN_TYPE = "gen_ai.token.type" diff --git a/agentops/semconv/span_kinds.py b/agentops/semconv/span_kinds.py index 190afacbe..71c3fff79 100644 --- a/agentops/semconv/span_kinds.py +++ b/agentops/semconv/span_kinds.py @@ -25,7 +25,8 @@ class SpanKind: LLM = "llm" TEAM = "team" UNKNOWN = "unknown" - + CHAIN = "chain" + TEXT = "text" class AgentOpsSpanKindValues(Enum): WORKFLOW = "workflow" diff --git a/examples/openai_examples/openai_assistants_example.ipynb b/examples/openai_examples/openai_assistants_example.ipynb index f1c2595d3..b0d6c3b83 100644 --- a/examples/openai_examples/openai_assistants_example.ipynb +++ b/examples/openai_examples/openai_assistants_example.ipynb @@ -75,7 +75,7 @@ }, { "cell_type": "code", - "execution_count": 1, + "execution_count": 62, "metadata": {}, "outputs": [], "source": [ @@ -173,7 +173,7 @@ }, { "cell_type": "code", - "execution_count": 2, + "execution_count": 63, "metadata": {}, "outputs": [], "source": [ @@ -325,7 +325,7 @@ }, { "cell_type": "code", - "execution_count": 8, + "execution_count": 69, "metadata": {}, "outputs": [], "source": [ @@ -441,7 +441,7 @@ }, { "cell_type": "code", - "execution_count": 12, + "execution_count": 73, "metadata": {}, "outputs": [], "source": [ @@ -475,7 +475,7 @@ }, { "cell_type": "code", - "execution_count": 13, + "execution_count": 74, "metadata": {}, "outputs": [], "source": [ @@ -630,7 +630,7 @@ }, { "cell_type": "code", - "execution_count": 17, + "execution_count": 78, "metadata": {}, "outputs": [], "source": [ @@ -755,7 +755,7 @@ }, { "cell_type": "code", - "execution_count": 21, + "execution_count": 51, "metadata": {}, "outputs": [], "source": [ @@ -828,7 +828,7 @@ }, { "cell_type": "code", - "execution_count": 23, + "execution_count": 53, "metadata": {}, "outputs": [], "source": [