From f5c1458a6270c7edbdfdb679f1bbad89347d4df9 Mon Sep 17 00:00:00 2001 From: Pratyush Shukla Date: Wed, 28 May 2025 13:22:33 +0530 Subject: [PATCH 01/20] fix session decorator resource management --- agentops/sdk/decorators/factory.py | 172 ++++++++++++++++++----------- 1 file changed, 110 insertions(+), 62 deletions(-) diff --git a/agentops/sdk/decorators/factory.py b/agentops/sdk/decorators/factory.py index cbf0e7026..4fc03601a 100644 --- a/agentops/sdk/decorators/factory.py +++ b/agentops/sdk/decorators/factory.py @@ -21,6 +21,114 @@ ) +def _handle_session_trace_sync( + operation_name: str, tags: Optional[Union[list, dict]], wrapped_func: Callable, args: tuple, kwargs: Dict[str, Any] +) -> Any: + """Helper function to handle SESSION trace lifecycle for sync functions with proper cleanup""" + trace_context: Optional[TraceContext] = None + trace_ended = False + + try: + # Start trace + trace_context = TracingCore.get_instance().start_trace(trace_name=operation_name, tags=tags) + if not trace_context: + logger.error(f"Failed to start trace for @trace '{operation_name}'. Executing without trace.") + return wrapped_func(*args, **kwargs) + + # Record input + try: + _record_entity_input(trace_context.span, args, kwargs) + except Exception as e: + logger.warning(f"Input recording failed for @trace '{operation_name}': {e}") + + # Execute function + result = wrapped_func(*args, **kwargs) + + # Record output + try: + _record_entity_output(trace_context.span, result) + except Exception as e: + logger.warning(f"Output recording failed for @trace '{operation_name}': {e}") + + # End trace successfully + TracingCore.get_instance().end_trace(trace_context, "Success") + trace_ended = True + return result + + except Exception: + # End trace with failure if not already ended + if trace_context and not trace_ended: + try: + TracingCore.get_instance().end_trace(trace_context, "Failure") + trace_ended = True + except Exception as cleanup_error: + logger.error(f"Failed to end trace during exception cleanup: {cleanup_error}") + raise + + finally: + # Safety net - only end if not already ended and still recording + if trace_context and not trace_ended and trace_context.span.is_recording(): + try: + TracingCore.get_instance().end_trace(trace_context, "Unknown") + logger.warning(f"Trace for @trace '{operation_name}' ended in finally block as 'Unknown'.") + except Exception as cleanup_error: + logger.error(f"Failed to end trace in finally block: {cleanup_error}") + + +async def _handle_session_trace_async( + operation_name: str, tags: Optional[Union[list, dict]], wrapped_func: Callable, args: tuple, kwargs: Dict[str, Any] +) -> Any: + """Helper function to handle SESSION trace lifecycle for async functions with proper cleanup""" + trace_context: Optional[TraceContext] = None + trace_ended = False + + try: + # Start trace + trace_context = TracingCore.get_instance().start_trace(trace_name=operation_name, tags=tags) + if not trace_context: + logger.error(f"Failed to start trace for @trace '{operation_name}'. Executing without trace.") + return await wrapped_func(*args, **kwargs) + + # Record input + try: + _record_entity_input(trace_context.span, args, kwargs) + except Exception as e: + logger.warning(f"Input recording failed for @trace '{operation_name}': {e}") + + # Execute function + result = await wrapped_func(*args, **kwargs) + + # Record output + try: + _record_entity_output(trace_context.span, result) + except Exception as e: + logger.warning(f"Output recording failed for @trace '{operation_name}': {e}") + + # End trace successfully + TracingCore.get_instance().end_trace(trace_context, "Success") + trace_ended = True + return result + + except Exception: + # End trace with failure if not already ended + if trace_context and not trace_ended: + try: + TracingCore.get_instance().end_trace(trace_context, "Failure") + trace_ended = True + except Exception as cleanup_error: + logger.error(f"Failed to end trace during exception cleanup: {cleanup_error}") + raise + + finally: + # Safety net - only end if not already ended and still recording + if trace_context and not trace_ended and trace_context.span.is_recording(): + try: + TracingCore.get_instance().end_trace(trace_context, "Unknown") + logger.warning(f"Trace for @trace '{operation_name}' ended in finally block as 'Unknown'.") + except Exception as cleanup_error: + logger.error(f"Failed to end trace in finally block: {cleanup_error}") + + def create_entity_decorator(entity_kind: str) -> Callable[..., Any]: """ Factory that creates decorators for instrumenting functions and classes. @@ -96,69 +204,9 @@ def wrapper( ) # Fallthrough to existing generator logic which creates a single span. elif is_async: - - async def _wrapped_session_async() -> Any: - trace_context: Optional[TraceContext] = None - try: - trace_context = TracingCore.get_instance().start_trace(trace_name=operation_name, tags=tags) - if not trace_context: - logger.error( - f"Failed to start trace for @trace '{operation_name}'. Executing without trace." - ) - return await wrapped_func(*args, **kwargs) - try: - _record_entity_input(trace_context.span, args, kwargs) - except Exception as e: - logger.warning(f"Input recording failed for @trace '{operation_name}': {e}") - result = await wrapped_func(*args, **kwargs) - try: - _record_entity_output(trace_context.span, result) - except Exception as e: - logger.warning(f"Output recording failed for @trace '{operation_name}': {e}") - TracingCore.get_instance().end_trace(trace_context, "Success") - return result - except Exception: - if trace_context: - TracingCore.get_instance().end_trace(trace_context, "Failure") - raise - finally: - if trace_context and trace_context.span.is_recording(): - logger.warning( - f"Trace for @trace '{operation_name}' not explicitly ended. Ending as 'Unknown'." - ) - TracingCore.get_instance().end_trace(trace_context, "Unknown") - - return _wrapped_session_async() + return _handle_session_trace_async(operation_name, tags, wrapped_func, args, kwargs) else: # Sync function for SpanKind.SESSION - trace_context: Optional[TraceContext] = None - try: - trace_context = TracingCore.get_instance().start_trace(trace_name=operation_name, tags=tags) - if not trace_context: - logger.error( - f"Failed to start trace for @trace '{operation_name}'. Executing without trace." - ) - return wrapped_func(*args, **kwargs) - try: - _record_entity_input(trace_context.span, args, kwargs) - except Exception as e: - logger.warning(f"Input recording failed for @trace '{operation_name}': {e}") - result = wrapped_func(*args, **kwargs) - try: - _record_entity_output(trace_context.span, result) - except Exception as e: - logger.warning(f"Output recording failed for @trace '{operation_name}': {e}") - TracingCore.get_instance().end_trace(trace_context, "Success") - return result - except Exception: - if trace_context: - TracingCore.get_instance().end_trace(trace_context, "Failure") - raise - finally: - if trace_context and trace_context.span.is_recording(): - logger.warning( - f"Trace for @trace '{operation_name}' not explicitly ended. Ending as 'Unknown'." - ) - TracingCore.get_instance().end_trace(trace_context, "Unknown") + return _handle_session_trace_sync(operation_name, tags, wrapped_func, args, kwargs) # Logic for non-SESSION kinds or generators under @trace (as per fallthrough) elif is_generator: From 5fa1cb84ebf629cae93ddf63b6439dd2303eae16 Mon Sep 17 00:00:00 2001 From: Pratyush Shukla Date: Wed, 28 May 2025 13:26:43 +0530 Subject: [PATCH 02/20] fix api consistency issues in `decorators/__init__.py` --- agentops/__init__.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/agentops/__init__.py b/agentops/__init__.py index 3b252759a..2045c26f8 100755 --- a/agentops/__init__.py +++ b/agentops/__init__.py @@ -106,7 +106,8 @@ def init( elif default_tags: merged_tags = default_tags - return _client.init( + # Initialize the client and handle return value consistency + _client.init( api_key=api_key, endpoint=endpoint, app_url=app_url, @@ -125,6 +126,10 @@ def init( **kwargs, ) + # For API consistency, always return the client instance + # Users can access sessions via start_trace() or legacy functions + return _client + def configure(**kwargs): """Update client configuration From 9d46fba3fc38622664ef1d152afd401b94884570 Mon Sep 17 00:00:00 2001 From: Pratyush Shukla Date: Wed, 28 May 2025 13:26:57 +0530 Subject: [PATCH 03/20] deprecation warning when using session decorator --- agentops/sdk/decorators/__init__.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/agentops/sdk/decorators/__init__.py b/agentops/sdk/decorators/__init__.py index f775b45d5..994aaba31 100644 --- a/agentops/sdk/decorators/__init__.py +++ b/agentops/sdk/decorators/__init__.py @@ -3,7 +3,6 @@ Provides @trace for creating trace-level spans (sessions) and other decorators for nested spans. """ -import functools from termcolor import colored from agentops.logging import logger @@ -16,15 +15,21 @@ operation_decorator = create_entity_decorator(SpanKind.OPERATION) workflow = create_entity_decorator(SpanKind.WORKFLOW) trace = create_entity_decorator(SpanKind.SESSION) -session = create_entity_decorator(SpanKind.SESSION) tool = create_entity_decorator(SpanKind.TOOL) -operation = task -# For backward compatibility: @session decorator calls @trace decorator -@functools.wraps(trace) + +# For backward compatibility: @session decorator calls @trace decorator with deprecation warning def session(*args, **kwargs): """@deprecated Use @agentops.trace instead. Wraps the @trace decorator for backward compatibility.""" + import warnings + + warnings.warn( + "@agentops.session decorator is deprecated. Please use @agentops.trace instead.", + DeprecationWarning, + stacklevel=2, + ) logger.info(colored("@agentops.session decorator is deprecated. Please use @agentops.trace instead.", "yellow")) + # If called as @session or @session(...) if not args or not callable(args[0]): # called with kwargs like @session(name=...) return trace(*args, **kwargs) From 811bffadd689534c016e1eeb3451a44aff5e5962 Mon Sep 17 00:00:00 2001 From: Pratyush Shukla Date: Wed, 28 May 2025 15:24:46 +0530 Subject: [PATCH 04/20] make `int` as `Required` --- agentops/sdk/types.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/agentops/sdk/types.py b/agentops/sdk/types.py index 0d4e37bcb..3a2395735 100644 --- a/agentops/sdk/types.py +++ b/agentops/sdk/types.py @@ -1,4 +1,4 @@ -from typing import Annotated, Optional, TypedDict +from typing import Annotated, Optional, Required, TypedDict from opentelemetry.sdk.trace import SpanProcessor from opentelemetry.sdk.trace.export import SpanExporter @@ -16,6 +16,6 @@ class TracingConfig(TypedDict, total=False): metrics_endpoint: Optional[str] api_key: Optional[str] # API key for authentication with AgentOps services project_id: Optional[str] # Project ID to include in resource attributes - max_queue_size: int # Required with a default value - max_wait_time: int # Required with a default value - export_flush_interval: int # Time interval between automatic exports + max_queue_size: Required[int] # Required with a default value + max_wait_time: Required[int] # Required with a default value + export_flush_interval: Required[int] # Time interval between automatic exports From 3b9585dc6591bd273e323040997b5fed1dd710dc Mon Sep 17 00:00:00 2001 From: Pratyush Shukla Date: Wed, 28 May 2025 15:26:25 +0530 Subject: [PATCH 05/20] use method to format trace id --- agentops/sdk/converters.py | 17 +++++++++++++++++ agentops/sdk/core.py | 13 +++---------- 2 files changed, 20 insertions(+), 10 deletions(-) diff --git a/agentops/sdk/converters.py b/agentops/sdk/converters.py index fee21e257..8febe470a 100644 --- a/agentops/sdk/converters.py +++ b/agentops/sdk/converters.py @@ -124,3 +124,20 @@ def camel_to_snake(text: str) -> str: text = re.sub("(.)([A-Z][a-z]+)", r"\1_\2", text) return re.sub("([a-z0-9])([A-Z])", r"\1_\2", text).lower() + + +def format_trace_id(trace_id: int) -> str: + """ + Format trace ID consistently as hex string with error handling. + + Args: + trace_id: The trace ID integer to format + + Returns: + Formatted trace ID as hex string + """ + try: + return f"{trace_id:x}" + except (TypeError, ValueError): + # Handle case where trace_id is not a valid integer + return str(trace_id) diff --git a/agentops/sdk/core.py b/agentops/sdk/core.py index d36c55228..871a2f935 100644 --- a/agentops/sdk/core.py +++ b/agentops/sdk/core.py @@ -24,6 +24,7 @@ from agentops.sdk.types import TracingConfig from agentops.semconv import ResourceAttributes, SpanKind, SpanAttributes, CoreAttributes from agentops.helpers.dashboard import log_trace_url +from agentops.sdk.converters import format_trace_id # No need to create shortcuts since we're using our own ResourceAttributes class now @@ -444,11 +445,7 @@ def start_trace( # Track the active trace with self._traces_lock: - try: - trace_id = f"{span.get_span_context().trace_id:x}" - except (TypeError, ValueError): - # Handle case where span is mocked or trace_id is not a valid integer - trace_id = str(span.get_span_context().trace_id) + trace_id = format_trace_id(span.get_span_context().trace_id) self._active_traces[trace_id] = trace_context logger.debug(f"Added trace {trace_id} to active traces. Total active: {len(self._active_traces)}") @@ -496,11 +493,7 @@ def _end_single_trace(self, trace_context: TraceContext, end_state: str) -> None span = trace_context.span token = trace_context.token - try: - trace_id = f"{span.get_span_context().trace_id:x}" - except (TypeError, ValueError): - # Handle case where span is mocked or trace_id is not a valid integer - trace_id = str(span.get_span_context().trace_id) + trace_id = format_trace_id(span.get_span_context().trace_id) logger.debug(f"Ending trace with span ID: {span.get_span_context().span_id}, end_state: {end_state}") From 9f4d3072656a91fbd9007c68a3fbc14a6f370b15 Mon Sep 17 00:00:00 2001 From: Pratyush Shukla Date: Wed, 28 May 2025 15:30:20 +0530 Subject: [PATCH 06/20] improved error handling in span processor flushing --- agentops/sdk/core.py | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/agentops/sdk/core.py b/agentops/sdk/core.py index 871a2f935..c6e91f0da 100644 --- a/agentops/sdk/core.py +++ b/agentops/sdk/core.py @@ -331,16 +331,26 @@ def shutdown(self) -> None: logger.debug("Tracing core shut down") def _flush_span_processors(self) -> None: - """Helper to force flush all span processors.""" - if not self._provider or not hasattr(self._provider, "force_flush"): - logger.debug("No provider or provider cannot force_flush.") + """Helper to force flush all span processors with comprehensive error handling.""" + if not self._provider: + logger.debug("No provider available for force_flush.") + return + + if not hasattr(self._provider, "force_flush"): + logger.debug("Provider does not support force_flush.") return try: + logger.debug("Attempting to force flush span processors...") self._provider.force_flush() # type: ignore - logger.debug("Provider force_flush completed.") + logger.debug("Provider force_flush completed successfully.") + except AttributeError as e: + logger.warning(f"Provider force_flush method not available: {e}") + except RuntimeError as e: + logger.warning(f"Runtime error during force_flush (provider may be shutting down): {e}") except Exception as e: - logger.warning(f"Failed to force flush provider's span processors: {e}", exc_info=True) + logger.error(f"Unexpected error during force_flush: {e}", exc_info=True) + # Continue execution - don't let flush failures break the application def get_tracer(self, name: str = "agentops") -> trace.Tracer: """ From 4148fe530247a50b2ea700f00a13fdd2a21dc63f Mon Sep 17 00:00:00 2001 From: Pratyush Shukla Date: Wed, 28 May 2025 15:31:07 +0530 Subject: [PATCH 07/20] prevent resource leak in `LiveSpanProcessor` --- agentops/sdk/processors.py | 24 +++++++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/agentops/sdk/processors.py b/agentops/sdk/processors.py index 2f65a1637..d82f9aa32 100644 --- a/agentops/sdk/processors.py +++ b/agentops/sdk/processors.py @@ -57,9 +57,27 @@ def on_end(self, span: ReadableSpan) -> None: self.span_exporter.export((span,)) def shutdown(self) -> None: - self._stop_event.set() - self._export_thread.join() - self.span_exporter.shutdown() + """Shutdown the processor with proper thread lifecycle management.""" + try: + # Signal the export thread to stop + self._stop_event.set() + + # Wait for the thread to finish with a timeout to prevent hanging + if self._export_thread.is_alive(): + self._export_thread.join(timeout=5.0) + + # If thread is still alive after timeout, log a warning + if self._export_thread.is_alive(): + logger.warning("Export thread did not shut down within timeout, continuing shutdown") + + except Exception as e: + logger.error(f"Error during thread shutdown: {e}") + + # Always attempt to shutdown the exporter + try: + self.span_exporter.shutdown() + except Exception as e: + logger.error(f"Error shutting down span exporter: {e}") def force_flush(self, timeout_millis: int = 30000) -> bool: return True From 339b3d10c90e8a531cab4c593e1d757f9cb4788f Mon Sep 17 00:00:00 2001 From: Pratyush Shukla Date: Wed, 28 May 2025 19:26:15 +0530 Subject: [PATCH 08/20] better method to create span --- agentops/sdk/decorators/factory.py | 21 +++-- agentops/sdk/decorators/utility.py | 137 ++++++++++++++++++----------- 2 files changed, 98 insertions(+), 60 deletions(-) diff --git a/agentops/sdk/decorators/factory.py b/agentops/sdk/decorators/factory.py index 4fc03601a..18da863bc 100644 --- a/agentops/sdk/decorators/factory.py +++ b/agentops/sdk/decorators/factory.py @@ -12,8 +12,8 @@ from agentops.semconv import SpanAttributes, CoreAttributes from .utility import ( + create_span, _create_as_current_span, - _make_span, _process_async_generator, _process_sync_generator, _record_entity_input, @@ -208,13 +208,15 @@ def wrapper( else: # Sync function for SpanKind.SESSION return _handle_session_trace_sync(operation_name, tags, wrapped_func, args, kwargs) - # Logic for non-SESSION kinds or generators under @trace (as per fallthrough) + # Logic for non-SESSION kinds using standardized context management elif is_generator: - span, _, token = _make_span( + # Generators require manual lifecycle management + span, _, token = create_span( operation_name, entity_kind, version=version, attributes={CoreAttributes.TAGS: tags} if tags else None, + manual_lifecycle=True, ) try: _record_entity_input(span, args, kwargs) @@ -226,11 +228,13 @@ def wrapper( result = wrapped_func(*args, **kwargs) return _process_sync_generator(span, result) elif is_async_generator: - span, _, token = _make_span( + # Async generators require manual lifecycle management + span, _, token = create_span( operation_name, entity_kind, version=version, attributes={CoreAttributes.TAGS: tags} if tags else None, + manual_lifecycle=True, ) try: _record_entity_input(span, args, kwargs) @@ -242,13 +246,14 @@ def wrapper( result = wrapped_func(*args, **kwargs) return _process_async_generator(span, token, result) elif is_async: - + # Async functions use context manager (OpenTelemetry best practice) async def _wrapped_async() -> Any: - with _create_as_current_span( + with create_span( operation_name, entity_kind, version=version, attributes={CoreAttributes.TAGS: tags} if tags else None, + manual_lifecycle=False, ) as span: try: _record_entity_input(span, args, kwargs) @@ -271,11 +276,13 @@ async def _wrapped_async() -> Any: return _wrapped_async() else: # Sync function for non-SESSION kinds - with _create_as_current_span( + # Sync functions use context manager (OpenTelemetry best practice) + with create_span( operation_name, entity_kind, version=version, attributes={CoreAttributes.TAGS: tags} if tags else None, + manual_lifecycle=False, ) as span: try: _record_entity_input(span, args, kwargs) diff --git a/agentops/sdk/decorators/utility.py b/agentops/sdk/decorators/utility.py index 5dee1d412..1a22f6187 100644 --- a/agentops/sdk/decorators/utility.py +++ b/agentops/sdk/decorators/utility.py @@ -61,6 +61,39 @@ async def _process_async_generator(span: trace.Span, context_token: Any, generat context_api.detach(context_token) +def create_span( + operation_name: str, + span_kind: str, + version: Optional[int] = None, + attributes: Optional[Dict[str, Any]] = None, + manual_lifecycle: bool = False, +): + """ + Unified span creation interface that chooses the appropriate method based on requirements. + + This function standardizes span creation across the SDK by choosing between: + - Context manager approach (preferred): For automatic lifecycle management + - Manual approach: For special cases like generators and SESSION spans + + Args: + operation_name: Name of the operation being traced + span_kind: Type of operation (from SpanKind) + version: Optional version identifier for the operation + attributes: Optional dictionary of attributes to set on the span + manual_lifecycle: If True, returns span with manual lifecycle control + + Returns: + - If manual_lifecycle=False: Context manager for automatic span lifecycle + - If manual_lifecycle=True: Tuple of (span, context, token) for manual control + """ + if manual_lifecycle or span_kind == SpanKind.SESSION: + # Use manual lifecycle for SESSION spans and when explicitly requested + return _make_span(operation_name, span_kind, version, attributes) + else: + # Use context manager for regular operations (OpenTelemetry best practice) + return _create_as_current_span(operation_name, span_kind, version, attributes) + + def _get_current_span_info(): """Helper to get information about the current span for debugging""" current_span = trace.get_current_span() @@ -75,16 +108,44 @@ def _get_current_span_info(): return {"name": "No current span"} +def _prepare_span_attributes( + operation_name: str, span_kind: str, version: Optional[int] = None, attributes: Optional[Dict[str, Any]] = None +) -> Dict[str, Any]: + """ + Prepare standardized span attributes for AgentOps spans. + + Args: + operation_name: Name of the operation being traced + span_kind: Type of operation (from SpanKind) + version: Optional version identifier for the operation + attributes: Optional dictionary of additional attributes + + Returns: + Dictionary of prepared span attributes + """ + if attributes is None: + attributes = {} + + # Add span kind to attributes + attributes[SpanAttributes.AGENTOPS_SPAN_KIND] = span_kind + + # Add standard attributes + attributes[SpanAttributes.OPERATION_NAME] = operation_name + if version is not None: + attributes[SpanAttributes.OPERATION_VERSION] = version + + return attributes + + @contextmanager def _create_as_current_span( operation_name: str, span_kind: str, version: Optional[int] = None, attributes: Optional[Dict[str, Any]] = None ) -> Generator[Span, None, None]: """ - Create and yield an instrumentation span as the current span using proper context management. + Create and yield an instrumentation span as the current span using OpenTelemetry best practices. - This function creates a span that will automatically be nested properly - within any parent span based on the current execution context, using OpenTelemetry's - context management to properly handle span lifecycle. + This is the preferred method for creating spans as it uses OpenTelemetry's built-in + context management for automatic span lifecycle handling and proper nesting. Args: operation_name: Name of the operation being traced @@ -95,55 +156,32 @@ def _create_as_current_span( Yields: A span with proper context that will be automatically closed when exiting the context """ - # Log before we do anything - before_span = _get_current_span_info() - logger.debug(f"[DEBUG] BEFORE {operation_name}.{span_kind} - Current context: {before_span}") - # Create span with proper naming convention span_name = f"{operation_name}.{span_kind}" # Get tracer tracer = TracingCore.get_instance().get_tracer() - # Prepare attributes - if attributes is None: - attributes = {} - - # Add span kind to attributes - attributes[SpanAttributes.AGENTOPS_SPAN_KIND] = span_kind - - # Add standard attributes - attributes[SpanAttributes.OPERATION_NAME] = operation_name - if version is not None: - attributes[SpanAttributes.OPERATION_VERSION] = version - - # Get current context explicitly to debug it - current_context = context_api.get_current() - - # Use OpenTelemetry's context manager to properly handle span lifecycle - with tracer.start_as_current_span(span_name, attributes=attributes, context=current_context) as span: - # Log after span creation - if hasattr(span, "get_span_context"): - span_ctx = span.get_span_context() - logger.debug( - f"[DEBUG] CREATED {span_name} - span_id: {span_ctx.span_id:x}, parent: {before_span.get('span_id', 'None')}" - ) + # Prepare standardized attributes + prepared_attributes = _prepare_span_attributes(operation_name, span_kind, version, attributes) + # Use OpenTelemetry's context manager for automatic lifecycle management + with tracer.start_as_current_span(span_name, attributes=prepared_attributes) as span: + logger.debug(f"Created span: {span_name}") yield span - # Log after we're done - after_span = _get_current_span_info() - logger.debug(f"[DEBUG] AFTER {operation_name}.{span_kind} - Returned to context: {after_span}") - def _make_span( operation_name: str, span_kind: str, version: Optional[int] = None, attributes: Optional[Dict[str, Any]] = None ) -> tuple: """ - Create a span without context management for manual span lifecycle control. + Create a span with manual lifecycle control for special cases. - This function creates a span that will be properly nested within any parent span - based on the current execution context, but requires manual ending via _finalize_span. + This function is used for cases that require manual span management: + - Trace spans (root spans that need special lifecycle handling) + - Generator functions (where context managers don't work well) + + For regular operations, prefer _create_as_current_span which uses OpenTelemetry best practices. Args: operation_name: Name of the operation being traced @@ -163,27 +201,20 @@ def _make_span( # Get tracer tracer = TracingCore.get_instance().get_tracer() - # Prepare attributes - if attributes is None: - attributes = {} - - # Add span kind to attributes - attributes[SpanAttributes.AGENTOPS_SPAN_KIND] = span_kind - - # Add standard attributes - attributes[SpanAttributes.OPERATION_NAME] = operation_name - if version is not None: - attributes[SpanAttributes.OPERATION_VERSION] = version + # Prepare standardized attributes + prepared_attributes = _prepare_span_attributes(operation_name, span_kind, version, attributes) current_context = context_api.get_current() - # Create the span with proper context management + # Create the span with appropriate context handling if span_kind == SpanKind.SESSION: # For session spans, create as a root span - span = tracer.start_span(span_name, attributes=attributes) + span = tracer.start_span(span_name, attributes=prepared_attributes) + logger.debug(f"Created root trace span: {span_name}") else: - # For other spans, use the current context - span = tracer.start_span(span_name, context=current_context, attributes=attributes) + # For other spans, use the current context for proper nesting + span = tracer.start_span(span_name, context=current_context, attributes=prepared_attributes) + logger.debug(f"Created nested span: {span_name}") # Set as current context and get token for detachment ctx = trace.set_span_in_context(span) From bf850af191fb9b18b1109d993481c2e6e56cfb32 Mon Sep 17 00:00:00 2001 From: Pratyush Shukla Date: Wed, 28 May 2025 19:27:17 +0530 Subject: [PATCH 09/20] Revert "fix api consistency issues in `decorators/__init__.py`" This reverts commit 5fa1cb84ebf629cae93ddf63b6439dd2303eae16. --- agentops/__init__.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/agentops/__init__.py b/agentops/__init__.py index 2045c26f8..3b252759a 100755 --- a/agentops/__init__.py +++ b/agentops/__init__.py @@ -106,8 +106,7 @@ def init( elif default_tags: merged_tags = default_tags - # Initialize the client and handle return value consistency - _client.init( + return _client.init( api_key=api_key, endpoint=endpoint, app_url=app_url, @@ -126,10 +125,6 @@ def init( **kwargs, ) - # For API consistency, always return the client instance - # Users can access sessions via start_trace() or legacy functions - return _client - def configure(**kwargs): """Update client configuration From c9a05320937a7839d7d134d1214ea1fdb01324f3 Mon Sep 17 00:00:00 2001 From: Pratyush Shukla Date: Wed, 28 May 2025 22:21:02 +0530 Subject: [PATCH 10/20] Revert "better method to create span" This reverts commit 339b3d10c90e8a531cab4c593e1d757f9cb4788f. --- agentops/sdk/decorators/factory.py | 21 ++--- agentops/sdk/decorators/utility.py | 137 +++++++++++------------------ 2 files changed, 60 insertions(+), 98 deletions(-) diff --git a/agentops/sdk/decorators/factory.py b/agentops/sdk/decorators/factory.py index 18da863bc..4fc03601a 100644 --- a/agentops/sdk/decorators/factory.py +++ b/agentops/sdk/decorators/factory.py @@ -12,8 +12,8 @@ from agentops.semconv import SpanAttributes, CoreAttributes from .utility import ( - create_span, _create_as_current_span, + _make_span, _process_async_generator, _process_sync_generator, _record_entity_input, @@ -208,15 +208,13 @@ def wrapper( else: # Sync function for SpanKind.SESSION return _handle_session_trace_sync(operation_name, tags, wrapped_func, args, kwargs) - # Logic for non-SESSION kinds using standardized context management + # Logic for non-SESSION kinds or generators under @trace (as per fallthrough) elif is_generator: - # Generators require manual lifecycle management - span, _, token = create_span( + span, _, token = _make_span( operation_name, entity_kind, version=version, attributes={CoreAttributes.TAGS: tags} if tags else None, - manual_lifecycle=True, ) try: _record_entity_input(span, args, kwargs) @@ -228,13 +226,11 @@ def wrapper( result = wrapped_func(*args, **kwargs) return _process_sync_generator(span, result) elif is_async_generator: - # Async generators require manual lifecycle management - span, _, token = create_span( + span, _, token = _make_span( operation_name, entity_kind, version=version, attributes={CoreAttributes.TAGS: tags} if tags else None, - manual_lifecycle=True, ) try: _record_entity_input(span, args, kwargs) @@ -246,14 +242,13 @@ def wrapper( result = wrapped_func(*args, **kwargs) return _process_async_generator(span, token, result) elif is_async: - # Async functions use context manager (OpenTelemetry best practice) + async def _wrapped_async() -> Any: - with create_span( + with _create_as_current_span( operation_name, entity_kind, version=version, attributes={CoreAttributes.TAGS: tags} if tags else None, - manual_lifecycle=False, ) as span: try: _record_entity_input(span, args, kwargs) @@ -276,13 +271,11 @@ async def _wrapped_async() -> Any: return _wrapped_async() else: # Sync function for non-SESSION kinds - # Sync functions use context manager (OpenTelemetry best practice) - with create_span( + with _create_as_current_span( operation_name, entity_kind, version=version, attributes={CoreAttributes.TAGS: tags} if tags else None, - manual_lifecycle=False, ) as span: try: _record_entity_input(span, args, kwargs) diff --git a/agentops/sdk/decorators/utility.py b/agentops/sdk/decorators/utility.py index 1a22f6187..5dee1d412 100644 --- a/agentops/sdk/decorators/utility.py +++ b/agentops/sdk/decorators/utility.py @@ -61,39 +61,6 @@ async def _process_async_generator(span: trace.Span, context_token: Any, generat context_api.detach(context_token) -def create_span( - operation_name: str, - span_kind: str, - version: Optional[int] = None, - attributes: Optional[Dict[str, Any]] = None, - manual_lifecycle: bool = False, -): - """ - Unified span creation interface that chooses the appropriate method based on requirements. - - This function standardizes span creation across the SDK by choosing between: - - Context manager approach (preferred): For automatic lifecycle management - - Manual approach: For special cases like generators and SESSION spans - - Args: - operation_name: Name of the operation being traced - span_kind: Type of operation (from SpanKind) - version: Optional version identifier for the operation - attributes: Optional dictionary of attributes to set on the span - manual_lifecycle: If True, returns span with manual lifecycle control - - Returns: - - If manual_lifecycle=False: Context manager for automatic span lifecycle - - If manual_lifecycle=True: Tuple of (span, context, token) for manual control - """ - if manual_lifecycle or span_kind == SpanKind.SESSION: - # Use manual lifecycle for SESSION spans and when explicitly requested - return _make_span(operation_name, span_kind, version, attributes) - else: - # Use context manager for regular operations (OpenTelemetry best practice) - return _create_as_current_span(operation_name, span_kind, version, attributes) - - def _get_current_span_info(): """Helper to get information about the current span for debugging""" current_span = trace.get_current_span() @@ -108,44 +75,16 @@ def _get_current_span_info(): return {"name": "No current span"} -def _prepare_span_attributes( - operation_name: str, span_kind: str, version: Optional[int] = None, attributes: Optional[Dict[str, Any]] = None -) -> Dict[str, Any]: - """ - Prepare standardized span attributes for AgentOps spans. - - Args: - operation_name: Name of the operation being traced - span_kind: Type of operation (from SpanKind) - version: Optional version identifier for the operation - attributes: Optional dictionary of additional attributes - - Returns: - Dictionary of prepared span attributes - """ - if attributes is None: - attributes = {} - - # Add span kind to attributes - attributes[SpanAttributes.AGENTOPS_SPAN_KIND] = span_kind - - # Add standard attributes - attributes[SpanAttributes.OPERATION_NAME] = operation_name - if version is not None: - attributes[SpanAttributes.OPERATION_VERSION] = version - - return attributes - - @contextmanager def _create_as_current_span( operation_name: str, span_kind: str, version: Optional[int] = None, attributes: Optional[Dict[str, Any]] = None ) -> Generator[Span, None, None]: """ - Create and yield an instrumentation span as the current span using OpenTelemetry best practices. + Create and yield an instrumentation span as the current span using proper context management. - This is the preferred method for creating spans as it uses OpenTelemetry's built-in - context management for automatic span lifecycle handling and proper nesting. + This function creates a span that will automatically be nested properly + within any parent span based on the current execution context, using OpenTelemetry's + context management to properly handle span lifecycle. Args: operation_name: Name of the operation being traced @@ -156,32 +95,55 @@ def _create_as_current_span( Yields: A span with proper context that will be automatically closed when exiting the context """ + # Log before we do anything + before_span = _get_current_span_info() + logger.debug(f"[DEBUG] BEFORE {operation_name}.{span_kind} - Current context: {before_span}") + # Create span with proper naming convention span_name = f"{operation_name}.{span_kind}" # Get tracer tracer = TracingCore.get_instance().get_tracer() - # Prepare standardized attributes - prepared_attributes = _prepare_span_attributes(operation_name, span_kind, version, attributes) + # Prepare attributes + if attributes is None: + attributes = {} + + # Add span kind to attributes + attributes[SpanAttributes.AGENTOPS_SPAN_KIND] = span_kind + + # Add standard attributes + attributes[SpanAttributes.OPERATION_NAME] = operation_name + if version is not None: + attributes[SpanAttributes.OPERATION_VERSION] = version + + # Get current context explicitly to debug it + current_context = context_api.get_current() + + # Use OpenTelemetry's context manager to properly handle span lifecycle + with tracer.start_as_current_span(span_name, attributes=attributes, context=current_context) as span: + # Log after span creation + if hasattr(span, "get_span_context"): + span_ctx = span.get_span_context() + logger.debug( + f"[DEBUG] CREATED {span_name} - span_id: {span_ctx.span_id:x}, parent: {before_span.get('span_id', 'None')}" + ) - # Use OpenTelemetry's context manager for automatic lifecycle management - with tracer.start_as_current_span(span_name, attributes=prepared_attributes) as span: - logger.debug(f"Created span: {span_name}") yield span + # Log after we're done + after_span = _get_current_span_info() + logger.debug(f"[DEBUG] AFTER {operation_name}.{span_kind} - Returned to context: {after_span}") + def _make_span( operation_name: str, span_kind: str, version: Optional[int] = None, attributes: Optional[Dict[str, Any]] = None ) -> tuple: """ - Create a span with manual lifecycle control for special cases. + Create a span without context management for manual span lifecycle control. - This function is used for cases that require manual span management: - - Trace spans (root spans that need special lifecycle handling) - - Generator functions (where context managers don't work well) - - For regular operations, prefer _create_as_current_span which uses OpenTelemetry best practices. + This function creates a span that will be properly nested within any parent span + based on the current execution context, but requires manual ending via _finalize_span. Args: operation_name: Name of the operation being traced @@ -201,20 +163,27 @@ def _make_span( # Get tracer tracer = TracingCore.get_instance().get_tracer() - # Prepare standardized attributes - prepared_attributes = _prepare_span_attributes(operation_name, span_kind, version, attributes) + # Prepare attributes + if attributes is None: + attributes = {} + + # Add span kind to attributes + attributes[SpanAttributes.AGENTOPS_SPAN_KIND] = span_kind + + # Add standard attributes + attributes[SpanAttributes.OPERATION_NAME] = operation_name + if version is not None: + attributes[SpanAttributes.OPERATION_VERSION] = version current_context = context_api.get_current() - # Create the span with appropriate context handling + # Create the span with proper context management if span_kind == SpanKind.SESSION: # For session spans, create as a root span - span = tracer.start_span(span_name, attributes=prepared_attributes) - logger.debug(f"Created root trace span: {span_name}") + span = tracer.start_span(span_name, attributes=attributes) else: - # For other spans, use the current context for proper nesting - span = tracer.start_span(span_name, context=current_context, attributes=prepared_attributes) - logger.debug(f"Created nested span: {span_name}") + # For other spans, use the current context + span = tracer.start_span(span_name, context=current_context, attributes=attributes) # Set as current context and get token for detachment ctx = trace.set_span_in_context(span) From d7f20df830d7477a02a31951d0b085348df59041 Mon Sep 17 00:00:00 2001 From: Pratyush Shukla Date: Wed, 28 May 2025 23:01:54 +0530 Subject: [PATCH 11/20] add some tests --- tests/unit/sdk/test_core_error_handling.py | 353 ++++++++++++++++ .../sdk/test_live_span_processor_lifecycle.py | 394 ++++++++++++++++++ tests/unit/test_init_api_consistency.py | 388 +++++++++++++++++ 3 files changed, 1135 insertions(+) create mode 100644 tests/unit/sdk/test_core_error_handling.py create mode 100644 tests/unit/sdk/test_live_span_processor_lifecycle.py create mode 100644 tests/unit/test_init_api_consistency.py diff --git a/tests/unit/sdk/test_core_error_handling.py b/tests/unit/sdk/test_core_error_handling.py new file mode 100644 index 000000000..9488c5a69 --- /dev/null +++ b/tests/unit/sdk/test_core_error_handling.py @@ -0,0 +1,353 @@ +""" +Tests for TracingCore error handling improvements. + +This module tests the enhanced error handling in TracingCore._flush_span_processors() +that was added to provide comprehensive exception handling for different failure scenarios. +""" + +from unittest.mock import MagicMock, patch + +from agentops.sdk.core import TracingCore + + +class TestTracingCoreFlushErrorHandling: + """Tests for the enhanced _flush_span_processors error handling.""" + + def setup_method(self): + """Set up test fixtures.""" + self.tracing_core = TracingCore.get_instance() + # Reset state for each test + self.tracing_core._initialized = False + self.tracing_core._provider = None + + @patch("agentops.sdk.core.logger") + def test_flush_no_provider(self, mock_logger): + """Test _flush_span_processors when no provider is available.""" + # Ensure no provider is set + self.tracing_core._provider = None + + # Call flush + self.tracing_core._flush_span_processors() + + # Verify debug message was logged + mock_logger.debug.assert_called_once_with("No provider available for force_flush.") + + @patch("agentops.sdk.core.logger") + def test_flush_provider_without_force_flush_method(self, mock_logger): + """Test _flush_span_processors when provider doesn't support force_flush.""" + # Create a mock provider without force_flush method + mock_provider = MagicMock() + del mock_provider.force_flush # Remove the method + self.tracing_core._provider = mock_provider + + # Call flush + self.tracing_core._flush_span_processors() + + # Verify debug message was logged + mock_logger.debug.assert_called_once_with("Provider does not support force_flush.") + + @patch("agentops.sdk.core.logger") + def test_flush_successful(self, mock_logger): + """Test _flush_span_processors with successful flush.""" + # Create a mock provider with force_flush method + mock_provider = MagicMock() + mock_provider.force_flush.return_value = True + self.tracing_core._provider = mock_provider + + # Call flush + self.tracing_core._flush_span_processors() + + # Verify force_flush was called + mock_provider.force_flush.assert_called_once() + + # Verify success messages were logged + expected_calls = [ + ("debug", "Attempting to force flush span processors..."), + ("debug", "Provider force_flush completed successfully."), + ] + + actual_calls = [(call[0], call[1][0]) for call in mock_logger.method_calls] + for level, message in expected_calls: + assert (level, message) in actual_calls + + @patch("agentops.sdk.core.logger") + def test_flush_attribute_error(self, mock_logger): + """Test _flush_span_processors when AttributeError is raised.""" + # Create a mock provider where force_flush raises AttributeError + mock_provider = MagicMock() + mock_provider.force_flush.side_effect = AttributeError("force_flush method not available") + self.tracing_core._provider = mock_provider + + # Call flush - should not raise exception + self.tracing_core._flush_span_processors() + + # Verify force_flush was attempted + mock_provider.force_flush.assert_called_once() + + # Verify warning was logged + mock_logger.warning.assert_called_once_with( + "Provider force_flush method not available: force_flush method not available" + ) + + @patch("agentops.sdk.core.logger") + def test_flush_runtime_error(self, mock_logger): + """Test _flush_span_processors when RuntimeError is raised.""" + # Create a mock provider where force_flush raises RuntimeError + mock_provider = MagicMock() + mock_provider.force_flush.side_effect = RuntimeError("Provider is shutting down") + self.tracing_core._provider = mock_provider + + # Call flush - should not raise exception + self.tracing_core._flush_span_processors() + + # Verify force_flush was attempted + mock_provider.force_flush.assert_called_once() + + # Verify warning was logged + mock_logger.warning.assert_called_once_with( + "Runtime error during force_flush (provider may be shutting down): Provider is shutting down" + ) + + @patch("agentops.sdk.core.logger") + def test_flush_unexpected_exception(self, mock_logger): + """Test _flush_span_processors when unexpected exception is raised.""" + # Create a mock provider where force_flush raises unexpected exception + mock_provider = MagicMock() + mock_provider.force_flush.side_effect = ValueError("Unexpected error") + self.tracing_core._provider = mock_provider + + # Call flush - should not raise exception + self.tracing_core._flush_span_processors() + + # Verify force_flush was attempted + mock_provider.force_flush.assert_called_once() + + # Verify error was logged with exc_info + mock_logger.error.assert_called_once_with( + "Unexpected error during force_flush: Unexpected error", exc_info=True + ) + + @patch("agentops.sdk.core.logger") + def test_flush_multiple_exception_types(self, mock_logger): + """Test that different exception types are handled with appropriate log levels.""" + test_cases = [ + (AttributeError("attr error"), "warning", "Provider force_flush method not available: attr error"), + ( + RuntimeError("runtime error"), + "warning", + "Runtime error during force_flush (provider may be shutting down): runtime error", + ), + (ValueError("value error"), "error", "Unexpected error during force_flush: value error"), + (TypeError("type error"), "error", "Unexpected error during force_flush: type error"), + (Exception("generic error"), "error", "Unexpected error during force_flush: generic error"), + ] + + for exception, expected_level, expected_message in test_cases: + # Reset mock + mock_logger.reset_mock() + + # Create a mock provider that raises the specific exception + mock_provider = MagicMock() + mock_provider.force_flush.side_effect = exception + self.tracing_core._provider = mock_provider + + # Call flush - should not raise exception + self.tracing_core._flush_span_processors() + + # Verify the appropriate log level was used + if expected_level == "warning": + mock_logger.warning.assert_called_once_with(expected_message) + mock_logger.error.assert_not_called() + elif expected_level == "error": + mock_logger.error.assert_called_once_with(expected_message, exc_info=True) + mock_logger.warning.assert_not_called() + + @patch("agentops.sdk.core.logger") + def test_flush_graceful_degradation(self, mock_logger): + """Test that flush failures don't break the application.""" + # Create a mock provider that always fails + mock_provider = MagicMock() + mock_provider.force_flush.side_effect = Exception("Always fails") + self.tracing_core._provider = mock_provider + + # Call flush multiple times - should never raise exception + for _ in range(3): + self.tracing_core._flush_span_processors() + + # Verify force_flush was attempted each time + assert mock_provider.force_flush.call_count == 3 + + # Verify errors were logged each time + assert mock_logger.error.call_count == 3 + + @patch("agentops.sdk.core.logger") + def test_flush_hasattr_check_behavior(self, mock_logger): + """Test the hasattr check behavior for force_flush method.""" + # Test case 1: Provider without force_flush attribute + mock_provider = MagicMock() + del mock_provider.force_flush # Remove the attribute completely + self.tracing_core._provider = mock_provider + + # This should trigger the hasattr check and log debug message + self.tracing_core._flush_span_processors() + + # When provider doesn't have force_flush attribute, it should only log the "does not support" message + mock_logger.debug.assert_called_once_with("Provider does not support force_flush.") + + # Reset for next test + mock_logger.reset_mock() + + # Test case 2: Provider has callable force_flush + mock_provider = MagicMock() + mock_provider.force_flush = MagicMock(return_value=True) + self.tracing_core._provider = mock_provider + + self.tracing_core._flush_span_processors() + + # Should attempt to call force_flush + mock_provider.force_flush.assert_called_once() + + @patch("agentops.sdk.core.logger") + def test_flush_logging_sequence(self, mock_logger): + """Test the complete logging sequence during successful flush.""" + # Create a mock provider with force_flush method + mock_provider = MagicMock() + mock_provider.force_flush.return_value = True + self.tracing_core._provider = mock_provider + + # Call flush + self.tracing_core._flush_span_processors() + + # Verify the complete logging sequence + expected_debug_calls = [ + "Attempting to force flush span processors...", + "Provider force_flush completed successfully.", + ] + + debug_calls = [call[0][0] for call in mock_logger.debug.call_args_list] + assert debug_calls == expected_debug_calls + + @patch("agentops.sdk.core.logger") + def test_flush_provider_none_vs_no_method(self, mock_logger): + """Test distinction between no provider and provider without method.""" + # Test 1: No provider + self.tracing_core._provider = None + self.tracing_core._flush_span_processors() + + mock_logger.debug.assert_called_with("No provider available for force_flush.") + mock_logger.reset_mock() + + # Test 2: Provider without force_flush method + mock_provider = MagicMock() + del mock_provider.force_flush + self.tracing_core._provider = mock_provider + + self.tracing_core._flush_span_processors() + + mock_logger.debug.assert_called_with("Provider does not support force_flush.") + + +class TestTracingCoreFlushIntegration: + """Integration tests for TracingCore flush functionality.""" + + def setup_method(self): + """Set up test fixtures.""" + self.tracing_core = TracingCore.get_instance() + # Reset state for each test + self.tracing_core._initialized = False + self.tracing_core._provider = None + + @patch("agentops.sdk.core.logger") + def test_flush_called_during_shutdown(self, mock_logger): + """Test that _flush_span_processors is called during shutdown.""" + # Create a mock provider + mock_provider = MagicMock() + mock_provider.force_flush.return_value = True + self.tracing_core._provider = mock_provider + self.tracing_core._initialized = True + + # Mock the shutdown method to track flush calls + with patch.object(self.tracing_core, "_flush_span_processors") as mock_flush: + # Call shutdown + self.tracing_core.shutdown() + + # Verify flush was called + mock_flush.assert_called_once() + + @patch("agentops.sdk.core.logger") + def test_flush_resilience_during_shutdown(self, mock_logger): + """Test that shutdown continues even if flush fails.""" + # Create a mock provider that fails on flush + mock_provider = MagicMock() + mock_provider.force_flush.side_effect = Exception("Flush failed") + mock_provider.shutdown.return_value = None + self.tracing_core._provider = mock_provider + self.tracing_core._initialized = True + + # Shutdown should complete successfully despite flush failure + self.tracing_core.shutdown() + + # Verify provider shutdown was still called + mock_provider.shutdown.assert_called_once() + + # Verify error was logged + mock_logger.error.assert_called_once_with("Unexpected error during force_flush: Flush failed", exc_info=True) + + @patch("agentops.sdk.core.logger") + def test_flush_with_real_provider_interface(self, mock_logger): + """Test flush with a more realistic provider mock.""" + # Create a mock provider that mimics real TracerProvider interface + mock_provider = MagicMock() + mock_provider.force_flush = MagicMock(return_value=True) + + # Add other typical provider methods + mock_provider.get_tracer = MagicMock() + mock_provider.shutdown = MagicMock() + + self.tracing_core._provider = mock_provider + + # Call flush + self.tracing_core._flush_span_processors() + + # Verify only force_flush was called, not other methods + mock_provider.force_flush.assert_called_once() + mock_provider.get_tracer.assert_not_called() + mock_provider.shutdown.assert_not_called() + + +class TestTracingCoreFlushBackwardCompatibility: + """Tests to ensure flush error handling doesn't break existing functionality.""" + + def setup_method(self): + """Set up test fixtures.""" + self.tracing_core = TracingCore.get_instance() + # Reset state for each test + self.tracing_core._initialized = False + self.tracing_core._provider = None + + def test_flush_preserves_existing_behavior(self): + """Test that flush behavior is preserved for existing code.""" + # Create a mock provider + mock_provider = MagicMock() + mock_provider.force_flush.return_value = True + self.tracing_core._provider = mock_provider + + # Call flush - should not raise any exceptions + self.tracing_core._flush_span_processors() + + # Verify force_flush was called + mock_provider.force_flush.assert_called_once() + + def test_flush_method_signature_unchanged(self): + """Test that _flush_span_processors method signature is unchanged.""" + # This test ensures the method can still be called without parameters + import inspect + + signature = inspect.signature(self.tracing_core._flush_span_processors) + + # Should have no required parameters + assert len(signature.parameters) == 0 + + # Should be callable without arguments + self.tracing_core._provider = None + self.tracing_core._flush_span_processors() # Should not raise diff --git a/tests/unit/sdk/test_live_span_processor_lifecycle.py b/tests/unit/sdk/test_live_span_processor_lifecycle.py new file mode 100644 index 000000000..8938730e8 --- /dev/null +++ b/tests/unit/sdk/test_live_span_processor_lifecycle.py @@ -0,0 +1,394 @@ +""" +Tests for LiveSpanProcessor lifecycle management improvements. + +This module tests the enhanced shutdown functionality that was added to prevent +resource leaks by properly managing thread lifecycle with timeouts and error handling. +""" + +import threading +import time +from unittest.mock import MagicMock, patch, call + +from agentops.sdk.processors import LiveSpanProcessor + + +class TestLiveSpanProcessorShutdown: + """Tests for the enhanced LiveSpanProcessor shutdown functionality.""" + + def test_shutdown_normal_thread_termination(self): + """Test shutdown with normal thread termination.""" + # Create a mock exporter + mock_exporter = MagicMock() + + # Create processor + processor = LiveSpanProcessor(mock_exporter) + + # Verify thread is running + assert processor._export_thread.is_alive() + + # Shutdown + processor.shutdown() + + # Verify thread stopped + assert not processor._export_thread.is_alive() + + # Verify exporter shutdown was called + mock_exporter.shutdown.assert_called_once() + + @patch("agentops.sdk.processors.logger") + def test_shutdown_with_thread_timeout(self, mock_logger): + """Test shutdown when thread doesn't terminate within timeout.""" + # Create a mock exporter + mock_exporter = MagicMock() + + # Create processor + processor = LiveSpanProcessor(mock_exporter) + + # Mock the thread to simulate it not terminating + with patch.object(processor._export_thread, "join") as mock_join: + with patch.object(processor._export_thread, "is_alive", return_value=True): + # Shutdown + processor.shutdown() + + # Verify join was called with timeout + mock_join.assert_called_once_with(timeout=5.0) + + # Verify warning was logged + mock_logger.warning.assert_called_once_with( + "Export thread did not shut down within timeout, continuing shutdown" + ) + + # Verify exporter shutdown was still called + mock_exporter.shutdown.assert_called_once() + + @patch("agentops.sdk.processors.logger") + def test_shutdown_thread_join_exception(self, mock_logger): + """Test shutdown when thread.join() raises an exception.""" + # Create a mock exporter + mock_exporter = MagicMock() + + # Create processor + processor = LiveSpanProcessor(mock_exporter) + + # Mock the thread join to raise an exception + with patch.object(processor._export_thread, "join", side_effect=Exception("Join failed")): + # Shutdown should not raise exception + processor.shutdown() + + # Verify error was logged + mock_logger.error.assert_called_once_with("Error during thread shutdown: Join failed") + + # Verify exporter shutdown was still called + mock_exporter.shutdown.assert_called_once() + + @patch("agentops.sdk.processors.logger") + def test_shutdown_exporter_shutdown_exception(self, mock_logger): + """Test shutdown when exporter.shutdown() raises an exception.""" + # Create a mock exporter that raises exception on shutdown + mock_exporter = MagicMock() + mock_exporter.shutdown.side_effect = Exception("Exporter shutdown failed") + + # Create processor + processor = LiveSpanProcessor(mock_exporter) + + # Shutdown should not raise exception + processor.shutdown() + + # Verify error was logged + mock_logger.error.assert_called_once_with("Error shutting down span exporter: Exporter shutdown failed") + + # Verify exporter shutdown was attempted + mock_exporter.shutdown.assert_called_once() + + @patch("agentops.sdk.processors.logger") + def test_shutdown_both_thread_and_exporter_exceptions(self, mock_logger): + """Test shutdown when both thread join and exporter shutdown raise exceptions.""" + # Create a mock exporter that raises exception on shutdown + mock_exporter = MagicMock() + mock_exporter.shutdown.side_effect = Exception("Exporter shutdown failed") + + # Create processor + processor = LiveSpanProcessor(mock_exporter) + + # Mock the thread join to raise an exception + with patch.object(processor._export_thread, "join", side_effect=Exception("Join failed")): + # Shutdown should not raise exception + processor.shutdown() + + # Verify both errors were logged + expected_calls = [ + call("Error during thread shutdown: Join failed"), + call("Error shutting down span exporter: Exporter shutdown failed"), + ] + mock_logger.error.assert_has_calls(expected_calls) + + # Verify exporter shutdown was attempted + mock_exporter.shutdown.assert_called_once() + + def test_shutdown_thread_already_stopped(self): + """Test shutdown when thread is already stopped.""" + # Create a mock exporter + mock_exporter = MagicMock() + + # Create processor + processor = LiveSpanProcessor(mock_exporter) + + # Stop the thread manually first + processor._stop_event.set() + processor._export_thread.join() + + # Verify thread is stopped + assert not processor._export_thread.is_alive() + + # Shutdown should still work + processor.shutdown() + + # Verify exporter shutdown was called + mock_exporter.shutdown.assert_called_once() + + def test_shutdown_multiple_calls(self): + """Test that multiple shutdown calls don't cause issues.""" + # Create a mock exporter + mock_exporter = MagicMock() + + # Create processor + processor = LiveSpanProcessor(mock_exporter) + + # Call shutdown multiple times + processor.shutdown() + processor.shutdown() + processor.shutdown() + + # Verify exporter shutdown was called multiple times (this is expected behavior) + assert mock_exporter.shutdown.call_count == 3 + + @patch("agentops.sdk.processors.logger") + def test_shutdown_timeout_behavior(self, mock_logger): + """Test the specific timeout behavior during shutdown.""" + # Create a mock exporter + mock_exporter = MagicMock() + + # Create processor + processor = LiveSpanProcessor(mock_exporter) + + # Track the actual timeout used + original_join = processor._export_thread.join + join_timeout = None + + def capture_join_timeout(timeout=None): + nonlocal join_timeout + join_timeout = timeout + # Simulate thread not terminating within timeout + if timeout is not None: + time.sleep(0.01) # Small delay to simulate timeout + return original_join(timeout=0.01) # Quick join to actually stop thread + + with patch.object(processor._export_thread, "join", side_effect=capture_join_timeout): + with patch.object(processor._export_thread, "is_alive", return_value=True): + # Shutdown + processor.shutdown() + + # Verify the timeout was 5.0 seconds + assert join_timeout == 5.0 + + # Verify warning was logged + mock_logger.warning.assert_called_once_with( + "Export thread did not shut down within timeout, continuing shutdown" + ) + + +class TestLiveSpanProcessorThreadSafety: + """Tests for thread safety during LiveSpanProcessor operations.""" + + def test_concurrent_shutdown_calls(self): + """Test that concurrent shutdown calls are handled safely.""" + # Create a mock exporter + mock_exporter = MagicMock() + + # Create processor + processor = LiveSpanProcessor(mock_exporter) + + # Create multiple threads that call shutdown + shutdown_threads = [] + exceptions = [] + + def shutdown_worker(): + try: + processor.shutdown() + except Exception as e: + exceptions.append(e) + + # Start multiple shutdown threads + for _ in range(5): + thread = threading.Thread(target=shutdown_worker) + shutdown_threads.append(thread) + thread.start() + + # Wait for all threads to complete + for thread in shutdown_threads: + thread.join() + + # Verify no exceptions occurred + assert len(exceptions) == 0 + + # Verify exporter shutdown was called (multiple times is expected) + assert mock_exporter.shutdown.call_count >= 1 + + def test_shutdown_during_span_processing(self): + """Test shutdown while spans are being processed.""" + # Create a mock exporter that takes time to export + mock_exporter = MagicMock() + + def slow_export(spans): + time.sleep(0.1) # Simulate slow export + return True + + mock_exporter.export.side_effect = slow_export + + # Create processor + processor = LiveSpanProcessor(mock_exporter) + + # Create a mock span with proper context + mock_span = MagicMock() + mock_span.context.span_id = 12345 + mock_span.context.trace_flags.sampled = True + + # Properly initialize the span through on_start (realistic lifecycle) + processor.on_start(mock_span, None) + + # Start processing a span in a separate thread + def process_span(): + processor.on_end(mock_span) + + process_thread = threading.Thread(target=process_span) + process_thread.start() + + # Give the processing thread a moment to start + time.sleep(0.05) + + # Shutdown while processing + processor.shutdown() + + # Wait for processing thread to complete + process_thread.join(timeout=1.0) + + # Verify shutdown completed successfully + assert not processor._export_thread.is_alive() + + +class TestLiveSpanProcessorResourceManagement: + """Tests for proper resource management in LiveSpanProcessor.""" + + def test_stop_event_set_during_shutdown(self): + """Test that stop event is properly set during shutdown.""" + # Create a mock exporter + mock_exporter = MagicMock() + + # Create processor + processor = LiveSpanProcessor(mock_exporter) + + # Verify stop event is not set initially + assert not processor._stop_event.is_set() + + # Shutdown + processor.shutdown() + + # Verify stop event is set + assert processor._stop_event.is_set() + + def test_export_thread_lifecycle(self): + """Test the complete lifecycle of the export thread.""" + # Create a mock exporter + mock_exporter = MagicMock() + + # Create processor + processor = LiveSpanProcessor(mock_exporter) + + # Verify thread is created and running + assert processor._export_thread is not None + assert processor._export_thread.is_alive() + assert processor._export_thread.daemon # Should be daemon thread + + # Shutdown + processor.shutdown() + + # Verify thread is stopped + assert not processor._export_thread.is_alive() + + @patch("agentops.sdk.processors.logger") + def test_graceful_degradation_on_errors(self, mock_logger): + """Test that processor gracefully degrades when errors occur during shutdown.""" + # Create a mock exporter + mock_exporter = MagicMock() + + # Create processor + processor = LiveSpanProcessor(mock_exporter) + + # Mock everything to fail + with patch.object(processor._stop_event, "set", side_effect=Exception("Stop event failed")): + with patch.object(processor._export_thread, "join", side_effect=Exception("Join failed")): + with patch.object(processor._export_thread, "is_alive", side_effect=Exception("is_alive failed")): + mock_exporter.shutdown.side_effect = Exception("Exporter shutdown failed") + + # Shutdown should still complete without raising exceptions + processor.shutdown() + + # Verify errors were logged but execution continued + assert mock_logger.error.call_count >= 1 + + +class TestLiveSpanProcessorBackwardCompatibility: + """Tests to ensure backward compatibility of LiveSpanProcessor.""" + + def test_force_flush_unchanged(self): + """Test that force_flush behavior is unchanged.""" + # Create a mock exporter + mock_exporter = MagicMock() + + # Create processor + processor = LiveSpanProcessor(mock_exporter) + + # Test force_flush + result = processor.force_flush() + + # Should return True (unchanged behavior) + assert result is True + + def test_on_start_unchanged(self): + """Test that on_start behavior is unchanged.""" + # Create a mock exporter + mock_exporter = MagicMock() + + # Create processor + processor = LiveSpanProcessor(mock_exporter) + + # Create a mock span + mock_span = MagicMock() + + # Test on_start (should not raise exception) + processor.on_start(mock_span, None) + + def test_on_end_unchanged(self): + """Test that on_end behavior is unchanged.""" + # Create a mock exporter + mock_exporter = MagicMock() + + # Create processor + processor = LiveSpanProcessor(mock_exporter) + + # Create a mock span with proper context + mock_span = MagicMock() + mock_span.context.span_id = 12345 + mock_span.context.trace_flags.sampled = True + + # Add the span to _in_flight first (simulating on_start) + processor._in_flight[mock_span.context.span_id] = mock_span + + # Test on_end + processor.on_end(mock_span) + + # Verify exporter.export was called + mock_exporter.export.assert_called_once_with((mock_span,)) + + # Verify span was removed from _in_flight + assert mock_span.context.span_id not in processor._in_flight diff --git a/tests/unit/test_init_api_consistency.py b/tests/unit/test_init_api_consistency.py new file mode 100644 index 000000000..52122f0f0 --- /dev/null +++ b/tests/unit/test_init_api_consistency.py @@ -0,0 +1,388 @@ +""" +Tests for API consistency changes in agentops.init(). + +This module tests the changes to the init() function return value behavior +that were reverted in commit bf850af to maintain backward compatibility. +""" + +import pytest +from unittest.mock import MagicMock, patch + +import agentops +from agentops.legacy import Session + + +class TestInitReturnValueConsistency: + """Tests for agentops.init() return value consistency.""" + + def setup_method(self): + """Reset client state before each test.""" + # Reset the global client + agentops._client = agentops.Client() + agentops._client._initialized = False + + @patch("agentops.client.Client.init") + def test_init_returns_client_init_result(self, mock_client_init): + """Test that init() returns the result of _client.init().""" + # Setup mock to return a specific value + expected_result = MagicMock() + mock_client_init.return_value = expected_result + + # Call init + result = agentops.init(api_key="test-key") + + # Verify the result is what client.init() returned + assert result == expected_result + + # Verify client.init was called with correct parameters + mock_client_init.assert_called_once() + + @patch("agentops.client.Client.init") + def test_init_auto_start_session_true_returns_session(self, mock_client_init): + """Test that init() with auto_start_session=True returns a Session object.""" + # Setup mock to return a Session (typical behavior when auto_start_session=True) + mock_session = MagicMock(spec=Session) + mock_client_init.return_value = mock_session + + # Call init with auto_start_session=True + result = agentops.init(api_key="test-key", auto_start_session=True) + + # Verify the result is the Session object + assert result == mock_session + + # Verify client.init was called with auto_start_session=True + mock_client_init.assert_called_once() + call_kwargs = mock_client_init.call_args[1] + assert call_kwargs["auto_start_session"] is True + + @patch("agentops.client.Client.init") + def test_init_auto_start_session_false_returns_none(self, mock_client_init): + """Test that init() with auto_start_session=False returns None.""" + # Setup mock to return None (typical behavior when auto_start_session=False) + mock_client_init.return_value = None + + # Call init with auto_start_session=False + result = agentops.init(api_key="test-key", auto_start_session=False) + + # Verify the result is None + assert result is None + + # Verify client.init was called with auto_start_session=False + mock_client_init.assert_called_once() + call_kwargs = mock_client_init.call_args[1] + assert call_kwargs["auto_start_session"] is False + + @patch("agentops.client.Client.init") + def test_init_default_auto_start_session_behavior(self, mock_client_init): + """Test that init() with default auto_start_session returns appropriate value.""" + # Setup mock to return a Session (default behavior typically starts a session) + mock_session = MagicMock(spec=Session) + mock_client_init.return_value = mock_session + + # Call init without specifying auto_start_session + result = agentops.init(api_key="test-key") + + # Verify the result is the Session object + assert result == mock_session + + # Verify client.init was called + mock_client_init.assert_called_once() + + @patch("agentops.client.Client.init") + def test_init_passes_all_parameters(self, mock_client_init): + """Test that init() passes all parameters to client.init().""" + # Setup mock + mock_client_init.return_value = None + + # Call init with various parameters + result = agentops.init( # noqa: F841 + api_key="test-key", + endpoint="https://test.endpoint.com", + app_url="https://test.app.com", + max_wait_time=5000, + max_queue_size=512, + default_tags=["test", "integration"], + instrument_llm_calls=True, + auto_start_session=False, + skip_auto_end_session=True, + env_data_opt_out=True, + custom_param="custom_value", + ) + + # Verify client.init was called with all parameters + mock_client_init.assert_called_once() + call_args, call_kwargs = mock_client_init.call_args + + # Check that all expected parameters were passed + expected_params = { + "api_key": "test-key", + "endpoint": "https://test.endpoint.com", + "app_url": "https://test.app.com", + "max_wait_time": 5000, + "max_queue_size": 512, + "default_tags": ["test", "integration"], + "instrument_llm_calls": True, + "auto_start_session": False, + "skip_auto_end_session": True, + "env_data_opt_out": True, + "custom_param": "custom_value", + } + + for param, value in expected_params.items(): + assert call_kwargs[param] == value + + @patch("agentops.client.Client.init") + def test_init_exception_propagation(self, mock_client_init): + """Test that exceptions from client.init() are properly propagated.""" + # Setup mock to raise an exception + test_exception = ValueError("Test initialization error") + mock_client_init.side_effect = test_exception + + # Call init and expect the exception to be raised + with pytest.raises(ValueError, match="Test initialization error"): + agentops.init(api_key="test-key") + + # Verify client.init was called + mock_client_init.assert_called_once() + + +class TestInitBackwardCompatibility: + """Tests for backward compatibility of agentops.init().""" + + def setup_method(self): + """Reset client state before each test.""" + # Reset the global client + agentops._client = agentops.Client() + agentops._client._initialized = False + + @patch("agentops.client.Client.init") + def test_init_return_value_used_in_conditional(self, mock_client_init): + """Test that init() return value can be used in conditional statements.""" + # Test case 1: init returns a Session (truthy) + mock_session = MagicMock(spec=Session) + mock_client_init.return_value = mock_session + + result = agentops.init(api_key="test-key", auto_start_session=True) + + # Should be able to use in conditional + if result: + session_available = True + else: + session_available = False + + assert session_available is True + assert result == mock_session + + # Test case 2: init returns None (falsy) + mock_client_init.return_value = None + + result = agentops.init(api_key="test-key", auto_start_session=False) + + # Should be able to use in conditional + if result: + session_available = True + else: + session_available = False + + assert session_available is False + assert result is None + + @patch("agentops.client.Client.init") + def test_init_return_value_assignment_patterns(self, mock_client_init): + """Test common assignment patterns with init() return value.""" + # Pattern 1: Direct assignment + mock_session = MagicMock(spec=Session) + mock_client_init.return_value = mock_session + + session = agentops.init(api_key="test-key") + assert session == mock_session + + # Pattern 2: Assignment with default + mock_client_init.return_value = None + + session = agentops.init(api_key="test-key") or "no_session" + assert session == "no_session" + + # Pattern 3: Conditional assignment + mock_client_init.return_value = mock_session + + result = agentops.init(api_key="test-key") + session = result if result else None + assert session == mock_session + + @patch("agentops.client.Client.init") + def test_init_chaining_with_session_methods(self, mock_client_init): + """Test that init() return value can be used for method chaining when appropriate.""" + # Create a mock session with methods + mock_session = MagicMock(spec=Session) + # Add the method to the mock before using it + mock_session.some_method = MagicMock(return_value="method_result") + mock_client_init.return_value = mock_session + + # Should be able to chain methods when session is returned + result = agentops.init(api_key="test-key") + if result: + method_result = result.some_method() + assert method_result == "method_result" + + @patch("agentops.client.Client.init") + def test_init_multiple_calls_behavior(self, mock_client_init): + """Test behavior of multiple init() calls.""" + # First call returns a session + mock_session1 = MagicMock(spec=Session) + mock_client_init.return_value = mock_session1 + + result1 = agentops.init(api_key="test-key") + assert result1 == mock_session1 + + # Second call returns None (already initialized) + mock_client_init.return_value = None + + result2 = agentops.init(api_key="test-key") + assert result2 is None + + # Verify both calls were made + assert mock_client_init.call_count == 2 + + +class TestInitAPIConsistencyRegression: + """Regression tests for the API consistency changes that were reverted.""" + + def setup_method(self): + """Reset client state before each test.""" + # Reset the global client + agentops._client = agentops.Client() + agentops._client._initialized = False + + @patch("agentops.client.Client.init") + def test_init_does_not_always_return_client(self, mock_client_init): + """Test that init() does NOT always return the client instance (reverted behavior).""" + # This test ensures the reverted change is working correctly + # The original change made init() always return the client, but this was reverted + + # Setup mock to return None + mock_client_init.return_value = None + + # Call init + result = agentops.init(api_key="test-key", auto_start_session=False) + + # Should return None, not the client instance + assert result is None + assert result != agentops._client + + @patch("agentops.client.Client.init") + def test_init_return_value_matches_client_init(self, mock_client_init): + """Test that init() return value exactly matches client.init() return value.""" + # Test with various return values + test_values = [None, MagicMock(spec=Session), "string_value", 42, {"dict": "value"}, ["list", "value"]] + + for test_value in test_values: + # Reset mock + mock_client_init.reset_mock() + mock_client_init.return_value = test_value + + # Call init + result = agentops.init(api_key="test-key") + + # Should return exactly what client.init() returned + assert result == test_value + assert result is test_value # Same object reference + + def test_init_function_signature_unchanged(self): + """Test that init() function signature is unchanged.""" + import inspect + + # Get the signature of the init function + signature = inspect.signature(agentops.init) + + # Verify key parameters exist + expected_params = [ + "api_key", + "endpoint", + "app_url", + "max_wait_time", + "max_queue_size", + "default_tags", + "instrument_llm_calls", + "auto_start_session", + "skip_auto_end_session", + "env_data_opt_out", + ] + + for param in expected_params: + assert param in signature.parameters, f"Parameter {param} missing from init() signature" + + @patch("agentops.client.Client.init") + def test_init_preserves_client_state(self, mock_client_init): + """Test that init() preserves client state regardless of return value.""" + # Setup mock + mock_client_init.return_value = None + + # Get initial client reference + initial_client = agentops._client + + # Call init + result = agentops.init(api_key="test-key") + + # Client reference should be unchanged + assert agentops._client is initial_client + + # Return value should not affect client state + assert result != agentops._client + + +class TestInitDocumentationExamples: + """Tests based on common documentation examples to ensure compatibility.""" + + def setup_method(self): + """Reset client state before each test.""" + # Reset the global client + agentops._client = agentops.Client() + agentops._client._initialized = False + + @patch("agentops.client.Client.init") + def test_common_usage_pattern_1(self, mock_client_init): + """Test: session = agentops.init(api_key="...", auto_start_session=True)""" + mock_session = MagicMock(spec=Session) + mock_client_init.return_value = mock_session + + # Common pattern from documentation + session = agentops.init(api_key="test-key", auto_start_session=True) + + # Should work as expected + assert session == mock_session + + @patch("agentops.client.Client.init") + def test_common_usage_pattern_2(self, mock_client_init): + """Test: agentops.init(api_key="...", auto_start_session=False)""" + mock_client_init.return_value = None + + # Common pattern from documentation + result = agentops.init(api_key="test-key", auto_start_session=False) + + # Should return None + assert result is None + + @patch("agentops.client.Client.init") + def test_common_usage_pattern_3(self, mock_client_init): + """Test: if agentops.init(...): # do something""" + # Test truthy case + mock_session = MagicMock(spec=Session) + mock_client_init.return_value = mock_session + + if agentops.init(api_key="test-key"): + truthy_result = True + else: + truthy_result = False + + assert truthy_result is True + + # Test falsy case + mock_client_init.return_value = None + + if agentops.init(api_key="test-key"): + falsy_result = True + else: + falsy_result = False + + assert falsy_result is False From d4fd2574acee06e5622e5ea704251fa473d3e2e5 Mon Sep 17 00:00:00 2001 From: Pratyush Shukla Date: Wed, 28 May 2025 23:18:00 +0530 Subject: [PATCH 12/20] remove `Annotated` --- agentops/sdk/types.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/agentops/sdk/types.py b/agentops/sdk/types.py index 3a2395735..a27701833 100644 --- a/agentops/sdk/types.py +++ b/agentops/sdk/types.py @@ -1,4 +1,4 @@ -from typing import Annotated, Optional, Required, TypedDict +from typing import Optional, Required, TypedDict from opentelemetry.sdk.trace import SpanProcessor from opentelemetry.sdk.trace.export import SpanExporter From f2dcc41af57d16aaf8c0d01b08ff1e2c5f45d09c Mon Sep 17 00:00:00 2001 From: Pratyush Shukla Date: Wed, 28 May 2025 23:25:40 +0530 Subject: [PATCH 13/20] bad GitHub copilot --- agentops/sdk/types.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/agentops/sdk/types.py b/agentops/sdk/types.py index a27701833..3a2395735 100644 --- a/agentops/sdk/types.py +++ b/agentops/sdk/types.py @@ -1,4 +1,4 @@ -from typing import Optional, Required, TypedDict +from typing import Annotated, Optional, Required, TypedDict from opentelemetry.sdk.trace import SpanProcessor from opentelemetry.sdk.trace.export import SpanExporter From 3cc4bf8b4d7c8716c6a0d0af3183f5d42fe911cf Mon Sep 17 00:00:00 2001 From: Pratyush Shukla Date: Wed, 28 May 2025 23:31:44 +0530 Subject: [PATCH 14/20] add compatibility for both Python <3.11 and >=3.11 --- agentops/sdk/types.py | 7 ++++++- pyproject.toml | 1 + 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/agentops/sdk/types.py b/agentops/sdk/types.py index 3a2395735..a792339f8 100644 --- a/agentops/sdk/types.py +++ b/agentops/sdk/types.py @@ -1,4 +1,9 @@ -from typing import Annotated, Optional, Required, TypedDict +from typing import Annotated, Optional, TypedDict + +try: + from typing import Required +except ImportError: + from typing_extensions import Required from opentelemetry.sdk.trace import SpanProcessor from opentelemetry.sdk.trace.export import SpanExporter diff --git a/pyproject.toml b/pyproject.toml index 532e4248a..ec394477e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -48,6 +48,7 @@ dependencies = [ "opentelemetry-instrumentation>=0.50b0; python_version>='3.10'", "opentelemetry-semantic-conventions==0.50b0; python_version<'3.10'", "opentelemetry-semantic-conventions>=0.50b0; python_version>='3.10'", + "typing_extensions>=4.0.0; python_version<'3.11'", # Required for Required and NotRequired in Python <3.11 ] [dependency-groups] From de37a130061b01b6f26e97f2b68d112031d50a4a Mon Sep 17 00:00:00 2001 From: Pratyush Shukla Date: Thu, 29 May 2025 17:57:59 +0530 Subject: [PATCH 15/20] use `logger` instead of `warnings` --- agentops/sdk/decorators/__init__.py | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/agentops/sdk/decorators/__init__.py b/agentops/sdk/decorators/__init__.py index 994aaba31..749a4ef14 100644 --- a/agentops/sdk/decorators/__init__.py +++ b/agentops/sdk/decorators/__init__.py @@ -21,14 +21,7 @@ # For backward compatibility: @session decorator calls @trace decorator with deprecation warning def session(*args, **kwargs): """@deprecated Use @agentops.trace instead. Wraps the @trace decorator for backward compatibility.""" - import warnings - - warnings.warn( - "@agentops.session decorator is deprecated. Please use @agentops.trace instead.", - DeprecationWarning, - stacklevel=2, - ) - logger.info(colored("@agentops.session decorator is deprecated. Please use @agentops.trace instead.", "yellow")) + logger.warning(colored("@agentops.session decorator is deprecated. Please use @agentops.trace instead.", "yellow")) # If called as @session or @session(...) if not args or not callable(args[0]): # called with kwargs like @session(name=...) From 48291790f9921d9d02b7fe5af4ca5b79f846a05a Mon Sep 17 00:00:00 2001 From: Pratyush Shukla Date: Thu, 29 May 2025 17:58:36 +0530 Subject: [PATCH 16/20] purge `LiveSpanProcessor` from codebase --- agentops/sdk/processors.py | 83 +--- .../sdk/test_live_span_processor_lifecycle.py | 394 ------------------ 2 files changed, 1 insertion(+), 476 deletions(-) delete mode 100644 tests/unit/sdk/test_live_span_processor_lifecycle.py diff --git a/agentops/sdk/processors.py b/agentops/sdk/processors.py index d82f9aa32..87a0d2d93 100644 --- a/agentops/sdk/processors.py +++ b/agentops/sdk/processors.py @@ -4,96 +4,15 @@ This module contains processors for OpenTelemetry spans. """ -import time -from threading import Event, Lock, Thread -from typing import Dict, Optional +from typing import Optional from opentelemetry.context import Context from opentelemetry.sdk.trace import ReadableSpan, Span, SpanProcessor -from opentelemetry.sdk.trace.export import SpanExporter from agentops.logging import logger -from agentops.semconv.core import CoreAttributes from agentops.logging import upload_logfile -class LiveSpanProcessor(SpanProcessor): - def __init__(self, span_exporter: SpanExporter, **kwargs): - self.span_exporter = span_exporter - self._in_flight: Dict[int, Span] = {} - self._lock = Lock() - self._stop_event = Event() - self._export_thread = Thread(target=self._export_periodically, daemon=True) - self._export_thread.start() - - def _export_periodically(self) -> None: - while not self._stop_event.is_set(): - time.sleep(1) - with self._lock: - to_export = [self._readable_span(span) for span in self._in_flight.values()] - if to_export: - self.span_exporter.export(to_export) - - def _readable_span(self, span: Span) -> ReadableSpan: - readable = span._readable_span() - readable._end_time = time.time_ns() - readable._attributes = { - **(readable._attributes or {}), - CoreAttributes.IN_FLIGHT: True, - } - return readable - - def on_start(self, span: Span, parent_context: Optional[Context] = None) -> None: - if not span.context or not span.context.trace_flags.sampled: - return - with self._lock: - self._in_flight[span.context.span_id] = span - - def on_end(self, span: ReadableSpan) -> None: - if not span.context or not span.context.trace_flags.sampled: - return - with self._lock: - del self._in_flight[span.context.span_id] - self.span_exporter.export((span,)) - - def shutdown(self) -> None: - """Shutdown the processor with proper thread lifecycle management.""" - try: - # Signal the export thread to stop - self._stop_event.set() - - # Wait for the thread to finish with a timeout to prevent hanging - if self._export_thread.is_alive(): - self._export_thread.join(timeout=5.0) - - # If thread is still alive after timeout, log a warning - if self._export_thread.is_alive(): - logger.warning("Export thread did not shut down within timeout, continuing shutdown") - - except Exception as e: - logger.error(f"Error during thread shutdown: {e}") - - # Always attempt to shutdown the exporter - try: - self.span_exporter.shutdown() - except Exception as e: - logger.error(f"Error shutting down span exporter: {e}") - - def force_flush(self, timeout_millis: int = 30000) -> bool: - return True - - def export_in_flight_spans(self) -> None: - """Export all in-flight spans without ending them. - - This method is primarily used for testing to ensure all spans - are exported before assertions are made. - """ - with self._lock: - to_export = [self._readable_span(span) for span in self._in_flight.values()] - if to_export: - self.span_exporter.export(to_export) - - class InternalSpanProcessor(SpanProcessor): """ A span processor that prints information about spans. diff --git a/tests/unit/sdk/test_live_span_processor_lifecycle.py b/tests/unit/sdk/test_live_span_processor_lifecycle.py deleted file mode 100644 index 8938730e8..000000000 --- a/tests/unit/sdk/test_live_span_processor_lifecycle.py +++ /dev/null @@ -1,394 +0,0 @@ -""" -Tests for LiveSpanProcessor lifecycle management improvements. - -This module tests the enhanced shutdown functionality that was added to prevent -resource leaks by properly managing thread lifecycle with timeouts and error handling. -""" - -import threading -import time -from unittest.mock import MagicMock, patch, call - -from agentops.sdk.processors import LiveSpanProcessor - - -class TestLiveSpanProcessorShutdown: - """Tests for the enhanced LiveSpanProcessor shutdown functionality.""" - - def test_shutdown_normal_thread_termination(self): - """Test shutdown with normal thread termination.""" - # Create a mock exporter - mock_exporter = MagicMock() - - # Create processor - processor = LiveSpanProcessor(mock_exporter) - - # Verify thread is running - assert processor._export_thread.is_alive() - - # Shutdown - processor.shutdown() - - # Verify thread stopped - assert not processor._export_thread.is_alive() - - # Verify exporter shutdown was called - mock_exporter.shutdown.assert_called_once() - - @patch("agentops.sdk.processors.logger") - def test_shutdown_with_thread_timeout(self, mock_logger): - """Test shutdown when thread doesn't terminate within timeout.""" - # Create a mock exporter - mock_exporter = MagicMock() - - # Create processor - processor = LiveSpanProcessor(mock_exporter) - - # Mock the thread to simulate it not terminating - with patch.object(processor._export_thread, "join") as mock_join: - with patch.object(processor._export_thread, "is_alive", return_value=True): - # Shutdown - processor.shutdown() - - # Verify join was called with timeout - mock_join.assert_called_once_with(timeout=5.0) - - # Verify warning was logged - mock_logger.warning.assert_called_once_with( - "Export thread did not shut down within timeout, continuing shutdown" - ) - - # Verify exporter shutdown was still called - mock_exporter.shutdown.assert_called_once() - - @patch("agentops.sdk.processors.logger") - def test_shutdown_thread_join_exception(self, mock_logger): - """Test shutdown when thread.join() raises an exception.""" - # Create a mock exporter - mock_exporter = MagicMock() - - # Create processor - processor = LiveSpanProcessor(mock_exporter) - - # Mock the thread join to raise an exception - with patch.object(processor._export_thread, "join", side_effect=Exception("Join failed")): - # Shutdown should not raise exception - processor.shutdown() - - # Verify error was logged - mock_logger.error.assert_called_once_with("Error during thread shutdown: Join failed") - - # Verify exporter shutdown was still called - mock_exporter.shutdown.assert_called_once() - - @patch("agentops.sdk.processors.logger") - def test_shutdown_exporter_shutdown_exception(self, mock_logger): - """Test shutdown when exporter.shutdown() raises an exception.""" - # Create a mock exporter that raises exception on shutdown - mock_exporter = MagicMock() - mock_exporter.shutdown.side_effect = Exception("Exporter shutdown failed") - - # Create processor - processor = LiveSpanProcessor(mock_exporter) - - # Shutdown should not raise exception - processor.shutdown() - - # Verify error was logged - mock_logger.error.assert_called_once_with("Error shutting down span exporter: Exporter shutdown failed") - - # Verify exporter shutdown was attempted - mock_exporter.shutdown.assert_called_once() - - @patch("agentops.sdk.processors.logger") - def test_shutdown_both_thread_and_exporter_exceptions(self, mock_logger): - """Test shutdown when both thread join and exporter shutdown raise exceptions.""" - # Create a mock exporter that raises exception on shutdown - mock_exporter = MagicMock() - mock_exporter.shutdown.side_effect = Exception("Exporter shutdown failed") - - # Create processor - processor = LiveSpanProcessor(mock_exporter) - - # Mock the thread join to raise an exception - with patch.object(processor._export_thread, "join", side_effect=Exception("Join failed")): - # Shutdown should not raise exception - processor.shutdown() - - # Verify both errors were logged - expected_calls = [ - call("Error during thread shutdown: Join failed"), - call("Error shutting down span exporter: Exporter shutdown failed"), - ] - mock_logger.error.assert_has_calls(expected_calls) - - # Verify exporter shutdown was attempted - mock_exporter.shutdown.assert_called_once() - - def test_shutdown_thread_already_stopped(self): - """Test shutdown when thread is already stopped.""" - # Create a mock exporter - mock_exporter = MagicMock() - - # Create processor - processor = LiveSpanProcessor(mock_exporter) - - # Stop the thread manually first - processor._stop_event.set() - processor._export_thread.join() - - # Verify thread is stopped - assert not processor._export_thread.is_alive() - - # Shutdown should still work - processor.shutdown() - - # Verify exporter shutdown was called - mock_exporter.shutdown.assert_called_once() - - def test_shutdown_multiple_calls(self): - """Test that multiple shutdown calls don't cause issues.""" - # Create a mock exporter - mock_exporter = MagicMock() - - # Create processor - processor = LiveSpanProcessor(mock_exporter) - - # Call shutdown multiple times - processor.shutdown() - processor.shutdown() - processor.shutdown() - - # Verify exporter shutdown was called multiple times (this is expected behavior) - assert mock_exporter.shutdown.call_count == 3 - - @patch("agentops.sdk.processors.logger") - def test_shutdown_timeout_behavior(self, mock_logger): - """Test the specific timeout behavior during shutdown.""" - # Create a mock exporter - mock_exporter = MagicMock() - - # Create processor - processor = LiveSpanProcessor(mock_exporter) - - # Track the actual timeout used - original_join = processor._export_thread.join - join_timeout = None - - def capture_join_timeout(timeout=None): - nonlocal join_timeout - join_timeout = timeout - # Simulate thread not terminating within timeout - if timeout is not None: - time.sleep(0.01) # Small delay to simulate timeout - return original_join(timeout=0.01) # Quick join to actually stop thread - - with patch.object(processor._export_thread, "join", side_effect=capture_join_timeout): - with patch.object(processor._export_thread, "is_alive", return_value=True): - # Shutdown - processor.shutdown() - - # Verify the timeout was 5.0 seconds - assert join_timeout == 5.0 - - # Verify warning was logged - mock_logger.warning.assert_called_once_with( - "Export thread did not shut down within timeout, continuing shutdown" - ) - - -class TestLiveSpanProcessorThreadSafety: - """Tests for thread safety during LiveSpanProcessor operations.""" - - def test_concurrent_shutdown_calls(self): - """Test that concurrent shutdown calls are handled safely.""" - # Create a mock exporter - mock_exporter = MagicMock() - - # Create processor - processor = LiveSpanProcessor(mock_exporter) - - # Create multiple threads that call shutdown - shutdown_threads = [] - exceptions = [] - - def shutdown_worker(): - try: - processor.shutdown() - except Exception as e: - exceptions.append(e) - - # Start multiple shutdown threads - for _ in range(5): - thread = threading.Thread(target=shutdown_worker) - shutdown_threads.append(thread) - thread.start() - - # Wait for all threads to complete - for thread in shutdown_threads: - thread.join() - - # Verify no exceptions occurred - assert len(exceptions) == 0 - - # Verify exporter shutdown was called (multiple times is expected) - assert mock_exporter.shutdown.call_count >= 1 - - def test_shutdown_during_span_processing(self): - """Test shutdown while spans are being processed.""" - # Create a mock exporter that takes time to export - mock_exporter = MagicMock() - - def slow_export(spans): - time.sleep(0.1) # Simulate slow export - return True - - mock_exporter.export.side_effect = slow_export - - # Create processor - processor = LiveSpanProcessor(mock_exporter) - - # Create a mock span with proper context - mock_span = MagicMock() - mock_span.context.span_id = 12345 - mock_span.context.trace_flags.sampled = True - - # Properly initialize the span through on_start (realistic lifecycle) - processor.on_start(mock_span, None) - - # Start processing a span in a separate thread - def process_span(): - processor.on_end(mock_span) - - process_thread = threading.Thread(target=process_span) - process_thread.start() - - # Give the processing thread a moment to start - time.sleep(0.05) - - # Shutdown while processing - processor.shutdown() - - # Wait for processing thread to complete - process_thread.join(timeout=1.0) - - # Verify shutdown completed successfully - assert not processor._export_thread.is_alive() - - -class TestLiveSpanProcessorResourceManagement: - """Tests for proper resource management in LiveSpanProcessor.""" - - def test_stop_event_set_during_shutdown(self): - """Test that stop event is properly set during shutdown.""" - # Create a mock exporter - mock_exporter = MagicMock() - - # Create processor - processor = LiveSpanProcessor(mock_exporter) - - # Verify stop event is not set initially - assert not processor._stop_event.is_set() - - # Shutdown - processor.shutdown() - - # Verify stop event is set - assert processor._stop_event.is_set() - - def test_export_thread_lifecycle(self): - """Test the complete lifecycle of the export thread.""" - # Create a mock exporter - mock_exporter = MagicMock() - - # Create processor - processor = LiveSpanProcessor(mock_exporter) - - # Verify thread is created and running - assert processor._export_thread is not None - assert processor._export_thread.is_alive() - assert processor._export_thread.daemon # Should be daemon thread - - # Shutdown - processor.shutdown() - - # Verify thread is stopped - assert not processor._export_thread.is_alive() - - @patch("agentops.sdk.processors.logger") - def test_graceful_degradation_on_errors(self, mock_logger): - """Test that processor gracefully degrades when errors occur during shutdown.""" - # Create a mock exporter - mock_exporter = MagicMock() - - # Create processor - processor = LiveSpanProcessor(mock_exporter) - - # Mock everything to fail - with patch.object(processor._stop_event, "set", side_effect=Exception("Stop event failed")): - with patch.object(processor._export_thread, "join", side_effect=Exception("Join failed")): - with patch.object(processor._export_thread, "is_alive", side_effect=Exception("is_alive failed")): - mock_exporter.shutdown.side_effect = Exception("Exporter shutdown failed") - - # Shutdown should still complete without raising exceptions - processor.shutdown() - - # Verify errors were logged but execution continued - assert mock_logger.error.call_count >= 1 - - -class TestLiveSpanProcessorBackwardCompatibility: - """Tests to ensure backward compatibility of LiveSpanProcessor.""" - - def test_force_flush_unchanged(self): - """Test that force_flush behavior is unchanged.""" - # Create a mock exporter - mock_exporter = MagicMock() - - # Create processor - processor = LiveSpanProcessor(mock_exporter) - - # Test force_flush - result = processor.force_flush() - - # Should return True (unchanged behavior) - assert result is True - - def test_on_start_unchanged(self): - """Test that on_start behavior is unchanged.""" - # Create a mock exporter - mock_exporter = MagicMock() - - # Create processor - processor = LiveSpanProcessor(mock_exporter) - - # Create a mock span - mock_span = MagicMock() - - # Test on_start (should not raise exception) - processor.on_start(mock_span, None) - - def test_on_end_unchanged(self): - """Test that on_end behavior is unchanged.""" - # Create a mock exporter - mock_exporter = MagicMock() - - # Create processor - processor = LiveSpanProcessor(mock_exporter) - - # Create a mock span with proper context - mock_span = MagicMock() - mock_span.context.span_id = 12345 - mock_span.context.trace_flags.sampled = True - - # Add the span to _in_flight first (simulating on_start) - processor._in_flight[mock_span.context.span_id] = mock_span - - # Test on_end - processor.on_end(mock_span) - - # Verify exporter.export was called - mock_exporter.export.assert_called_once_with((mock_span,)) - - # Verify span was removed from _in_flight - assert mock_span.context.span_id not in processor._in_flight From c184be2079b3a1677eae27328abad951f574950b Mon Sep 17 00:00:00 2001 From: Pratyush Shukla Date: Thu, 29 May 2025 23:10:12 +0530 Subject: [PATCH 17/20] add `dev-llm` to make @Dwij1704 happy :) --- pyproject.toml | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index ec394477e..13be9d9d5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -90,6 +90,20 @@ dev = [ "ipython>=8.18.1", ] +dev-llm = [ + # LLM Provider Packages + "openai>=1.82.1", + "anthropic>=0.52.1", + "ibm-watsonx-ai>=1.3.23", + "google-genai>=1.17.0", + # Agentic Framework Packages + "crewai>=0.121.1", + "autogen-agentchat>=0.5.7", # microsoft autogen, the one where it all began + "ag2>=0.9.1.post0", # same as autogen package, references ag2 + "openai-agents>=0.0.16", + "google-adk>=1.1.1", +] + [project.urls] Homepage = "https://github.com/AgentOps-AI/agentops" Issues = "https://github.com/AgentOps-AI/agentops/issues" @@ -190,4 +204,3 @@ exclude = [ [tool.hatch.metadata] allow-direct-references = true - From 9ecfeb9bac4731b4dbe1f001b8ca6144d2dde279 Mon Sep 17 00:00:00 2001 From: Pratyush Shukla Date: Thu, 29 May 2025 23:46:41 +0530 Subject: [PATCH 18/20] Revert "add `dev-llm` to make @Dwij1704 happy :)" This reverts commit c184be2079b3a1677eae27328abad951f574950b. --- pyproject.toml | 15 +-------------- 1 file changed, 1 insertion(+), 14 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 13be9d9d5..ec394477e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -90,20 +90,6 @@ dev = [ "ipython>=8.18.1", ] -dev-llm = [ - # LLM Provider Packages - "openai>=1.82.1", - "anthropic>=0.52.1", - "ibm-watsonx-ai>=1.3.23", - "google-genai>=1.17.0", - # Agentic Framework Packages - "crewai>=0.121.1", - "autogen-agentchat>=0.5.7", # microsoft autogen, the one where it all began - "ag2>=0.9.1.post0", # same as autogen package, references ag2 - "openai-agents>=0.0.16", - "google-adk>=1.1.1", -] - [project.urls] Homepage = "https://github.com/AgentOps-AI/agentops" Issues = "https://github.com/AgentOps-AI/agentops/issues" @@ -204,3 +190,4 @@ exclude = [ [tool.hatch.metadata] allow-direct-references = true + From 21bf90d851a1ec1c9c39cd909ec7352d1a6094f8 Mon Sep 17 00:00:00 2001 From: Pratyush Shukla Date: Tue, 3 Jun 2025 01:16:16 +0530 Subject: [PATCH 19/20] import `functools` --- agentops/sdk/decorators/__init__.py | 1 + 1 file changed, 1 insertion(+) diff --git a/agentops/sdk/decorators/__init__.py b/agentops/sdk/decorators/__init__.py index a30598389..3296231b7 100644 --- a/agentops/sdk/decorators/__init__.py +++ b/agentops/sdk/decorators/__init__.py @@ -3,6 +3,7 @@ Provides @trace for creating trace-level spans (sessions) and other decorators for nested spans. """ +import functools from termcolor import colored from agentops.logging import logger From dc3573062974ec7cfb3fe60e5cafa67e65799fe6 Mon Sep 17 00:00:00 2001 From: Pratyush Shukla Date: Thu, 5 Jun 2025 04:11:34 +0530 Subject: [PATCH 20/20] @bboynton97 use the new tracer variable pls #1030 --- agentops/__init__.py | 8 +- agentops/client/client.py | 44 +++---- agentops/legacy/__init__.py | 78 ++++++------ agentops/sdk/core.py | 28 ++--- agentops/sdk/decorators/factory.py | 40 +++--- .../unit/sdk/test_internal_span_processor.py | 24 ++-- tests/unit/test_session.py | 116 +++++++++--------- 7 files changed, 166 insertions(+), 172 deletions(-) diff --git a/agentops/__init__.py b/agentops/__init__.py index 3b252759a..52f56e0bf 100755 --- a/agentops/__init__.py +++ b/agentops/__init__.py @@ -208,20 +208,20 @@ def start_trace( return tracing_core.start_trace(trace_name=trace_name, tags=tags) -def end_trace(trace_context: Optional[TraceContext] = None, end_state: str = "Success") -> None: +def end_trace(tracer: Optional[TraceContext] = None, end_state: str = "Success") -> None: """ Ends a trace (its root span) and finalizes it. - If no trace_context is provided, ends all active session spans. + If no tracer is provided, ends all active session spans. Args: - trace_context: The TraceContext object returned by start_trace. If None, ends all active traces. + tracer: The TraceContext object returned by start_trace. If None, ends all active traces. end_state: The final state of the trace (e.g., "Success", "Failure", "Error"). """ tracing_core = TracingCore.get_instance() if not tracing_core.initialized: logger.warning("AgentOps SDK not initialized. Cannot end trace.") return - tracing_core.end_trace(trace_context=trace_context, end_state=end_state) + tracing_core.end_trace(tracer=tracer, end_state=end_state) __all__ = [ diff --git a/agentops/client/client.py b/agentops/client/client.py index 2ceacd90e..9666e7a2c 100644 --- a/agentops/client/client.py +++ b/agentops/client/client.py @@ -11,7 +11,7 @@ from agentops.legacy import Session # Global variables to hold the client's auto-started trace and its legacy session wrapper -_client_init_trace_context: Optional[TraceContext] = None +_client_init_tracer: Optional[TraceContext] = None _client_legacy_session_for_init_trace: Optional[Session] = None # Single atexit handler registered flag @@ -20,18 +20,18 @@ def _end_init_trace_atexit(): """Global atexit handler to end the client's auto-initialized trace during shutdown.""" - global _client_init_trace_context, _client_legacy_session_for_init_trace - if _client_init_trace_context is not None: + global _client_init_tracer, _client_legacy_session_for_init_trace + if _client_init_tracer is not None: logger.debug("Auto-ending client's init trace during shutdown.") try: # Use TracingCore to end the trace directly tracing_core = TracingCore.get_instance() - if tracing_core.initialized and _client_init_trace_context.span.is_recording(): - tracing_core.end_trace(_client_init_trace_context, end_state="Shutdown") + if tracing_core.initialized and _client_init_tracer.span.is_recording(): + tracing_core.end_trace(_client_init_tracer, end_state="Shutdown") except Exception as e: logger.warning(f"Error ending client's init trace during shutdown: {e}") finally: - _client_init_trace_context = None + _client_init_tracer = None _client_legacy_session_for_init_trace = None # Clear its legacy wrapper too @@ -40,7 +40,7 @@ class Client: config: Config _initialized: bool - _init_trace_context: Optional[TraceContext] = None # Stores the context of the auto-started trace + _init_tracer: Optional[TraceContext] = None # Stores the context of the auto-started trace _legacy_session_for_init_trace: Optional[ Session ] = None # Stores the legacy Session wrapper for the auto-started trace @@ -53,7 +53,7 @@ def __new__(cls, *args: Any, **kwargs: Any) -> "Client": if cls.__instance is None: cls.__instance = super(Client, cls).__new__(cls) # Initialize instance variables that should only be set once per instance - cls.__instance._init_trace_context = None + cls.__instance._init_tracer = None cls.__instance._legacy_session_for_init_trace = None return cls.__instance @@ -66,7 +66,7 @@ def __init__(self): ): # Ensure init logic runs only once per actual initialization intent self.config = Config() # Initialize config here for the instance self._initialized = False - # self._init_trace_context = None # Already done in __new__ + # self._init_tracer = None # Already done in __new__ # self._legacy_session_for_init_trace = None # Already done in __new__ def init(self, **kwargs: Any) -> None: # Return type updated to None @@ -81,10 +81,10 @@ def init(self, **kwargs: Any) -> None: # Return type updated to None logger.warning("AgentOps Client being re-initialized with a different API key. This is unusual.") # Reset initialization status to allow re-init with new key/config self._initialized = False - if self._init_trace_context and self._init_trace_context.span.is_recording(): + if self._init_tracer and self._init_tracer.span.is_recording(): logger.warning("Ending previously auto-started trace due to re-initialization.") - TracingCore.get_instance().end_trace(self._init_trace_context, "Reinitialized") - self._init_trace_context = None + TracingCore.get_instance().end_trace(self._init_tracer, "Reinitialized") + self._init_tracer = None self._legacy_session_for_init_trace = None if self.initialized: @@ -133,31 +133,31 @@ def init(self, **kwargs: Any) -> None: # Return type updated to None # Auto-start trace if configured if self.config.auto_start_session: - if self._init_trace_context is None or not self._init_trace_context.span.is_recording(): + if self._init_tracer is None or not self._init_tracer.span.is_recording(): logger.debug("Auto-starting init trace.") trace_name = self.config.trace_name or "default" - self._init_trace_context = tracing_core.start_trace( + self._init_tracer = tracing_core.start_trace( trace_name=trace_name, tags=list(self.config.default_tags) if self.config.default_tags else None, is_init_trace=True, ) - if self._init_trace_context: - self._legacy_session_for_init_trace = Session(self._init_trace_context) + if self._init_tracer: + self._legacy_session_for_init_trace = Session(self._init_tracer) # For backward compatibility, also update the global references in legacy and client modules # These globals are what old code might have been using via agentops.legacy.get_session() or similar indirect access. - global _client_init_trace_context, _client_legacy_session_for_init_trace - _client_init_trace_context = self._init_trace_context + global _client_init_tracer, _client_legacy_session_for_init_trace + _client_init_tracer = self._init_tracer _client_legacy_session_for_init_trace = self._legacy_session_for_init_trace - # Update legacy module's _current_session and _current_trace_context + # Update legacy module's _current_session and _current_tracer # This is tricky; direct access to another module's globals is not ideal. # Prefer explicit calls if possible, but for maximum BC: try: import agentops.legacy agentops.legacy._current_session = self._legacy_session_for_init_trace - agentops.legacy._current_trace_context = self._init_trace_context + agentops.legacy._current_tracer = self._init_tracer except ImportError: pass # Should not happen @@ -196,7 +196,7 @@ def initialized(self, value: bool) -> None: # Remove the old __instance = None at the end of the class definition if it's a repeat # __instance = None # This was a class variable, should be defined once - # Make _init_trace_context and _legacy_session_for_init_trace accessible + # Make _init_tracer and _legacy_session_for_init_trace accessible # to the atexit handler if it becomes a static/class method or needs access # For now, the atexit handler is global and uses global vars copied from these. @@ -210,4 +210,4 @@ def initialized(self, value: bool) -> None: # For now, _client_legacy_session_for_init_trace is the primary global for the auto-init trace's legacy Session. # Remove the old global _active_session defined at the top of this file if it's no longer the primary mechanism. -# The new globals _client_init_trace_context and _client_legacy_session_for_init_trace handle the auto-init trace. +# The new globals _client_init_tracer and _client_legacy_session_for_init_trace handle the auto-init trace. diff --git a/agentops/legacy/__init__.py b/agentops/legacy/__init__.py index ff32beb8b..213644d98 100644 --- a/agentops/legacy/__init__.py +++ b/agentops/legacy/__init__.py @@ -15,7 +15,7 @@ from agentops.sdk.core import TracingCore, TraceContext _current_session: Optional["Session"] = None -_current_trace_context: Optional[TraceContext] = None +_current_tracer: Optional[TraceContext] = None class Session: @@ -28,22 +28,22 @@ class Session: - end_session(): Called when a CrewAI run completes """ - def __init__(self, trace_context: Optional[TraceContext]): - self.trace_context = trace_context + def __init__(self, tracer: Optional[TraceContext]): + self.tracer = tracer @property def span(self) -> Optional[Any]: - return self.trace_context.span if self.trace_context else None + return self.tracer.span if self.tracer else None @property def token(self) -> Optional[Any]: - return self.trace_context.token if self.trace_context else None + return self.tracer.token if self.tracer else None def __del__(self): - if self.trace_context and self.trace_context.span and self.trace_context.span.is_recording(): - if not self.trace_context.is_init_trace: + if self.tracer and self.tracer.span and self.tracer.span.is_recording(): + if not self.tracer.is_init_trace: logger.warning( - f"Legacy Session (trace ID: {self.trace_context.span.get_span_context().span_id}) \ + f"Legacy Session (trace ID: {self.tracer.span.get_span_context().span_id}) \ was garbage collected but its trace might still be recording. Ensure legacy sessions are ended with end_session()." ) @@ -67,7 +67,7 @@ def start_session( @deprecated Use agentops.start_trace() instead. Starts a legacy AgentOps session. Calls TracingCore.start_trace internally. """ - global _current_session, _current_trace_context + global _current_session, _current_tracer tracing_core = TracingCore.get_instance() if not tracing_core.initialized: @@ -79,33 +79,33 @@ def start_session( logger.warning("AgentOps client init failed during legacy start_session. Creating dummy session.") dummy_session = Session(None) _current_session = dummy_session - _current_trace_context = None + _current_tracer = None return dummy_session except Exception as e: logger.warning(f"AgentOps client init failed: {str(e)}. Creating dummy session.") dummy_session = Session(None) _current_session = dummy_session - _current_trace_context = None + _current_tracer = None return dummy_session - trace_context = tracing_core.start_trace(trace_name="session", tags=tags) - if trace_context is None: + tracer = tracing_core.start_trace(trace_name="session", tags=tags) + if tracer is None: logger.error("Failed to start trace via TracingCore. Returning dummy session.") dummy_session = Session(None) _current_session = dummy_session - _current_trace_context = None + _current_tracer = None return dummy_session - session_obj = Session(trace_context) + session_obj = Session(tracer) _current_session = session_obj - _current_trace_context = trace_context + _current_tracer = tracer try: import agentops.client.client agentops.client.client._active_session = session_obj # type: ignore - if hasattr(agentops.client.client, "_active_trace_context"): - agentops.client.client._active_trace_context = trace_context # type: ignore + if hasattr(agentops.client.client, "_active_tracer"): + agentops.client.client._active_tracer = tracer # type: ignore except (ImportError, AttributeError): pass return session_obj @@ -128,61 +128,55 @@ def end_session(session_or_status: Any = None, **kwargs: Any) -> None: Ends a legacy AgentOps session. Calls TracingCore.end_trace internally. Supports multiple calling patterns for backward compatibility. """ - global _current_session, _current_trace_context + global _current_session, _current_tracer tracing_core = TracingCore.get_instance() if not tracing_core.initialized: logger.debug("Ignoring end_session: TracingCore not initialized.") return - target_trace_context: Optional[TraceContext] = None + target_tracer: Optional[TraceContext] = None end_state_from_args = "Success" extra_attributes = kwargs.copy() if isinstance(session_or_status, Session): - target_trace_context = session_or_status.trace_context + target_tracer = session_or_status.tracer if "end_state" in extra_attributes: end_state_from_args = str(extra_attributes.pop("end_state")) elif isinstance(session_or_status, str): end_state_from_args = session_or_status - target_trace_context = _current_trace_context + target_tracer = _current_tracer if "end_state" in extra_attributes: end_state_from_args = str(extra_attributes.pop("end_state")) elif session_or_status is None and kwargs: - target_trace_context = _current_trace_context + target_tracer = _current_tracer if "end_state" in extra_attributes: end_state_from_args = str(extra_attributes.pop("end_state")) else: - target_trace_context = _current_trace_context + target_tracer = _current_tracer if "end_state" in extra_attributes: end_state_from_args = str(extra_attributes.pop("end_state")) - if not target_trace_context: + if not target_tracer: logger.warning("end_session called but no active trace context found.") return - if target_trace_context.span and extra_attributes: - _set_span_attributes(target_trace_context.span, extra_attributes) + if target_tracer.span and extra_attributes: + _set_span_attributes(target_tracer.span, extra_attributes) - tracing_core.end_trace(target_trace_context, end_state=end_state_from_args) + tracing_core.end_trace(target_tracer, end_state=end_state_from_args) - if target_trace_context is _current_trace_context: + if target_tracer is _current_tracer: _current_session = None - _current_trace_context = None + _current_tracer = None try: import agentops.client.client - if ( - hasattr(agentops.client.client, "_active_trace_context") - and agentops.client.client._active_trace_context is target_trace_context - ): # type: ignore - agentops.client.client._active_trace_context = None # type: ignore + if hasattr(agentops.client.client, "_active_tracer") and agentops.client.client._active_tracer is target_tracer: # type: ignore + agentops.client.client._active_tracer = None # type: ignore agentops.client.client._active_session = None # type: ignore - elif ( - hasattr(agentops.client.client, "_init_trace_context") - and agentops.client.client._init_trace_context is target_trace_context - ): # type: ignore + elif hasattr(agentops.client.client, "_init_tracer") and agentops.client.client._init_tracer is target_tracer: # type: ignore logger.debug("Legacy end_session called on client's auto-init trace. This is unusual.") except (ImportError, AttributeError): pass @@ -198,12 +192,12 @@ def end_all_sessions() -> None: return # Use the new end_trace functionality to end all active traces - tracing_core.end_trace(trace_context=None, end_state="Success") + tracing_core.end_trace(tracer=None, end_state="Success") # Clear legacy global state - global _current_session, _current_trace_context + global _current_session, _current_tracer _current_session = None - _current_trace_context = None + _current_tracer = None def ToolEvent(*args: Any, **kwargs: Any) -> None: diff --git a/agentops/sdk/core.py b/agentops/sdk/core.py index c6e91f0da..a01142cc5 100644 --- a/agentops/sdk/core.py +++ b/agentops/sdk/core.py @@ -451,31 +451,31 @@ def start_trace( except Exception as e: logger.warning(f"Failed to log trace URL for '{trace_name}': {e}") - trace_context = TraceContext(span, token=context_token, is_init_trace=is_init_trace) + tracer = TraceContext(span, token=context_token, is_init_trace=is_init_trace) # Track the active trace with self._traces_lock: trace_id = format_trace_id(span.get_span_context().trace_id) - self._active_traces[trace_id] = trace_context + self._active_traces[trace_id] = tracer logger.debug(f"Added trace {trace_id} to active traces. Total active: {len(self._active_traces)}") - return trace_context + return tracer - def end_trace(self, trace_context: Optional[TraceContext] = None, end_state: str = "Success") -> None: + def end_trace(self, tracer: Optional[TraceContext] = None, end_state: str = "Success") -> None: """ Ends a trace (its root span) and finalizes it. - If no trace_context is provided, ends all active session spans. + If no tracer is provided, ends all active session spans. Args: - trace_context: The TraceContext object returned by start_trace. If None, ends all active traces. + tracer: The TraceContext object returned by start_trace. If None, ends all active traces. end_state: The final state of the trace (e.g., "Success", "Failure", "Error"). """ if not self.initialized: logger.warning("TracingCore not initialized. Cannot end trace.") return - # If no specific trace_context provided, end all active traces - if trace_context is None: + # If no specific tracer provided, end all active traces + if tracer is None: with self._traces_lock: active_traces = list(self._active_traces.values()) logger.debug(f"Ending all {len(active_traces)} active traces with state: {end_state}") @@ -485,24 +485,24 @@ def end_trace(self, trace_context: Optional[TraceContext] = None, end_state: str return # End specific trace - self._end_single_trace(trace_context, end_state) + self._end_single_trace(tracer, end_state) - def _end_single_trace(self, trace_context: TraceContext, end_state: str) -> None: + def _end_single_trace(self, tracer: TraceContext, end_state: str) -> None: """ Internal method to end a single trace. Args: - trace_context: The TraceContext object to end. + tracer: The TraceContext object to end. end_state: The final state of the trace. """ from agentops.sdk.decorators.utility import _finalize_span # Local import - if not trace_context or not trace_context.span: + if not tracer or not tracer.span: logger.warning("Invalid TraceContext or span provided to end trace.") return - span = trace_context.span - token = trace_context.token + span = tracer.span + token = tracer.token trace_id = format_trace_id(span.get_span_context().trace_id) logger.debug(f"Ending trace with span ID: {span.get_span_context().span_id}, end_state: {end_state}") diff --git a/agentops/sdk/decorators/factory.py b/agentops/sdk/decorators/factory.py index 4fc03601a..aab690e79 100644 --- a/agentops/sdk/decorators/factory.py +++ b/agentops/sdk/decorators/factory.py @@ -25,19 +25,19 @@ def _handle_session_trace_sync( operation_name: str, tags: Optional[Union[list, dict]], wrapped_func: Callable, args: tuple, kwargs: Dict[str, Any] ) -> Any: """Helper function to handle SESSION trace lifecycle for sync functions with proper cleanup""" - trace_context: Optional[TraceContext] = None + tracer: Optional[TraceContext] = None trace_ended = False try: # Start trace - trace_context = TracingCore.get_instance().start_trace(trace_name=operation_name, tags=tags) - if not trace_context: + tracer = TracingCore.get_instance().start_trace(trace_name=operation_name, tags=tags) + if not tracer: logger.error(f"Failed to start trace for @trace '{operation_name}'. Executing without trace.") return wrapped_func(*args, **kwargs) # Record input try: - _record_entity_input(trace_context.span, args, kwargs) + _record_entity_input(tracer.span, args, kwargs) except Exception as e: logger.warning(f"Input recording failed for @trace '{operation_name}': {e}") @@ -46,20 +46,20 @@ def _handle_session_trace_sync( # Record output try: - _record_entity_output(trace_context.span, result) + _record_entity_output(tracer.span, result) except Exception as e: logger.warning(f"Output recording failed for @trace '{operation_name}': {e}") # End trace successfully - TracingCore.get_instance().end_trace(trace_context, "Success") + TracingCore.get_instance().end_trace(tracer, "Success") trace_ended = True return result except Exception: # End trace with failure if not already ended - if trace_context and not trace_ended: + if tracer and not trace_ended: try: - TracingCore.get_instance().end_trace(trace_context, "Failure") + TracingCore.get_instance().end_trace(tracer, "Failure") trace_ended = True except Exception as cleanup_error: logger.error(f"Failed to end trace during exception cleanup: {cleanup_error}") @@ -67,9 +67,9 @@ def _handle_session_trace_sync( finally: # Safety net - only end if not already ended and still recording - if trace_context and not trace_ended and trace_context.span.is_recording(): + if tracer and not trace_ended and tracer.span.is_recording(): try: - TracingCore.get_instance().end_trace(trace_context, "Unknown") + TracingCore.get_instance().end_trace(tracer, "Unknown") logger.warning(f"Trace for @trace '{operation_name}' ended in finally block as 'Unknown'.") except Exception as cleanup_error: logger.error(f"Failed to end trace in finally block: {cleanup_error}") @@ -79,19 +79,19 @@ async def _handle_session_trace_async( operation_name: str, tags: Optional[Union[list, dict]], wrapped_func: Callable, args: tuple, kwargs: Dict[str, Any] ) -> Any: """Helper function to handle SESSION trace lifecycle for async functions with proper cleanup""" - trace_context: Optional[TraceContext] = None + tracer: Optional[TraceContext] = None trace_ended = False try: # Start trace - trace_context = TracingCore.get_instance().start_trace(trace_name=operation_name, tags=tags) - if not trace_context: + tracer = TracingCore.get_instance().start_trace(trace_name=operation_name, tags=tags) + if not tracer: logger.error(f"Failed to start trace for @trace '{operation_name}'. Executing without trace.") return await wrapped_func(*args, **kwargs) # Record input try: - _record_entity_input(trace_context.span, args, kwargs) + _record_entity_input(tracer.span, args, kwargs) except Exception as e: logger.warning(f"Input recording failed for @trace '{operation_name}': {e}") @@ -100,20 +100,20 @@ async def _handle_session_trace_async( # Record output try: - _record_entity_output(trace_context.span, result) + _record_entity_output(tracer.span, result) except Exception as e: logger.warning(f"Output recording failed for @trace '{operation_name}': {e}") # End trace successfully - TracingCore.get_instance().end_trace(trace_context, "Success") + TracingCore.get_instance().end_trace(tracer, "Success") trace_ended = True return result except Exception: # End trace with failure if not already ended - if trace_context and not trace_ended: + if tracer and not trace_ended: try: - TracingCore.get_instance().end_trace(trace_context, "Failure") + TracingCore.get_instance().end_trace(tracer, "Failure") trace_ended = True except Exception as cleanup_error: logger.error(f"Failed to end trace during exception cleanup: {cleanup_error}") @@ -121,9 +121,9 @@ async def _handle_session_trace_async( finally: # Safety net - only end if not already ended and still recording - if trace_context and not trace_ended and trace_context.span.is_recording(): + if tracer and not trace_ended and tracer.span.is_recording(): try: - TracingCore.get_instance().end_trace(trace_context, "Unknown") + TracingCore.get_instance().end_trace(tracer, "Unknown") logger.warning(f"Trace for @trace '{operation_name}' ended in finally block as 'Unknown'.") except Exception as cleanup_error: logger.error(f"Failed to end trace in finally block: {cleanup_error}") diff --git a/tests/unit/sdk/test_internal_span_processor.py b/tests/unit/sdk/test_internal_span_processor.py index dc397d3ad..d163f84eb 100644 --- a/tests/unit/sdk/test_internal_span_processor.py +++ b/tests/unit/sdk/test_internal_span_processor.py @@ -32,12 +32,12 @@ def test_start_trace_logs_url(self, mock_make_span, mock_log_trace_url): mock_make_span.return_value = (mock_span, mock_context, mock_token) # Call start_trace - trace_context = self.tracing_core.start_trace(trace_name="test_trace") + tracer = self.tracing_core.start_trace(trace_name="test_trace") # Assert that log_trace_url was called with the span and title mock_log_trace_url.assert_called_once_with(mock_span, title="test_trace") - self.assertIsInstance(trace_context, TraceContext) - self.assertEqual(trace_context.span, mock_span) + self.assertIsInstance(tracer, TraceContext) + self.assertEqual(tracer.span, mock_span) @patch("agentops.sdk.core.log_trace_url") @patch("agentops.sdk.decorators.utility._finalize_span") @@ -48,10 +48,10 @@ def test_end_trace_logs_url(self, mock_finalize_span, mock_log_trace_url): mock_span.name = "test_trace" mock_span.get_span_context.return_value.span_id = 12345 mock_token = MagicMock() - trace_context = TraceContext(mock_span, mock_token) + tracer = TraceContext(mock_span, mock_token) # Call end_trace - self.tracing_core.end_trace(trace_context, "Success") + self.tracing_core.end_trace(tracer, "Success") # Assert that log_trace_url was called with the span and title mock_log_trace_url.assert_called_once_with(mock_span, title="test_trace") @@ -71,11 +71,11 @@ def test_start_trace_url_logging_failure_does_not_break_trace(self, mock_make_sp mock_log_trace_url.side_effect = Exception("URL logging failed") # Call start_trace - should not raise exception - trace_context = self.tracing_core.start_trace(trace_name="test_trace") + tracer = self.tracing_core.start_trace(trace_name="test_trace") # Assert that trace was still created successfully - self.assertIsInstance(trace_context, TraceContext) - self.assertEqual(trace_context.span, mock_span) + self.assertIsInstance(tracer, TraceContext) + self.assertEqual(tracer.span, mock_span) mock_log_trace_url.assert_called_once_with(mock_span, title="test_trace") @patch("agentops.sdk.core.log_trace_url") @@ -87,13 +87,13 @@ def test_end_trace_url_logging_failure_does_not_break_trace(self, mock_finalize_ mock_span.name = "test_trace" mock_span.get_span_context.return_value.span_id = 12345 mock_token = MagicMock() - trace_context = TraceContext(mock_span, mock_token) + tracer = TraceContext(mock_span, mock_token) # Make log_trace_url raise an exception mock_log_trace_url.side_effect = Exception("URL logging failed") # Call end_trace - should not raise exception - self.tracing_core.end_trace(trace_context, "Success") + self.tracing_core.end_trace(tracer, "Success") # Assert that finalize_span was still called mock_finalize_span.assert_called_once() @@ -111,11 +111,11 @@ def test_start_trace_with_tags_logs_url(self, mock_make_span, mock_log_trace_url mock_make_span.return_value = (mock_span, mock_context, mock_token) # Call start_trace with tags - trace_context = self.tracing_core.start_trace(trace_name="tagged_trace", tags=["test", "integration"]) + tracer = self.tracing_core.start_trace(trace_name="tagged_trace", tags=["test", "integration"]) # Assert that log_trace_url was called with the span and title mock_log_trace_url.assert_called_once_with(mock_span, title="tagged_trace") - self.assertIsInstance(trace_context, TraceContext) + self.assertIsInstance(tracer, TraceContext) class TestSessionDecoratorURLLogging(unittest.TestCase): diff --git a/tests/unit/test_session.py b/tests/unit/test_session.py index bf20aa764..803548ce9 100644 --- a/tests/unit/test_session.py +++ b/tests/unit/test_session.py @@ -31,16 +31,16 @@ def mock_api_client(): @pytest.fixture(scope="function") -def mock_trace_context(): +def mock_tracer(): """Mock the TraceContext creation""" mock_span = MagicMock() mock_token = MagicMock() - mock_trace_context_instance = MagicMock() - mock_trace_context_instance.span = mock_span - mock_trace_context_instance.token = mock_token - mock_trace_context_instance.is_init_trace = False + mock_tracer_instance = MagicMock() + mock_tracer_instance.span = mock_span + mock_tracer_instance.token = mock_token + mock_tracer_instance.is_init_trace = False - return mock_trace_context_instance + return mock_tracer_instance @pytest.fixture(scope="function") @@ -52,22 +52,22 @@ def reset_client(): agentops._client = agentops.Client() # Reset all client state agentops._client._initialized = False - agentops._client._init_trace_context = None + agentops._client._init_tracer = None agentops._client._legacy_session_for_init_trace = None yield # Clean up after test try: if hasattr(agentops._client, "_initialized"): agentops._client._initialized = False - if hasattr(agentops._client, "_init_trace_context"): - agentops._client._init_trace_context = None + if hasattr(agentops._client, "_init_tracer"): + agentops._client._init_tracer = None if hasattr(agentops._client, "_legacy_session_for_init_trace"): agentops._client._legacy_session_for_init_trace = None except: pass -def test_explicit_init_then_explicit_start_trace(mock_tracing_core, mock_api_client, mock_trace_context, reset_client): +def test_explicit_init_then_explicit_start_trace(mock_tracing_core, mock_api_client, mock_tracer, reset_client): """Test explicitly initializing followed by explicitly starting a trace""" import agentops @@ -78,23 +78,23 @@ def test_explicit_init_then_explicit_start_trace(mock_tracing_core, mock_api_cli mock_tracing_core.start_trace.assert_not_called() # Mock the start_trace method to return our mock trace context - mock_tracing_core.start_trace.return_value = mock_trace_context + mock_tracing_core.start_trace.return_value = mock_tracer # Explicitly start a trace - trace_context = agentops.start_trace(trace_name="test_trace", tags=["test"]) + tracer = agentops.start_trace(trace_name="test_trace", tags=["test"]) # Verify the trace was created mock_tracing_core.start_trace.assert_called_once_with(trace_name="test_trace", tags=["test"]) - assert trace_context == mock_trace_context + assert tracer == mock_tracer -def test_auto_start_session_true(mock_tracing_core, mock_api_client, mock_trace_context, reset_client): +def test_auto_start_session_true(mock_tracing_core, mock_api_client, mock_tracer, reset_client): """Test initializing with auto_start_session=True""" import agentops from agentops.legacy import Session # Mock the start_trace method to return our mock trace context - mock_tracing_core.start_trace.return_value = mock_trace_context + mock_tracing_core.start_trace.return_value = mock_tracer # Initialize with auto_start_session=True result = agentops.init(api_key="test-api-key", auto_start_session=True) @@ -103,16 +103,16 @@ def test_auto_start_session_true(mock_tracing_core, mock_api_client, mock_trace_ mock_tracing_core.start_trace.assert_called_once() # init() should return a Session object when auto-starting a session assert isinstance(result, Session) - assert result.trace_context == mock_trace_context + assert result.tracer == mock_tracer -def test_auto_start_session_default(mock_tracing_core, mock_api_client, mock_trace_context, reset_client): +def test_auto_start_session_default(mock_tracing_core, mock_api_client, mock_tracer, reset_client): """Test initializing with default auto_start_session behavior""" import agentops from agentops.legacy import Session # Mock the start_trace method to return our mock trace context - mock_tracing_core.start_trace.return_value = mock_trace_context + mock_tracing_core.start_trace.return_value = mock_tracer # Initialize without explicitly setting auto_start_session (defaults to True) result = agentops.init(api_key="test-api-key") @@ -121,7 +121,7 @@ def test_auto_start_session_default(mock_tracing_core, mock_api_client, mock_tra assert agentops._client.initialized # Since auto_start_session defaults to True, init() should return a Session object assert isinstance(result, Session) - assert result.trace_context == mock_trace_context + assert result.tracer == mock_tracer def test_start_trace_without_init(): @@ -156,18 +156,18 @@ def side_effect(): assert result is None -def test_end_trace(mock_tracing_core, mock_trace_context): +def test_end_trace(mock_tracing_core, mock_tracer): """Test ending a trace""" import agentops # End the trace - agentops.end_trace(mock_trace_context, end_state="Success") + agentops.end_trace(mock_tracer, end_state="Success") # Verify end_trace was called on TracingCore - mock_tracing_core.end_trace.assert_called_once_with(trace_context=mock_trace_context, end_state="Success") + mock_tracing_core.end_trace.assert_called_once_with(tracer=mock_tracer, end_state="Success") -def test_session_decorator_creates_trace(mock_tracing_core, mock_api_client, mock_trace_context, reset_client): +def test_session_decorator_creates_trace(mock_tracing_core, mock_api_client, mock_tracer, reset_client): """Test that the @session decorator creates a trace-level span""" import agentops from agentops.sdk.decorators import session @@ -176,7 +176,7 @@ def test_session_decorator_creates_trace(mock_tracing_core, mock_api_client, moc agentops.init(api_key="test-api-key", auto_start_session=False) # Mock the start_trace and end_trace methods - mock_tracing_core.start_trace.return_value = mock_trace_context + mock_tracing_core.start_trace.return_value = mock_tracer @session(name="test_session", tags=["decorator_test"]) def test_function(): @@ -194,7 +194,7 @@ def test_function(): assert mock_tracing_core.end_trace.call_count >= 1 -def test_session_decorator_with_exception(mock_tracing_core, mock_api_client, mock_trace_context, reset_client): +def test_session_decorator_with_exception(mock_tracing_core, mock_api_client, mock_tracer, reset_client): """Test that the @session decorator handles exceptions properly""" import agentops from agentops.sdk.decorators import session @@ -203,7 +203,7 @@ def test_session_decorator_with_exception(mock_tracing_core, mock_api_client, mo agentops.init(api_key="test-api-key", auto_start_session=False) # Mock the start_trace method - mock_tracing_core.start_trace.return_value = mock_trace_context + mock_tracing_core.start_trace.return_value = mock_tracer @session(name="failing_session") def failing_function(): @@ -219,7 +219,7 @@ def failing_function(): assert mock_tracing_core.end_trace.call_count >= 1 -def test_legacy_start_session_compatibility(mock_tracing_core, mock_api_client, mock_trace_context, reset_client): +def test_legacy_start_session_compatibility(mock_tracing_core, mock_api_client, mock_tracer, reset_client): """Test that legacy start_session still works and calls TracingCore.start_trace""" import agentops from agentops.legacy import Session @@ -228,21 +228,21 @@ def test_legacy_start_session_compatibility(mock_tracing_core, mock_api_client, agentops.init(api_key="test-api-key", auto_start_session=False) # Mock the start_trace method - mock_tracing_core.start_trace.return_value = mock_trace_context + mock_tracing_core.start_trace.return_value = mock_tracer # Start a legacy session session = agentops.start_session(tags=["legacy_test"]) # Verify the session was created assert isinstance(session, Session) - assert session.trace_context == mock_trace_context + assert session.tracer == mock_tracer # Verify that TracingCore.start_trace was called # Note: May be called multiple times due to initialization assert mock_tracing_core.start_trace.call_count >= 1 -def test_legacy_end_session_compatibility(mock_tracing_core, mock_api_client, mock_trace_context, reset_client): +def test_legacy_end_session_compatibility(mock_tracing_core, mock_api_client, mock_tracer, reset_client): """Test that legacy end_session still works and calls TracingCore.end_trace""" import agentops from agentops.legacy import Session @@ -251,13 +251,13 @@ def test_legacy_end_session_compatibility(mock_tracing_core, mock_api_client, mo agentops.init(api_key="test-api-key", auto_start_session=False) # Create a legacy session object - session = Session(mock_trace_context) + session = Session(mock_tracer) # End the session agentops.end_session(session) # Verify that TracingCore.end_trace was called - mock_tracing_core.end_trace.assert_called_once_with(mock_trace_context, end_state="Success") + mock_tracing_core.end_trace.assert_called_once_with(mock_tracer, end_state="Success") def test_no_double_init(mock_tracing_core, mock_api_client, reset_client): @@ -277,12 +277,12 @@ def test_no_double_init(mock_tracing_core, mock_api_client, reset_client): assert mock_api_client.call_count == call_count -def test_client_initialization_behavior(mock_tracing_core, mock_api_client, mock_trace_context, reset_client): +def test_client_initialization_behavior(mock_tracing_core, mock_api_client, mock_tracer, reset_client): """Test basic client initialization behavior""" import agentops # Mock the start_trace method - mock_tracing_core.start_trace.return_value = mock_trace_context + mock_tracing_core.start_trace.return_value = mock_tracer # Test that initialization works agentops.init(api_key="test-api-key", auto_start_session=False) @@ -308,13 +308,13 @@ def test_multiple_concurrent_traces(mock_tracing_core, mock_api_client, reset_cl agentops.init(api_key="test-api-key", auto_start_session=False) # Create mock trace contexts for different traces - mock_trace_context_1 = MagicMock() - mock_trace_context_2 = MagicMock() + mock_tracer_1 = MagicMock() + mock_tracer_2 = MagicMock() # Mock start_trace to return different contexts mock_tracing_core.start_trace.side_effect = [ - mock_trace_context_1, - mock_trace_context_2, + mock_tracer_1, + mock_tracer_2, ] # Start multiple traces @@ -322,27 +322,27 @@ def test_multiple_concurrent_traces(mock_tracing_core, mock_api_client, reset_cl trace2 = agentops.start_trace(trace_name="trace2", tags=["test2"]) # Verify both traces were created - assert trace1 == mock_trace_context_1 - assert trace2 == mock_trace_context_2 + assert trace1 == mock_tracer_1 + assert trace2 == mock_tracer_2 # Verify start_trace was called twice assert mock_tracing_core.start_trace.call_count == 2 -def test_trace_context_properties(mock_trace_context): +def test_tracer_properties(mock_tracer): """Test that TraceContext properties work correctly""" from agentops.legacy import Session # Create a legacy session with the mock trace context - session = Session(mock_trace_context) + session = Session(mock_tracer) # Test that properties are accessible - assert session.span == mock_trace_context.span - assert session.token == mock_trace_context.token - assert session.trace_context == mock_trace_context + assert session.span == mock_tracer.span + assert session.token == mock_tracer.token + assert session.tracer == mock_tracer -def test_session_decorator_async_function(mock_tracing_core, mock_api_client, mock_trace_context, reset_client): +def test_session_decorator_async_function(mock_tracing_core, mock_api_client, mock_tracer, reset_client): """Test that the @session decorator works with async functions""" import agentops import asyncio @@ -352,7 +352,7 @@ def test_session_decorator_async_function(mock_tracing_core, mock_api_client, mo agentops.init(api_key="test-api-key", auto_start_session=False) # Mock the start_trace method - mock_tracing_core.start_trace.return_value = mock_trace_context + mock_tracing_core.start_trace.return_value = mock_tracer @session(name="async_test_session") async def async_test_function(): @@ -370,7 +370,7 @@ async def async_test_function(): assert mock_tracing_core.end_trace.call_count >= 1 -def test_trace_context_creation(): +def test_tracer_creation(): """Test that TraceContext can be created with proper attributes""" from agentops.sdk.core import TraceContext @@ -378,11 +378,11 @@ def test_trace_context_creation(): mock_token = MagicMock() # Test creating a TraceContext - trace_context = TraceContext(span=mock_span, token=mock_token, is_init_trace=False) + tracer = TraceContext(span=mock_span, token=mock_token, is_init_trace=False) - assert trace_context.span == mock_span - assert trace_context.token == mock_token - assert trace_context.is_init_trace == False + assert tracer.span == mock_span + assert tracer.token == mock_token + assert tracer.is_init_trace == False def test_session_management_integration(): @@ -408,19 +408,19 @@ def test_session_management_integration(): agentops.init(api_key="test-api-key", auto_start_session=False) # Create mock trace context - mock_trace_context = MagicMock() - mock_instance.start_trace.return_value = mock_trace_context + mock_tracer = MagicMock() + mock_instance.start_trace.return_value = mock_tracer # Test new API - trace_context = agentops.start_trace(trace_name="new_api_trace") - assert trace_context == mock_trace_context + tracer = agentops.start_trace(trace_name="new_api_trace") + assert tracer == mock_tracer # Test legacy API session = agentops.start_session(tags=["legacy"]) - assert session.trace_context == mock_trace_context + assert session.tracer == mock_tracer # Test ending both - agentops.end_trace(trace_context) + agentops.end_trace(tracer) agentops.end_session(session) # Verify calls were made