diff --git a/agentops/__init__.py b/agentops/__init__.py index 4150f839a..c1009a6a8 100755 --- a/agentops/__init__.py +++ b/agentops/__init__.py @@ -2,6 +2,8 @@ import sys from typing import Optional, List, Union +from agentops.telemetry.config import OTELConfig + from .client import Client from .event import Event, ActionEvent, LLMEvent, ToolEvent, ErrorEvent from .decorators import record_action, track_agent, record_tool, record_function @@ -48,6 +50,7 @@ def init( auto_start_session: Optional[bool] = None, inherited_session_id: Optional[str] = None, skip_auto_end_session: Optional[bool] = None, + telemetry: Optional[OTELConfig] = None, # OTEL configuration ) -> Union[Session, None]: """ Initializes the AgentOps singleton pattern. @@ -69,6 +72,8 @@ def init( inherited_session_id (optional, str): Init Agentops with an existing Session skip_auto_end_session (optional, bool): Don't automatically end session based on your framework's decision-making (i.e. Crew determining when tasks are complete and ending the session) + exporters (List[SpanExporter], optional): Additional OpenTelemetry exporters for sending + telemetry data to external systems. Attributes: """ Client().unsuppress_logs() @@ -84,6 +89,7 @@ def init( if default_tags is None: default_tags = tags + # Create OTEL config if exporters provided Client().configure( api_key=api_key, parent_key=parent_key, @@ -94,6 +100,7 @@ def init( instrument_llm_calls=instrument_llm_calls, auto_start_session=auto_start_session, skip_auto_end_session=skip_auto_end_session, + otel=telemetry, # Pass OTEL config through ) if inherited_session_id is not None: diff --git a/agentops/client.py b/agentops/client.py index fb3e17937..911b89f25 100644 --- a/agentops/client.py +++ b/agentops/client.py @@ -20,6 +20,8 @@ from termcolor import colored +from agentops.telemetry.config import OTELConfig + from .config import Configuration from .event import ErrorEvent, Event from .host_env import get_host_env @@ -28,10 +30,16 @@ from .meta_client import MetaClient from .session import Session, active_sessions from .singleton import conditional_singleton +from .telemetry import ClientTelemetry @conditional_singleton class Client(metaclass=MetaClient): + """ + This is the AgentOps core Client. + It's the entrypoint to all core functionality. + """ + def __init__(self): self._pre_init_messages: List[str] = [] self._initialized: bool = False @@ -40,6 +48,7 @@ def __init__(self): self._config = Configuration() self._pre_init_queue = {"agents": []} self._host_env = None # Cache host env data + self.telemetry = ClientTelemetry(self) self.configure( api_key=os.environ.get("AGENTOPS_API_KEY"), @@ -60,6 +69,7 @@ def configure( auto_start_session: Optional[bool] = None, skip_auto_end_session: Optional[bool] = None, env_data_opt_out: Optional[bool] = None, + otel: Optional[OTELConfig] = None, ): if self.has_sessions: return logger.warning( @@ -78,36 +88,34 @@ def configure( auto_start_session=auto_start_session, skip_auto_end_session=skip_auto_end_session, env_data_opt_out=env_data_opt_out, + telemetry=otel, ) def initialize(self) -> Union[Session, None]: - if self.is_initialized: - return - - self.unsuppress_logs() - if self._config.api_key is None: - return logger.error( - "Could not initialize AgentOps client - API Key is missing." - + "\n\t Find your API key at https://app.agentops.ai/settings/projects" - ) + """Initialize the client""" + if not self.is_initialized: + self.unsuppress_logs() + if self._config.api_key is None: + return logger.error( + "Could not initialize AgentOps client - API Key is missing." + + "\n\t Find your API key at https://app.agentops.ai/settings/projects" + ) - self._handle_unclean_exits() - self._initialized = True + self._handle_unclean_exits() + self._initialized = True - if self._config.instrument_llm_calls: - self._llm_tracker = LlmTracker(self) - self._llm_tracker.override_api() + if self._config.instrument_llm_calls: + self._llm_tracker = LlmTracker(self) + self._llm_tracker.override_api() - session = None - if self._config.auto_start_session: - session = self.start_session() + # Initialize telemetry with configuration + self.telemetry.initialize(self._config.telemetry) - if session: - for agent_args in self._pre_init_queue["agents"]: - session.create_agent(name=agent_args["name"], agent_id=agent_args["agent_id"]) - self._pre_init_queue["agents"] = [] + session = None + if self._config.auto_start_session: + session = self.start_session() - return session + return session def _initialize_autogen_logger(self) -> None: try: @@ -224,6 +232,7 @@ def start_session( session_tags.update(tags) session = Session( + client=self.telemetry, session_id=session_id, tags=list(session_tags), host_env=self.host_env, diff --git a/agentops/config.py b/agentops/config.py index 7dfb574d2..5f0c809c1 100644 --- a/agentops/config.py +++ b/agentops/config.py @@ -2,6 +2,7 @@ from uuid import UUID from .log_config import logger +from .telemetry.config import OTELConfig class Configuration: @@ -16,6 +17,7 @@ def __init__(self): self.auto_start_session: bool = True self.skip_auto_end_session: bool = False self.env_data_opt_out: bool = False + self.telemetry: OTELConfig = OTELConfig() # Default OTEL configuration def configure( self, @@ -30,6 +32,7 @@ def configure( auto_start_session: Optional[bool] = None, skip_auto_end_session: Optional[bool] = None, env_data_opt_out: Optional[bool] = None, + telemetry: Optional[OTELConfig] = None, # New parameter ): if api_key is not None: try: @@ -72,3 +75,7 @@ def configure( if env_data_opt_out is not None: self.env_data_opt_out = env_data_opt_out + + # OTEL configuration + if telemetry is not None: + self.telemetry = telemetry diff --git a/agentops/event.py b/agentops/event.py index c6200aca1..874158ff3 100644 --- a/agentops/event.py +++ b/agentops/event.py @@ -25,6 +25,7 @@ class Event: end_timestamp(str): A timestamp indicating when the event ended. Defaults to the time when this Event was instantiated. agent_id(UUID, optional): The unique identifier of the agent that triggered the event. id(UUID): A unique identifier for the event. Defaults to a new UUID. + session_id(UUID, optional): The unique identifier of the session that the event belongs to. foo(x=1) { ... @@ -43,6 +44,7 @@ class Event: end_timestamp: Optional[str] = None agent_id: Optional[UUID] = field(default_factory=check_call_stack_for_agent_id) id: UUID = field(default_factory=uuid4) + session_id: Optional[UUID] = None @dataclass @@ -105,7 +107,7 @@ class ToolEvent(Event): @dataclass -class ErrorEvent: +class ErrorEvent(Event): """ For recording any errors e.g. ones related to agent execution @@ -115,21 +117,30 @@ class ErrorEvent: code(str, optional): A code that can be used to identify the error e.g. 501. details(str, optional): Detailed information about the error. logs(str, optional): For detailed information/logging related to the error. - timestamp(str): A timestamp indicating when the error occurred. Defaults to the time when this ErrorEvent was instantiated. - """ - + # Inherit common Event fields + event_type: str = field(default=EventType.ERROR.value) + + # Error-specific fields trigger_event: Optional[Event] = None exception: Optional[BaseException] = None error_type: Optional[str] = None code: Optional[str] = None details: Optional[Union[str, Dict[str, str]]] = None logs: Optional[str] = field(default_factory=traceback.format_exc) - timestamp: str = field(default_factory=get_ISO_time) def __post_init__(self): - self.event_type = EventType.ERROR.value + """Process exception if provided""" if self.exception: self.error_type = self.error_type or type(self.exception).__name__ self.details = self.details or str(self.exception) self.exception = None # removes exception from serialization + + # Ensure end timestamp is set + if not self.end_timestamp: + self.end_timestamp = get_ISO_time() + + @property + def timestamp(self) -> str: + """Maintain backward compatibility with old code expecting timestamp""" + return self.init_timestamp diff --git a/agentops/log_config.py b/agentops/log_config.py index e578a4358..01047f9f2 100644 --- a/agentops/log_config.py +++ b/agentops/log_config.py @@ -1,7 +1,41 @@ +""" +AgentOps Logging Configuration + +This module serves as the single source of truth for AgentOps logging configuration. +It provides: +1. Base configuration for AgentOps loggers +2. Integration with loguru when available +3. Standard formatting and log levels + +Other modules should: +- Import and use the 'logger' instance from this module +- Use set_log_handler() from telemetry.log_handler for OTEL integration +- Avoid directly configuring the root logger +""" + import logging import os import re +import sys +import inspect + +# Import loguru conditionally +try: + from loguru import logger as loguru_logger + LOGURU_AVAILABLE = True +except ImportError: + LOGURU_AVAILABLE = False +# Get logging level from environment variable +logging_level = os.environ.get("AGENTOPS_LOGGING_LEVEL", "INFO") +LEVEL_MAP = { + "DEBUG": logging.DEBUG, + "INFO": logging.INFO, + "WARNING": logging.WARNING, + "ERROR": logging.ERROR, + "CRITICAL": logging.CRITICAL +} +LOG_LEVEL = LEVEL_MAP.get(logging_level.upper(), logging.INFO) class AgentOpsLogFormatter(logging.Formatter): blue = "\x1b[34m" @@ -22,32 +56,105 @@ def format(self, record): formatter = logging.Formatter(log_fmt) return formatter.format(record) +# Setup loguru if available +if LOGURU_AVAILABLE: + # Remove default handler + loguru_logger.remove() + + # Configure loguru with our format including colors for different levels + format_string = ( + "🖇 AgentOps: | " + "{level: <8} | " + "{name}:{function}:{line} - " + "{message}" + ) + + level_colors = { + "TRACE": "", + "DEBUG": "", + "INFO": "", + "SUCCESS": "", + "WARNING": "", + "ERROR": "", + "CRITICAL": "", + } -logger = logging.getLogger("agentops") -logger.propagate = False -logger.setLevel(logging.CRITICAL) + # Add custom colors to loguru levels + for level_name, color in level_colors.items(): + loguru_logger.level(level_name, color=color) + + # Configure loguru with enhanced format + loguru_logger.add( + sys.stderr, + format=format_string, + level=logging_level.upper(), + colorize=True, + backtrace=True, + diagnose=True, + ) + + # Create intercepting handler for AgentOps logging only + class InterceptHandler(logging.Handler): + def emit(self, record): + # Only handle AgentOps logs + if not record.name.startswith('agentops'): + return -# Streaming Handler -stream_handler = logging.StreamHandler() -stream_handler.setLevel(logging.DEBUG) -stream_handler.setFormatter(AgentOpsLogFormatter()) -logger.addHandler(stream_handler) + # Get corresponding Loguru level if it exists + try: + level = loguru_logger.level(record.levelname).name + except ValueError: + level = record.levelno + # Find caller from where originated the logged message + frame, depth = inspect.currentframe(), 0 + while frame and (depth == 0 or frame.f_code.co_filename == logging.__file__): + frame = frame.f_back + depth += 1 -# File Handler -class AgentOpsLogFileFormatter(logging.Formatter): - def format(self, record): - # Remove ANSI escape codes from the message - record.msg = ANSI_ESCAPE_PATTERN.sub("", str(record.msg)) - return super().format(record) - - -ANSI_ESCAPE_PATTERN = re.compile(r"\x1b\[[0-9;]*m") -log_to_file = os.environ.get("AGENTOPS_LOGGING_TO_FILE", "True").lower() == "true" -if log_to_file: - file_handler = logging.FileHandler("agentops.log", mode="w") - file_handler.setLevel(logging.DEBUG) - formatter = AgentOpsLogFileFormatter("%(asctime)s - %(levelname)s - %(message)s") - file_handler.setFormatter(formatter) - file_handler.setFormatter(formatter) - logger.addHandler(file_handler) + loguru_logger.opt(depth=depth, exception=record.exc_info).log( + level, record.getMessage() + ) + + # Create our specific logger + logger = logging.getLogger("agentops") + logger.propagate = False + logger.setLevel(LOG_LEVEL) # Use environment variable level + + # Only handle AgentOps logs + logger.addHandler(InterceptHandler()) + + # Add a note about telemetry integration + loguru_logger.info("Loguru detected - OTEL logging will be integrated with loguru") + +else: + # Fallback to standard logging setup + logger = logging.getLogger("agentops") + logger.propagate = False + logger.setLevel(LOG_LEVEL) # Use environment variable level + + # Streaming Handler + stream_handler = logging.StreamHandler() + stream_handler.setLevel(LOG_LEVEL) # Use environment variable level + stream_handler.setFormatter(AgentOpsLogFormatter()) + logger.addHandler(stream_handler) + + # File Handler + ANSI_ESCAPE_PATTERN = re.compile(r"\x1b\[[0-9;]*m") + log_to_file = os.environ.get("AGENTOPS_LOGGING_TO_FILE", "True").lower() == "true" + + if log_to_file: + class AgentOpsLogFileFormatter(logging.Formatter): + def format(self, record): + # Remove ANSI escape codes from the message + record.msg = ANSI_ESCAPE_PATTERN.sub("", str(record.msg)) + return super().format(record) + + file_handler = logging.FileHandler("agentops.log", mode="w") + file_handler.setLevel(LOG_LEVEL) # Use environment variable level + formatter = AgentOpsLogFileFormatter("%(asctime)s - %(levelname)s - %(message)s") + file_handler.setFormatter(formatter) + logger.addHandler(file_handler) + + # Add a note about telemetry integration + logger.info("Using standard Python logging with OTEL integration") diff --git a/agentops/session.py b/agentops/session.py index b9f07d20b..dd4f50aac 100644 --- a/agentops/session.py +++ b/agentops/session.py @@ -11,57 +11,19 @@ from opentelemetry import trace from opentelemetry.context import attach, detach, set_value -from opentelemetry.sdk.resources import SERVICE_NAME, Resource -from opentelemetry.sdk.trace import ReadableSpan, TracerProvider -from opentelemetry.sdk.trace.export import ( - BatchSpanProcessor, - ConsoleSpanExporter, - SpanExporter, - SpanExportResult, -) +from opentelemetry.sdk.trace import ReadableSpan +from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult from termcolor import colored from .config import Configuration from .enums import EndState -from .event import ErrorEvent, Event +from .event import ActionEvent, ErrorEvent, Event, LLMEvent, ToolEvent from .exceptions import ApiServerException from .helpers import filter_unjsonable, get_ISO_time, safe_serialize from .http_client import HttpClient, Response from .log_config import logger - -""" -OTEL Guidelines: - - - -- Maintain a single TracerProvider for the application runtime - - Have one global TracerProvider in the Client class - -- According to the OpenTelemetry Python documentation, Resource should be initialized once per application and shared across all telemetry (traces, metrics, logs). -- Each Session gets its own Tracer (with session-specific context) -- Allow multiple sessions to share the provider while maintaining their own context - - - -:: Resource - - '''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''' - Captures information about the entity producing telemetry as Attributes. - For example, a process producing telemetry that is running in a container - on Kubernetes has a process name, a pod name, a namespace, and possibly - a deployment name. All these attributes can be included in the Resource. - '''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''' - - The key insight from the documentation is: - - - Resource represents the entity producing telemetry - in our case, that's the AgentOps SDK application itself - - Session-specific information should be attributes on the spans themselves - - A Resource is meant to identify the service/process/application1 - - Sessions are units of work within that application - - The documentation example about "process name, pod name, namespace" refers to where the code is running, not the work it's doing - -""" - +from .telemetry.client import ClientTelemetry +from .telemetry.converter import AgentOpsAttributes, EventToSpanConverter class SessionExporter(SpanExporter): """ @@ -172,6 +134,7 @@ class Session: Args: session_id (UUID): The session id is used to record particular runs. config (Configuration): The configuration object for the session. + client (ClientTelemetry): The client telemetry object for the session. tags (List[str], optional): Tags that can be used for grouping or sorting later. Examples could be ["GPT-4"]. host_env (dict, optional): A dictionary containing host and environment data. @@ -201,23 +164,19 @@ def __init__( self, session_id: UUID, config: Configuration, + client: Optional[ClientTelemetry] = None, tags: Optional[List[str]] = None, host_env: Optional[dict] = None, ): - self.end_timestamp = None - self.end_state: Optional[str] = "Indeterminate" + """Initialize session with telemetry handled by ClientTelemetry""" self.session_id = session_id - self.init_timestamp = get_ISO_time() - self.tags: List[str] = tags or [] - self.video: Optional[str] = None - self.end_state_reason: Optional[str] = None - self.host_env = host_env self.config = config - self.jwt = None - self._lock = threading.Lock() - self._end_session_lock = threading.Lock() - self.token_cost: Decimal = Decimal(0) - self._session_url: str = "" + self.tags = tags or [] + self.init_timestamp = get_ISO_time() + self.end_timestamp = None + self.end_state = EndState.INDETERMINATE.value + self.end_state_reason = None + self.token_cost = Decimal(0) self.event_counts = { "llms": 0, "tools": 0, @@ -225,33 +184,29 @@ def __init__( "errors": 0, "apis": 0, } - # self.session_url: Optional[str] = None + self._lock = threading.Lock() + self._end_session_lock = threading.Lock() + self.is_running = True + self.jwt = None # Start session first to get JWT - self.is_running = self._start_session() - if not self.is_running: - return - - # Initialize OTEL components with a more controlled processor - self._tracer_provider = TracerProvider() - self._otel_tracer = self._tracer_provider.get_tracer( - f"agentops.session.{str(session_id)}", - ) - self._otel_exporter = SessionExporter(session=self) - - # Use smaller batch size and shorter delay to reduce buffering - self._span_processor = BatchSpanProcessor( - self._otel_exporter, - max_queue_size=self.config.max_queue_size, - schedule_delay_millis=self.config.max_wait_time, - max_export_batch_size=min( - max(self.config.max_queue_size // 20, 1), - min(self.config.max_queue_size, 32), - ), - export_timeout_millis=20000, + if not self._start_session(): + raise Exception("Failed to start session") + + # Get session-specific tracer from client telemetry + from agentops.client import Client + self._telemetry_client = client or Client().telemetry + + # Get tracer from client + self._otel_tracer = self._telemetry_client.get_session_tracer( + session_id=self.session_id, + jwt=self.jwt or "" ) - self._tracer_provider.add_span_processor(self._span_processor) + # For test compatibility + self._tracer_provider = self._telemetry_client._tracer_provider + # Use existing span processor from client + self._span_processor = self._telemetry_client._session_exporters[self.session_id] def set_video(self, video: str) -> None: """ @@ -294,37 +249,21 @@ def end_session( return None try: - # Force flush any pending spans before ending session - if hasattr(self, "_span_processor"): - self._span_processor.force_flush(timeout_millis=5000) - - # 1. Set shutdown flag on exporter first - if hasattr(self, "_otel_exporter"): - self._otel_exporter.shutdown() + # Force flush any pending spans + if self._telemetry_client: + self._telemetry_client.force_flush() - # 2. Set session end state + # Set session end state self.end_timestamp = get_ISO_time() self.end_state = end_state self.end_state_reason = end_state_reason if video is not None: self.video = video - # 3. Mark session as not running before cleanup + # Mark session as not running self.is_running = False - # 4. Clean up OTEL components - if hasattr(self, "_span_processor"): - try: - # Force flush any pending spans - self._span_processor.force_flush(timeout_millis=5000) - # Shutdown the processor - self._span_processor.shutdown() - except Exception as e: - logger.warning(f"Error during span processor cleanup: {e}") - finally: - del self._span_processor - - # 5. Final session update + # Get analytics before cleanup if not (analytics_stats := self.get_analytics()): return None @@ -339,18 +278,23 @@ def end_session( ) logger.info(analytics) + # Clean up telemetry + if self._telemetry_client: + self._telemetry_client.cleanup_session(self.session_id) + + return self.token_cost + except Exception as e: logger.exception(f"Error during session end: {e}") + return None finally: - active_sessions.remove(self) # First thing, get rid of the session - + active_sessions.remove(self) logger.info( colored( f"\x1b[34mSession Replay: {self.session_url}\x1b[0m", "blue", ) ) - return self.token_cost def add_tags(self, tags: List[str]) -> None: """ @@ -395,62 +339,41 @@ def record(self, event: Union[Event, ErrorEvent], flush_now=False): if not self.is_running: return - # Ensure event has all required base attributes - if not hasattr(event, "id"): - event.id = uuid4() - if not hasattr(event, "init_timestamp"): - event.init_timestamp = get_ISO_time() - if not hasattr(event, "end_timestamp") or event.end_timestamp is None: - event.end_timestamp = get_ISO_time() + # Set session_id on the event itself + event.session_id = self.session_id # Create session context token = set_value("session.id", str(self.session_id)) try: token = attach(token) - - # Create a copy of event data to modify - event_data = dict(filter_unjsonable(event.__dict__)) - - # Add required fields based on event type - if isinstance(event, ErrorEvent): - event_data["error_type"] = getattr(event, "error_type", event.event_type) - elif event.event_type == "actions": - # Ensure action events have action_type - if "action_type" not in event_data: - event_data["action_type"] = event_data.get("name", "unknown_action") - if "name" not in event_data: - event_data["name"] = event_data.get("action_type", "unknown_action") - elif event.event_type == "tools": - # Ensure tool events have name - if "name" not in event_data: - event_data["name"] = event_data.get("tool_name", "unknown_tool") - if "tool_name" not in event_data: - event_data["tool_name"] = event_data.get("name", "unknown_tool") - - with self._otel_tracer.start_as_current_span( - name=event.event_type, - attributes={ - "event.id": str(event.id), - "event.type": event.event_type, - "event.timestamp": event.init_timestamp or get_ISO_time(), - "event.end_timestamp": event.end_timestamp or get_ISO_time(), - "session.id": str(self.session_id), - "session.tags": ",".join(self.tags) if self.tags else "", - "event.data": json.dumps(event_data), - }, - ) as span: - if event.event_type in self.event_counts: - self.event_counts[event.event_type] += 1 - - if isinstance(event, ErrorEvent): - span.set_attribute("error", True) - if hasattr(event, "trigger_event") and event.trigger_event: - span.set_attribute("trigger_event.id", str(event.trigger_event.id)) - span.set_attribute("trigger_event.type", event.trigger_event.event_type) - - if flush_now and hasattr(self, "_span_processor"): - self._span_processor.force_flush() + + # Get span definitions from converter + span_definitions = EventToSpanConverter.convert_event(event) + + # Create spans based on definitions + for span_def in span_definitions: + # Add session context to span attributes + span_def.attributes.update({ + AgentOpsAttributes.SESSION_ID: str(self.session_id), + AgentOpsAttributes.SESSION_TAGS: ",".join(self.tags) if self.tags else "", + # Add session_id to event data + AgentOpsAttributes.EVENT_DATA: json.dumps({ + "session_id": str(self.session_id), + **span_def.attributes.get(AgentOpsAttributes.EVENT_DATA, {}) + }) + }) + + with self._otel_tracer.start_as_current_span( + name=span_def.name, + attributes=span_def.attributes, + kind=span_def.kind or trace.SpanKind.INTERNAL + ) as span: + if event.event_type in self.event_counts: + self.event_counts[event.event_type] += 1 + + if flush_now: + self._telemetry_client.force_flush() finally: detach(token) @@ -656,5 +579,50 @@ def session_url(self) -> str: # def session_url(self, url: str): # pass + def _format_event_data(self, event: Union[Event, ErrorEvent]) -> Dict[str, Any]: + """ + Format event data for telemetry export. + Extracts relevant fields from event and ensures they're JSON-serializable. + """ + # Get base event fields from Event class + event_data = { + "event_type": event.event_type, + "params": event.params, + "returns": event.returns, + "init_timestamp": event.init_timestamp, + "end_timestamp": event.end_timestamp, + } + + # Add event-type specific fields + if isinstance(event, ErrorEvent): + event_data.update({ + "error_type": event.error_type, + "code": event.code, + "details": event.details, + "logs": event.logs, + }) + elif isinstance(event, ActionEvent): + event_data.update({ + "action_type": event.action_type, + "logs": event.logs, + "screenshot": event.screenshot, + }) + elif isinstance(event, ToolEvent): + event_data.update({ + "name": event.name, + "logs": event.logs, + }) + elif isinstance(event, LLMEvent): + event_data.update({ + "model": event.model, + "prompt": event.prompt, + "completion": event.completion, + "prompt_tokens": event.prompt_tokens, + "completion_tokens": event.completion_tokens, + }) + + # Filter out None values and ensure JSON-serializable + return {k: v for k, v in filter_unjsonable(event_data).items() if v is not None} + active_sessions: List[Session] = [] diff --git a/agentops/telemetry/README.md b/agentops/telemetry/README.md new file mode 100644 index 000000000..d08db304b --- /dev/null +++ b/agentops/telemetry/README.md @@ -0,0 +1,175 @@ +# AgentOps OpenTelemetry Integration + +## Architecture Overview + +```mermaid +flowchart TB + subgraph AgentOps + Client[AgentOps Client] + Session[Session] + Events[Events] + LLMTracker[LLM Tracker] + end + + subgraph OpenTelemetry + TracerProvider[Tracer Provider] + MeterProvider[Meter Provider] + Processors[Span/Metric Processors] + OTLP[OTLP Exporters] + end + + subgraph Backends + Collector[OTEL Collector] + Backends[Observability Backends] + end + + Client --> Session + Session --> Events + Client --> LLMTracker + + %% OTEL Integration Points + Events --> TracerProvider + LLMTracker --> MeterProvider + TracerProvider --> Processors + MeterProvider --> Processors + Processors --> OTLP + OTLP --> Collector + Collector --> Backends +``` + +## Component Mapping + +```mermaid +classDiagram + class AgentOpsEvent { + +EventType event_type + +Dict params + +UUID id + +timestamp init_timestamp + +timestamp end_timestamp + } + + class OTELSpan { + +SpanContext context + +SpanKind kind + +str name + +Dict attributes + +timestamp start_time + +timestamp end_time + } + + class OTELMetric { + +str name + +str description + +str unit + +MetricType type + +Dict attributes + } + + AgentOpsEvent --> OTELSpan : maps to + LLMTracker --> OTELMetric : generates +``` + +## Data Flow + +```mermaid +sequenceDiagram + participant App + participant AgentOps + participant OTEL + participant Collector + + App->>AgentOps: record_event() + AgentOps->>OTEL: create_span() + OTEL->>OTEL: add_attributes() + OTEL->>OTEL: batch_processor + OTEL->>Collector: export_batch() + Collector-->>OTEL: acknowledge +``` + +## Implementation Guide + +1. **Resource Configuration** +```python +from opentelemetry.sdk.resources import Resource, SERVICE_NAME + +resource = Resource(attributes={ + SERVICE_NAME: "agentops", + "library.name": "agentops", + "library.version": "1.0.0" +}) +``` + +2. **Tracer Setup** +```python +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor +from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter + +tracer_provider = TracerProvider(resource=resource) +otlp_exporter = OTLPSpanExporter(endpoint="") +span_processor = BatchSpanProcessor(otlp_exporter) +tracer_provider.add_span_processor(span_processor) +``` + +3. **Metrics Setup** +```python +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader +from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter + +metric_reader = PeriodicExportingMetricReader( + OTLPMetricExporter(endpoint="") +) +meter_provider = MeterProvider( + resource=resource, + metric_readers=[metric_reader] +) +``` + +## Key Metrics & Spans + +```mermaid +graph LR + subgraph Metrics + A[llm.calls.count] + B[llm.tokens.total] + C[llm.latency] + end + + subgraph Spans + D[action.execution] + E[tool.usage] + F[llm.completion] + end + + subgraph Attributes + G[session.id] + H[agent.id] + I[event.type] + end +``` + +## Best Practices + +1. **Resource Attribution** + - Always set service name and version + - Include environment information + - Add deployment-specific tags + +2. **Span Management** + - Use context managers for automatic span lifecycle + - Add error handling and status codes + - Include relevant attributes for filtering + +3. **Metric Collection** + - Use appropriate metric types (counter, histogram, gauge) + - Add dimension tags for better querying + - Configure appropriate aggregation intervals + +4. **Performance** + - Use batch processors for spans and metrics + - Configure appropriate batch sizes and export intervals + - Enable sampling for high-volume deployments + +Would you like me to elaborate on any specific aspect of this integration architecture? diff --git a/agentops/telemetry/__init__.py b/agentops/telemetry/__init__.py new file mode 100644 index 000000000..7488f51ea --- /dev/null +++ b/agentops/telemetry/__init__.py @@ -0,0 +1,4 @@ +from .client import ClientTelemetry +from .config import OTELConfig + +__all__ = [OTELConfig] diff --git a/agentops/telemetry/client.py b/agentops/telemetry/client.py new file mode 100644 index 000000000..e518bdfbb --- /dev/null +++ b/agentops/telemetry/client.py @@ -0,0 +1,152 @@ +from typing import TYPE_CHECKING, Dict, Optional, Union +from uuid import UUID +import os + +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor, SpanExporter + +from agentops.config import Configuration +from agentops.log_config import logger +from .config import OTELConfig +from .exporter import ExportManager +from .manager import OTELManager +from .processors import LiveSpanProcessor + + +if TYPE_CHECKING: + from agentops.session import Session + from agentops.client import Client + + +class ClientTelemetry: + """Manages telemetry at the agentops.Client level, shared across sessions""" + + def __init__(self,client: "Client"): + self._otel_manager: Optional[OTELManager] = None + self._tracer_provider: Optional[TracerProvider] = None + self._session_exporters: Dict[UUID, ExportManager] = {} + self.config: Optional[OTELConfig] = None + self.client = client + + def initialize(self, config: OTELConfig) -> None: + """Initialize telemetry components""" + # Create a deep copy of the config + config_copy = OTELConfig( + additional_exporters=list(config.additional_exporters) if config.additional_exporters else None, + resource_attributes=dict(config.resource_attributes) if config.resource_attributes else None, + sampler=config.sampler, + retry_config=dict(config.retry_config) if config.retry_config else None, + custom_formatters=list(config.custom_formatters) if config.custom_formatters else None, + enable_metrics=config.enable_metrics, + metric_readers=list(config.metric_readers) if config.metric_readers else None, + enable_in_flight=config.enable_in_flight, + in_flight_interval=config.in_flight_interval, + max_queue_size=config.max_queue_size, + max_wait_time=config.max_wait_time, + endpoint=config.endpoint, + api_key=config.api_key + ) + + # Only check environment variables if no exporters are explicitly configured + if config_copy.additional_exporters is None: + endpoint = os.environ.get("OTEL_EXPORTER_OTLP_ENDPOINT") + service_name = os.environ.get("OTEL_SERVICE_NAME") + + if service_name and not config_copy.resource_attributes: + config_copy.resource_attributes = {"service.name": service_name} + + if endpoint: + from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter + config_copy.additional_exporters = [OTLPSpanExporter(endpoint=endpoint)] + logger.info("Using OTEL configuration from environment variables") + + # Validate exporters + if config_copy.additional_exporters: + for exporter in config_copy.additional_exporters: + if not isinstance(exporter, SpanExporter): + raise ValueError(f"Invalid exporter type: {type(exporter)}. Must be a SpanExporter") + + # Create the OTEL manager instance + self._otel_manager = OTELManager( + config=config_copy, + exporters=config_copy.additional_exporters, + resource_attributes=config_copy.resource_attributes, + sampler=config_copy.sampler + ) + self.config = config_copy + + # Initialize the tracer provider with global service info + self._tracer_provider = self._otel_manager.initialize( + service_name="agentops", + session_id="global" + ) + + def get_session_tracer(self, session_id: UUID, jwt: str): + """Get or create a tracer for a specific session""" + if not self.client: + raise RuntimeError("Client not initialized") + + # Create session-specific exporter + exporter = ExportManager( + session_id=session_id, + endpoint=self.client._config.endpoint, + jwt=jwt, + api_key=self.client._config.api_key, + retry_config=self.config.retry_config if self.config else None, + custom_formatters=self.config.custom_formatters if self.config else None, + ) + + # Store exporter reference + self._session_exporters[session_id] = exporter + + # Add both batch and in-flight processors + batch_processor = BatchSpanProcessor( + exporter, + max_queue_size=self.client._config.max_queue_size, + schedule_delay_millis=self.client._config.max_wait_time, + max_export_batch_size=min( + max(self.client._config.max_queue_size // 20, 1), + min(self.client._config.max_queue_size, 32), + ), + export_timeout_millis=20000, + ) + + # Add in-flight processor for long-running operations + inflight_processor = LiveSpanProcessor(exporter) + + self._otel_manager.add_processor(batch_processor) + self._otel_manager.add_processor(inflight_processor) + + # Return session-specific tracer + return self._tracer_provider.get_tracer(f"agentops.session.{str(session_id)}") + + def cleanup_session(self, session_id: UUID): + """Clean up telemetry resources for a session""" + if session_id in self._session_exporters: + exporter = self._session_exporters[session_id] + exporter.shutdown() + del self._session_exporters[session_id] + + def shutdown(self): + """Shutdown all telemetry""" + if self._otel_manager: + self._otel_manager.shutdown() + for exporter in self._session_exporters.values(): + exporter.shutdown() + self._session_exporters.clear() + + def force_flush(self) -> bool: + """Force flush all processors""" + if not self._otel_manager: + return True + + success = True + for processor in self._otel_manager._processors: + try: + if not processor.force_flush(): + success = False + except Exception as e: + logger.error(f"Error flushing processor: {e}") + success = False + + return success diff --git a/agentops/telemetry/config.py b/agentops/telemetry/config.py new file mode 100644 index 000000000..e48ed3825 --- /dev/null +++ b/agentops/telemetry/config.py @@ -0,0 +1,24 @@ +from dataclasses import dataclass +from typing import Callable, Dict, List, Optional + +from opentelemetry.sdk.trace.export import SpanExporter +from opentelemetry.sdk.trace.sampling import Sampler + + +@dataclass +class OTELConfig: + """Configuration for OpenTelemetry integration""" + + additional_exporters: Optional[List[SpanExporter]] = None + resource_attributes: Optional[Dict] = None + sampler: Optional[Sampler] = None + retry_config: Optional[Dict] = None + custom_formatters: Optional[List[Callable]] = None + enable_metrics: bool = False + metric_readers: Optional[List] = None + enable_in_flight: bool = True + in_flight_interval: float = 1.0 + max_queue_size: int = 512 + max_wait_time: int = 5000 + endpoint: str = "https://api.agentops.ai" + api_key: Optional[str] = None diff --git a/agentops/telemetry/converter.py b/agentops/telemetry/converter.py new file mode 100644 index 000000000..55865f156 --- /dev/null +++ b/agentops/telemetry/converter.py @@ -0,0 +1,348 @@ +""" +Converts AgentOps events to OpenTelemetry spans following semantic conventions. +""" + +from dataclasses import fields +from typing import Any, Dict, List, Optional +from uuid import UUID +import json + +from opentelemetry.trace import SpanKind +from opentelemetry.semconv.trace import SpanAttributes +from opentelemetry.util.types import AttributeValue +from opentelemetry import trace + + +# AgentOps semantic conventions +class AgentOpsAttributes: + """Semantic conventions for AgentOps spans""" + # Time attributes + TIME_START = "time.start" + TIME_END = "time.end" + + # Common attributes (from Event base class) + EVENT_ID = "event.id" + EVENT_TYPE = "event.type" + EVENT_DATA = "event.data" + EVENT_START_TIME = "event.start_time" + EVENT_END_TIME = "event.end_time" + EVENT_PARAMS = "event.params" + EVENT_RETURNS = "event.returns" + + # Session attributes + SESSION_ID = "session.id" + SESSION_TAGS = "session.tags" + + # Agent attributes + AGENT_ID = "agent.id" + + # Thread attributes + THREAD_ID = "thread.id" + + # Error attributes + ERROR = "error" + ERROR_TYPE = "error.type" + ERROR_MESSAGE = "error.message" + ERROR_STACKTRACE = "error.stacktrace" + ERROR_DETAILS = "error.details" + ERROR_CODE = "error.code" + TRIGGER_EVENT_ID = "trigger_event.id" + TRIGGER_EVENT_TYPE = "trigger_event.type" + + # LLM attributes + LLM_MODEL = "llm.model" + LLM_PROMPT = "llm.prompt" + LLM_COMPLETION = "llm.completion" + LLM_TOKENS_TOTAL = "llm.tokens.total" + LLM_TOKENS_PROMPT = "llm.tokens.prompt" + LLM_TOKENS_COMPLETION = "llm.tokens.completion" + LLM_COST = "llm.cost" + + # Action attributes + ACTION_TYPE = "action.type" + ACTION_PARAMS = "action.params" + ACTION_RESULT = "action.result" + ACTION_LOGS = "action.logs" + ACTION_SCREENSHOT = "action.screenshot" + + # Tool attributes + TOOL_NAME = "tool.name" + TOOL_PARAMS = "tool.params" + TOOL_RESULT = "tool.result" + TOOL_LOGS = "tool.logs" + + # Execution attributes + EXECUTION_START_TIME = "execution.start_time" + EXECUTION_END_TIME = "execution.end_time" + + +from agentops.event import ActionEvent, ErrorEvent, Event, LLMEvent, ToolEvent + + +def span_safe(value: Any) -> AttributeValue: + """Convert value to OTEL-compatible attribute value""" + if isinstance(value, (str, int, float, bool)) or value is None: + return value + if isinstance(value, UUID): + return str(value) + if isinstance(value, (dict, list)): + return json.dumps(value) + return str(value) + + +class SpanDefinition: + """Defines how a span should be created""" + def __init__( + self, + name: str, + attributes: Dict[str, AttributeValue], + parent_span_id: Optional[str] = None, + kind: Optional[SpanKind] = None + ): + self.name = name + self.attributes = {k: span_safe(v) for k, v in attributes.items()} + self.parent_span_id = parent_span_id + self.kind = kind + + +class EventToSpanConverter: + """Converts AgentOps events to OpenTelemetry spans""" + + # Field name mappings for semantic conventions + FIELD_MAPPINGS = { + 'init_timestamp': AgentOpsAttributes.TIME_START, + 'end_timestamp': AgentOpsAttributes.TIME_END, + 'error_type': AgentOpsAttributes.ERROR_TYPE, + 'details': AgentOpsAttributes.ERROR_MESSAGE, + 'logs': AgentOpsAttributes.ERROR_STACKTRACE, + + # LLM fields + 'model': AgentOpsAttributes.LLM_MODEL, + 'prompt': AgentOpsAttributes.LLM_PROMPT, + 'completion': AgentOpsAttributes.LLM_COMPLETION, + 'prompt_tokens': AgentOpsAttributes.LLM_TOKENS_PROMPT, + 'completion_tokens': AgentOpsAttributes.LLM_TOKENS_COMPLETION, + 'cost': AgentOpsAttributes.LLM_COST, + + # Action fields + 'action_type': AgentOpsAttributes.ACTION_TYPE, + 'params': AgentOpsAttributes.ACTION_PARAMS, + 'returns': AgentOpsAttributes.ACTION_RESULT, + 'logs': AgentOpsAttributes.ACTION_LOGS, + + # Tool fields + 'name': AgentOpsAttributes.TOOL_NAME, + } + + @staticmethod + def convert_event(event: Event) -> List[SpanDefinition]: + """Convert an event into its corresponding span(s)""" + main_span = SpanDefinition( + name=EventToSpanConverter._get_span_name(event), + attributes=EventToSpanConverter._get_span_attributes(event), + kind=EventToSpanConverter._get_span_kind(event) + ) + + spans = [main_span] + child_span = EventToSpanConverter._create_child_span(event, main_span.name) + if child_span: + spans.append(child_span) + + return spans + + @staticmethod + def _get_span_name(event: Event) -> str: + """Get semantic span name""" + if isinstance(event, LLMEvent): + return "llm.completion" + elif isinstance(event, ActionEvent): + return "agent.action" + elif isinstance(event, ToolEvent): + return "agent.tool" + elif isinstance(event, ErrorEvent): + return "error" + return "event" + + @staticmethod + def _get_span_kind(event: Event) -> Optional[SpanKind]: + """Get OTEL span kind""" + if isinstance(event, LLMEvent): + return SpanKind.CLIENT + elif isinstance(event, ErrorEvent): + return SpanKind.INTERNAL + return SpanKind.INTERNAL + + @staticmethod + def _get_span_attributes(event: Event) -> Dict[str, AttributeValue]: + """Extract span attributes using OTEL conventions""" + attributes = {} + event_type = event.__class__.__name__.lower().replace('event', '') + + # Add common timing attributes first + attributes.update({ + AgentOpsAttributes.EVENT_START_TIME: event.init_timestamp if hasattr(event, 'init_timestamp') else event.timestamp, + AgentOpsAttributes.EVENT_END_TIME: getattr(event, 'end_timestamp', None), + AgentOpsAttributes.EVENT_ID: str(event.id), + AgentOpsAttributes.SESSION_ID: str(event.session_id) if event.session_id else None, + }) + + # Add agent ID if present + if hasattr(event, 'agent_id') and event.agent_id: + attributes[AgentOpsAttributes.AGENT_ID] = str(event.agent_id) + attributes['agent_id'] = str(event.agent_id) + + # Add LLM-specific attributes + if isinstance(event, LLMEvent): + llm_attrs = { + AgentOpsAttributes.LLM_MODEL: event.model, + AgentOpsAttributes.LLM_PROMPT: event.prompt, + AgentOpsAttributes.LLM_COMPLETION: event.completion, + AgentOpsAttributes.LLM_TOKENS_PROMPT: event.prompt_tokens, + AgentOpsAttributes.LLM_TOKENS_COMPLETION: event.completion_tokens, + AgentOpsAttributes.LLM_COST: event.cost, + AgentOpsAttributes.LLM_TOKENS_TOTAL: (event.prompt_tokens or 0) + (event.completion_tokens or 0), + # Add simple keys for backward compatibility + 'model': event.model, + 'prompt': event.prompt, + 'completion': event.completion, + 'prompt_tokens': event.prompt_tokens, + 'completion_tokens': event.completion_tokens, + 'cost': event.cost, + } + attributes.update(llm_attrs) + + # Add action-specific attributes + elif isinstance(event, ActionEvent): + action_attrs = { + AgentOpsAttributes.ACTION_TYPE: event.action_type, + AgentOpsAttributes.ACTION_PARAMS: event.params, + AgentOpsAttributes.ACTION_RESULT: event.returns, + AgentOpsAttributes.ACTION_LOGS: event.logs, + # Add simple keys for backward compatibility + 'action_type': event.action_type, + 'params': event.params, + 'returns': event.returns, + 'logs': event.logs, + } + attributes.update(action_attrs) + + # Add tool-specific attributes + elif isinstance(event, ToolEvent): + tool_attrs = { + AgentOpsAttributes.TOOL_NAME: event.name, + AgentOpsAttributes.TOOL_PARAMS: event.params, + AgentOpsAttributes.TOOL_RESULT: event.returns, + AgentOpsAttributes.TOOL_LOGS: event.logs, + # Add simple keys for backward compatibility + 'name': event.name, + 'params': event.params, + 'returns': event.returns, + 'logs': event.logs, + } + attributes.update(tool_attrs) + + # Add error flag for error events + elif isinstance(event, ErrorEvent): + error_attrs = { + AgentOpsAttributes.ERROR: True, + AgentOpsAttributes.ERROR_TYPE: event.error_type, + AgentOpsAttributes.ERROR_DETAILS: event.details, + # Add simple keys for backward compatibility + 'error': True, + 'error_type': event.error_type, + 'details': event.details, + 'trigger_event': event.trigger_event, + } + attributes.update(error_attrs) + + if event.trigger_event: + trigger_attrs = { + AgentOpsAttributes.TRIGGER_EVENT_ID: str(event.trigger_event.id), + AgentOpsAttributes.TRIGGER_EVENT_TYPE: event.trigger_event.event_type, + # Add simple keys for backward compatibility + 'trigger_event_id': str(event.trigger_event.id), + 'trigger_event_type': event.trigger_event.event_type, + } + attributes.update(trigger_attrs) + + return attributes + + @staticmethod + def _create_child_span(event: Event, parent_span_id: str) -> Optional[SpanDefinition]: + """Create child span using OTEL conventions""" + event_type = event.__class__.__name__.lower().replace('event', '') + + # Get session_id from context + session_id = trace.get_current_span().get_span_context().trace_id + + # Base attributes for all child spans + base_attributes = { + AgentOpsAttributes.TIME_START: event.init_timestamp, + AgentOpsAttributes.TIME_END: event.end_timestamp, + AgentOpsAttributes.EVENT_ID: str(event.id), + # Simple keys for backward compatibility + 'start_time': event.init_timestamp, + 'end_time': event.end_timestamp, + 'event_id': str(event.id), + # Get session_id from context + AgentOpsAttributes.EVENT_DATA: json.dumps({ + "session_id": str(session_id), + "event_type": event_type, + }) + } + + if isinstance(event, (ActionEvent, ToolEvent)): + attributes = { + **base_attributes, + AgentOpsAttributes.EXECUTION_START_TIME: event.init_timestamp, + AgentOpsAttributes.EXECUTION_END_TIME: event.end_timestamp, + # Simple keys for backward compatibility + 'start_time': event.init_timestamp, + 'end_time': event.end_timestamp, + } + if isinstance(event, ActionEvent): + action_attrs = { + AgentOpsAttributes.ACTION_TYPE: event.action_type, + 'action_type': event.action_type, # Simple key + } + if event.params: + action_attrs.update({ + AgentOpsAttributes.ACTION_PARAMS: json.dumps(event.params), + 'params': json.dumps(event.params), # Simple key + }) + attributes.update(action_attrs) + else: # ToolEvent + tool_attrs = { + AgentOpsAttributes.TOOL_NAME: event.name, + 'name': event.name, # Simple key + } + if event.params: + tool_attrs.update({ + AgentOpsAttributes.TOOL_PARAMS: json.dumps(event.params), + 'params': json.dumps(event.params), # Simple key + }) + attributes.update(tool_attrs) + + return SpanDefinition( + name=f"{event_type}.execution", + attributes=attributes, + parent_span_id=parent_span_id, + kind=SpanKind.INTERNAL + ) + elif isinstance(event, LLMEvent): + llm_attrs = { + **base_attributes, + AgentOpsAttributes.LLM_MODEL: event.model, + 'model': event.model, # Simple key + "llm.request.timestamp": event.init_timestamp, + "llm.response.timestamp": event.end_timestamp, + 'request_timestamp': event.init_timestamp, # Simple key + 'response_timestamp': event.end_timestamp, # Simple key + } + return SpanDefinition( + name="llm.api.call", + attributes=llm_attrs, + parent_span_id=parent_span_id, + kind=SpanKind.CLIENT + ) + return None \ No newline at end of file diff --git a/agentops/telemetry/exporter.py b/agentops/telemetry/exporter.py new file mode 100644 index 000000000..7b84eb9f6 --- /dev/null +++ b/agentops/telemetry/exporter.py @@ -0,0 +1,155 @@ +import json +import threading +from typing import Callable, Dict, List, Optional, Sequence +from uuid import UUID + +from opentelemetry.sdk.trace import ReadableSpan +from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult +from opentelemetry.util.types import Attributes + +from agentops.http_client import HttpClient +from agentops.log_config import logger +from agentops.telemetry.converter import AgentOpsAttributes + + +class ExportManager(SpanExporter): + """ + Manages export strategies and batching for AgentOps telemetry + """ + + def __init__( + self, + session_id: UUID, + endpoint: str, + jwt: str, + api_key: str, + retry_config: Optional[Dict] = None, + custom_formatters: Optional[List[Callable]] = None, + ): + self.session_id = session_id + self.endpoint = endpoint + self.jwt = jwt + self.api_key = api_key + self._export_lock = threading.Lock() + self._shutdown = threading.Event() + self._wait_event = threading.Event() + self._wait_fn = self._wait_event.wait # Store the wait function + + # Allow custom retry configuration + retry_config = retry_config or {} + self._retry_count = retry_config.get("retry_count", 3) + self._retry_delay = retry_config.get("retry_delay", 1.0) + + # Support custom formatters + self._custom_formatters = custom_formatters or [] + + def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: + """Export spans with retry logic and proper error handling""" + if self._shutdown.is_set(): + return SpanExportResult.SUCCESS + + with self._export_lock: + try: + if not spans: + return SpanExportResult.SUCCESS + + events = self._format_spans(spans) + + for attempt in range(self._retry_count): + try: + success = self._send_batch(events) + if success: + return SpanExportResult.SUCCESS + + # If not successful but not the last attempt, wait and retry + if attempt < self._retry_count - 1: + self._wait_before_retry(attempt) + continue + + except Exception as e: + logger.error(f"Export attempt {attempt + 1} failed: {e}") + if attempt < self._retry_count - 1: + self._wait_before_retry(attempt) + continue + return SpanExportResult.FAILURE + + # If we've exhausted all retries without success + return SpanExportResult.FAILURE + + except Exception as e: + logger.error(f"Error during span export: {e}") + return SpanExportResult.FAILURE + + def _format_spans(self, spans: Sequence[ReadableSpan]) -> List[Dict]: + """Format spans into AgentOps event format with custom formatters""" + events = [] + for span in spans: + try: + # Get base event data + event_data = json.loads(span.attributes.get(AgentOpsAttributes.EVENT_DATA, "{}")) + + # Ensure required fields + event = { + "id": span.attributes.get(AgentOpsAttributes.EVENT_ID), + "event_type": span.name, + "init_timestamp": span.attributes.get(AgentOpsAttributes.EVENT_START_TIME), + "end_timestamp": span.attributes.get(AgentOpsAttributes.EVENT_END_TIME), + # Always include session_id from the exporter + "session_id": str(self.session_id), + } + + # Add agent ID if present + agent_id = span.attributes.get(AgentOpsAttributes.AGENT_ID) + if agent_id: + event["agent_id"] = agent_id + + # Add event-specific data, but ensure session_id isn't overwritten + event_data["session_id"] = str(self.session_id) + event.update(event_data) + + # Apply custom formatters + for formatter in self._custom_formatters: + try: + event = formatter(event) + # Ensure session_id isn't removed by formatters + event["session_id"] = str(self.session_id) + except Exception as e: + logger.error(f"Custom formatter failed: {e}") + + events.append(event) + except Exception as e: + logger.error(f"Error formatting span: {e}") + + return events + + def _send_batch(self, events: List[Dict]) -> bool: + """Send a batch of events to the AgentOps backend""" + try: + endpoint = self.endpoint.rstrip('/') + '/v2/create_events' + response = HttpClient.post( + endpoint, + json.dumps({"events": events}).encode("utf-8"), + api_key=self.api_key, + jwt=self.jwt, + ) + return response.code == 200 + except Exception as e: + logger.error(f"Error sending batch: {str(e)}", exc_info=e) + return False + + def _wait_before_retry(self, attempt: int): + """Implement exponential backoff for retries""" + delay = self._retry_delay * (2**attempt) + self._wait_fn(delay) # Use the wait function + + def _set_wait_fn(self, wait_fn): + """Test helper to override wait behavior""" + self._wait_fn = wait_fn + + def force_flush(self, timeout_millis: Optional[int] = None) -> bool: + """Force flush any pending exports""" + return True + + def shutdown(self) -> None: + """Shutdown the exporter gracefully""" + self._shutdown.set() diff --git a/agentops/telemetry/instrumentation.py b/agentops/telemetry/instrumentation.py new file mode 100644 index 000000000..d604c2df0 --- /dev/null +++ b/agentops/telemetry/instrumentation.py @@ -0,0 +1,109 @@ +import logging +import os +import re +from typing import TYPE_CHECKING +from urllib.parse import urljoin +from uuid import UUID + +from opentelemetry import metrics, trace +from opentelemetry._logs import set_logger_provider +from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter +from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter +from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter +from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler +from opentelemetry.sdk._logs.export import SimpleLogRecordProcessor +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader +from opentelemetry.sdk.resources import Resource +from opentelemetry.sdk.trace import TracerProvider + +from agentops.http_client import HttpClient + +from .log_handler import LoggingHandler, set_log_handler +from .processors import LiveSpanProcessor + +if TYPE_CHECKING: + from opentelemetry.sdk._logs import LoggerProvider + +UUID_REGEX = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}" + +ACCOUNTS_PREFIX = "accounts/" +ACCOUNT_ID_REGEX = f"{ACCOUNTS_PREFIX}{UUID_REGEX}" + +WORKSPACES_PREFIX = "workspaces/" +WORKSPACE_ID_REGEX = f"{WORKSPACES_PREFIX}{UUID_REGEX}" + + +def extract_account_and_workspace_id(url: str) -> tuple[UUID, UUID]: + account_id, workspace_id = None, None + + if res := re.search(ACCOUNT_ID_REGEX, url): + account_id = UUID(res.group().removeprefix(ACCOUNTS_PREFIX)) + + if res := re.search(WORKSPACE_ID_REGEX, url): + workspace_id = UUID(res.group().removeprefix(WORKSPACES_PREFIX)) + + if account_id and workspace_id: + return account_id, workspace_id + + raise ValueError(f"Could not extract account and workspace id from API url: {url!r}") + + +def setup_exporters(jwt: str) -> tuple[TracerProvider, MeterProvider, "LoggerProvider"]: + telemetry_url = _url_join(api_url, "telemetry/") + + resource = Resource.create( + { + "service.name": "agentops", + "service.instance.id": os.uname().nodename, + "prefect.account": str(account_id), + "prefect.workspace": str(workspace_id), + } + ) + + trace_provider = _setup_trace_provider(resource, headers, telemetry_url) + meter_provider = _setup_meter_provider(resource, headers, telemetry_url) + logger_provider = _setup_logger_provider(resource, headers, telemetry_url) + + return trace_provider, meter_provider, logger_provider + + +def _setup_trace_provider(resource: Resource, headers: dict[str, str], telemetry_url: str) -> TracerProvider: + trace_provider = TracerProvider(resource=resource) + otlp_span_exporter = OTLPSpanExporter( + endpoint=_url_join(telemetry_url, "v1/traces"), + headers=headers, + ) + trace_provider.add_span_processor(InFlightSpanProcessor(otlp_span_exporter)) + trace.set_tracer_provider(trace_provider) + + return trace_provider + + +def _setup_meter_provider(resource: Resource, headers: dict[str, str], telemetry_url: str) -> MeterProvider: + metric_reader = PeriodicExportingMetricReader( + OTLPMetricExporter( + endpoint=_url_join(telemetry_url, "v1/metrics"), + headers=headers, + ) + ) + meter_provider = MeterProvider(resource=resource, metric_readers=[metric_reader]) + metrics.set_meter_provider(meter_provider) + + return meter_provider + + +def _setup_logger_provider(resource: Resource, headers: dict[str, str], telemetry_url: str) -> LoggerProvider: + logger_provider = LoggerProvider(resource=resource) + otlp_exporter = OTLPLogExporter( + endpoint=_url_join(telemetry_url, "v1/logs"), + headers=headers, + ) + logger_provider.add_log_record_processor(SimpleLogRecordProcessor(otlp_exporter)) + set_logger_provider(logger_provider) + + # Create and configure handler for AgentOps-specific logger + log_handler = LoggingHandler(level=logging.NOTSET, logger_provider=logger_provider) + set_log_handler(log_handler, "agentops.telemetry") # Use a more specific logger name + + return logger_provider diff --git a/agentops/telemetry/log_handler.py b/agentops/telemetry/log_handler.py new file mode 100644 index 000000000..d4e2a8190 --- /dev/null +++ b/agentops/telemetry/log_handler.py @@ -0,0 +1,106 @@ +import logging +from typing import Optional + +from opentelemetry._logs import LoggerProvider, LogRecord +from opentelemetry.sdk._logs import LoggerProvider as SDKLoggerProvider +from opentelemetry.sdk._logs import LoggingHandler as _LoggingHandler +from opentelemetry.sdk._logs.export import BatchLogRecordProcessor, ConsoleLogExporter +from opentelemetry.trace import get_current_span +from opentelemetry.util.types import Attributes + + +class LoggingHandler(_LoggingHandler): + """ + Custom log handler that integrates with OpenTelemetry + """ + + def __init__( + self, + level: int = logging.NOTSET, + logger_provider: Optional[LoggerProvider] = None, + ): + super().__init__(level, logger_provider) + + def emit(self, record: logging.LogRecord) -> None: + """Emit a log record with trace context""" + try: + # Get current span context + span = get_current_span() + trace_id = span.get_span_context().trace_id if span else None + span_id = span.get_span_context().span_id if span else None + + # Create OTEL log record + log_data = { + "timestamp": int(record.created * 1e9), # Convert to nanoseconds + "severity_text": record.levelname, + "severity_number": record.levelno, + "body": record.getMessage(), + "attributes": { + "logger.name": record.name, + "logger.thread_name": record.threadName, + "logger.file.name": record.filename, + "logger.file.line": record.lineno, + "logger.file.path": record.pathname, + }, + } + + # Add trace context if available + if trace_id: + log_data["attributes"]["trace_id"] = format(trace_id, "032x") + if span_id: + log_data["attributes"]["span_id"] = format(span_id, "016x") + + # Create and emit OTEL log record + otel_record = LogRecord(**log_data) + self._logger.emit(otel_record) + + except Exception as e: + # Fallback to standard logging if OTEL emission fails + super().emit(record) + + @staticmethod + def setup(service_name: str) -> SDKLoggerProvider: + """Setup logging with OTEL integration""" + # Create logger provider + logger_provider = SDKLoggerProvider() + + # Add console exporter for development + console_exporter = ConsoleLogExporter() + logger_provider.add_log_record_processor(BatchLogRecordProcessor(console_exporter)) + + # Create and configure handler + handler = LoggingHandler( + level=logging.INFO, + logger_provider=logger_provider, + ) + + # Configure root logger + root_logger = logging.getLogger() + root_logger.addHandler(handler) + root_logger.setLevel(logging.INFO) + + return logger_provider + +def set_log_handler(handler: logging.Handler, logger_name: str = "agentops") -> None: + """ + Configure an AgentOps-specific logger with the provided handler. + + Args: + handler: The logging handler to set + logger_name: The name of the logger to configure (defaults to "agentops") + """ + # Get AgentOps-specific logger instead of root logger + logger = logging.getLogger(logger_name) + logger.propagate = False # Prevent duplicate logging + + # Remove existing handlers of the same type to avoid duplicates + for existing_handler in logger.handlers[:]: + if isinstance(existing_handler, type(handler)): + logger.removeHandler(existing_handler) + + # Add the new handler + logger.addHandler(handler) + + # Only set level if not already set + if logger.level == logging.NOTSET: + logger.setLevel(logging.INFO) diff --git a/agentops/telemetry/manager.py b/agentops/telemetry/manager.py new file mode 100644 index 000000000..03fc96d99 --- /dev/null +++ b/agentops/telemetry/manager.py @@ -0,0 +1,165 @@ +from typing import Dict, List, Optional + +from opentelemetry import trace +from opentelemetry.sdk.resources import SERVICE_NAME, Resource, ResourceAttributes +from opentelemetry.sdk.trace import TracerProvider, SpanProcessor +from opentelemetry.sdk.trace.export import BatchSpanProcessor, SpanExporter +from opentelemetry.sdk.trace.sampling import ParentBased, Sampler, TraceIdRatioBased + +from agentops.config import Configuration +from agentops.telemetry.config import OTELConfig + + +class OTELManager: + """ + Manages OpenTelemetry setup and configuration for AgentOps. + + This manager follows OpenTelemetry best practices: + 1. Configuration is done at initialization + 2. TracerProvider is configured once with all necessary processors + 3. Resource attributes and sampling are set at provider creation + 4. Each processor handles one exporter + + The manager supports any SpanProcessor implementation that follows + the OpenTelemetry processor interface, including: + - BatchSpanProcessor: For efficient batched exports + - SimpleSpanProcessor: For immediate exports + - LiveSpanProcessor: For real-time monitoring + - EventProcessor: For event-specific processing + - Custom processors: Any class implementing SpanProcessor interface + """ + + def __init__( + self, + config: OTELConfig, + exporters: Optional[List[SpanExporter]] = None, + resource_attributes: Optional[Dict] = None, + sampler: Optional[Sampler] = None, + ): + """ + Initialize the manager with all necessary configuration. + + Args: + config: Base configuration for processors + exporters: List of exporters to use (each gets its own processor) + resource_attributes: Custom resource attributes + sampler: Custom sampling strategy + """ + self.config = config + self._tracer_provider = None + self._processors: List[SpanProcessor] = [] + self._resource_attributes = resource_attributes or {} + self._sampler = sampler + self._exporters = exporters or [] + + def initialize(self, service_name: str, session_id: str) -> TracerProvider: + """ + Initialize OTEL components with proper resource attributes. + Creates the TracerProvider and configures all processors. + + Args: + service_name: Name of the service + session_id: Unique session identifier + + Returns: + Configured TracerProvider instance + """ + # Set up resource attributes + resource_attributes = { + ResourceAttributes.SERVICE_NAME: service_name, + "session.id": session_id, + } + resource_attributes.update(self._resource_attributes) + + # Create resource with attributes + resource = Resource.create(attributes=resource_attributes) + + # Create provider with resource and sampling config + self._tracer_provider = TracerProvider( + resource=resource, + sampler=self._sampler or ParentBased(TraceIdRatioBased(1.0)), + ) + + # Set up processors for all configured exporters + for exporter in self._exporters: + processor = BatchSpanProcessor( + exporter, + max_queue_size=self.config.max_queue_size, + schedule_delay_millis=self.config.max_wait_time, + ) + self._tracer_provider.add_span_processor(processor) + self._processors.append(processor) + + # Set as global tracer provider + trace.set_tracer_provider(self._tracer_provider) + + return self._tracer_provider + + def add_processor(self, processor: SpanProcessor): + """ + Add a custom span processor to the tracer provider. + + Args: + processor: Any span processor implementation + + Raises: + RuntimeError: If manager is not initialized + """ + if not self._tracer_provider: + raise RuntimeError("OTELManager not initialized") + + self._tracer_provider.add_span_processor(processor) + self._processors.append(processor) + + def get_tracer(self, name: str) -> trace.Tracer: + """ + Get a tracer instance for the given name. + + Args: + name: Name for the tracer, typically __name__ + + Returns: + Configured tracer instance + + Raises: + RuntimeError: If manager is not initialized + """ + if not self._tracer_provider: + raise RuntimeError("OTELManager not initialized") + return self._tracer_provider.get_tracer(name) + + def shutdown(self): + """ + Shutdown all processors and cleanup resources. + Ensures proper cleanup of all processor types. + """ + for processor in self._processors: + try: + if hasattr(processor, 'force_flush'): + processor.force_flush(timeout_millis=5000) + processor.shutdown() + except Exception: + pass # Ensure we continue cleanup even if one processor fails + self._processors = [] + self._tracer_provider = None + + def configure( + self, + additional_exporters: Optional[List[SpanExporter]] = None, + resource_attributes: Optional[Dict] = None, + sampler: Optional[Sampler] = None, + ) -> None: + """ + Configure the OTEL manager with additional settings. + + Args: + additional_exporters: Additional span exporters to use + resource_attributes: Custom resource attributes to add + sampler: Custom sampling strategy + """ + if additional_exporters: + self._exporters.extend(additional_exporters) + if resource_attributes: + self._resource_attributes.update(resource_attributes) + if sampler: + self._sampler = sampler diff --git a/agentops/telemetry/metrics.py b/agentops/telemetry/metrics.py new file mode 100644 index 000000000..c7621b198 --- /dev/null +++ b/agentops/telemetry/metrics.py @@ -0,0 +1,152 @@ +from typing import Iterable, Optional + +from opentelemetry import metrics +from opentelemetry.metrics import CallbackOptions, Instrument, MeterProvider, Observation +from opentelemetry.sdk.metrics import MeterProvider as SDKMeterProvider +from opentelemetry.sdk.metrics.export import ConsoleMetricExporter, PeriodicExportingMetricReader + +from agentops.log_config import logger # Import the configured AgentOps logger + + +class TelemetryMetrics: + """ + Manages metrics collection for AgentOps telemetry + """ + + def __init__(self, service_name: str): + self._meter_provider = self._setup_meter_provider() + self._meter = self._meter_provider.get_meter(name=service_name, version="1.0.0") + + # Export counters + self.export_attempts = self._meter.create_counter( + name="agentops.export.attempts", + description="Number of export attempts", + unit="1", + ) + + self.export_failures = self._meter.create_counter( + name="agentops.export.failures", + description="Number of failed exports", + unit="1", + ) + + # Export histograms + self.export_duration = self._meter.create_histogram( + name="agentops.export.duration", + description="Duration of export operations", + unit="ms", + ) + + self.batch_size = self._meter.create_histogram( + name="agentops.export.batch_size", + description="Size of export batches", + unit="1", + ) + + # Memory usage gauge + self._memory_gauge = self._meter.create_observable_gauge( + name="agentops.memory.usage", + description="Memory usage of the telemetry system", + unit="bytes", + callbacks=[self._get_memory_usage], + ) + + def _setup_meter_provider(self) -> MeterProvider: + """Setup the meter provider with appropriate exporters""" + # Create console exporter for development + console_exporter = ConsoleMetricExporter() + reader = PeriodicExportingMetricReader(console_exporter, export_interval_millis=5000) + + return SDKMeterProvider( + metric_readers=[reader], + ) + + def _get_memory_usage(self, options: CallbackOptions) -> Iterable[Observation]: + """Callback to get current memory usage""" + try: + import psutil + + process = psutil.Process() + memory = process.memory_info().rss + return [Observation(value=float(memory), attributes={"type": "process_memory"})] + except Exception as e: + logger.error(f"Failed to collect memory metrics: {e}") + return [] + + def record_export_attempt(self, success: bool, duration_ms: float, batch_size: int, error_type: str = None): + """Record metrics for an export attempt""" + # Record attempt + self.export_attempts.add(1) + + # Record failure if applicable + if not success: + self.export_failures.add(1, {"error_type": error_type or "unknown"}) + + # Record duration and batch size + self.export_duration.record(duration_ms) + self.batch_size.record(batch_size) + + def shutdown(self): + """Shutdown metrics collection""" + if isinstance(self._meter_provider, SDKMeterProvider): + # Force a final export before shutdown + for reader in self._meter_provider._all_metric_readers: + reader.force_flush() + # Then shutdown the provider + self._meter_provider.shutdown() + +# Add example usage +if __name__ == "__main__": + import time + import random + + # Initialize metrics with a test service name + logger.info("Initializing TelemetryMetrics...") + metrics = TelemetryMetrics("example-service") + + logger.info("Starting metrics collection example...") + logger.info("Export interval: 5s, Running 5 iterations...") + logger.info("-" * 80) + + # Example error types + ERROR_TYPES = ["timeout", "connection_error", "invalid_data", "rate_limit", "server_error"] + + # Simulate some export operations + for i in range(5): + # Simulate successful export + duration = 100.5 + i * 10 + batch_size = 5 + i + + logger.info(f"Iteration {i+1}:") + logger.info(f" ✓ Recording successful export (duration={duration}ms, batch_size={batch_size})") + metrics.record_export_attempt( + success=True, + duration_ms=duration, + batch_size=batch_size + ) + + # Simulate failed export with random error type + error_type = random.choice(ERROR_TYPES) + logger.info(f" ✗ Recording failed export (duration=200.5ms, batch_size=3, error={error_type})") + metrics.record_export_attempt( + success=False, + duration_ms=200.5, + batch_size=3, + error_type=error_type + ) + + # Log cumulative stats with error breakdown + logger.info(f" 📊 Cumulative stats:") + logger.info(f" - Total attempts: {(i+1)*2}") + logger.info(f" - Failures: {i+1}") + logger.info(f" - Last batch size: {batch_size}") + logger.info(f" - Last error: {error_type}") + logger.info("") + + # Wait to see metrics in console + logger.info("Waiting for metrics export (5s)...") + time.sleep(2) + + logger.info("-" * 80) + logger.info("Metrics collection completed. Shutting down...") + metrics.shutdown() diff --git a/agentops/telemetry/processors.py b/agentops/telemetry/processors.py new file mode 100644 index 000000000..aae0e028c --- /dev/null +++ b/agentops/telemetry/processors.py @@ -0,0 +1,229 @@ +import json +import time +from datetime import datetime, timezone +from threading import Event, Lock, Thread +from typing import Any, Dict, Optional, List +from uuid import UUID, uuid4 + +from opentelemetry import trace +from opentelemetry.context import Context, attach, detach, set_value +from opentelemetry.sdk.trace import ReadableSpan, Span, SpanProcessor, TracerProvider +from opentelemetry.sdk.trace.export import SpanExporter +from opentelemetry.trace import Span as OTELSpan + +from agentops.helpers import filter_unjsonable, get_ISO_time +from agentops.telemetry.converter import EventToSpanConverter +from agentops.event import ErrorEvent + + +class EventProcessor: + """ + Handles event processing and formatting for AgentOps telemetry. + + This class follows the OpenTelemetry pattern where: + 1. A TracerProvider manages span processors and creates tracers + 2. Tracers create spans + 3. Spans are automatically processed by all processors registered with the provider + + This design ensures: + - Loose coupling: Processors don't need to know about each other + - Flexibility: Processors can be added/removed via the provider + - Standard compliance: Follows OpenTelemetry's recommended architecture + """ + + def __init__(self, session_id: UUID, tracer_provider: Optional[TracerProvider] = None): + """ + Initialize the event processor with a session ID and optional tracer provider. + + Args: + session_id: Unique identifier for the telemetry session + tracer_provider: Optional TracerProvider. If not provided, creates a new one. + In production, you typically want to pass in a configured provider + with the desired span processors already registered. + """ + self.session_id = session_id + # Use provided provider or create new one. In production, you should pass in + # a configured provider to ensure consistent span processing across the application + self._tracer_provider = tracer_provider or TracerProvider() + self._tracer = self._tracer_provider.get_tracer(__name__) + self.event_counts: Dict[str, int] = { + "llms": 0, + "tools": 0, + "actions": 0, + "errors": 0, + "apis": 0, + } + + def process_event(self, event: Any, tags: Optional[List[str]] = None, flush_now: bool = False) -> Optional[Span]: + """Process and format an event into OpenTelemetry spans""" + # Ensure required attributes + if not hasattr(event, "id"): + event.id = uuid4() + if not hasattr(event, "init_timestamp"): + event.init_timestamp = get_ISO_time() + if not hasattr(event, "end_timestamp") or event.end_timestamp is None: + event.end_timestamp = get_ISO_time() + if not hasattr(event, "session_id"): + event.session_id = self.session_id + + # Get current span if it exists + current_span = trace.get_current_span() + + # Create session context + token = set_value("session.id", str(self.session_id)) + try: + token = attach(token) + + # Get span definitions from converter + span_definitions = EventToSpanConverter.convert_event(event) + + # If we have a current span and this is an error event, update the current span + if isinstance(event, ErrorEvent) and current_span and current_span.is_recording(): + # Update current span with error attributes + for key, value in span_definitions[0].attributes.items(): + current_span.set_attribute(key, value) + return current_span + + # Otherwise create new spans as before + primary_span = None + for span_def in span_definitions: + context = None + if span_def.parent_span_id and primary_span: + context = trace.set_span_in_context(primary_span) + + # Add common attributes + span_def.attributes.update({ + "event.id": str(event.id), + "session.id": str(self.session_id), + "session.tags": ",".join(tags) if tags else "", + "event.timestamp": event.init_timestamp, + "event.end_timestamp": event.end_timestamp, + }) + + with self._tracer.start_span( + name=span_def.name, + kind=span_def.kind, + attributes=span_def.attributes, + context=context, + ) as span: + if not primary_span: + primary_span = span + if event.event_type in self.event_counts: + self.event_counts[event.event_type] += 1 + + if flush_now: + span.end() + + return primary_span + finally: + detach(token) + + def _format_event_data(self, event: Any) -> Dict[str, Any]: + """Format event data based on event type""" + event_data = dict(filter_unjsonable(event.__dict__)) + + if hasattr(event, "error_type"): + event_data["error_type"] = getattr(event, "error_type", event.event_type) + elif event.event_type == "actions": + if "action_type" not in event_data: + event_data["action_type"] = event_data.get("name", "unknown_action") + if "name" not in event_data: + event_data["name"] = event_data.get("action_type", "unknown_action") + elif event.event_type == "tools": + if "name" not in event_data: + event_data["name"] = event_data.get("tool_name", "unknown_tool") + if "tool_name" not in event_data: + event_data["tool_name"] = event_data.get("name", "unknown_tool") + + return event_data + + +class LiveSpanProcessor(SpanProcessor): + """ + This processor is particularly useful for monitoring long-running operations + where you want to see progress before completion, rather than only getting + visibility after the fact. + + Integrates with the broader OpenTelemetry system through the + standard SpanProcessor interface, but adds the specialized capability of exporting + intermediate states of spans, which is not typically available in standard OTEL processors. + """ + + def __init__(self, span_exporter: SpanExporter): + self.span_exporter = span_exporter + self._in_flight: Dict[int, Span] = {} + self._lock = Lock() + self._stop_event = Event() + self._export_thread = Thread(target=self._export_periodically, daemon=True) + self._export_thread.start() + + def _export_periodically(self) -> None: + while not self._stop_event.is_set(): + time.sleep(1) # Export every second + with self._lock: + to_export = [self._readable_span(span) for span in self._in_flight.values()] + if to_export: + self.span_exporter.export(to_export) + + def _readable_span(self, span: OTELSpan) -> ReadableSpan: + """Convert an OTEL span to a readable span with additional attributes""" + if not hasattr(span, 'get_span_context'): + raise ValueError("Invalid span type") + + context = span.get_span_context() + + # Combine existing attributes with in-flight attributes + attributes = dict(span.attributes or {}) + attributes.update({ + "agentops.in_flight": True, + "agentops.event_type": span.attributes.get("event.type", "unknown"), + "agentops.duration_ms": (time.time_ns() - span.start_time) / 1e6, + }) + + # Create new ReadableSpan with all attributes + readable = ReadableSpan( + name=span.name, + context=context, + parent=None, # Parent context handled separately + resource=span.resource, + attributes=attributes, # Use combined attributes + events=span.events, + links=span.links, + kind=span.kind, + status=span.status, + start_time=span.start_time, + end_time=time.time_ns(), + instrumentation_scope=span.instrumentation_scope, + ) + + return readable + + def on_start(self, span: OTELSpan, parent_context: Optional[Context] = None) -> None: + """Handle span start event""" + context = span.get_span_context() + if not context or not context.trace_flags.sampled: + return + + with self._lock: + self._in_flight[context.span_id] = span + # Export immediately when span starts + readable = self._readable_span(span) + self.span_exporter.export([readable]) + + def on_end(self, span: ReadableSpan) -> None: + """Handle span end event""" + if not span.context or not span.context.trace_flags.sampled: + return + with self._lock: + # Remove from in-flight and export final state + if span.context.span_id in self._in_flight: + del self._in_flight[span.context.span_id] + self.span_exporter.export((span,)) + + def shutdown(self) -> None: + self._stop_event.set() + self._export_thread.join() + self.span_exporter.shutdown() + + def force_flush(self, timeout_millis: int = 30000) -> bool: + return True diff --git a/docs/dev/CURRENT_OTEL.md b/docs/dev/CURRENT_OTEL.md new file mode 100644 index 000000000..d7b3057ec --- /dev/null +++ b/docs/dev/CURRENT_OTEL.md @@ -0,0 +1,142 @@ +# OpenTelemetry Design in AgentOps + +## Overview + +AgentOps uses OpenTelemetry (OTEL) for its observability infrastructure. The implementation centers around a custom SessionExporter that handles the export of telemetry data to the AgentOps backend. + +## Core Components + +### 1. Session Exporter +The SessionExporter is the primary component responsible for exporting spans to the AgentOps backend. + +```python +class SessionExporter(SpanExporter): + def __init__(self, session, endpoint: str): + self._export_lock = threading.Lock() + self._shutdown = threading.Event() + self.session = session + self.endpoint = endpoint + + def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: + # Implementation details in session.py +``` + +### 2. Session Management + +Sessions are the core organizational unit in AgentOps. Each session maintains its own OTEL context and exporter. + +Key attributes: +```python +class Session: + def __init__(self): + self.session_id: UUID + self._tracer_provider: TracerProvider + self._otel_exporter: SessionExporter +``` + +## Architecture + +```mermaid +graph TD + A[Agent Code] -->|Instrumentation| B[AgentOps SDK] + B -->|Creates| C[Session] + C -->|Initializes| D[TracerProvider] + C -->|Creates| E[SessionExporter] + D -->|Generates| F[Spans] + F -->|Exported via| E + E -->|Sends to| G[AgentOps Backend] + + subgraph "OTEL Implementation" + D + E + F + end +``` + +## Span Structure + +Each span represents an event in AgentOps and contains the following required attributes: + +```python +{ + "event.id": str(UUID), + "event.type": str, + "event.timestamp": str, # ISO format + "event.end_timestamp": str, # ISO format + "event.data": str, # JSON serialized + "session.id": str +} +``` + +## Implementation Details + +### 1. Span Export Process + +The export process follows these steps: + +1. Collect spans in batch +2. Format event data based on event type (actions, tools, etc.) +3. Add required timestamps and IDs +4. Export to AgentOps backend via HTTP + +### 2. Session States + +AgentOps supports two session management modes: + +1. Single Session Mode + - One active session at a time + - Synchronous operations + - Default mode + +2. Multi-Session Mode + - Multiple concurrent sessions + - Asynchronous operations + - Requires explicit session management + +## Best Practices + +1. **Session Management** + - Initialize sessions explicitly + - End sessions properly + - Handle session state transitions + +2. **Span Creation** + - Include all required attributes + - Use consistent event types + - Properly format timestamps + +3. **Error Handling** + - Implement proper retry logic + - Log export failures + - Maintain data consistency + +## Configuration Options + +The OTEL implementation can be configured through: + +1. **Export Settings** + - Batch size + - Export frequency + - Retry attempts + +2. **Instrumentation Options** + - Auto-instrumentation toggles + - Custom attribute addition + - Sampling rates + +## Future Improvements + +1. **Distributed Tracing** + - Cross-service trace context propagation + - Baggage support + - W3C trace context compliance + +2. **Metrics Collection** + - OTEL metrics support + - Custom metrics exporters + - Aggregation support + +3. **Performance Optimization** + - Batch size optimization + - Export frequency tuning + - Compression support \ No newline at end of file diff --git a/docs/dev/OTEL/backend/collector-and-grpc.md b/docs/dev/OTEL/backend/collector-and-grpc.md new file mode 100644 index 000000000..c0d5d0ec8 --- /dev/null +++ b/docs/dev/OTEL/backend/collector-and-grpc.md @@ -0,0 +1,10 @@ +Why gRPC for Logs? + 1. Performance: + • gRPC uses HTTP/2, providing low-latency, high-throughput communication. + • Efficient for streaming large amounts of log data in real time. + 2. Compression: + • gRPC supports built-in compression (e.g., gzip), reducing the overhead of transferring large log files. + 3. Interoperability: + • Many OpenTelemetry-compatible backends (e.g., Jaeger, Prometheus, Datadog) expect data in gRPC OTLP format. + 4. Standardization: + • gRPC is the recommended protocol for OTLP in production deployments. It’s more robust for structured log delivery compared to HTTP/JSON. diff --git a/docs/dev/OTEL/entity_mapping.md b/docs/dev/OTEL/entity_mapping.md new file mode 100644 index 000000000..abc7ca13e --- /dev/null +++ b/docs/dev/OTEL/entity_mapping.md @@ -0,0 +1,74 @@ +An example of how AgentOps components could map to OpenTelemetry concepts: + +1. **Core Mapping** +```mermaid +graph LR + subgraph AgentOps + A[Session] --> B[Events] + B --> C[LLMEvent] + B --> D[ActionEvent] + B --> E[ToolEvent] + end + + subgraph OpenTelemetry + F[Trace] --> G[Spans] + G --> H[LLM Spans] + G --> I[Action Spans] + G --> J[Tool Spans] + K[Metrics] --> L[LLM Metrics] + end + + A -.->|Maps to| F + C -.->|Maps to| H + D -.->|Maps to| I + E -.->|Maps to| J +``` + +Let's look at specific examples: + +1. **Session to Trace** +````python +# When AgentOps starts a session: +class Session: + def __init__(self): + # Create root span for the session + self.trace = tracer.start_span( + name="agentops.session", + attributes={ + "session.id": self.session_id, + "agent.id": self.agent_id + } + ) +```` + +2. **LLMEvent to Span** +````python +# When AgentOps records an LLM event: +class LLMEvent: + def to_span(self): + return tracer.start_span( + name="llm.completion", + attributes={ + "llm.model": self.model, + "llm.tokens.prompt": self.prompt_tokens, + "llm.tokens.completion": self.completion_tokens, + "llm.cost": self.cost + } + ) +```` + +3. **LLM Metrics** +````python +# In LlmTracker: +class LlmTracker: + def __init__(self): + self.calls_counter = meter.create_counter( + name="llm.calls", + description="Number of LLM API calls" + ) + + self.token_histogram = meter.create_histogram( + name="llm.tokens", + description="Distribution of token usage" + ) +```` diff --git a/docs/dev/OTEL/entity_mappings_v2.md b/docs/dev/OTEL/entity_mappings_v2.md new file mode 100644 index 000000000..187ba09ac --- /dev/null +++ b/docs/dev/OTEL/entity_mappings_v2.md @@ -0,0 +1,102 @@ + +Looking at CODEBASE.md, here's how the mapping should actually work: + +1. **Session → Trace** + - Each session represents a complete interaction/workflow + - Contains all related events + - Has a unique `session_id` (that becomes the `trace_id`) + +2. **Events → Spans** + ```mermaid + graph TB + subgraph Session/Trace + A[Session Start] -->|Parent Span| B[Events] + B --> C[LLMEvent
span: llm.completion] + B --> D[ActionEvent
span: agent.action] + B --> E[ToolEvent
span: agent.tool] + + C --> C1[API Call
span: llm.api.call] + D --> D1[Function Execution
span: action.execution] + E --> E1[Tool Execution
span: tool.execution] + end + ``` + +Looking at CODEBASE.md's Event class: +```python +class Event { + +EventType event_type + +Dict params + +str init_timestamp # Maps to span.start_time + +str end_timestamp # Maps to span.end_time + +UUID agent_id # Maps to span.attributes["agent.id"] + +UUID id # Maps to span.span_id +} +``` + +Each Event naturally maps to a span because: +1. Events have start/end times (like spans) +2. Events have unique IDs (like spans) +3. Events have parameters/metadata (like span attributes) +4. Events are hierarchical (like spans can be) + +The key insight is that some events might create multiple spans: + +```python +# Example LLMEvent creating multiple spans +class LLMEvent: + def to_spans(self, tracer): + # Main LLM event span + with tracer.start_span("llm.completion") as event_span: + event_span.set_attributes({ + "llm.model": self.model, + "llm.tokens.total": self.prompt_tokens + self.completion_tokens, + "llm.cost": self.cost + }) + + # Child span for API call + with tracer.start_span("llm.api.call", parent=event_span) as api_span: + api_span.set_attributes({ + "llm.provider": self.provider, + "llm.api.endpoint": self.endpoint + }) +``` + +This better reflects the reality that a single logical event (like an LLM call) might involve multiple distinct operations that we want to track separately. + + + +# Direct mapping of our events to OTEL spans + +``` +EVENT_TO_SPAN_MAPPING = { + 'LLMEvent': { + 'name': 'llm.completion', + 'attributes': { + 'llm.model': 'model', + 'llm.tokens.prompt': 'prompt_tokens', + 'llm.tokens.completion': 'completion_tokens', + 'llm.cost': 'cost' + } + }, + 'ActionEvent': { + 'name': 'agent.action', + 'attributes': { + 'action.type': 'action_type', + 'action.name': 'name' + } + }, + 'ToolEvent': { + 'name': 'agent.tool', + 'attributes': { + 'tool.name': 'name' + } + }, + 'ErrorEvent': { + 'name': 'agent.error', + 'attributes': { + 'error.type': 'error_type', + 'error.code': 'code' + } + } +} +``` diff --git a/docs/dev/OTEL/exporters-behavior-and-use-case.md b/docs/dev/OTEL/exporters-behavior-and-use-case.md new file mode 100644 index 000000000..1cb91ef9b --- /dev/null +++ b/docs/dev/OTEL/exporters-behavior-and-use-case.md @@ -0,0 +1,78 @@ +Based on the documentation, here's a high-level overview of exporters behavior and a real-world use case: + +### Exporters Behavior + +```mermaid +graph LR + A[AgentOps Events] --> B[OTEL SDK] + B --> C{Sampler} + C -->|Sampled| D[Batch Processor] + C -->|Not Sampled| E[Dropped] + D --> F[OTLP Exporter] + F -->|HTTP/gRPC| G[OTEL Collector] + G --> H1[Jaeger] + G --> H2[Prometheus] + G --> H3[Other Backends] +``` + +### Real-World Use Case Example: + +```mermaid +graph TD + A[AI Agent System] --> B[AgentOps Events] + B --> C[OTEL Integration] + + subgraph "Telemetry Pipeline" + C -->|1. LLM Call| D[Span: model=gpt-4, tokens=1500] + C -->|2. Tool Call| E[Span: tool=database_query] + C -->|3. Error| F[Span: error=API_timeout] + end + + subgraph "OTEL Processing" + D --> G[Sampler
rate=0.5] + E --> G + F --> G + G --> H[BatchProcessor
batch_size=512
schedule=5s] + H --> I[OTLP Exporter] + end + + I -->|Export| J[Collector] + J -->|Visualize| K[Jaeger UI] +``` + +Key Behaviors: + +1. **Sampling Decision**: +- Parent-based sampling ensures entire traces are sampled consistently +- Error events typically have higher sampling priority +- Default sampling rate can be configured (e.g., 0.5 = 50% of traces) + +2. **Batching**: +```python +# Example configuration +batch_processor = BatchSpanProcessor( + OTLPSpanExporter(), + # Max batch size before forcing export + max_queue_size=512, + # Scheduled export interval + schedule_delay_millis=5000 +) +``` + +3. **Export Formats**: +```python +# OTLP over gRPC (recommended for production) +otlp_exporter = OTLPSpanExporter( + endpoint="https://collector:4317", + insecure=False +) + +# Console exporter (for development) +console_exporter = ConsoleSpanExporter() +``` + +This setup allows AgentOps to: +- Efficiently batch and export telemetry data +- Maintain trace context across agent operations +- Control data volume through sampling +- Support multiple observability backends through the collector diff --git a/docs/dev/OTEL/span_vs_otlpspan.md b/docs/dev/OTEL/span_vs_otlpspan.md new file mode 100644 index 000000000..d55fa29a6 --- /dev/null +++ b/docs/dev/OTEL/span_vs_otlpspan.md @@ -0,0 +1,73 @@ +### Span vs OTLPSpan + +```mermaid +classDiagram + class Span { + +SpanContext context + +str name + +SpanKind kind + +Dict attributes + +start_time + +end_time + +set_attribute() + +add_event() + +set_status() + } + + class OTLPSpan { + +SpanContext context + +str name + +SpanKind kind + +Dict attributes + +Resource resource + +InstrumentationScope scope + +List[Event] events + +List[Link] links + +Status status + +to_protobuf() + +from_protobuf() + } + + Span <|-- ReadableSpan + ReadableSpan <|-- OTLPSpan +``` + +1. **Span (Base Class)** + - Basic span representation for in-memory operations + - Used during active tracing + - Contains core attributes and methods + ```python + from opentelemetry.trace import Span + + span = tracer.start_span("operation_name") + span.set_attribute("key", "value") + ``` + +2. **OTLPSpan (OTLP Format)** + - Specialized format for OpenTelemetry Protocol (OTLP) + - Used for exporting/transmitting spans + - Contains additional fields for interoperability + - Includes protobuf serialization capabilities + ```python + from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter + + # Spans are automatically converted to OTLP format during export + exporter = OTLPSpanExporter() + ``` + +3. **Key Differences**: + ```python + # Regular Span - Used in instrumentation + with tracer.start_span("my_operation") as span: + span.set_attribute("custom.attribute", "value") + # ... do work ... + + # OTLPSpan - Used in export pipeline + class CustomExporter(SpanExporter): + def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: + # Spans are converted to OTLP format here + otlp_spans = [span.to_otlp() for span in spans] + # Send to backend... + ``` + +The main distinction is that `Span` is used for instrumentation while `OTLPSpan` is used for data exchange between systems. diff --git a/docs/dev/OTEL_V2.md b/docs/dev/OTEL_V2.md new file mode 100644 index 000000000..dbe47d0c6 --- /dev/null +++ b/docs/dev/OTEL_V2.md @@ -0,0 +1,161 @@ + +Based on the OpenTelemetry Python documentation and the current codebase, I'll explain these concepts and provide recommendations for implementation: + +# OpenTelemetry Design in AgentOps + +## Current Implementation + +AgentOps uses OpenTelemetry for observability through a custom SessionExporter that handles span export and session management. + +```mermaid +graph TD + A[Agent Code] -->|Instrumentation| B[AgentOps SDK] + B -->|Creates| C[Session] + C -->|Initializes| D[TracerProvider] + C -->|Creates| E[SessionExporter] + D -->|Generates| F[Spans] + F -->|Exported via| E + E -->|Sends to| G[AgentOps Backend] +``` + +## Distributed Tracing + +### Current State +Currently, AgentOps implements basic tracing within a single service. To support distributed tracing across services: + +```mermaid +sequenceDiagram + participant Service A + participant Context Propagation + participant Service B + participant Backend + + Service A->>Context Propagation: Generate Trace Context + Context Propagation->>Service B: Propagate Context + Service B->>Backend: Send Spans with Context + Note over Service A,Backend: Spans are linked via trace/span IDs +``` + +### Implementation Plan + +1. **Cross-Service Trace Context Propagation** +```python +from opentelemetry.propagate import inject, extract +from opentelemetry.trace import get_current_span + +class DistributedSession(Session): + def propagate_context(self, headers=None): + if headers is None: + headers = {} + inject(headers) # Injects current context into headers + return headers + + def receive_context(self, headers): + context = extract(headers) + return context +``` + +2. **Baggage Support** +```python +from opentelemetry.baggage import set_baggage, get_baggage + +class Session: + def add_baggage(self, key: str, value: str): + """Add metadata that flows with the trace""" + set_baggage(key, value) + + def get_baggage_value(self, key: str) -> str: + """Retrieve baggage value""" + return get_baggage(key) +``` + +3. **W3C Trace Context Compliance** +```python +from opentelemetry.propagators.trace_context import TraceContextTextMapPropagator + +class Session: + def __init__(self): + self.propagator = TraceContextTextMapPropagator() + # Use W3C trace context format + set_global_textmap(self.propagator) +``` + +## Metrics Implementation + +### Custom Metrics +```python +from opentelemetry import metrics +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader +from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter + +class MetricsManager: + def __init__(self): + # Configure metrics export + reader = PeriodicExportingMetricReader( + OTLPMetricExporter(endpoint="") + ) + provider = MeterProvider(metric_readers=[reader]) + metrics.set_meter_provider(provider) + self.meter = metrics.get_meter("agentops.metrics") + + # Define custom metrics + self.llm_latency = self.meter.create_histogram( + name="llm.request.latency", + description="Time taken for LLM requests" + ) + + self.token_counter = self.meter.create_counter( + name="llm.tokens.total", + description="Total tokens processed" + ) + + def record_latency(self, duration_ms: float): + self.llm_latency.record(duration_ms) + + def increment_tokens(self, count: int): + self.token_counter.add(count) +``` + +### Integration with Session + +```python +class Session: + def __init__(self): + self.metrics = MetricsManager() + + def record_llm_call(self, duration_ms: float, token_count: int): + self.metrics.record_latency(duration_ms) + self.metrics.increment_tokens(token_count) +``` + +## Configuration + +```python +# Environment variables for metrics configuration +OTEL_EXPORTER_OTLP_METRICS_ENDPOINT="http://collector:4318/v1/metrics" +OTEL_EXPORTER_METRICS_TEMPORALITY_PREFERENCE="DELTA" + +# Metrics export interval +OTEL_METRIC_EXPORT_INTERVAL=60000 # milliseconds +``` + +## Best Practices + +1. **Context Propagation** + - Always propagate trace context in distributed systems + - Use baggage for request-scoped metadata + - Follow W3C trace context specification + +2. **Metrics** + - Use appropriate metric types (Counter, Histogram, Gauge) + - Set meaningful metric names and descriptions + - Configure appropriate export intervals + +3. **Resource Attribution** + - Tag metrics and traces with service name + - Include version information + - Add environment labels +``` + +This design document outlines both the current implementation and future enhancements for distributed tracing and metrics in AgentOps using OpenTelemetry. The implementation details are based on OpenTelemetry Python best practices and can be extended based on specific needs. diff --git a/docs/dev/otel-x-customer-painpoints.md b/docs/dev/otel-x-customer-painpoints.md new file mode 100644 index 000000000..40ba80aa7 --- /dev/null +++ b/docs/dev/otel-x-customer-painpoints.md @@ -0,0 +1,284 @@ +## Current State vs. Potential Customer Usage + +Currently, AgentOps uses OTEL primarily for internal telemetry through the SessionExporter: + +```168:254:agentops/session.py +class Session: + ... + def __init__( + self, + session_id: UUID, + config: Configuration, + tags: Optional[List[str]] = None, + host_env: Optional[dict] = None, + ): + self.end_timestamp = None + self.end_state: Optional[str] = "Indeterminate" + self.session_id = session_id + self.init_timestamp = get_ISO_time() + self.tags: List[str] = tags or [] + self.video: Optional[str] = None + self.end_state_reason: Optional[str] = None + self.host_env = host_env + self.config = config + self.jwt = None + self._lock = threading.Lock() + self._end_session_lock = threading.Lock() + self.token_cost: Decimal = Decimal(0) + self._session_url: str = "" + self.event_counts = { + "llms": 0, + "tools": 0, + "actions": 0, + "errors": 0, + "apis": 0, + } + # self.session_url: Optional[str] = None + + # Start session first to get JWT + self.is_running = self._start_session() + if not self.is_running: + return + + # Initialize OTEL components with a more controlled processor + self._tracer_provider = TracerProvider() + self._otel_tracer = self._tracer_provider.get_tracer( + f"agentops.session.{str(session_id)}", + ) + self._otel_exporter = SessionExporter(session=self) + # Use smaller batch size and shorter delay to reduce buffering + self._span_processor = BatchSpanProcessor( + self._otel_exporter, + max_queue_size=self.config.max_queue_size, + schedule_delay_millis=self.config.max_wait_time, + max_export_batch_size=min( + max(self.config.max_queue_size // 20, 1), + min(self.config.max_queue_size, 32), + ), + export_timeout_millis=20000, + ) + + self._tracer_provider.add_span_processor(self._span_processor) +``` + + +However, customers might want to: + +1. **Use Their Own OTEL Setup** +Many organizations already have OTEL infrastructure and might want to: +- Send data to multiple backends (their existing + AgentOps) +- Use their own sampling/batching configurations +- Add custom attributes/resources + +2. **Custom Metrics** +Customers might want to track: +- LLM-specific metrics (token usage, latency, costs) +- Agent performance metrics (success rates, completion times) +- Custom business metrics + +Here's how I envision a more flexible integration: + +```python +# Option 1: Use AgentOps with existing OTEL setup +import agentops +from opentelemetry import trace +from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter + +# Customer's existing OTEL setup +existing_exporter = OTLPSpanExporter(endpoint="their-collector:4317") + +# Initialize AgentOps with custom OTEL config +agentops.init( + api_key="xxx", + otel_config={ + "additional_exporters": [existing_exporter], + "resource_attributes": { + "service.name": "my-agent-service", + "deployment.environment": "production" + } + } +) +``` + +```python +# Option 2: Custom metrics integration +import agentops +from opentelemetry import metrics + +# Initialize with metrics support +session = agentops.init( + api_key="xxx", + enable_metrics=True +) + +# Add custom metrics +meter = metrics.get_meter("agent.metrics") +token_counter = meter.create_counter( + name="llm.tokens.total", + description="Total tokens processed" +) + +@agentops.record_action("process_task") +def process_task(): + # Your agent code + token_counter.add(1, {"model": "gpt-4"}) +``` + +## Recommended Architecture Changes + +1. **Pluggable OTEL Manager** + +```9:52:agentops/telemetry/manager.py +class OTELManager: + """ + Manages OpenTelemetry setup and configuration for AgentOps + """ + + def __init__(self, config: Configuration): + self.config = config + self._tracer_provider = None + self._processors = [] + + def initialize(self, service_name: str, session_id: str): + """Initialize OTEL components with proper resource attributes""" + resource = Resource.create( + { + SERVICE_NAME: service_name, + "session.id": session_id, + } + ) + + self._tracer_provider = TracerProvider(resource=resource) + return self._tracer_provider + + def add_processor(self, processor: BatchSpanProcessor): + """Add a span processor to the tracer provider""" + if self._tracer_provider: + self._tracer_provider.add_span_processor(processor) + self._processors.append(processor) + + def get_tracer(self, name: str): + """Get a tracer instance for the given name""" + if not self._tracer_provider: + raise RuntimeError("OTELManager not initialized") + return self._tracer_provider.get_tracer(name) + + def shutdown(self): + """Shutdown all processors and cleanup resources""" + for processor in self._processors: + try: + processor.force_flush(timeout_millis=5000) + processor.shutdown() + except Exception: + pass + self._processors = [] + self._tracer_provider = None +``` + +This is a good start, but could be extended to support: +- Multiple exporters +- Custom metric providers +- Resource configuration + +2. **Enhanced Metrics Support** + +```54:87:agentops/telemetry/metrics.py + def _setup_meter_provider(self) -> MeterProvider: + """Setup the meter provider with appropriate exporters""" + # Create console exporter for development + console_exporter = ConsoleMetricExporter() + reader = PeriodicExportingMetricReader(console_exporter, export_interval_millis=5000) + + return SDKMeterProvider( + metric_readers=[reader], + ) + + def _get_memory_usage(self, options: CallbackOptions) -> Iterable[Observation]: + """Callback to get current memory usage""" + try: + import psutil + + process = psutil.Process() + memory = process.memory_info().rss + return [Observation(value=float(memory), attributes={"type": "process_memory"})] + except Exception as e: + logger.error(f"Failed to collect memory metrics: {e}") + return [] + + def record_export_attempt(self, success: bool, duration_ms: float, batch_size: int, error_type: str = None): + """Record metrics for an export attempt""" + # Record attempt + self.export_attempts.add(1) + + # Record failure if applicable + if not success: + self.export_failures.add(1, {"error_type": error_type or "unknown"}) + + # Record duration and batch size + self.export_duration.record(duration_ms) + self.batch_size.record(batch_size) +``` + +The metrics implementation could be expanded to include: +- Standard LLM metrics +- Agent performance metrics +- Custom metric registration + +3. **Context Propagation** +For distributed tracing scenarios: +```python +class Session: + def inject_context(self, carrier: dict): + """Inject OTEL context for distributed tracing""" + from opentelemetry.propagate import inject + return inject(carrier) + + def extract_context(self, carrier: dict): + """Extract OTEL context from carrier""" + from opentelemetry.propagate import extract + return extract(carrier) +``` + +## Best Practices for Integration + +1. **Configuration Flexibility** +```python +agentops.init( + api_key="xxx", + otel_config={ + "exporters": [...], + "processors": [...], + "samplers": {...}, + "resource_attributes": {...}, + "metric_readers": [...] + } +) +``` + +2. **Resource Attribution** +Always allow customers to add their own resource attributes: +```python +agentops.init( + api_key="xxx", + resource_attributes={ + "service.name": "agent-service", + "service.version": "1.0.0", + "deployment.environment": "production" + } +) +``` + +3. **Sampling Control** +Let customers configure sampling based on their needs: +```python +from opentelemetry.sdk.trace.sampling import ParentBased, TraceIdRatioBased + +agentops.init( + api_key="xxx", + sampler=ParentBased( + root=TraceIdRatioBased(0.1) # Sample 10% of traces + ) +) +``` + +This approach would make AgentOps more flexible for customers with existing OTEL setups while maintaining the simplicity for those who just want the default functionality. diff --git a/docs/dev/proposal.md b/docs/dev/proposal.md new file mode 100644 index 000000000..fe6a92adb --- /dev/null +++ b/docs/dev/proposal.md @@ -0,0 +1,196 @@ +# OpenTelemetry Integration Proposal + +## Current Architecture Context + +Our current architecture (from CODEBASE.md) shows we have: +- Client as central orchestrator +- Sessions managing discrete periods of activity +- Events (LLM, Action, Tool, Error) capturing agent behavior +- Provider-specific LLM instrumentation +- MetaClient for exception handling + +## Proposed OTEL Integration + +Here's how OpenTelemetry components map to our existing architecture: + +```mermaid +flowchart TB + subgraph AgentOps Core + Client[AgentOps Client] + Session[Sessions] + Events[Events] + Providers[LLM Providers] + end + + subgraph OpenTelemetry Layer + TP[TracerProvider] + SP[SpanProcessor] + Ex[OTLP Exporter] + + subgraph Spans + LLMSpan[LLM Spans] + ActionSpan[Action Spans] + ToolSpan[Tool Spans] + ErrorSpan[Error Spans] + end + end + + Client --> Session + Session --> Events + Events --> TP + Providers --> Events + + TP --> SP + SP --> Ex + + Events --> LLMSpan + Events --> ActionSpan + Events --> ToolSpan + Events --> ErrorSpan +``` + +### Entity Mapping + +1. **Events to Spans** +```python +# Direct mapping of our events to OTEL spans +EVENT_TO_SPAN_MAPPING = { + 'LLMEvent': { + 'name': 'llm.completion', + 'attributes': { + 'llm.model': 'model', + 'llm.tokens.prompt': 'prompt_tokens', + 'llm.tokens.completion': 'completion_tokens', + 'llm.cost': 'cost' + } + }, + 'ActionEvent': { + 'name': 'agent.action', + 'attributes': { + 'action.type': 'action_type', + 'action.name': 'name' + } + }, + 'ToolEvent': { + 'name': 'agent.tool', + 'attributes': { + 'tool.name': 'name' + } + }, + 'ErrorEvent': { + 'name': 'agent.error', + 'attributes': { + 'error.type': 'error_type', + 'error.code': 'code' + } + } +} +``` + +### Integration Points + +1. **Session Management** +```python +class Session: + def __init__(self): + self._tracer = get_tracer(__name__) + self._current_span = None + + def start_session(self): + self._current_span = self._tracer.start_span( + name="agent.session", + attributes={ + "session.id": self.session_id, + "agent.id": self.agent_id + } + ) + + def end_session(self): + if self._current_span: + self._current_span.end() +``` + +2. **Event Recording** +```python +class Session: + def record(self, event: Event): + # Create child span for event + with self._tracer.start_span( + name=EVENT_TO_SPAN_MAPPING[event.__class__.__name__]['name'], + attributes=self._map_event_attributes(event), + context=self._current_span + ) as span: + # Existing event recording logic + self.events.append(event) + span.set_status(Status(StatusCode.OK)) +``` + +3. **LLM Provider Instrumentation** +```python +class InstrumentedProvider: + def handle_response(self, response): + with self._tracer.start_span( + name="llm.api.call", + attributes={ + "llm.provider": self.provider_name, + "llm.model": response.model + } + ) as span: + # Existing response handling + span.set_status(Status(StatusCode.OK)) +``` + +### Configuration + +```python +def initialize_telemetry(): + resource = Resource.create({ + "service.name": "agentops", + "service.version": __version__, + "deployment.environment": get_environment() + }) + + provider = TracerProvider(resource=resource) + processor = BatchSpanProcessor( + OTLPSpanExporter( + endpoint=OTLP_ENDPOINT, + headers={"api-key": API_KEY} + ) + ) + provider.add_span_processor(processor) + set_tracer_provider(provider) +``` + +### Benefits + +1. **Standardized Observability** + - Events automatically converted to OTEL spans + - Standard attributes for better querying + - Built-in support for distributed tracing + +2. **Performance Impact** + - Batch processing of spans + - Minimal overhead during agent execution + - Configurable sampling for high-volume deployments + +3. **Compatibility** + - Works with existing AgentOps events + - No changes needed to agent code + - Support for multiple backends (Jaeger, Datadog, etc.) + +### Migration Strategy + +1. Phase 1: Basic Integration + - Add OTEL dependencies + - Implement basic span creation + - Map core events to spans + +2. Phase 2: Enhanced Features + - Add distributed tracing + - Implement sampling strategies + - Add custom metrics + +3. Phase 3: Production Readiness + - Performance optimization + - Error handling + - Documentation diff --git a/docs/pr/581.current.md b/docs/pr/581.current.md new file mode 100644 index 000000000..401652405 --- /dev/null +++ b/docs/pr/581.current.md @@ -0,0 +1,109 @@ +Currently, AgentOps uses OTEL primarily for internal telemetry, primarily handled by [SessionExporter](https://github.com/AgentOps-AI/agentops/blob/fbe476ffe72eb6cd5f7a21c93fdef433ac4b9115/agentops/session.py#L66). You can find more details on the implementation [here](https://github.com/AgentOps-AI/agentops/blob/otel/v2/docs/dev/CURRENT_OTEL.md). + +
V1 Architecture + +

+ +```mermaid +graph TD + A[Agent Code] -->|Instrumentation| B[AgentOps SDK] + B -->|Creates| C[Session] + C -->|Initializes| D[TracerProvider] + C -->|Creates| E[SessionExporter] + D -->|Generates| F[Spans] + F -->|Processed by| G[BatchSpanProcessor] + G -->|Exports via| E + E -->|Sends to| H[AgentOps Backend] + + subgraph "OTEL Implementation" + D + F + G + E + end +``` + +

+
+ + +Which is pretty limited and does not take full advantage of the OpenTelemetry capabilities. + +--- + +Clients might want to: + +- **Use Their Own OTEL Setup** + Many organizations already have OTEL infrastructure and might want to: + - Send data to multiple backends (their existing + AgentOps) + - Use their own sampling/batching configurations + - Add custom attributes/resources + +- **Trace Custom Metrics** + - LLM-specific metrics (token usage, latency, costs) + - Agent performance metrics (success rates, completion times) + - Custom business metrics + + +--- + +## Higher-level picture: AgentOps components mapping to OpenTelemetry concepts + + +```mermaid +graph LR + subgraph AgentOps + A[Session] --> B[Events] + B --> C[LLMEvent] + B --> D[ActionEvent] + B --> E[ToolEvent] + end + + subgraph OpenTelemetry + F[Trace] --> G[Spans] + G --> H[LLM Spans] + G --> I[Action Spans] + G --> J[Tool Spans] + K[Metrics] --> L[LLM Metrics] + end + + A -.->|Maps to| F + C -.->|Maps to| H + D -.->|Maps to| I + E -.->|Maps to| J +``` + +1. **Session → Trace** + - Each session represents a complete interaction/workflow + - Contains all related events + - Has a unique `session_id` (that becomes the `trace_id`) + +2. **Events → Spans** + + Each Event naturally maps to a span because: + - Events have start/end times _(like spans)_ + - Events have unique IDs _(like spans)_ + - Events have parameters/metadata _(like span attributes)_ + - Events are hierarchical _(like spans can be)_ + +
Session / Event Tracing +

+ + ```mermaid + graph TB + subgraph Session/Trace + A[Session Start] -->|Parent Span| B[Events] + B --> C[LLMEvent
span: llm.completion] + B --> D[ActionEvent
span: agent.action] + B --> E[ToolEvent
span: agent.tool] + + C --> C1[API Call
span: llm.api.call] + D --> D1[Function Execution
span: action.execution] + E --> E1[Tool Execution
span: tool.execution] + end + ``` + +

+
+ +[View more details](/AgentOps-AI/agentops/blob/otel/v2/docs/dev/OTEL/entity_mapping.md) diff --git a/docs/pr/581.md b/docs/pr/581.md new file mode 100644 index 000000000..f0475491b --- /dev/null +++ b/docs/pr/581.md @@ -0,0 +1,206 @@ +# OpenTelemetry Integration PR + +## Core Components + +### 1. OTELManager (`manager.py`) +- Central management of OpenTelemetry setup +- Handles TracerProvider configuration +- Manages span processors and exporters +- Resource attribute management +- Sampling configuration + +```python +class OTELManager: + def __init__(self, config: Configuration, + exporters: Optional[List[SpanExporter]] = None, + resource_attributes: Optional[Dict] = None, + sampler: Optional[Sampler] = None) +``` + +### 2. ExportManager (`exporter.py`) +- Custom span exporter implementation +- Handles batching and retry logic +- Supports custom formatters +- Error handling and recovery + +### 3. EventProcessor (`processors.py`) +- Event to span conversion +- Session context management +- Event type handling +- Error event processing + +### 4. LiveSpanProcessor (`processors.py`) +- Real-time span monitoring +- In-flight span tracking +- Periodic export of active spans +- Custom attribute injection + +## Key Features + +1. **Flexible Configuration** +```python +agentops.init( + api_key="xxx", + otel_config={ + "additional_exporters": [custom_exporter], + "resource_attributes": {"service.name": "my-service"}, + "sampler": custom_sampler + } +) +``` + +2. **Metrics Support** (`metrics.py`) +- Export attempts/failures tracking +- Duration and batch size histograms +- Memory usage monitoring +- Custom metric support + +3. **Logging Integration** (`logging.py`) +- OpenTelemetry-aware logging +- Trace context propagation +- Structured log formatting +- Console export support + +## Implementation Details + +### Resource Attribution +```python +resource_attributes = { + SERVICE_NAME: service_name, + "session.id": session_id, + # Custom attributes +} +``` + +### Span Processing Pipeline +1. Event creation +2. Span conversion +3. Processor chain +4. Export handling + +### Error Handling +- Retry logic for failed exports +- Error event special handling +- Graceful degradation + +## Testing & Validation + +1. **Unit Tests** +- Manager configuration +- Export handling +- Processor chain +- Error scenarios + +2. **Integration Tests** +- End-to-end flow +- Multiple exporters +- Resource cleanup + +## Future Improvements + +1. **Distributed Tracing** +- Cross-service context propagation +- W3C trace context support +- Baggage implementation + +2. **Advanced Metrics** +- Custom aggregations +- Additional dimensions +- Metric export optimization + +3. **Performance** +- Export batching optimization +- Memory usage improvements +- Sampling strategies + +## Migration Guide + +1. **Basic Usage** +```python +from agentops.telemetry import OTELConfig +config = OTELConfig(enable_metrics=True) +agentops.init(otel_config=config) +``` + +2. **Custom Configuration** +```python +config = OTELConfig( + additional_exporters=[my_exporter], + resource_attributes={"env": "prod"}, + enable_metrics=True +) + + + +# OpenTelemetry Integration PR + +## Architecture Overview + +```mermaid +graph TD + subgraph AgentOps + Client[AgentOps Client] + Session[Session] + Events[Events] + LLMTracker[LLM Tracker] + end + + subgraph OpenTelemetry + TracerProvider[Tracer Provider] + MeterProvider[Meter Provider] + Processors[Span/Metric Processors] + OTLP[OTLP Exporters] + end + + Client --> Session + Session --> Events + Client --> LLMTracker + + Events --> TracerProvider + LLMTracker --> MeterProvider + TracerProvider --> Processors + MeterProvider --> Processors + Processors --> OTLP +``` + +## Component Mapping + +```mermaid +graph LR + subgraph AgentOps Events + A[Session] --> B[Events] + B --> C[LLMEvent] + B --> D[ActionEvent] + B --> E[ToolEvent] + end + + subgraph OpenTelemetry + F[Trace] --> G[Spans] + G --> H[LLM Spans] + G --> I[Action Spans] + G --> J[Tool Spans] + K[Metrics] --> L[LLM Metrics] + end + + A -.->|Maps to| F + C -.->|Maps to| H + D -.->|Maps to| I + E -.->|Maps to| J +``` + +## Data Flow + +```mermaid +sequenceDiagram + participant Agent + participant Session + participant EventProcessor + participant OTELManager + participant Exporters + + Agent->>Session: Record Event + Session->>EventProcessor: Process Event + EventProcessor->>OTELManager: Create Span + OTELManager->>Exporters: Export via Processors + Note over Exporters: Batching & Retry Logic +``` diff --git a/examples/open-telemetry/.gitkeep b/examples/open-telemetry/.gitkeep new file mode 100644 index 000000000..e69de29bb diff --git a/examples/open-telemetry/Dockerfile b/examples/open-telemetry/Dockerfile new file mode 100644 index 000000000..4d0845e89 --- /dev/null +++ b/examples/open-telemetry/Dockerfile @@ -0,0 +1,21 @@ +FROM python:3.11-slim + +WORKDIR /app + +# Install system dependencies +RUN apt-get update && apt-get install -y --no-install-recommends \ + build-essential \ + && rm -rf /var/lib/apt/lists/* + +# Copy requirements first to leverage Docker cache +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +# Copy application code +COPY . . + +# Expose the application port +EXPOSE 8000 + +# Start the application with uvicorn +CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"] diff --git a/examples/open-telemetry/collector-config.yaml b/examples/open-telemetry/collector-config.yaml new file mode 100644 index 000000000..8d37005d3 --- /dev/null +++ b/examples/open-telemetry/collector-config.yaml @@ -0,0 +1,29 @@ +receivers: + otlp: + protocols: + http: + endpoint: 0.0.0.0:4318 + grpc: + endpoint: 0.0.0.0:4317 + +processors: + batch: + timeout: 1s + send_batch_size: 1000 + +exporters: + otlphttp: + endpoint: http://processing-service:8000 + tls: + insecure: true + headers: + "Content-Type": "application/x-protobuf" + debug: + verbosity: detailed + +service: + pipelines: + traces: + receivers: [otlp] + processors: [batch] + exporters: [otlphttp, debug] diff --git a/examples/open-telemetry/docker-compose.yml b/examples/open-telemetry/docker-compose.yml new file mode 100644 index 000000000..03796e637 --- /dev/null +++ b/examples/open-telemetry/docker-compose.yml @@ -0,0 +1,38 @@ +services: + collector: + image: otel/opentelemetry-collector:latest + command: ["--config=/etc/otel-collector-config.yaml"] + volumes: + - ./collector-config.yaml:/etc/otel-collector-config.yaml + ports: + - "4317:4317" # OTLP gRPC + - "4318:4318" # OTLP HTTP + depends_on: + - processing-service + + processing-service: + build: . + ports: + - "8000:8000" + depends_on: + - redis + environment: + - CELERY_BROKER_URL=redis://redis:6379/0 + volumes: + - .:/app + + redis: + image: redis:latest + ports: + - "6379:6379" + + celery-worker: + build: . + command: celery -A main.celery_app worker --loglevel=info + depends_on: + - redis + - processing-service + environment: + - CELERY_BROKER_URL=redis://redis:6379/0 + volumes: + - .:/app \ No newline at end of file diff --git a/examples/open-telemetry/main.py b/examples/open-telemetry/main.py new file mode 100644 index 000000000..3accae445 --- /dev/null +++ b/examples/open-telemetry/main.py @@ -0,0 +1,104 @@ +from fastapi import FastAPI, Request, HTTPException, Response +from opentelemetry.proto.collector.trace.v1 import trace_service_pb2 +from celery import Celery +from pydantic import BaseModel +from typing import Dict, Any +import logging +import json +import gzip +import io + +# Configure logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +app = FastAPI(title="Span Processing Service") +celery_app = Celery('tasks', broker='redis://redis:6379/0') + +class SpanData(BaseModel): + trace_id: str + span_id: str + name: str + attributes: Dict[str, Any] + +@app.post("/v1/traces") +async def ingest_spans(request: Request): + try: + # Get content type and encoding + content_type = request.headers.get("content-type", "") + content_encoding = request.headers.get("content-encoding", "") + + # Read raw body + body = await request.body() + + # Handle gzip compression if present + if content_encoding == "gzip": + with gzip.GzipFile(fileobj=io.BytesIO(body), mode="rb") as gz: + body = gz.read() + + # Parse the protobuf message + request_proto = trace_service_pb2.ExportTraceServiceRequest() + request_proto.ParseFromString(body) + + # Process each resource spans + for resource_spans in request_proto.resource_spans: + resource_attrs = {} + + # Extract resource attributes + for attr in resource_spans.resource.attributes: + resource_attrs[attr.key] = attr.value.string_value + + # Process each scope spans + for scope_spans in resource_spans.scope_spans: + # Process each span + for span in scope_spans.spans: + # Extract span attributes + span_attrs = {} + for attr in span.attributes: + span_attrs[attr.key] = attr.value.string_value + + # Combine resource and span attributes + combined_attrs = {**resource_attrs, **span_attrs} + + # Convert trace and span IDs to hex strings + trace_id = span.trace_id.hex() + span_id = span.span_id.hex() + + # Queue span for processing + process_span.delay( + trace_id=trace_id, + span_id=span_id, + name=span.name, + attributes=combined_attrs + ) + + logger.info(f"Queued span for processing: {span.name} (trace_id: {trace_id})") + + return Response( + content=trace_service_pb2.ExportTraceServiceResponse().SerializeToString(), + media_type="application/x-protobuf" + ) + + except Exception as e: + logger.error(f"Error processing spans: {str(e)}") + raise HTTPException(status_code=500, detail=str(e)) + +@celery_app.task(bind=True, max_retries=3) +def process_span(self, trace_id: str, span_id: str, name: str, attributes: dict): + try: + # Your processing logic here + # For example: + logger.info(f"Processing span: {name} (trace_id: {trace_id}, span_id: {span_id})") + # Add your storage/processing logic here + + # Just dump the span payload + json.dumps({ + "trace_id": trace_id, + "span_id": span_id, + "name": name, + "attributes": attributes + }, indent=2) + + except Exception as e: + logger.error(f"Error processing span {span_id}: {str(e)}") + raise self.retry(exc=e, countdown=2 ** self.request.retries) diff --git a/examples/open-telemetry/requirements.txt b/examples/open-telemetry/requirements.txt new file mode 100644 index 000000000..2b1854c60 --- /dev/null +++ b/examples/open-telemetry/requirements.txt @@ -0,0 +1,10 @@ +fastapi +uvicorn +celery +redis +protobuf +opentelemetry-proto +opentelemetry-api +opentelemetry-sdk +opentelemetry-exporter-otlp-proto-grpc +opentelemetry-exporter-otlp-proto-http diff --git a/examples/open-telemetry/test_spans.py b/examples/open-telemetry/test_spans.py new file mode 100644 index 000000000..4e79730ca --- /dev/null +++ b/examples/open-telemetry/test_spans.py @@ -0,0 +1,24 @@ +from opentelemetry import trace +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor +from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter +import time + +# Initialize tracer +trace.set_tracer_provider(TracerProvider()) +tracer = trace.get_tracer(__name__) + +# Configure OTLP exporter +otlp_exporter = OTLPSpanExporter(endpoint="http://localhost:4317") +span_processor = BatchSpanProcessor(otlp_exporter) +trace.get_tracer_provider().add_span_processor(span_processor) + +# Generate test spans +with tracer.start_as_current_span("parent") as parent: + parent.set_attribute("custom.attribute", "test-value") + + with tracer.start_as_current_span("child") as child: + child.set_attribute("another.attribute", "child-value") + time.sleep(0.1) # Simulate some work + +print("Test spans generated!") \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 421bfd0e7..ba7de9802 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,6 +34,7 @@ dependencies = [ "opentelemetry-api>=1.22.0,<2.0.0", "opentelemetry-sdk>=1.22.0,<2.0.0", "opentelemetry-exporter-otlp-proto-http>=1.22.0,<2.0.0", + "opentelemetry-exporter-otlp-proto-grpc>=1.0.0,<2.0.0" ] [dependency-groups] @@ -42,29 +43,25 @@ test = [ "langchain", "pytest-cov", ] - dev = [ # Testing essentials - "pytest>=7.4.0,<8.0.0", # Testing framework with good async support - "pytest-depends", # For testing complex agent workflows - "pytest-asyncio", # Async test support for testing concurrent agent operations - "pytest-mock", # Mocking capabilities for isolating agent components - "pyfakefs", # File system testing - "pytest-recording", # Alternative to pytest-vcr with better Python 3.x support + "pytest>=7.4.0,<8.0.0", # Testing framework with good async support + "pytest-depends", # For testing complex agent workflows + "pytest-asyncio", # Async test support for testing concurrent agent operations + "pytest-mock", # Mocking capabilities for isolating agent components + "pyfakefs", # File system testing + "pytest-recording", # Alternative to pytest-vcr with better Python 3.x support "vcrpy @ git+https://github.com/kevin1024/vcrpy.git@81978659f1b18bbb7040ceb324a19114e4a4f328", # Code quality and type checking - "ruff", # Fast Python linter for maintaining code quality - "mypy", # Static type checking for better reliability - "types-requests", # Type stubs for requests library - + "ruff", # Fast Python linter for maintaining code quality + "mypy", # Static type checking for better reliability + "types-requests", # Type stubs for requests library # HTTP mocking and environment "requests_mock>=1.11.0", # Mock HTTP requests for testing agent external communications - "python-dotenv", # Environment management for secure testing - + "python-dotenv", # Environment management for secure testing # Agent integration testing + "pytest-sugar>=1.0.0", ] - -# CI dependencies ci = [ "tach~=0.9" # Task runner for CI/CD pipelines ] @@ -96,6 +93,8 @@ faulthandler_timeout = 30 # Reduced from 60 timeout = 60 # Reduced from 300 disable_socket = true # Add this to prevent hanging on socket cleanup +log_level = "DEBUG" + [tool.ruff] line-length = 120 diff --git a/tests/telemetry/conftest.py b/tests/telemetry/conftest.py new file mode 100644 index 000000000..bd5176dd7 --- /dev/null +++ b/tests/telemetry/conftest.py @@ -0,0 +1,114 @@ +import pytest +from opentelemetry import trace as trace_api +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import SimpleSpanProcessor +from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter + +from agentops.event import ActionEvent, ErrorEvent, LLMEvent, ToolEvent + + +class InstrumentationTester: + """Helper class for testing OTEL instrumentation""" + def __init__(self): + self.tracer_provider = TracerProvider() + self.memory_exporter = InMemorySpanExporter() + span_processor = SimpleSpanProcessor(self.memory_exporter) + self.tracer_provider.add_span_processor(span_processor) + + # Reset and set global tracer provider + trace_api.set_tracer_provider(self.tracer_provider) + self.memory_exporter.clear() + + def get_finished_spans(self): + return self.memory_exporter.get_finished_spans() + + def clear(self): + """Clear captured spans""" + self.memory_exporter.clear() + + +@pytest.fixture +def instrumentation(): + """Fixture providing instrumentation testing utilities""" + return InstrumentationTester() + + +@pytest.fixture +def mock_llm_event(): + """Creates an LLMEvent for testing""" + return LLMEvent( + prompt="What is the meaning of life?", + completion="42", + model="gpt-4", + prompt_tokens=10, + completion_tokens=1, + cost=0.01, + ) + + +@pytest.fixture +def mock_action_event(): + """Creates an ActionEvent for testing""" + return ActionEvent( + action_type="process_data", + params={"input_file": "data.csv"}, + returns="100 rows processed", + logs="Successfully processed all rows", + ) + + +@pytest.fixture +def mock_tool_event(): + """Creates a ToolEvent for testing""" + return ToolEvent( + name="searchWeb", + params={"query": "python testing"}, + returns=["result1", "result2"], + logs={"status": "success"}, + ) + + +@pytest.fixture +def mock_error_event(): + """Creates an ErrorEvent for testing""" + trigger = ActionEvent(action_type="risky_action") + error = ValueError("Something went wrong") + return ErrorEvent( + trigger_event=trigger, + exception=error, + error_type="ValueError", + details="Detailed error info" + ) + + +@pytest.fixture +def mock_span_exporter(): + """Creates an InMemorySpanExporter for testing""" + return InMemorySpanExporter() + + +@pytest.fixture +def tracer_provider(mock_span_exporter): + """Creates a TracerProvider with test exporter""" + provider = TracerProvider() + processor = SimpleSpanProcessor(mock_span_exporter) + provider.add_span_processor(processor) + return provider + + +@pytest.fixture(autouse=True) +def cleanup_telemetry(): + """Cleanup telemetry after each test""" + yield + # Clean up any active telemetry + from agentops import Client + client = Client() + if hasattr(client, 'telemetry'): + try: + if client.telemetry._tracer_provider: + client.telemetry._tracer_provider.shutdown() + if client.telemetry._otel_manager: + client.telemetry._otel_manager.shutdown() + client.telemetry.shutdown() + except Exception: + pass # Ensure cleanup continues even if one step fails diff --git a/tests/telemetry/smoke.py b/tests/telemetry/smoke.py new file mode 100644 index 000000000..770ed80c2 --- /dev/null +++ b/tests/telemetry/smoke.py @@ -0,0 +1,107 @@ +import time +import uuid +from dataclasses import dataclass + +import pytest + +from agentops.config import Configuration +from agentops.telemetry.manager import OTELManager +from agentops.telemetry.processor import EventProcessor +from agentops.telemetry.exporter import ExportManager +from agentops.telemetry.metrics import TelemetryMetrics + +@dataclass +class TestEvent: + """Simple test event for smoke testing""" + id: uuid.UUID = uuid.uuid4() + event_type: str = "test_event" + init_timestamp: str | None = None + end_timestamp: str | None = None + data: dict | None = None + +def test_basic_telemetry_flow(): + """Test the basic flow of events through the telemetry system""" + # Setup + config = Configuration(api_key="test-key") + session_id = uuid.uuid4() + + # Initialize components + manager = OTELManager(config) + provider = manager.initialize("test-service", str(session_id)) + tracer = manager.get_tracer("test-tracer") + + exporter = ExportManager( + session_id=session_id, + endpoint="http://localhost:8000/v2/create_events", + jwt="test-jwt", + api_key="test-key" + ) + + processor = EventProcessor(session_id, tracer) + + # Create and process a test event + event = TestEvent(data={"test": "data"}) + span = processor.process_event(event, tags=["test"]) + + # Verify event was processed + assert span is not None + assert processor.event_counts["test_event"] == 1 + +def test_metrics_collection(): + """Test basic metrics collection""" + metrics = TelemetryMetrics("test-service") + + # Record some test metrics + metrics.record_export_attempt(True, 100.0, 10) + metrics.record_export_attempt(False, 200.0, 5) + + # Let metrics flush (they're async) + time.sleep(0.1) + + # Cleanup + metrics.shutdown() + +def test_manager_lifecycle(): + """Test OTEL manager lifecycle (init -> shutdown)""" + config = Configuration(api_key="test-key") + manager = OTELManager(config) + + # Initialize + provider = manager.initialize("test-service", str(uuid.uuid4())) + assert provider is not None + + # Get tracer + tracer = manager.get_tracer("test-tracer") + assert tracer is not None + + # Shutdown + manager.shutdown() + assert manager._tracer_provider is None + assert len(manager._processors) == 0 + +# def test_exporter_retry(): +# """Test exporter retry mechanism""" +# session_id = uuid.uuid4() +# exporter = ExportManager( +# session_id=session_id, +# endpoint="http://invalid-host:8000/v2/create_events", # Invalid endpoint to force retry +# jwt="test-jwt", +# api_key="test-key" +# ) +# +# # Create a test span (minimal attributes for testing) +# @dataclass +# class TestSpan: +# name: str = "test" +# attributes: dict = None +# +# span = TestSpan(attributes={ +# "event.data": "{}", +# "event.id": str(uuid.uuid4()), +# "event.timestamp": "2024-01-01T00:00:00Z", +# "event.end_timestamp": "2024-01-01T00:00:01Z" +# }) +# +# # Export should fail but not raise exception +# result = exporter.export([span]) +# assert result == exporter.SpanExportResult.FAILURE diff --git a/tests/telemetry/test_event_converter.py b/tests/telemetry/test_event_converter.py new file mode 100644 index 000000000..77db6b0c4 --- /dev/null +++ b/tests/telemetry/test_event_converter.py @@ -0,0 +1,110 @@ +import json +import pytest +from opentelemetry.trace import SpanKind + +from agentops.event import Event +from agentops.telemetry.converter import EventToSpanConverter, SpanDefinition + + +class TestEventToSpanConverter: + """Test the Event to Span conversion logic""" + + def test_llm_event_conversion(self, mock_llm_event): + """Test converting LLMEvent to spans""" + span_defs = EventToSpanConverter.convert_event(mock_llm_event) + + # Verify we get exactly two spans for LLM events + assert len(span_defs) == 2, f"Expected 2 spans for LLM event, got {len(span_defs)}" + + # Find the spans by name + completion_span = next((s for s in span_defs if s.name == "llm.completion"), None) + api_span = next((s for s in span_defs if s.name == "llm.api.call"), None) + + assert completion_span is not None, "Missing llm.completion span" + assert api_span is not None, "Missing llm.api.call span" + + # Verify completion span attributes + assert completion_span.attributes["model"] == mock_llm_event.model + assert completion_span.attributes["prompt"] == mock_llm_event.prompt + assert completion_span.attributes["completion"] == mock_llm_event.completion + assert completion_span.attributes["prompt_tokens"] == 10 + assert completion_span.attributes["completion_tokens"] == 1 + assert completion_span.attributes["cost"] == 0.01 + assert completion_span.attributes["event.start_time"] == mock_llm_event.init_timestamp + assert completion_span.attributes["event.end_time"] == mock_llm_event.end_timestamp + + # Verify API span attributes and relationships + assert api_span.parent_span_id == completion_span.name + assert api_span.kind == SpanKind.CLIENT + assert api_span.attributes["model"] == mock_llm_event.model + assert api_span.attributes["start_time"] == mock_llm_event.init_timestamp + assert api_span.attributes["end_time"] == mock_llm_event.end_timestamp + + def test_action_event_conversion(self, mock_action_event): + """Test converting ActionEvent to spans""" + span_defs = EventToSpanConverter.convert_event(mock_action_event) + + assert len(span_defs) == 2 + action_span = next((s for s in span_defs if s.name == "agent.action"), None) + execution_span = next((s for s in span_defs if s.name == "action.execution"), None) + + assert action_span is not None + assert execution_span is not None + + # Verify action span attributes + assert action_span.attributes["action_type"] == "process_data" + assert json.loads(action_span.attributes["params"]) == {"input_file": "data.csv"} + assert action_span.attributes["returns"] == "100 rows processed" + assert action_span.attributes["logs"] == "Successfully processed all rows" + assert action_span.attributes["event.start_time"] == mock_action_event.init_timestamp + + # Verify execution span + assert execution_span.parent_span_id == action_span.name + assert execution_span.attributes["start_time"] == mock_action_event.init_timestamp + assert execution_span.attributes["end_time"] == mock_action_event.end_timestamp + + def test_tool_event_conversion(self, mock_tool_event): + """Test converting ToolEvent to spans""" + span_defs = EventToSpanConverter.convert_event(mock_tool_event) + + assert len(span_defs) == 2 + tool_span = next((s for s in span_defs if s.name == "agent.tool"), None) + execution_span = next((s for s in span_defs if s.name == "tool.execution"), None) + + assert tool_span is not None + assert execution_span is not None + + # Verify tool span attributes + assert tool_span.attributes["name"] == "searchWeb" + assert json.loads(tool_span.attributes["params"]) == {"query": "python testing"} + assert json.loads(tool_span.attributes["returns"]) == ["result1", "result2"] + assert json.loads(tool_span.attributes["logs"]) == {"status": "success"} + + # Verify execution span + assert execution_span.parent_span_id == tool_span.name + assert execution_span.attributes["start_time"] == mock_tool_event.init_timestamp + assert execution_span.attributes["end_time"] == mock_tool_event.end_timestamp + + def test_error_event_conversion(self, mock_error_event): + """Test converting ErrorEvent to spans""" + span_defs = EventToSpanConverter.convert_event(mock_error_event) + + assert len(span_defs) == 1 + error_span = span_defs[0] + + # Verify error span attributes + assert error_span.name == "error" + assert error_span.attributes["error"] is True + assert error_span.attributes["error_type"] == "ValueError" + assert error_span.attributes["details"] == "Detailed error info" + assert "trigger_event" in error_span.attributes + + def test_unknown_event_type(self): + """Test handling of unknown event types""" + class UnknownEvent(Event): + pass + + # Should still work, just with generic event name + span_defs = EventToSpanConverter.convert_event(UnknownEvent(event_type="unknown")) + assert len(span_defs) == 1 + assert span_defs[0].name == "event" diff --git a/tests/telemetry/test_exporter.py b/tests/telemetry/test_exporter.py new file mode 100644 index 000000000..7e46da5d9 --- /dev/null +++ b/tests/telemetry/test_exporter.py @@ -0,0 +1,168 @@ +import json +import threading +import time +import uuid +from unittest.mock import Mock, patch + +import pytest +from opentelemetry.sdk.trace import ReadableSpan +from opentelemetry.sdk.trace.export import SpanExportResult + +from agentops.telemetry.exporter import ExportManager + + +@pytest.fixture +def mock_span(): + span = Mock(spec=ReadableSpan) + span.name = "test_span" + span.attributes = { + "event.id": str(uuid.uuid4()), + "event.data": json.dumps({"test": "data"}), + "event.timestamp": "2024-01-01T00:00:00Z", + "event.end_timestamp": "2024-01-01T00:00:01Z", + } + return span + + +@pytest.fixture +def ref(): + return ExportManager( + session_id=uuid.uuid4(), endpoint="http://test-endpoint/v2/create_events", jwt="test-jwt", api_key="test-key" + ) + + +class TestExportManager: + def test_initialization(self, ref: ExportManager): + """Test exporter initialization""" + assert not ref._shutdown.is_set() + assert isinstance(ref._export_lock, type(threading.Lock())) + assert ref._retry_count == 3 + assert ref._retry_delay == 1.0 + + def test_export_empty_spans(self, ref): + """Test exporting empty spans list""" + result = ref.export([]) + assert result == SpanExportResult.SUCCESS + + def test_export_single_span(self, ref, mock_span): + """Test exporting a single span""" + with patch("agentops.http_client.HttpClient.post") as mock_post: + mock_post.return_value.code = 200 + + result = ref.export([mock_span]) + assert result == SpanExportResult.SUCCESS + + # Verify request + mock_post.assert_called_once() + call_args = mock_post.call_args[0] + payload = json.loads(call_args[1].decode("utf-8")) + + assert len(payload["events"]) == 1 + assert payload["events"][0]["event_type"] == "test_span" + + def test_export_multiple_spans(self, ref, mock_span): + """Test exporting multiple spans""" + spans = [mock_span, mock_span] + + with patch("agentops.http_client.HttpClient.post") as mock_post: + mock_post.return_value.code = 200 + + result = ref.export(spans) + assert result == SpanExportResult.SUCCESS + + # Verify request + mock_post.assert_called_once() + call_args = mock_post.call_args[0] + payload = json.loads(call_args[1].decode("utf-8")) + + assert len(payload["events"]) == 2 + + def test_export_failure_retry(self, ref, mock_span): + """Test retry behavior on export failure""" + mock_wait = Mock() + ref._wait_fn = mock_wait + + with patch("agentops.http_client.HttpClient.post") as mock_post: + # Create mock responses with proper return values + mock_responses = [ + Mock(code=500), # First attempt fails + Mock(code=500), # Second attempt fails + Mock(code=200), # Third attempt succeeds + ] + mock_post.side_effect = mock_responses + + result = ref.export([mock_span]) + assert result == SpanExportResult.SUCCESS + assert mock_post.call_count == 3 + + # Verify exponential backoff delays + assert mock_wait.call_count == 2 + assert mock_wait.call_args_list[0][0][0] == 1.0 + assert mock_wait.call_args_list[1][0][0] == 2.0 + + def test_export_max_retries_exceeded(self, ref, mock_span): + """Test behavior when max retries are exceeded""" + mock_wait = Mock() + ref._wait_fn = mock_wait + + with patch("agentops.http_client.HttpClient.post") as mock_post: + # Mock consistently failing response + mock_response = Mock(ok=False, status_code=500) + mock_post.return_value = mock_response + + result = ref.export([mock_span]) + assert result == SpanExportResult.FAILURE + assert mock_post.call_count == ref._retry_count + + # Verify all retries waited + assert mock_wait.call_count == ref._retry_count - 1 + + def test_shutdown_behavior(self, ref, mock_span): + """Test exporter shutdown behavior""" + ref.shutdown() + assert ref._shutdown.is_set() + + # Should return success without exporting + result = ref.export([mock_span]) + assert result == SpanExportResult.SUCCESS + + def test_malformed_span_handling(self, ref): + """Test handling of malformed spans""" + malformed_span = Mock(spec=ReadableSpan) + malformed_span.name = "test_span" + malformed_span.attributes = {} # Missing required attributes + + with patch("agentops.http_client.HttpClient.post") as mock_post: + mock_post.return_value.code = 200 + + result = ref.export([malformed_span]) + assert result == SpanExportResult.SUCCESS + + # Verify event was formatted with defaults + call_args = mock_post.call_args[0] + payload = json.loads(call_args[1].decode("utf-8")) + event = payload["events"][0] + + assert "id" in event + assert event["event_type"] == "test_span" + + def test_concurrent_exports(self, ref, mock_span): + """Test concurrent export handling""" + + def export_spans(): + return ref.export([mock_span]) + + with patch("agentops.http_client.HttpClient.post") as mock_post: + mock_post.return_value.code = 200 + + # Create and start threads + threads = [threading.Thread(target=export_spans) for _ in range(3)] + for thread in threads: + thread.start() + + # Wait for all threads to complete + for thread in threads: + thread.join() + + # Verify each thread's export was processed + assert mock_post.call_count == 3 diff --git a/tests/telemetry/test_integration.py b/tests/telemetry/test_integration.py new file mode 100644 index 000000000..368139f2c --- /dev/null +++ b/tests/telemetry/test_integration.py @@ -0,0 +1,97 @@ +import time +import uuid +from dataclasses import dataclass +from typing import Optional + +import pytest + +from agentops.config import Configuration +from agentops.telemetry.manager import OTELManager +from agentops.telemetry.processors import EventProcessor +from agentops.telemetry.exporter import ExportManager +from agentops.telemetry.metrics import TelemetryMetrics +# from agentops.telemetry.log_handler import setup_logging +# +# @dataclass +# class ComplexEvent: +# """A more complex test event that mimics real usage""" +# id: uuid.UUID = uuid.uuid4() +# event_type: str = "complex_test" +# init_timestamp: Optional[str] = None +# end_timestamp: Optional[str] = None +# name: str = "test_action" +# action_type: str = "test" +# params: dict = None +# returns: dict = None +# error_type: Optional[str] = None +# trigger_event: Optional[any] = None +# +# def test_full_telemetry_pipeline(): +# """Test the full telemetry pipeline with all components""" +# # 1. Setup basic configuration +# config = Configuration(api_key="test-key") +# session_id = uuid.uuid4() +# +# # 2. Initialize logging +# logger_provider = setup_logging("test-service") +# +# # 3. Setup metrics +# metrics = TelemetryMetrics("test-service") +# +# # 4. Initialize OTEL manager +# manager = OTELManager(config) +# provider = manager.initialize("test-service", str(session_id)) +# tracer = manager.get_tracer("test-tracer") +# +# # 5. Setup exporter with metrics integration +# exporter = ExportManager( +# session_id=session_id, +# endpoint="http://localhost:8000/v2/create_events", +# jwt="test-jwt", +# api_key="test-key" +# ) +# +# # 6. Create event processor +# processor = EventProcessor(session_id, tracer) +# +# # 7. Process different types of events +# +# # Normal event +# normal_event = ComplexEvent( +# name="test_action", +# params={"input": "test"}, +# returns={"output": "success"} +# ) +# normal_span = processor.process_event(normal_event, tags=["test", "normal"]) +# assert normal_span is not None +# +# # Error event +# error_event = ComplexEvent( +# name="failed_action", +# error_type="TestError", +# params={"input": "test"}, +# trigger_event=normal_event +# ) +# error_span = processor.process_event(error_event, tags=["test", "error"]) +# assert error_span is not None +# +# # Verify event counts +# assert processor.event_counts["complex_test"] == 2 +# +# # 8. Export events and record metrics +# start_time = time.time() +# export_result = exporter.export([normal_span, error_span]) +# duration_ms = (time.time() - start_time) * 1000 +# +# metrics.record_export_attempt( +# success=(export_result == exporter.SpanExportResult.SUCCESS), +# duration_ms=duration_ms, +# batch_size=2 +# ) +# +# # 9. Cleanup +# metrics.shutdown() +# manager.shutdown() +# +# # Let async operations complete +# time.sleep(0.1) diff --git a/tests/telemetry/test_processor.py b/tests/telemetry/test_processor.py new file mode 100644 index 000000000..601fc851d --- /dev/null +++ b/tests/telemetry/test_processor.py @@ -0,0 +1,140 @@ +import json +import uuid +from unittest.mock import Mock, patch + +import pytest +from opentelemetry import trace +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor, SpanExporter +from opentelemetry.trace import SpanKind, Status, StatusCode + +from agentops.telemetry.processors import EventProcessor, LiveSpanProcessor + + +class TestSpanExporter(SpanExporter): + """Test exporter that captures spans for verification""" + def __init__(self): + self.exported_spans = [] + self._shutdown = False + + def export(self, spans): + self.exported_spans.extend(spans) + return True + + def shutdown(self): + self._shutdown = True + return True + + def force_flush(self, timeout_millis=None): + return True + + +@pytest.fixture +def test_exporter(): + """Create a test exporter that captures spans""" + return TestSpanExporter() + + +@pytest.fixture +def tracer_provider(test_exporter): + """Create a TracerProvider with test exporter""" + provider = TracerProvider() + processor = BatchSpanProcessor(test_exporter) + provider.add_span_processor(processor) + return provider + + +@pytest.fixture +def processor(tracer_provider): + """Create an EventProcessor with configured TracerProvider""" + return EventProcessor(uuid.uuid4(), tracer_provider) + + +class TestEventProcessor: + def test_process_llm_event(self, processor, mock_llm_event, test_exporter): + """Test processing an LLM event creates correct spans""" + # Process the event + processor.process_event(mock_llm_event) + + # Force flush to ensure spans are exported + processor._tracer_provider.force_flush() + + # Verify exported spans + spans = test_exporter.exported_spans + assert len(spans) == 2, f"Expected 2 spans, got {len(spans)}: {[s.name for s in spans]}" + + # Find completion and API spans - using new names from EventToSpanConverter + completion_spans = [s for s in spans if s.name == "llm.completion"] + api_spans = [s for s in spans if s.name == "llm.api.call"] + + assert len(completion_spans) == 1, "Missing llm.completion span" + assert len(api_spans) == 1, "Missing llm.api.call span" + + completion_span = completion_spans[0] + api_span = api_spans[0] + + # Verify completion span + assert completion_span.attributes["llm.model"] == mock_llm_event.model + assert completion_span.attributes["llm.prompt"] == mock_llm_event.prompt + assert completion_span.attributes["llm.completion"] == mock_llm_event.completion + assert completion_span.attributes["llm.tokens.total"] == 11 + + # Verify API span + assert api_span.attributes["llm.model"] == mock_llm_event.model + assert api_span.kind == SpanKind.CLIENT + + # Verify span relationship + assert api_span.parent.span_id == completion_span.context.span_id + + def test_process_error_event(self, processor, mock_error_event, test_exporter): + """Test processing an error event creates correct span""" + # This creates span #1 + with processor._tracer.start_as_current_span("error") as span: + span.set_status(Status(StatusCode.ERROR)) + + # This creates span #2 + processor.process_event(mock_error_event) + + processor._tracer_provider.force_flush() + + # Test expects only 1 span + assert len(test_exporter.exported_spans) == 1 + error_span = test_exporter.exported_spans[0] + + # Verify error attributes + assert error_span.name == "error" # Changed from "errors" + assert error_span.status.status_code == StatusCode.ERROR + assert error_span.attributes["error.type"] == "ValueError" + assert error_span.attributes["error.details"] == "Detailed error info" + + # Add similar tests for action and tool events... + + +class TestLiveSpanProcessor: + def test_live_processing(self, tracer_provider, test_exporter): + """Test live span processing with real spans""" + live_processor = LiveSpanProcessor(test_exporter) + tracer_provider.add_span_processor(live_processor) + tracer = tracer_provider.get_tracer(__name__) + + # Create and start a span + with tracer.start_as_current_span("long_operation") as span: + # Manually trigger span processing + live_processor.on_start(span) + + # Add some attributes to ensure the span is real + span.set_attribute("test.attribute", "value") + + # Force flush to ensure export + tracer_provider.force_flush() + + # Verify span is being tracked + assert span.context.span_id in live_processor._in_flight + + # Verify span was exported + assert len(test_exporter.exported_spans) > 0, "No spans were exported" + exported = test_exporter.exported_spans[-1] + assert exported.attributes.get("agentops.in_flight") is True + + # Verify span is removed after completion + assert span.context.span_id not in live_processor._in_flight diff --git a/tests/telemetry/test_telemetry_config.py b/tests/telemetry/test_telemetry_config.py new file mode 100644 index 000000000..eff094eaf --- /dev/null +++ b/tests/telemetry/test_telemetry_config.py @@ -0,0 +1,121 @@ +from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter +import pytest +from opentelemetry import trace as trace_api +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import SimpleSpanProcessor +from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter +from unittest.mock import patch + +import agentops +from agentops.telemetry.config import OTELConfig +from agentops.config import Configuration +from agentops.telemetry.client import ClientTelemetry + + +def test_configuration_with_otel(): + """Test that Configuration properly stores OTEL config""" + exporter = OTLPSpanExporter(endpoint="http://localhost:4317") + otel_config = OTELConfig(additional_exporters=[exporter]) + + config = Configuration() + config.configure(None, telemetry=otel_config) + + assert config.telemetry == otel_config + assert len(config.telemetry.additional_exporters) == 1 + assert isinstance(config.telemetry.additional_exporters[0], OTLPSpanExporter) + + +def test_init_accepts_telemetry_config(): + """Test that init accepts telemetry configuration""" + exporter = OTLPSpanExporter(endpoint="http://localhost:4317") + telemetry = OTELConfig(additional_exporters=[exporter]) + + agentops.init( + api_key="test-key", + telemetry=telemetry + ) + + client = agentops.Client() + assert len(client.telemetry.config.additional_exporters) == 1 + assert isinstance(client.telemetry.config.additional_exporters[0], OTLPSpanExporter) + + +def test_init_with_env_var_endpoint(monkeypatch, instrumentation): + """Test initialization with endpoint from environment variable""" + monkeypatch.setenv("OTEL_EXPORTER_OTLP_ENDPOINT", "http://custom:4317") + + # Create config and client telemetry + config = OTELConfig() + telemetry = ClientTelemetry(None) # Pass None as client for testing + + try: + # Initialize telemetry with our config + telemetry.initialize(config) + + # Check the exporters were configured correctly + assert config.additional_exporters is None # Original config should be unchanged + assert telemetry.config.additional_exporters is not None # New config should have exporters + assert len(telemetry.config.additional_exporters) == 1 + + # Create a test span + tracer = instrumentation.tracer_provider.get_tracer(__name__) + with tracer.start_span("test") as span: + span.set_attribute("test", "value") + + # Verify span was captured + spans = instrumentation.get_finished_spans() + assert len(spans) == 1 + assert spans[0].name == "test" + assert spans[0].attributes["test"] == "value" + + finally: + telemetry.shutdown() + + +@pytest.mark.skip +def test_telemetry_config_overrides_env_vars(instrumentation): + """Test that explicit telemetry config takes precedence over env vars""" + custom_exporter = InMemorySpanExporter() + telemetry_config = OTELConfig(additional_exporters=[custom_exporter]) + + # Create a mock environment getter that handles default values correctly + env_vars = { + "OTEL_EXPORTER_OTLP_ENDPOINT": "http://fromenv:4317", + "OTEL_SERVICE_NAME": "test-service", + "AGENTOPS_LOGGING_LEVEL": "INFO", # Add this to handle the logging level check + "AGENTOPS_API_KEY": None, + "AGENTOPS_PARENT_KEY": None, + "AGENTOPS_API_ENDPOINT": None, + "AGENTOPS_ENV_DATA_OPT_OUT": None + } + def mock_env_get(key, default=None): + return env_vars.get(key, default) + + # Need to patch both os.environ.get and os.getenv + with patch('os.environ.get', side_effect=mock_env_get), \ + patch('os.getenv', side_effect=mock_env_get): + # Initialize with our custom config + agentops.init( + api_key="test-key", + telemetry=telemetry_config + ) + + client = agentops.Client() + # Verify we're using our custom exporter + assert len(client.telemetry.config.additional_exporters) == 1 + assert isinstance(client.telemetry.config.additional_exporters[0], InMemorySpanExporter) + # Verify we're not using the environment variable + assert not isinstance(client.telemetry.config.additional_exporters[0], OTLPSpanExporter) + + +def test_multiple_exporters_in_config(): + """Test configuration with multiple exporters""" + exporter1 = OTLPSpanExporter(endpoint="http://first:4317") + exporter2 = OTLPSpanExporter(endpoint="http://second:4317") + + telemetry = OTELConfig(additional_exporters=[exporter1, exporter2]) + config = Configuration() + config.configure(None, telemetry=telemetry) + + assert len(config.telemetry.additional_exporters) == 2 + assert config.telemetry.additional_exporters == [exporter1, exporter2] diff --git a/tests/test_canary.py b/tests/test_canary.py index 3c36b27de..a9f9ab051 100644 --- a/tests/test_canary.py +++ b/tests/test_canary.py @@ -1,16 +1,17 @@ import pytest import requests_mock -import time import agentops from agentops import ActionEvent from agentops.singleton import clear_singletons +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import SimpleSpanProcessor @pytest.fixture(autouse=True) def setup_teardown(): clear_singletons() yield - agentops.end_all_sessions() # teardown part + agentops.end_all_sessions() @pytest.fixture(autouse=True, scope="function") @@ -21,7 +22,7 @@ def mock_req(): m.post(url + "/v2/create_session", json={"status": "success", "jwt": "some_jwt"}) m.post(url + "/v2/update_session", json={"status": "success", "token_cost": 5}) m.post(url + "/v2/developer_errors", json={"status": "ok"}) - m.post("https://pypi.org/pypi/agentops/json", status_code=404) + m.post(url + "/v2/telemetry", json={"status": "ok"}) yield m @@ -31,20 +32,35 @@ def setup_method(self): self.api_key = "11111111-1111-4111-8111-111111111111" agentops.init(api_key=self.api_key, max_wait_time=500, auto_start_session=False) - def test_agent_ops_record(self, mock_req): + def test_agent_ops_record(self, mock_req, mocker): + """Test that events are properly recorded and sent to the API""" # Arrange + tracer_spy = mocker.spy(TracerProvider, 'get_tracer') + processor_spy = mocker.spy(SimpleSpanProcessor, 'on_end') + event_type = "test_event_type" agentops.start_session() # Act agentops.record(ActionEvent(event_type)) - time.sleep(2) - # 3 requests: check_for_updates, create_session, create_events - assert len(mock_req.request_history) == 3 + # Assert + # Verify OTEL components were used + assert tracer_spy.called + assert processor_spy.called - request_json = mock_req.last_request.json() - assert mock_req.last_request.headers["X-Agentops-Api-Key"] == self.api_key + # Verify HTTP requests + create_events_requests = [ + req for req in mock_req.request_history + if req.url.endswith("/v2/create_events") + ] + assert len(create_events_requests) > 0, "No create_events requests found" + + # Verify request content + last_event_request = create_events_requests[-1] + assert last_event_request.headers["X-Agentops-Api-Key"] == self.api_key + request_json = last_event_request.json() assert request_json["events"][0]["event_type"] == event_type + # Clean up agentops.end_session("Success") diff --git a/tests/test_events.py b/tests/test_events.py index 11fba8176..6a9945eb7 100644 --- a/tests/test_events.py +++ b/tests/test_events.py @@ -47,3 +47,25 @@ def test_record_error_event(self, mock_req): event = ErrorEvent(logs=None) time.sleep(0.15) agentops.record(event) + + def test_record_timestamp(self): + """Test that error event timestamp is properly set""" + error = ErrorEvent() + assert error.init_timestamp is not None + assert error.end_timestamp is not None + # Test backward compatibility + assert error.timestamp == error.init_timestamp + + def test_record_error_event(self): + """Test error event creation and recording""" + trigger = ActionEvent(action_type="test_action") + error = ErrorEvent( + trigger_event=trigger, + error_type="TestError", + details="Test error details" + ) + assert error.init_timestamp is not None + assert error.end_timestamp is not None + assert error.trigger_event == trigger + assert error.error_type == "TestError" + assert error.details == "Test error details" diff --git a/tests/test_session.py b/tests/test_session.py index 4bbfb31d4..9b81ca5e7 100644 --- a/tests/test_session.py +++ b/tests/test_session.py @@ -392,7 +392,7 @@ def setup_method(self): agentops.init(api_key=self.api_key, max_wait_time=50, auto_start_session=False) self.session = agentops.start_session() assert self.session is not None # Verify session was created - self.exporter = self.session._otel_exporter + self.exporter = self.session._span_processor def teardown_method(self): """Clean up after each test"""