diff --git a/agentops/__init__.py b/agentops/__init__.py index dc526e8aa..0ed1171a8 100755 --- a/agentops/__init__.py +++ b/agentops/__init__.py @@ -15,18 +15,28 @@ from typing import List, Optional, Union, Dict, Any from agentops.client import Client from agentops.sdk.core import TraceContext, tracer -from agentops.sdk.decorators import trace, session, agent, task, workflow, operation +from agentops.sdk.decorators import trace, session, agent, task, workflow, operation, tool +from agentops.enums import TraceState, SUCCESS, ERROR, UNSET +from opentelemetry.trace.status import StatusCode from agentops.logging.config import logger +import threading -# Client global instance; one per process runtime -_client = Client() +# Thread-safe client management +_client_lock = threading.Lock() +_client = None def get_client() -> Client: - """Get the singleton client instance""" + """Get the singleton client instance in a thread-safe manner""" global _client + # Double-checked locking pattern for thread safety + if _client is None: + with _client_lock: + if _client is None: + _client = Client() + return _client @@ -106,24 +116,31 @@ def init( elif default_tags: merged_tags = default_tags - return _client.init( - api_key=api_key, - endpoint=endpoint, - app_url=app_url, - max_wait_time=max_wait_time, - max_queue_size=max_queue_size, - default_tags=merged_tags, - trace_name=trace_name, - instrument_llm_calls=instrument_llm_calls, - auto_start_session=auto_start_session, - auto_init=auto_init, - skip_auto_end_session=skip_auto_end_session, - env_data_opt_out=env_data_opt_out, - log_level=log_level, - fail_safe=fail_safe, - exporter_endpoint=exporter_endpoint, + # Prepare initialization arguments + init_kwargs = { + "api_key": api_key, + "endpoint": endpoint, + "app_url": app_url, + "max_wait_time": max_wait_time, + "max_queue_size": max_queue_size, + "default_tags": merged_tags, + "trace_name": trace_name, + "instrument_llm_calls": instrument_llm_calls, + "auto_start_session": auto_start_session, + "auto_init": auto_init, + "skip_auto_end_session": skip_auto_end_session, + "env_data_opt_out": env_data_opt_out, + "log_level": log_level, + "fail_safe": fail_safe, + "exporter_endpoint": exporter_endpoint, **kwargs, - ) + } + + # Get the current client instance (creates new one if needed) + client = get_client() + + # Initialize the client directly + return client.init(**init_kwargs) def configure(**kwargs): @@ -173,7 +190,8 @@ def configure(**kwargs): if invalid_params: logger.warning(f"Invalid configuration parameters: {invalid_params}") - _client.configure(**kwargs) + client = get_client() + client.configure(**kwargs) def start_trace( @@ -207,7 +225,9 @@ def start_trace( return tracer.start_trace(trace_name=trace_name, tags=tags) -def end_trace(trace_context: Optional[TraceContext] = None, end_state: str = "Success") -> None: +def end_trace( + trace_context: Optional[TraceContext] = None, end_state: Union[TraceState, StatusCode, str] = TraceState.SUCCESS +) -> None: """ Ends a trace (its root span) and finalizes it. If no trace_context is provided, ends all active session spans. @@ -246,4 +266,12 @@ def end_trace(trace_context: Optional[TraceContext] = None, end_state: str = "Su "workflow", "operation", "tracer", + "tool", + # Trace state enums + "TraceState", + "SUCCESS", + "ERROR", + "UNSET", + # OpenTelemetry status codes (for advanced users) + "StatusCode", ] diff --git a/agentops/client/client.py b/agentops/client/client.py index 1979e6c6e..0fe95b95c 100644 --- a/agentops/client/client.py +++ b/agentops/client/client.py @@ -24,7 +24,7 @@ def _end_init_trace_atexit(): if _client_init_trace_context is not None: logger.debug("Auto-ending client's init trace during shutdown.") try: - # Use TracingCore to end the trace directly + # Use global tracer to end the trace directly if tracer.initialized and _client_init_trace_context.span.is_recording(): tracer.end_trace(_client_init_trace_context, end_state="Shutdown") except Exception as e: @@ -104,7 +104,7 @@ def init(self, **kwargs: Any) -> None: # Return type updated to None try: response = self.api.v3.fetch_auth_token(self.config.api_key) if response is None: - # If auth fails, we cannot proceed with TracingCore initialization that depends on project_id + # If auth fails, we cannot proceed with tracer initialization that depends on project_id logger.error("Failed to fetch auth token. AgentOps SDK will not be initialized.") return None # Explicitly return None if auth fails except Exception as e: @@ -161,8 +161,8 @@ def init(self, **kwargs: Any) -> None: # Return type updated to None else: logger.error("Failed to start the auto-init trace.") - # Even if auto-start fails, core services up to TracingCore might be initialized. - # Set self.initialized to True if TracingCore is up, but return None. + # Even if auto-start fails, core services up to the tracer might be initialized. + # Set self.initialized to True if tracer is up, but return None. self._initialized = tracer.initialized return None # Failed to start trace diff --git a/agentops/enums.py b/agentops/enums.py new file mode 100644 index 000000000..c3dcfda38 --- /dev/null +++ b/agentops/enums.py @@ -0,0 +1,36 @@ +""" +AgentOps enums for user-friendly API. + +This module provides simple enums that users can import from agentops +without needing to know about OpenTelemetry internals. +""" + +from enum import Enum +from opentelemetry.trace.status import StatusCode + + +class TraceState(Enum): + """ + Enum for trace end states. + + This provides a user-friendly interface that maps to OpenTelemetry StatusCode internally. + Users can simply use agentops.TraceState.SUCCESS instead of importing OpenTelemetry. + """ + + SUCCESS = StatusCode.OK + ERROR = StatusCode.ERROR + UNSET = StatusCode.UNSET + + def __str__(self) -> str: + """Return the name for string representation.""" + return self.name + + def to_status_code(self) -> StatusCode: + """Convert to OpenTelemetry StatusCode.""" + return self.value + + +# For backward compatibility, also provide these as module-level constants +SUCCESS = TraceState.SUCCESS +ERROR = TraceState.ERROR +UNSET = TraceState.UNSET diff --git a/agentops/instrumentation/__init__.py b/agentops/instrumentation/__init__.py index b48475af9..96966a8ad 100644 --- a/agentops/instrumentation/__init__.py +++ b/agentops/instrumentation/__init__.py @@ -444,7 +444,7 @@ def instrument_one(loader: InstrumentorLoader) -> Optional[BaseInstrumentor]: instrumentor = loader.get_instance() try: - instrumentor.instrument(tracer_provider=TracingCore.get_instance()._provider) + instrumentor.instrument(tracer_provider=tracer._provider) logger.info( f"AgentOps: Successfully instrumented '{loader.class_name}' for package '{loader.package_name or loader.module_name}'." ) diff --git a/agentops/legacy/__init__.py b/agentops/legacy/__init__.py index 7c7f787b6..d835c0877 100644 --- a/agentops/legacy/__init__.py +++ b/agentops/legacy/__init__.py @@ -65,7 +65,7 @@ def start_session( ) -> Session: """ @deprecated Use agentops.start_trace() instead. - Starts a legacy AgentOps session. Calls TracingCore.start_trace internally. + Starts a legacy AgentOps session. Calls tracer.start_trace internally. """ global _current_session, _current_trace_context @@ -89,7 +89,7 @@ def start_session( trace_context = tracer.start_trace(trace_name="session", tags=tags) if trace_context is None: - logger.error("Failed to start trace via TracingCore. Returning dummy session.") + logger.error("Failed to start trace via global tracer. Returning dummy session.") dummy_session = Session(None) _current_session = dummy_session _current_trace_context = None @@ -124,13 +124,13 @@ def _set_span_attributes(span: Any, attributes: Dict[str, Any]) -> None: def end_session(session_or_status: Any = None, **kwargs: Any) -> None: """ @deprecated Use agentops.end_trace() instead. - Ends a legacy AgentOps session. Calls TracingCore.end_trace internally. + Ends a legacy AgentOps session. Calls tracer.end_trace internally. Supports multiple calling patterns for backward compatibility. """ global _current_session, _current_trace_context if not tracer.initialized: - logger.debug("Ignoring end_session: TracingCore not initialized.") + logger.debug("Ignoring end_session: global tracer not initialized.") return target_trace_context: Optional[TraceContext] = None @@ -189,7 +189,7 @@ def end_session(session_or_status: Any = None, **kwargs: Any) -> None: def end_all_sessions() -> None: """@deprecated Ends all active sessions/traces.""" if not tracer.initialized: - logger.debug("Ignoring end_all_sessions: TracingCore not initialized.") + logger.debug("Ignoring end_all_sessions: global tracer not initialized.") return # Use the new end_trace functionality to end all active traces diff --git a/agentops/sdk/README.md b/agentops/sdk/README.md index b39eb52c0..e13fe6c87 100644 --- a/agentops/sdk/README.md +++ b/agentops/sdk/README.md @@ -176,11 +176,11 @@ flowchart TD ## Example Usage ```python -from agentops import Session, agent, tool -from agentops.sdk import TracingCore, TracingConfig +from agentops import Session, agent, tool, tracer +from agentops.sdk import TracingConfig -# Initialize the tracing core with a dedicated configuration -TracingCore.get_instance().initialize( +# Initialize the global tracer with a dedicated configuration +tracer.initialize( service_name="my-service", exporter_endpoint="https://my-exporter-endpoint.com", max_queue_size=1000, diff --git a/agentops/sdk/__init__.py b/agentops/sdk/__init__.py index f1be1f718..bc2f6b6f9 100644 --- a/agentops/sdk/__init__.py +++ b/agentops/sdk/__init__.py @@ -5,18 +5,16 @@ for different types of operations in AI agent workflows. """ -# Import core components -from agentops.sdk.core import TracingCore - # Import decorators from agentops.sdk.decorators import agent, operation, session, task, workflow # from agentops.sdk.traced import TracedObject # Merged into TracedObject from agentops.sdk.types import TracingConfig +from opentelemetry.trace.status import StatusCode + __all__ = [ # Core components - "TracingCore", "TracingConfig", # Decorators "session", @@ -24,4 +22,6 @@ "agent", "task", "workflow", + # OpenTelemetry status codes + "StatusCode", ] diff --git a/agentops/sdk/core.py b/agentops/sdk/core.py index acab6185f..8874fd2b6 100644 --- a/agentops/sdk/core.py +++ b/agentops/sdk/core.py @@ -2,7 +2,7 @@ import atexit import threading -from typing import Optional, Any, Dict +from typing import Optional, Any, Dict, Union from opentelemetry import metrics, trace from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter @@ -26,6 +26,7 @@ ) from agentops.semconv import SpanKind from agentops.helpers.dashboard import log_trace_url +from opentelemetry.trace.status import StatusCode # No need to create shortcuts since we're using our own ResourceAttributes class now @@ -36,6 +37,38 @@ def __init__(self, span: Span, token: Optional[context_api.Token] = None, is_ini self.span = span self.token = token self.is_init_trace = is_init_trace # Flag to identify the auto-started trace + self._end_state = StatusCode.UNSET # Default end state because we don't know yet + + def __enter__(self) -> "TraceContext": + """Enter the trace context.""" + return self + + def __exit__(self, exc_type: Optional[type], exc_val: Optional[Exception], exc_tb: Optional[Any]) -> bool: + """Exit the trace context and end the trace. + + Automatically sets the trace status based on whether an exception occurred: + - If an exception is present, sets status to ERROR + - If no exception occurred, sets status to OK + + Returns: + False: Always returns False to propagate any exceptions that occurred + within the context manager block, following Python's + context manager protocol for proper exception handling. + """ + if exc_type is not None: + self._end_state = StatusCode.ERROR + if exc_val: + logger.debug(f"Trace exiting with exception: {exc_val}") + else: + # No exception occurred, set to OK + self._end_state = StatusCode.OK + + try: + tracer.end_trace(self, self._end_state) + except Exception as e: + logger.error(f"Error ending trace in context manager: {e}") + + return False # get_imported_libraries moved to agentops.helpers.system @@ -201,7 +234,7 @@ def config(self) -> TracingConfig: """Get the tracing configuration.""" if self._config is None: # This case should ideally not be reached if initialized properly - raise AgentOpsClientNotInitializedException("TracingCore config accessed before initialization.") + raise AgentOpsClientNotInitializedException("Tracer config accessed before initialization.") return self._config def shutdown(self) -> None: @@ -316,7 +349,7 @@ def start_trace( A TraceContext object containing the span and context token, or None if not initialized. """ if not self.initialized: - logger.warning("TracingCore not initialized. Cannot start trace.") + logger.warning("Global tracer not initialized. Cannot start trace.") return None # Build trace attributes @@ -347,7 +380,9 @@ def start_trace( return trace_context - def end_trace(self, trace_context: Optional[TraceContext] = None, end_state: str = "Success") -> None: + def end_trace( + self, trace_context: Optional[TraceContext] = None, end_state: Union[Any, StatusCode, str] = None + ) -> None: """ Ends a trace (its root span) and finalizes it. If no trace_context is provided, ends all active session spans. @@ -357,9 +392,15 @@ def end_trace(self, trace_context: Optional[TraceContext] = None, end_state: str end_state: The final state of the trace (e.g., "Success", "Failure", "Error"). """ if not self.initialized: - logger.warning("TracingCore not initialized. Cannot end trace.") + logger.warning("Global tracer not initialized. Cannot end trace.") return + # Set default if not provided + if end_state is None: + from agentops.enums import TraceState + + end_state = TraceState.SUCCESS + # If no specific trace_context provided, end all active traces if trace_context is None: with self._traces_lock: @@ -373,7 +414,7 @@ def end_trace(self, trace_context: Optional[TraceContext] = None, end_state: str # End specific trace self._end_single_trace(trace_context, end_state) - def _end_single_trace(self, trace_context: TraceContext, end_state: str) -> None: + def _end_single_trace(self, trace_context: TraceContext, end_state: Union[Any, StatusCode, str]) -> None: """ Internal method to end a single trace. @@ -393,7 +434,20 @@ def _end_single_trace(self, trace_context: TraceContext, end_state: str) -> None # Handle case where span is mocked or trace_id is not a valid integer trace_id = str(span.get_span_context().trace_id) - logger.debug(f"Ending trace with span ID: {span.get_span_context().span_id}, end_state: {end_state}") + # Convert TraceState enum to StatusCode if needed + from agentops.enums import TraceState + + if isinstance(end_state, TraceState): + # It's a TraceState enum + state_str = str(end_state) + elif isinstance(end_state, StatusCode): + # It's already a StatusCode + state_str = str(end_state) + else: + # It's a string (legacy) + state_str = str(end_state) + + logger.debug(f"Ending trace with span ID: {span.get_span_context().span_id}, end_state: {state_str}") try: # Build and set session end attributes diff --git a/docs/mint.json b/docs/mint.json index f2c0865f7..00fec4adf 100644 --- a/docs/mint.json +++ b/docs/mint.json @@ -187,6 +187,7 @@ "v2/usage/dashboard-info", "v2/usage/sdk-reference", "v2/usage/advanced-configuration", + "v2/usage/context-managers", "v2/usage/tracking-llm-calls", "v2/usage/tracking-agents", "v2/usage/recording-operations", diff --git a/docs/v2/usage/context-managers.mdx b/docs/v2/usage/context-managers.mdx new file mode 100644 index 000000000..391dbd2b2 --- /dev/null +++ b/docs/v2/usage/context-managers.mdx @@ -0,0 +1,328 @@ +--- +title: "Context Managers" +description: "Use AgentOps traces as Python context managers for automatic lifecycle management" +--- + +# Context Managers + +AgentOps provides native context manager support for traces, allowing you to use Python's `with` statement for automatic trace lifecycle management. This approach ensures traces are properly started and ended, even when exceptions occur. + +## Basic Usage + +The simplest way to use context managers is with the `start_trace()` function: + +```python +import agentops + +# Initialize AgentOps +agentops.init(api_key="your-api-key") + +# Use context manager for automatic trace management +with agentops.start_trace("my_workflow") as trace: + # Your code here + print("Processing data...") + # Trace automatically ends when exiting the with block +``` + +The trace will automatically: +- Start when entering the `with` block +- End with "Success" status when exiting normally +- End with "Error" status if an exception occurs +- Clean up resources properly in all cases + +## Advanced Usage + +### Traces with Tags + +You can add tags to traces for better organization and filtering: + +```python +import agentops + +agentops.init(api_key="your-api-key") + +# Using list tags +with agentops.start_trace("data_processing", tags=["batch", "production"]): + process_batch_data() + +# Using dictionary tags for more structured metadata +with agentops.start_trace("user_request", tags={ + "user_id": "12345", + "request_type": "query", + "priority": "high" +}): + handle_user_request() +``` + +### Parallel Traces + +Context managers create independent parallel traces, not parent-child relationships: + +```python +import agentops + +agentops.init(api_key="your-api-key") + +# Sequential parallel traces +with agentops.start_trace("task_1"): + print("Task 1 executing") + +with agentops.start_trace("task_2"): + print("Task 2 executing") + +# Nested context managers create parallel traces +with agentops.start_trace("outer_workflow"): + print("Outer workflow started") + + with agentops.start_trace("inner_task"): + print("Inner task executing (parallel to outer)") + + print("Outer workflow continuing") +``` + +### Exception Handling + +Context managers automatically handle exceptions and set appropriate trace states: + +```python +import agentops + +agentops.init(api_key="your-api-key") + +# Automatic error handling +try: + with agentops.start_trace("risky_operation"): + # This will automatically set trace status to "Error" + raise ValueError("Something went wrong") +except ValueError as e: + print(f"Caught error: {e}") + # Trace has already been ended with Error status + +# Graceful degradation pattern +try: + with agentops.start_trace("primary_service"): + result = call_primary_service() +except ServiceUnavailableError: + with agentops.start_trace("fallback_service"): + result = call_fallback_service() +``` + +### Concurrent Execution + +Context managers work seamlessly with threading and asyncio: + + + +```python Threading +import agentops +import threading + +agentops.init(api_key="your-api-key") + +# With threading +def worker_function(worker_id): + with agentops.start_trace(f"worker_{worker_id}"): + # Each thread gets its own independent trace + process_work(worker_id) + +threads = [] +for i in range(3): + thread = threading.Thread(target=worker_function, args=(i,)) + threads.append(thread) + thread.start() + +for thread in threads: + thread.join() +``` + +```python Asyncio +import agentops +import asyncio + +agentops.init(api_key="your-api-key") + +# With asyncio +async def async_task(task_id): + with agentops.start_trace(f"async_task_{task_id}"): + await asyncio.sleep(0.1) # Simulate async work + return f"result_{task_id}" + +async def main(): + tasks = [async_task(i) for i in range(3)] + results = await asyncio.gather(*tasks) + return results + +# Run async tasks +results = asyncio.run(main()) +``` + + + +## Production Patterns + +### API Endpoint Monitoring + +```python +import agentops +from flask import Flask, request + +app = Flask(__name__) +agentops.init(api_key="your-api-key") + +@app.route('/api/process', methods=['POST']) +def process_request(): + # Create trace for each API request + with agentops.start_trace("api_request", tags={ + "endpoint": "/api/process", + "method": "POST", + "user_id": request.headers.get("user-id") + }): + try: + data = request.get_json() + result = process_data(data) + return {"status": "success", "result": result} + except Exception as e: + # Exception automatically sets trace to Error status + return {"status": "error", "message": str(e)}, 500 +``` + +### Batch Processing + +```python +import agentops + +agentops.init(api_key="your-api-key") + +def process_batch(items): + with agentops.start_trace("batch_processing", tags={ + "batch_size": len(items), + "batch_type": "data_processing" + }): + successful = 0 + failed = 0 + + for item in items: + try: + with agentops.start_trace("item_processing", tags={ + "item_id": item.get("id"), + "item_type": item.get("type") + }): + process_item(item) + successful += 1 + except Exception as e: + failed += 1 + print(f"Failed to process item {item.get('id')}: {e}") + + print(f"Batch completed: {successful} successful, {failed} failed") +``` + +### Retry Logic + +```python +import agentops +import time + +agentops.init(api_key="your-api-key") + +def retry_operation(operation_name, max_retries=3): + for attempt in range(max_retries): + try: + with agentops.start_trace(f"{operation_name}_attempt_{attempt + 1}", tags={ + "operation": operation_name, + "attempt": attempt + 1, + "max_retries": max_retries + }): + # Your operation here + result = perform_operation() + return result # Success - exit retry loop + + except Exception as e: + if attempt < max_retries - 1: + wait_time = 2 ** attempt # Exponential backoff + print(f"Attempt {attempt + 1} failed: {e}. Retrying in {wait_time}s...") + time.sleep(wait_time) + else: + print(f"All {max_retries} attempts failed") + raise +``` + +## Backward Compatibility + +Context managers are fully backward compatible with existing AgentOps code patterns: + + + +```python Manual Management +import agentops + +agentops.init(api_key="your-api-key") + +# Manual trace management (legacy) +trace = agentops.start_trace("manual_trace") +# ... your code ... +agentops.end_trace(trace, "Success") +``` + +```python Context Manager +import agentops + +agentops.init(api_key="your-api-key") + +# Context manager (new, recommended) +with agentops.start_trace("context_managed_trace") as trace: + # ... your code ... + pass # Automatically ended +``` + +```python Property Access +import agentops + +agentops.init(api_key="your-api-key") + +# Accessing trace properties +with agentops.start_trace("property_access") as trace: + span = trace.span # Access underlying span + trace_id = trace.span.get_span_context().trace_id +``` + +```python Mixed Usage +import agentops + +agentops.init(api_key="your-api-key") + +# Mixed usage +trace = agentops.start_trace("mixed_usage") +try: + with trace: # Use existing trace as context manager + # ... your code ... + pass +except Exception: + agentops.end_trace(trace, "Error") +``` + + + +## Examples + +For complete working examples, see the following files in the AgentOps repository: + + + + Simple context manager patterns and error handling + + + Sequential, nested, and concurrent trace patterns + + + Exception handling, retry patterns, and graceful degradation + + + API endpoints, batch processing, microservices, and monitoring + + + +These examples demonstrate real-world usage patterns and best practices for using AgentOps context managers in production applications. + +## API Reference + +For detailed API information, see the [SDK Reference](/v2/usage/sdk-reference#trace-management) documentation. diff --git a/examples/context_manager/README.md b/examples/context_manager/README.md new file mode 100644 index 000000000..5ed88c88f --- /dev/null +++ b/examples/context_manager/README.md @@ -0,0 +1,93 @@ +# AgentOps Context Manager Examples + +This directory contains examples demonstrating how to use AgentOps with context managers for automatic trace lifecycle management using the simplified, native implementation. + +## Overview + +AgentOps provides native context manager support through the `TraceContext` class. This implementation eliminates wrapper classes and leverages OpenTelemetry's built-in capabilities for clean, automatic trace management. + +## Key Benefits + +- **Automatic cleanup**: Traces end automatically when exiting the context +- **Error handling**: Traces are marked with appropriate error states when exceptions occur +- **Native OpenTelemetry integration**: Leverages OpenTelemetry's built-in context management +- **Simplified architecture**: No wrapper classes or monkey patching required +- **Thread safety**: Works seamlessly in multi-threaded environments +- **Performance optimized**: Direct method calls without wrapper overhead + +## Quick Start + +```python +import agentops + +# Initialize AgentOps +agentops.init(api_key="your-api-key") + +# Use native context manager support +with agentops.start_trace("my_task") as trace: + # Your code here + pass # Trace automatically ends with Success/Error state +``` + +## Examples + +### `basic_usage.py` +- Native TraceContext context manager usage +- Multiple parallel traces +- Error handling basics +- Comparison with OpenTelemetry concepts + +### `parallel_traces.py` +- Sequential and nested parallel traces +- Concurrent traces with threading and ThreadPoolExecutor +- Mixed success/error scenarios +- Different tag types (lists, dictionaries, none) + +### `error_handling.py` +- Exception handling with different error types +- Nested exception handling and propagation +- Recovery patterns (retry, graceful degradation, partial success) +- Custom exceptions, finally blocks, and exception chaining + +### `production_patterns.py` +- API endpoint monitoring and metrics +- Batch processing with individual item tracking +- Microservice communication patterns +- Production monitoring and alerting + +## Running Examples + +### Step 1: Install Dependencies + +```bash +# Install required packages +pip install agentops python-dotenv +``` + +### Step 2: Set Up API Key + +To use AgentOps and send trace data to your dashboard, you need to configure your API key using one of these methods: + +**Option A: Environment Variable** +```bash +# Export directly in your shell +export AGENTOPS_API_KEY="your-api-key-here" +``` + +**Option B: .env File (Recommended)** +```bash +# Create a .env file in the project root +echo "AGENTOPS_API_KEY=your-api-key-here" > .env +``` + +**Important**: When using a `.env` file, the examples automatically load it using `load_dotenv()`. Make sure the `.env` file is in the same directory as the example scripts or in the project root. + +### Step 3: Run the Examples + +```bash +# Run each example to see different patterns +python examples/context_manager/basic_usage.py +python examples/context_manager/parallel_traces.py +python examples/context_manager/error_handling.py +python examples/context_manager/production_patterns.py +``` diff --git a/examples/context_manager/basic_usage.py b/examples/context_manager/basic_usage.py new file mode 100644 index 000000000..122991339 --- /dev/null +++ b/examples/context_manager/basic_usage.py @@ -0,0 +1,140 @@ +""" +Basic Context Manager Example + +This example demonstrates how to use AgentOps context manager with the native +TraceContext support, eliminating the need for wrappers or monkey patching. +""" + +import os +import agentops +from agentops import agent, task, tool +from dotenv import load_dotenv + +load_dotenv() + +# Get API key from environment +AGENTOPS_API_KEY = os.environ.get("AGENTOPS_API_KEY") + + +@agent +class SimpleAgent: + """A simple example agent.""" + + def __init__(self, name: str): + self.name = name + + @task + def process_data(self, data: str) -> str: + """Process some data.""" + result = f"Processed: {data}" + return self.use_tool(result) + + @tool + def use_tool(self, input_data: str) -> str: + """Use a tool to transform data.""" + return f"Tool output: {input_data.upper()}" + + +def basic_context_manager_example(): + """Example using the native TraceContext context manager.""" + print("Basic Context Manager Example") + + # Initialize AgentOps + agentops.init(api_key=AGENTOPS_API_KEY) + + # Use native TraceContext context manager + with agentops.start_trace("basic_example", tags=["basic", "demo"]): + print("Trace started") + + # Create and use agent + agent = SimpleAgent("BasicAgent") + result = agent.process_data("sample data") + print(f"Result: {result}") + + print("Trace ended automatically") + + +def multiple_parallel_traces(): + """Example showing multiple parallel traces.""" + print("\nMultiple Parallel Traces") + + agentops.init(api_key=AGENTOPS_API_KEY) + + # First trace + with agentops.start_trace("task_1", tags=["parallel", "task-1"]): + print("Task 1 started") + agent1 = SimpleAgent("Agent1") + result1 = agent1.process_data("task 1 data") + print(f"Task 1 result: {result1}") + + # Second trace (independent) + with agentops.start_trace("task_2", tags=["parallel", "task-2"]): + print("Task 2 started") + agent2 = SimpleAgent("Agent2") + result2 = agent2.process_data("task 2 data") + print(f"Task 2 result: {result2}") + + print("All parallel traces completed") + + +def error_handling_example(): + """Example showing error handling with context manager.""" + print("\nError Handling Example") + + agentops.init(api_key=AGENTOPS_API_KEY) + + try: + with agentops.start_trace("error_example", tags=["error-handling"]): + print("Trace started") + + agent = SimpleAgent("ErrorAgent") + result = agent.process_data("some data") + print(f"Processing successful: {result}") + + # Simulate an error + raise ValueError("Simulated error") + + except ValueError as e: + print(f"Caught error: {e}") + print("Trace automatically ended with Error status") + + +def nested_traces_example(): + """Example showing nested traces (which are parallel, not parent-child).""" + print("\nNested Traces Example") + + agentops.init(api_key=AGENTOPS_API_KEY) + + # Outer trace + with agentops.start_trace("main_workflow", tags=["workflow", "main"]): + print("Main workflow started") + + # Inner trace (parallel, not child) + with agentops.start_trace("sub_task", tags=["workflow", "sub"]): + print("Sub task started") + + agent = SimpleAgent("WorkflowAgent") + result = agent.process_data("workflow data") + print(f"Sub task result: {result}") + + print("Sub task completed") + print("Main workflow completed") + + +if __name__ == "__main__": + print("AgentOps Context Manager Examples") + print("=" * 40) + + # Show basic usage + basic_context_manager_example() + + # Show multiple parallel traces + multiple_parallel_traces() + + # Show error handling + error_handling_example() + + # Show nested traces + nested_traces_example() + + print("\nAll examples completed!") diff --git a/examples/context_manager/error_handling.py b/examples/context_manager/error_handling.py new file mode 100644 index 000000000..bfeffda08 --- /dev/null +++ b/examples/context_manager/error_handling.py @@ -0,0 +1,317 @@ +""" +Error Handling with Context Managers + +This example demonstrates error handling patterns with AgentOps context managers, +showing how traces automatically handle different types of exceptions. +""" + +import os +import time +import agentops +from agentops import agent, task, tool +from dotenv import load_dotenv + +load_dotenv() + +# Get API key from environment +AGENTOPS_API_KEY = os.environ.get("AGENTOPS_API_KEY") + + +@agent +class ErrorProneAgent: + """An agent that can encounter various types of errors.""" + + def __init__(self, name: str): + self.name = name + + @task + def risky_operation(self, operation_type: str) -> str: + """Perform a risky operation that might fail.""" + if operation_type == "value_error": + raise ValueError("Invalid value provided") + elif operation_type == "type_error": + raise TypeError("Wrong type provided") + elif operation_type == "runtime_error": + raise RuntimeError("Runtime error occurred") + elif operation_type == "custom_error": + raise CustomError("Custom error occurred") + else: + return f"Success: {operation_type}" + + @tool + def validate_input(self, data: str) -> str: + """Validate input data.""" + if not data or data == "invalid": + raise ValueError("Invalid input data") + return f"Validated: {data}" + + @task + def multi_step_operation(self, steps: list) -> str: + """Perform multiple steps, any of which might fail.""" + results = [] + for i, step in enumerate(steps): + if step == "fail": + raise RuntimeError(f"Step {i+1} failed") + results.append(f"Step{i+1}:{step}") + return " -> ".join(results) + + +class CustomError(Exception): + """Custom exception for demonstration.""" + + pass + + +def basic_exception_handling(): + """Basic example of exception handling with context managers.""" + print("Basic Exception Handling") + + agentops.init(api_key=AGENTOPS_API_KEY) + + error_types = ["value_error", "type_error", "runtime_error", "success"] + + for error_type in error_types: + try: + with agentops.start_trace(f"basic_{error_type}", tags=["basic", "error-handling"]): + print(f"Started trace for {error_type}") + + agent = ErrorProneAgent(f"BasicAgent_{error_type}") + result = agent.risky_operation(error_type) + print(f"Success result: {result}") + + except ValueError as e: + print(f"Caught ValueError: {e}") + except TypeError as e: + print(f"Caught TypeError: {e}") + except RuntimeError as e: + print(f"Caught RuntimeError: {e}") + + print("Basic exception handling completed") + + +def nested_exception_handling(): + """Example of exception handling in nested traces.""" + print("\nNested Exception Handling") + + agentops.init(api_key=AGENTOPS_API_KEY) + + try: + with agentops.start_trace("outer_operation", tags=["nested", "outer"]): + print("Outer trace started") + + # outer_agent = ErrorProneAgent("OuterAgent") # Not used in this example + + try: + with agentops.start_trace("inner_operation", tags=["nested", "inner"]): + print("Inner trace started") + + inner_agent = ErrorProneAgent("InnerAgent") + # This will cause an error in the inner trace + inner_agent.risky_operation("value_error") + + except ValueError as e: + print(f"Inner trace caught ValueError: {e}") + # Re-raise to affect outer trace + raise RuntimeError("Inner operation failed") from e + + except RuntimeError as e: + print(f"Outer trace caught RuntimeError: {e}") + + print("Nested exception handling completed") + + +def retry_pattern(): + """Example of retry pattern with context managers.""" + print("\nRetry Pattern") + + agentops.init(api_key=AGENTOPS_API_KEY) + + max_retries = 3 + for attempt in range(max_retries): + try: + with agentops.start_trace(f"retry_attempt_{attempt+1}", tags=["retry", f"attempt-{attempt+1}"]): + print(f"Retry attempt {attempt+1} started") + + agent = ErrorProneAgent(f"RetryAgent_Attempt{attempt+1}") + + # Simulate success on the last attempt + if attempt < max_retries - 1: + result = agent.risky_operation("runtime_error") + else: + result = agent.risky_operation("success") + + print(f"Retry attempt {attempt+1} succeeded: {result}") + break + + except RuntimeError as e: + print(f"Retry attempt {attempt+1} failed: {e}") + if attempt < max_retries - 1: + wait_time = 2**attempt # Exponential backoff + print(f"Waiting {wait_time}s before retry...") + time.sleep(wait_time * 0.01) # Shortened for demo + else: + print("All retry attempts exhausted") + raise + + print("Retry pattern completed") + + +def graceful_degradation(): + """Example of graceful degradation pattern.""" + print("\nGraceful Degradation") + + agentops.init(api_key=AGENTOPS_API_KEY) + + try: + with agentops.start_trace("primary_service", tags=["degradation", "primary"]): + print("Primary service trace started") + + agent = ErrorProneAgent("PrimaryAgent") + result = agent.risky_operation("runtime_error") + print(f"Primary service result: {result}") + + except RuntimeError as e: + print(f"Primary service failed: {e}") + print("Falling back to secondary service...") + + with agentops.start_trace("fallback_service", tags=["degradation", "fallback"]): + print("Fallback service trace started") + + fallback_agent = ErrorProneAgent("FallbackAgent") + result = fallback_agent.risky_operation("success") + print(f"Fallback service result: {result}") + + print("Graceful degradation completed") + + +def partial_success_handling(): + """Example of partial success handling.""" + print("\nPartial Success Handling") + + agentops.init(api_key=AGENTOPS_API_KEY) + + steps = ["step1", "step2", "fail", "step4"] + + with agentops.start_trace("partial_success", tags=["partial", "multi-step"]): + print("Partial success trace started") + + agent = ErrorProneAgent("PartialAgent") + + try: + result = agent.multi_step_operation(steps) + print(f"All steps completed: {result}") + except RuntimeError as e: + print(f"Operation partially failed: {e}") + + print("Partial success handling completed") + + +def custom_exception_handling(): + """Example of handling custom exceptions.""" + print("\nCustom Exception Handling") + + agentops.init(api_key=AGENTOPS_API_KEY) + + try: + with agentops.start_trace("custom_exception", tags=["custom", "exception"]): + print("Custom exception trace started") + + agent = ErrorProneAgent("CustomAgent") + result = agent.risky_operation("custom_error") + print(f"Result: {result}") + + except CustomError as e: + print(f"Caught custom exception: {e}") + except Exception as e: + print(f"Caught unexpected exception: {e}") + + print("Custom exception handling completed") + + +def finally_blocks_example(): + """Example of exception handling with finally blocks.""" + print("\nFinally Blocks Example") + + agentops.init(api_key=AGENTOPS_API_KEY) + + cleanup_actions = [] + + try: + with agentops.start_trace("finally_example", tags=["finally", "cleanup"]): + print("Finally example trace started") + + agent = ErrorProneAgent("FinallyAgent") + + try: + result = agent.risky_operation("value_error") + print(f"Result: {result}") + finally: + cleanup_actions.append("Inner cleanup executed") + print("Inner finally block executed") + + except ValueError as e: + print(f"Caught exception: {e}") + finally: + cleanup_actions.append("Outer cleanup executed") + print("Outer finally block executed") + + print(f"Cleanup actions performed: {cleanup_actions}") + print("Finally block handling completed") + + +def exception_chaining_example(): + """Example of exception chaining and context preservation.""" + print("\nException Chaining Example") + + agentops.init(api_key=AGENTOPS_API_KEY) + + try: + with agentops.start_trace("exception_chaining", tags=["chaining", "context"]): + print("Exception chaining trace started") + + agent = ErrorProneAgent("ChainingAgent") + + try: + # First operation fails + agent.validate_input("invalid") + except ValueError as e: + print(f"Validation failed: {e}") + # Chain the exception with additional context + raise RuntimeError("Operation failed due to validation error") from e + + except RuntimeError as e: + print(f"Caught chained exception: {e}") + print(f"Original cause: {e.__cause__}") + + print("Exception chaining completed") + + +if __name__ == "__main__": + print("AgentOps Error Handling Examples") + print("=" * 40) + + # Basic exception handling + basic_exception_handling() + + # Nested exception handling + nested_exception_handling() + + # Retry pattern + retry_pattern() + + # Graceful degradation + graceful_degradation() + + # Partial success handling + partial_success_handling() + + # Custom exception handling + custom_exception_handling() + + # Finally blocks + finally_blocks_example() + + # Exception chaining + exception_chaining_example() + + print("\nAll error handling examples completed!") diff --git a/examples/context_manager/parallel_traces.py b/examples/context_manager/parallel_traces.py new file mode 100644 index 000000000..cbc9c8f89 --- /dev/null +++ b/examples/context_manager/parallel_traces.py @@ -0,0 +1,245 @@ +""" +Parallel Traces Example + +This example demonstrates how AgentOps context managers create independent +parallel traces rather than parent-child relationships, which is ideal for +concurrent operations and workflow management. +""" + +import os +import time +import threading +import concurrent.futures +import agentops +from agentops import agent, task, tool +from dotenv import load_dotenv + +load_dotenv() + +# Get API key from environment +AGENTOPS_API_KEY = os.environ.get("AGENTOPS_API_KEY") + + +@agent +class WorkerAgent: + """A worker agent that can process tasks independently.""" + + def __init__(self, worker_id: str): + self.worker_id = worker_id + + @task + def process_task(self, task_data: str) -> str: + """Process a task with some simulated work.""" + # Simulate some work + time.sleep(0.1) + + result = self.analyze_data(task_data) + return self.finalize_result(result) + + @tool + def analyze_data(self, data: str) -> str: + """Analyze the input data.""" + return f"Analyzed: {data}" + + @tool + def finalize_result(self, analyzed_data: str) -> str: + """Finalize the processing result.""" + return f"Final: {analyzed_data.upper()}" + + +def sequential_parallel_traces(): + """Example of sequential parallel traces - each trace is independent.""" + print("Sequential Parallel Traces") + + agentops.init(api_key=AGENTOPS_API_KEY) + + tasks = ["task_1", "task_2", "task_3"] + results = [] + + for i, task_name in enumerate(tasks): + # Each trace is completely independent + with agentops.start_trace(f"sequential_{task_name}", tags=["sequential", f"step-{i+1}"]): + print(f"Started trace for {task_name}") + + worker = WorkerAgent(f"Worker{i+1}") + result = worker.process_task(f"data_for_{task_name}") + results.append(result) + + print(f"{task_name} result: {result}") + + print("All sequential traces completed") + print(f"Final results: {results}") + + +def nested_parallel_traces(): + """Example showing that nested context managers create parallel traces.""" + print("\nNested Parallel Traces") + + agentops.init(api_key=AGENTOPS_API_KEY) + + # Outer trace for the overall workflow + with agentops.start_trace("workflow_main", tags=["workflow", "main"]): + print("Main workflow trace started") + + # Inner trace for data preparation (parallel, not child) + with agentops.start_trace("data_preparation", tags=["workflow", "preparation"]): + print("Data preparation trace started (parallel to main)") + + prep_worker = WorkerAgent("PrepWorker") + prep_result = prep_worker.process_task("raw_data") + print(f"Preparation result: {prep_result}") + + print("Data preparation trace ended") + + # Another inner trace for data processing (also parallel) + with agentops.start_trace("data_processing", tags=["workflow", "processing"]): + print("Data processing trace started (parallel to main)") + + proc_worker = WorkerAgent("ProcessWorker") + proc_result = proc_worker.process_task("prepared_data") + print(f"Processing result: {proc_result}") + + print("Data processing trace ended") + print("Main workflow completed") + + print("Main workflow trace ended") + print("All traces were independent/parallel, not parent-child relationships") + + +def concurrent_traces_with_threads(): + """Example of truly concurrent traces using threading.""" + print("\nConcurrent Traces with Threading") + + agentops.init(api_key=AGENTOPS_API_KEY) + + def worker_function(worker_id: int, task_data: str): + """Function to run in a separate thread with its own trace.""" + trace_name = f"concurrent_worker_{worker_id}" + + with agentops.start_trace(trace_name, tags=["concurrent", f"worker-{worker_id}"]): + print(f"Thread {worker_id}: Trace started") + + worker = WorkerAgent(f"ConcurrentWorker{worker_id}") + result = worker.process_task(task_data) + + # Simulate varying work times + time.sleep(0.05 * worker_id) + + print(f"Thread {worker_id}: Result: {result}") + return result + + # Start multiple threads, each with their own trace + threads = [] + results = [] + + for i in range(3): + thread = threading.Thread(target=lambda i=i: results.append(worker_function(i, f"concurrent_data_{i}"))) + threads.append(thread) + thread.start() + + # Wait for all threads to complete + for thread in threads: + thread.join() + + print("All concurrent traces completed") + print(f"Results from concurrent execution: {len(results)} completed") + + +def concurrent_traces_with_executor(): + """Example using ThreadPoolExecutor for concurrent traces.""" + print("\nConcurrent Traces with ThreadPoolExecutor") + + agentops.init(api_key=AGENTOPS_API_KEY) + + def process_with_trace(task_id: int, data: str) -> str: + """Process data within its own trace context.""" + trace_name = f"executor_task_{task_id}" + + with agentops.start_trace(trace_name, tags=["executor", f"task-{task_id}"]): + print(f"Executor task {task_id}: Trace started") + + worker = WorkerAgent(f"ExecutorWorker{task_id}") + result = worker.process_task(data) + + print(f"Executor task {task_id}: Result: {result}") + return result + + # Use ThreadPoolExecutor for concurrent execution + tasks_data = [ + (1, "executor_data_1"), + (2, "executor_data_2"), + (3, "executor_data_3"), + (4, "executor_data_4"), + ] + + with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: + # Submit all tasks + future_to_task = {executor.submit(process_with_trace, task_id, data): task_id for task_id, data in tasks_data} + + # Collect results as they complete + results = [] + for future in concurrent.futures.as_completed(future_to_task): + task_id = future_to_task[future] + try: + result = future.result() + results.append((task_id, result)) + print(f"Task {task_id} completed successfully") + except Exception as e: + print(f"Task {task_id} failed: {e}") + + print("All executor-based traces completed") + print(f"Completed {len(results)} tasks concurrently") + + +def trace_with_different_tag_types(): + """Example showing different ways to tag parallel traces.""" + print("\nTraces with Different Tag Types") + + agentops.init(api_key=AGENTOPS_API_KEY) + + # Trace with list tags + with agentops.start_trace("list_tags_trace", tags=["list", "example", "demo"]): + print("Trace with list tags started") + worker1 = WorkerAgent("ListTagWorker") + result1 = worker1.process_task("list_tag_data") + print(f"List tags result: {result1}") + + # Trace with dictionary tags + with agentops.start_trace( + "dict_tags_trace", tags={"environment": "demo", "version": "1.0", "priority": "high", "team": "engineering"} + ): + print("Trace with dictionary tags started") + worker2 = WorkerAgent("DictTagWorker") + result2 = worker2.process_task("dict_tag_data") + print(f"Dictionary tags result: {result2}") + + # Trace with no tags + with agentops.start_trace("no_tags_trace"): + print("Trace with no tags started") + worker3 = WorkerAgent("NoTagWorker") + result3 = worker3.process_task("no_tag_data") + print(f"No tags result: {result3}") + + print("All differently tagged traces completed") + + +if __name__ == "__main__": + print("AgentOps Parallel Traces Examples") + print("=" * 40) + + # Sequential parallel traces + sequential_parallel_traces() + + # Nested parallel traces + nested_parallel_traces() + + # Concurrent traces with threading + concurrent_traces_with_threads() + + # Concurrent traces with executor + concurrent_traces_with_executor() + + # Different tag types + trace_with_different_tag_types() + + print("\nAll parallel trace examples completed!") diff --git a/examples/context_manager/production_patterns.py b/examples/context_manager/production_patterns.py new file mode 100644 index 000000000..8dedc50ec --- /dev/null +++ b/examples/context_manager/production_patterns.py @@ -0,0 +1,340 @@ +""" +Production Patterns with Context Managers + +This example demonstrates real-world production patterns using AgentOps +context managers, including monitoring, logging, and performance tracking. +""" + +import os +import time +import logging +from datetime import datetime +from typing import Dict, Any +import agentops +from agentops import agent, task, tool +from dotenv import load_dotenv + +load_dotenv() + +# Get API key from environment +AGENTOPS_API_KEY = os.environ.get("AGENTOPS_API_KEY") + +# Configure logging +logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s") +logger = logging.getLogger(__name__) + + +@agent +class ProductionAgent: + """A production-ready agent with monitoring.""" + + def __init__(self, name: str, config: Dict[str, Any]): + self.name = name + self.config = config + self.metrics = {"tasks_completed": 0, "errors_encountered": 0, "total_processing_time": 0.0} + logger.info(f"Initialized production agent: {self.name}") + + @task + def process_request(self, request_data: Dict[str, Any]) -> Dict[str, Any]: + """Process a production request with monitoring.""" + start_time = time.time() + + try: + logger.info(f"{self.name} processing request: {request_data.get('id', 'unknown')}") + + # Validate request + validated_data = self.validate_request(request_data) + + # Process data + processed_data = self.transform_data(validated_data) + + # Generate response + response = self.generate_response(processed_data) + + # Update metrics + processing_time = time.time() - start_time + self.metrics["tasks_completed"] += 1 + self.metrics["total_processing_time"] += processing_time + + logger.info(f"{self.name} completed request in {processing_time:.3f}s") + return response + + except Exception as e: + self.metrics["errors_encountered"] += 1 + logger.error(f"{self.name} failed to process request: {e}") + raise + + @tool + def validate_request(self, data: Dict[str, Any]) -> Dict[str, Any]: + """Validate incoming request data.""" + required_fields = ["id", "type", "payload"] + + for field in required_fields: + if field not in data: + raise ValueError(f"Missing required field: {field}") + + logger.debug(f"{self.name} validated request: {data['id']}") + return data + + @tool + def transform_data(self, data: Dict[str, Any]) -> Dict[str, Any]: + """Transform and enrich the data.""" + # Simulate data transformation + time.sleep(0.01) # Simulate processing time + + transformed = { + **data, + "processed_at": datetime.now().isoformat(), + "processed_by": self.name, + "version": self.config.get("version", "1.0"), + } + + logger.debug(f"{self.name} transformed data for: {data['id']}") + return transformed + + @tool + def generate_response(self, data: Dict[str, Any]) -> Dict[str, Any]: + """Generate the final response.""" + response = { + "status": "success", + "request_id": data["id"], + "result": f"Processed {data['type']} successfully", + "metadata": { + "processed_at": data["processed_at"], + "processed_by": data["processed_by"], + "version": data["version"], + }, + } + + logger.debug(f"{self.name} generated response for: {data['id']}") + return response + + def get_metrics(self) -> Dict[str, Any]: + """Get current agent metrics.""" + avg_processing_time = self.metrics["total_processing_time"] / max(self.metrics["tasks_completed"], 1) + + return { + **self.metrics, + "average_processing_time": avg_processing_time, + "error_rate": self.metrics["errors_encountered"] + / max(self.metrics["tasks_completed"] + self.metrics["errors_encountered"], 1), + } + + +def api_endpoint_pattern(): + """Example of using context managers in API endpoint pattern.""" + print("API Endpoint Pattern") + + agentops.init(api_key=AGENTOPS_API_KEY) + + # Simulate API requests + requests = [ + {"id": "req_001", "type": "user_query", "payload": {"query": "What is AI?"}}, + {"id": "req_002", "type": "data_analysis", "payload": {"dataset": "sales_data"}}, + {"id": "req_003", "type": "user_query", "payload": {"query": "Process this"}}, + ] + + agent_config = {"version": "2.1.0", "environment": "production"} + api_agent = ProductionAgent("APIAgent", agent_config) + + for request in requests: + try: + with agentops.start_trace("api_request", tags=["api", request["type"]]): + response = api_agent.process_request(request) + logger.info(f"API Response: {response['status']} for {response['request_id']}") + + except Exception as e: + logger.error(f"API request failed: {e}") + + # Print final metrics + print(f"Agent Metrics: {api_agent.get_metrics()}") + + +def batch_processing_pattern(): + """Example of batch processing with context managers.""" + print("\nBatch Processing Pattern") + + agentops.init(api_key=AGENTOPS_API_KEY) + + # Simulate batch data + batch_data = [{"id": f"item_{i:03d}", "type": "data_record", "payload": {"value": i * 10}} for i in range(1, 6)] + + agent_config = {"version": "1.5.0", "batch_size": 5} + batch_agent = ProductionAgent("BatchAgent", agent_config) + + with agentops.start_trace("batch_processing", tags=["batch", "bulk"]): + logger.info(f"Starting batch processing of {len(batch_data)} items") + + successful_items = 0 + failed_items = 0 + + for item in batch_data: + try: + with agentops.start_trace("item_processing", tags=["batch", "item"]): + batch_agent.process_request(item) + successful_items += 1 + logger.debug(f"Processed item: {item['id']}") + + except Exception as e: + failed_items += 1 + logger.error(f"Failed to process item {item['id']}: {e}") + + logger.info(f"Batch completed: {successful_items} successful, {failed_items} failed") + + print(f"Batch Agent Metrics: {batch_agent.get_metrics()}") + + +def microservice_pattern(): + """Example of microservice communication pattern.""" + print("\nMicroservice Communication Pattern") + + agentops.init(api_key=AGENTOPS_API_KEY) + + def authenticate_user(user_id: str) -> bool: + """Simulate authentication service.""" + with agentops.start_trace("authenticate", tags=["auth", "security"]): + logger.info(f"Authenticating user: {user_id}") + time.sleep(0.01) # Simulate auth check + return user_id != "invalid_user" + + def get_user_profile(user_id: str) -> Dict[str, Any]: + """Simulate user service.""" + with agentops.start_trace("get_profile", tags=["user", "profile"]): + logger.info(f"Fetching profile for user: {user_id}") + time.sleep(0.02) # Simulate database query + return {"user_id": user_id, "name": f"User {user_id}", "email": f"{user_id}@example.com"} + + def send_notification(user_id: str, message: str) -> bool: + """Simulate notification service.""" + with agentops.start_trace("send_notification", tags=["notification", "email"]): + logger.info(f"Sending notification to user: {user_id}") + time.sleep(0.01) # Simulate email sending + return True + + # Simulate a complete user workflow + user_requests = ["user_123", "user_456", "invalid_user"] + + for user_id in user_requests: + try: + with agentops.start_trace("user_workflow", tags=["workflow", "user"]): + logger.info(f"Processing workflow for user: {user_id}") + + # Step 1: Authenticate + if not authenticate_user(user_id): + raise ValueError(f"Authentication failed for user: {user_id}") + + # Step 2: Get profile + get_user_profile(user_id) + + # Step 3: Send welcome notification + send_notification(user_id, "Welcome to our service!") + + logger.info(f"Workflow completed for user: {user_id}") + + except Exception as e: + logger.error(f"Workflow failed for user {user_id}: {e}") + + +def monitoring_pattern(): + """Example of monitoring with context managers.""" + print("\nMonitoring Pattern") + + agentops.init(api_key=AGENTOPS_API_KEY) + + class AlertManager: + """Simple alert manager for demonstration.""" + + def __init__(self): + self.alerts = [] + + def check_and_alert(self, trace_name: str, duration: float, success: bool): + """Check conditions and generate alerts.""" + # Alert on slow operations + if duration > 0.1: # 100ms threshold + self.alerts.append( + { + "type": "SLOW_OPERATION", + "trace": trace_name, + "duration": duration, + "timestamp": datetime.now().isoformat(), + } + ) + logger.warning(f"SLOW OPERATION ALERT: {trace_name} took {duration:.3f}s") + + # Alert on failures + if not success: + self.alerts.append( + {"type": "OPERATION_FAILURE", "trace": trace_name, "timestamp": datetime.now().isoformat()} + ) + logger.warning(f"FAILURE ALERT: {trace_name} failed") + + alert_manager = AlertManager() + + class MonitoredOperation: + """Context manager with built-in monitoring and alerting.""" + + def __init__(self, operation_name: str, tags: list = None): + self.operation_name = operation_name + self.tags = tags or [] + self.start_time = None + self.trace_context = None + + def __enter__(self): + self.start_time = time.time() + self.trace_context = agentops.start_trace(self.operation_name, tags=self.tags) + return self.trace_context + + def __exit__(self, exc_type, exc_val, exc_tb): + duration = time.time() - self.start_time + success = exc_type is None + + # Check for alerts + alert_manager.check_and_alert(self.operation_name, duration, success) + + return False + + # Simulate operations with different performance characteristics + operations = [ + ("fast_operation", 0.01, False), # Fast, successful + ("slow_operation", 0.15, False), # Slow, successful (will trigger alert) + ("failing_operation", 0.05, True), # Fast, but fails (will trigger alert) + ("normal_operation", 0.03, False), # Normal, successful + ] + + for op_name, sleep_time, should_fail in operations: + try: + with MonitoredOperation(op_name, tags=["monitoring", "demo"]): + logger.info(f"Executing {op_name}") + time.sleep(sleep_time) + + if should_fail: + raise RuntimeError(f"Simulated failure in {op_name}") + + logger.info(f"{op_name} completed successfully") + + except Exception as e: + logger.error(f"{op_name} failed: {e}") + + # Print alerts + print(f"Generated {len(alert_manager.alerts)} alerts:") + for alert in alert_manager.alerts: + print(f" {alert['type']}: {alert.get('trace', 'unknown')} at {alert['timestamp']}") + + +if __name__ == "__main__": + print("AgentOps Production Patterns Examples") + print("=" * 50) + + # API endpoint pattern + api_endpoint_pattern() + + # Batch processing pattern + batch_processing_pattern() + + # Microservice pattern + microservice_pattern() + + # Monitoring pattern + monitoring_pattern() + + print("\nAll production pattern examples completed!") diff --git a/tests/benchmark/benchmark_init.py b/tests/benchmark/benchmark_init.py index 94dad2660..743df9e5d 100644 --- a/tests/benchmark/benchmark_init.py +++ b/tests/benchmark/benchmark_init.py @@ -2,13 +2,13 @@ """ -Benchmark script for measuring TracingCore initialization time. +Benchmark script for measuring global tracer initialization time. """ def run_benchmark(): """ - Run a benchmark of TracingCore initialization. + Run a benchmark of global tracer initialization. Returns: Dictionary with timing results @@ -41,6 +41,6 @@ def print_results(results): if __name__ == "__main__": - print("Running TracingCore benchmark...") + print("Running global tracer benchmark...") results = run_benchmark() print_results(results) diff --git a/tests/integration/test_session_concurrency.py b/tests/integration/test_session_concurrency.py index c89203021..928dd972a 100644 --- a/tests/integration/test_session_concurrency.py +++ b/tests/integration/test_session_concurrency.py @@ -40,7 +40,7 @@ def setup_agentops(mock_api_key): mock_api.v3.fetch_auth_token.return_value = {"token": "mock_token", "project_id": "mock_project_id"} mock_api_client.return_value = mock_api - # Mock TracingCore to avoid actual initialization + # Mock global tracer to avoid actual initialization with patch("agentops.tracer") as mock_tracer: mock_tracer.initialized = True diff --git a/tests/unit/sdk/test_internal_span_processor.py b/tests/unit/sdk/test_internal_span_processor.py index f53e1ae12..ed76a3ff5 100644 --- a/tests/unit/sdk/test_internal_span_processor.py +++ b/tests/unit/sdk/test_internal_span_processor.py @@ -12,7 +12,7 @@ class TestURLLogging(unittest.TestCase): - """Tests for URL logging functionality in TracingCore.""" + """Tests for URL logging functionality in global tracer.""" def setUp(self): self.tracing_core = tracer @@ -21,7 +21,7 @@ def setUp(self): self.tracing_core._config = {"project_id": "test_project"} @patch("agentops.sdk.core.log_trace_url") - @patch("agentops.sdk.core.TracingCore.make_span") + @patch("agentops.sdk.core.tracer.make_span") def test_start_trace_logs_url(self, mock_make_span, mock_log_trace_url): """Test that start_trace logs the trace URL.""" # Create a mock span @@ -57,7 +57,7 @@ def test_end_trace_logs_url(self, mock_finalize_span, mock_log_trace_url): mock_log_trace_url.assert_called_once_with(mock_span, title="test_trace") @patch("agentops.sdk.core.log_trace_url") - @patch("agentops.sdk.core.TracingCore.make_span") + @patch("agentops.sdk.core.tracer.make_span") def test_start_trace_url_logging_failure_does_not_break_trace(self, mock_make_span, mock_log_trace_url): """Test that URL logging failure doesn't break trace creation.""" # Create a mock span @@ -100,7 +100,7 @@ def test_end_trace_url_logging_failure_does_not_break_trace(self, mock_finalize_ mock_log_trace_url.assert_called_once_with(mock_span, title="test_trace") @patch("agentops.sdk.core.log_trace_url") - @patch("agentops.sdk.core.TracingCore.make_span") + @patch("agentops.sdk.core.tracer.make_span") def test_start_trace_with_tags_logs_url(self, mock_make_span, mock_log_trace_url): """Test that start_trace with tags logs the trace URL.""" # Create a mock span @@ -128,7 +128,7 @@ def setUp(self): self.tracing_core._config = {"project_id": "test_project"} @patch("agentops.sdk.core.log_trace_url") - @patch("agentops.sdk.core.TracingCore.make_span") + @patch("agentops.sdk.core.tracer.make_span") @patch("agentops.sdk.core.tracer.finalize_span") def test_session_decorator_logs_url_on_start_and_end(self, mock_finalize_span, mock_make_span, mock_log_trace_url): """Test that session decorator logs URLs on both start and end.""" @@ -160,7 +160,7 @@ def test_function(): self.assertEqual(result, "test_result") @patch("agentops.sdk.core.log_trace_url") - @patch("agentops.sdk.core.TracingCore.make_span") + @patch("agentops.sdk.core.tracer.make_span") @patch("agentops.sdk.core.tracer.finalize_span") def test_session_decorator_with_default_name_logs_url(self, mock_finalize_span, mock_make_span, mock_log_trace_url): """Test that session decorator with default name logs URLs.""" @@ -191,7 +191,7 @@ def my_function(): self.assertEqual(result, "result") @patch("agentops.sdk.core.log_trace_url") - @patch("agentops.sdk.core.TracingCore.make_span") + @patch("agentops.sdk.core.tracer.make_span") @patch("agentops.sdk.core.tracer.finalize_span") def test_session_decorator_handles_url_logging_failure( self, mock_finalize_span, mock_make_span, mock_log_trace_url diff --git a/tests/unit/test_context_manager.py b/tests/unit/test_context_manager.py new file mode 100644 index 000000000..3f2e612be --- /dev/null +++ b/tests/unit/test_context_manager.py @@ -0,0 +1,777 @@ +import unittest +from unittest.mock import Mock, patch +import threading +import time +import asyncio + +from agentops import start_trace +from agentops.sdk.core import TraceContext +from opentelemetry.trace import StatusCode + + +class TestContextManager(unittest.TestCase): + """Test the context manager functionality of TraceContext""" + + def test_trace_context_has_context_manager_methods(self): + """Test that TraceContext has __enter__ and __exit__ methods""" + # TraceContext should have context manager protocol methods + assert hasattr(TraceContext, "__enter__") + assert hasattr(TraceContext, "__exit__") + + @patch("agentops.sdk.core.tracer") + def test_trace_context_enter_returns_self(self, mock_tracer): + """Test that __enter__ returns the TraceContext instance""" + mock_span = Mock() + mock_token = Mock() + trace_context = TraceContext(span=mock_span, token=mock_token) + + # __enter__ should return self + result = trace_context.__enter__() + assert result is trace_context + + @patch("agentops.sdk.core.tracer") + def test_trace_context_exit_calls_end_trace(self, mock_tracer): + """Test that __exit__ calls end_trace with appropriate state""" + mock_span = Mock() + mock_token = Mock() + trace_context = TraceContext(span=mock_span, token=mock_token) + + # Test normal exit (no exception) + trace_context.__exit__(None, None, None) + mock_tracer.end_trace.assert_called_once_with(trace_context, StatusCode.OK) + + @patch("agentops.sdk.core.tracer") + def test_trace_context_exit_with_exception_sets_error_state(self, mock_tracer): + """Test that __exit__ sets ERROR state when exception occurs""" + mock_span = Mock() + mock_token = Mock() + trace_context = TraceContext(span=mock_span, token=mock_token) + + # Test exit with exception + mock_tracer.reset_mock() + exc_type = ValueError + exc_val = ValueError("test error") + exc_tb = None + + trace_context.__exit__(exc_type, exc_val, exc_tb) + mock_tracer.end_trace.assert_called_once_with(trace_context, StatusCode.ERROR) + + @patch("agentops.sdk.core.tracer") + @patch("agentops.tracer") + def test_context_manager_usage_pattern(self, mock_agentops_tracer, mock_core_tracer): + """Test using start_trace as a context manager""" + # Create a mock TraceContext + mock_span = Mock() + mock_token = Mock() + mock_trace_context = TraceContext(span=mock_span, token=mock_token) + + # Mock the tracer's start_trace method to return our TraceContext + mock_agentops_tracer.initialized = True + mock_agentops_tracer.start_trace.return_value = mock_trace_context + + # Use as context manager + with start_trace("test_trace") as trace: + assert trace is mock_trace_context + assert trace.span is mock_span + assert trace.token is mock_token + + # Verify start_trace was called + mock_agentops_tracer.start_trace.assert_called_once_with(trace_name="test_trace", tags=None) + # Verify end_trace was called + mock_core_tracer.end_trace.assert_called_once_with(mock_trace_context, StatusCode.OK) + + @patch("agentops.sdk.core.tracer") + @patch("agentops.tracer") + def test_context_manager_with_exception(self, mock_agentops_tracer, mock_core_tracer): + """Test context manager handles exceptions properly""" + # Create a mock TraceContext + mock_span = Mock() + mock_token = Mock() + mock_trace_context = TraceContext(span=mock_span, token=mock_token) + + # Mock the tracer's start_trace method + mock_agentops_tracer.initialized = True + mock_agentops_tracer.start_trace.return_value = mock_trace_context + + # Test exception handling + with self.assertRaises(ValueError): + with start_trace("test_trace"): + raise ValueError("Test exception") + + # Verify end_trace was called with ERROR state + mock_core_tracer.end_trace.assert_called_once_with(mock_trace_context, StatusCode.ERROR) + + @patch("agentops.sdk.core.tracer") + @patch("agentops.tracer") + @patch("agentops.init") + def test_start_trace_auto_initializes_if_needed(self, mock_init, mock_agentops_tracer, mock_core_tracer): + """Test that start_trace attempts to initialize SDK if not initialized""" + # First call: SDK not initialized + mock_agentops_tracer.initialized = False + + # After init() is called, set initialized to True + def set_initialized(): + mock_agentops_tracer.initialized = True + + mock_init.side_effect = set_initialized + + # Create a mock TraceContext for when start_trace is called after init + mock_span = Mock() + mock_token = Mock() + mock_trace_context = TraceContext(span=mock_span, token=mock_token) + mock_agentops_tracer.start_trace.return_value = mock_trace_context + + # Call start_trace + result = start_trace("test_trace") + + # Verify init was called + mock_init.assert_called_once() + # Verify start_trace was called on tracer + mock_agentops_tracer.start_trace.assert_called_once_with(trace_name="test_trace", tags=None) + assert result is mock_trace_context + + def test_no_wrapper_classes_needed(self): + """Test that we don't need wrapper classes - TraceContext is the context manager""" + # TraceContext itself implements the context manager protocol + # No need for TraceContextManager wrapper + mock_span = Mock() + mock_token = Mock() + trace_context = TraceContext(span=mock_span, token=mock_token) + + # Can use directly as context manager + assert hasattr(trace_context, "__enter__") + assert hasattr(trace_context, "__exit__") + assert callable(trace_context.__enter__) + assert callable(trace_context.__exit__) + + @patch("agentops.sdk.core.tracer") + @patch("agentops.tracer") + def test_parallel_traces_independence(self, mock_agentops_tracer, mock_core_tracer): + """Test that multiple traces can run in parallel independently""" + # Create mock TraceContexts + mock_trace1 = TraceContext(span=Mock(), token=Mock()) + mock_trace2 = TraceContext(span=Mock(), token=Mock()) + + # Mock the tracer to return different traces + mock_agentops_tracer.initialized = True + mock_agentops_tracer.start_trace.side_effect = [mock_trace1, mock_trace2] + + # Start two traces + trace1 = start_trace("trace1") + trace2 = start_trace("trace2") + + # They should be different instances + assert trace1 is not trace2 + assert trace1 is mock_trace1 + assert trace2 is mock_trace2 + + # End them independently using context manager protocol + trace1.__exit__(None, None, None) + trace2.__exit__(None, None, None) + + # Verify both were ended + assert mock_core_tracer.end_trace.call_count == 2 + + @patch("agentops.sdk.core.tracer") + @patch("agentops.tracer") + def test_nested_context_managers_create_parallel_traces(self, mock_agentops_tracer, mock_core_tracer): + """Test that nested context managers create parallel traces, not parent-child""" + # Create mock TraceContexts + mock_outer = TraceContext(span=Mock(), token=Mock()) + mock_inner = TraceContext(span=Mock(), token=Mock()) + + # Mock the tracer + mock_agentops_tracer.initialized = True + mock_agentops_tracer.start_trace.side_effect = [mock_outer, mock_inner] + + # Use nested context managers + with start_trace("outer_trace") as outer: + assert outer is mock_outer + with start_trace("inner_trace") as inner: + assert inner is mock_inner + assert inner is not outer + # Both traces are active + assert mock_agentops_tracer.start_trace.call_count == 2 + + # Verify both were ended + assert mock_core_tracer.end_trace.call_count == 2 + # Inner trace ended first, then outer + calls = mock_core_tracer.end_trace.call_args_list + assert calls[0][0][0] is mock_inner + assert calls[1][0][0] is mock_outer + + @patch("agentops.sdk.core.tracer") + @patch("agentops.tracer") + def test_exception_in_nested_traces(self, mock_agentops_tracer, mock_core_tracer): + """Test exception handling in nested traces""" + # Create mock TraceContexts + mock_outer = TraceContext(span=Mock(), token=Mock()) + mock_inner = TraceContext(span=Mock(), token=Mock()) + + # Mock the tracer + mock_agentops_tracer.initialized = True + mock_agentops_tracer.start_trace.side_effect = [mock_outer, mock_inner] + + # Test exception in inner trace + with self.assertRaises(ValueError): + with start_trace("outer_trace"): + with start_trace("inner_trace"): + raise ValueError("Inner exception") + + # Both traces should be ended with ERROR state + assert mock_core_tracer.end_trace.call_count == 2 + calls = mock_core_tracer.end_trace.call_args_list + # Inner trace ended with ERROR + assert calls[0][0][0] is mock_inner + assert calls[0][0][1] == StatusCode.ERROR + # Outer trace also ended with ERROR (exception propagated) + assert calls[1][0][0] is mock_outer + assert calls[1][0][1] == StatusCode.ERROR + + @patch("agentops.sdk.core.tracer") + def test_trace_context_attributes_access(self, mock_tracer): + """Test accessing span and token attributes of TraceContext""" + mock_span = Mock() + mock_token = Mock() + trace_context = TraceContext(span=mock_span, token=mock_token) + + # Direct attribute access + assert trace_context.span is mock_span + assert trace_context.token is mock_token + + @patch("agentops.sdk.core.tracer") + @patch("agentops.tracer") + def test_multiple_exceptions_in_sequence(self, mock_agentops_tracer, mock_core_tracer): + """Test handling multiple exceptions in sequence""" + # Mock the tracer + mock_agentops_tracer.initialized = True + + # Create different mock traces for each attempt + mock_traces = [TraceContext(span=Mock(), token=Mock()) for _ in range(3)] + mock_agentops_tracer.start_trace.side_effect = mock_traces + + # Multiple traces with exceptions + for i in range(3): + with self.assertRaises(RuntimeError): + with start_trace(f"trace_{i}"): + raise RuntimeError(f"Error {i}") + + # All should be ended with ERROR state + assert mock_core_tracer.end_trace.call_count == 3 + for i, call in enumerate(mock_core_tracer.end_trace.call_args_list): + assert call[0][0] is mock_traces[i] + assert call[0][1] == StatusCode.ERROR + + @patch("agentops.sdk.core.tracer") + @patch("agentops.tracer") + def test_trace_with_tags_dict(self, mock_agentops_tracer, mock_core_tracer): + """Test starting trace with tags as dictionary""" + # Create a mock TraceContext + mock_trace = TraceContext(span=Mock(), token=Mock()) + mock_agentops_tracer.initialized = True + mock_agentops_tracer.start_trace.return_value = mock_trace + + tags = {"environment": "test", "version": "1.0"} + with start_trace("tagged_trace", tags=tags) as trace: + assert trace is mock_trace + + # Verify tags were passed + mock_agentops_tracer.start_trace.assert_called_once_with(trace_name="tagged_trace", tags=tags) + + @patch("agentops.sdk.core.tracer") + @patch("agentops.tracer") + def test_trace_with_tags_list(self, mock_agentops_tracer, mock_core_tracer): + """Test starting trace with tags as list""" + # Create a mock TraceContext + mock_trace = TraceContext(span=Mock(), token=Mock()) + mock_agentops_tracer.initialized = True + mock_agentops_tracer.start_trace.return_value = mock_trace + + tags = ["test", "v1.0", "experimental"] + with start_trace("tagged_trace", tags=tags) as trace: + assert trace is mock_trace + + # Verify tags were passed + mock_agentops_tracer.start_trace.assert_called_once_with(trace_name="tagged_trace", tags=tags) + + @patch("agentops.sdk.core.tracer") + @patch("agentops.tracer") + def test_trace_context_manager_thread_safety(self, mock_agentops_tracer, mock_core_tracer): + """Test that context managers work correctly in multi-threaded environment""" + # Mock the tracer + mock_agentops_tracer.initialized = True + + # Create unique traces for each thread + thread_traces = {} + trace_lock = threading.Lock() + + def create_trace(trace_name=None, tags=None, **kwargs): + trace = TraceContext(span=Mock(), token=Mock()) + with trace_lock: + thread_traces[threading.current_thread().ident] = trace + return trace + + mock_agentops_tracer.start_trace.side_effect = create_trace + + results = [] + errors = [] + + def worker(thread_id): + try: + with start_trace(f"thread_{thread_id}_trace") as trace: + # Each thread should get its own trace + results.append((thread_id, trace)) + time.sleep(0.01) # Simulate some work + except Exception as e: + errors.append((thread_id, str(e))) + + # Start multiple threads + threads = [] + for i in range(5): + t = threading.Thread(target=worker, args=(i,)) + threads.append(t) + t.start() + + # Wait for all threads + for t in threads: + t.join() + + # Check results + assert len(errors) == 0, f"Errors in threads: {errors}" + assert len(results) == 5 + + # Each thread should have gotten a unique trace + traces = [r[1] for r in results] + assert len(set(id(t) for t in traces)) == 5 # All unique + + @patch("agentops.sdk.core.tracer") + @patch("agentops.tracer") + def test_context_manager_with_early_return(self, mock_agentops_tracer, mock_core_tracer): + """Test that context manager properly cleans up with early return""" + # Create a mock TraceContext + mock_trace = TraceContext(span=Mock(), token=Mock()) + mock_agentops_tracer.initialized = True + mock_agentops_tracer.start_trace.return_value = mock_trace + + def function_with_early_return(): + with start_trace("early_return_trace"): + if True: # Early return condition + return "early" + return "normal" + + result = function_with_early_return() + assert result == "early" + + # Verify trace was still ended properly + mock_core_tracer.end_trace.assert_called_once_with(mock_trace, StatusCode.OK) + + @patch("agentops.sdk.core.tracer") + @patch("agentops.tracer") + def test_context_manager_with_finally_block(self, mock_agentops_tracer, mock_core_tracer): + """Test context manager with try-finally block""" + # Create a mock TraceContext + mock_trace = TraceContext(span=Mock(), token=Mock()) + mock_agentops_tracer.initialized = True + mock_agentops_tracer.start_trace.return_value = mock_trace + + finally_executed = False + + try: + with start_trace("finally_trace"): + try: + raise ValueError("Test") + finally: + finally_executed = True + except ValueError: + pass + + assert finally_executed + # Trace should be ended with ERROR due to exception + mock_core_tracer.end_trace.assert_called_once_with(mock_trace, StatusCode.ERROR) + + @patch("agentops.sdk.core.tracer") + @patch("agentops.tracer") + def test_backwards_compatibility_existing_patterns(self, mock_agentops_tracer, mock_core_tracer): + """Test that existing usage patterns continue to work""" + # Create mock traces + mock_traces = [TraceContext(span=Mock(), token=Mock()) for _ in range(3)] + mock_agentops_tracer.initialized = True + mock_agentops_tracer.start_trace.side_effect = mock_traces + + # Pattern 1: Basic context manager + with start_trace("basic") as trace: + assert trace is mock_traces[0] + + # Pattern 2: Manual start/end using context manager protocol + trace = start_trace("manual") + assert trace is mock_traces[1] + trace.__exit__(None, None, None) # Use context manager exit instead of end_trace + + # Pattern 3: With tags + with start_trace("tagged", tags=["production", "v2"]) as trace: + assert trace is mock_traces[2] + + # All patterns should work + assert mock_agentops_tracer.start_trace.call_count == 3 + assert mock_core_tracer.end_trace.call_count == 3 + + @patch("agentops.sdk.core.tracer") + @patch("agentops.tracer") + def test_edge_case_none_trace_context(self, mock_agentops_tracer, mock_core_tracer): + """Test handling when start_trace returns None""" + # Mock SDK not initialized and init fails + mock_agentops_tracer.initialized = False + + # When start_trace is called on uninitialized tracer, it returns None + with patch("agentops.init") as mock_init: + mock_init.side_effect = Exception("Init failed") + + result = start_trace("test_trace") + assert result is None + + # Verify start_trace was not called on tracer (since init failed) + mock_agentops_tracer.start_trace.assert_not_called() + + @patch("agentops.sdk.core.tracer") + @patch("agentops.tracer") + def test_edge_case_tracing_core_not_initialized(self, mock_agentops_tracer, mock_core_tracer): + """Test behavior when global tracer is not initialized""" + mock_agentops_tracer.initialized = False + + # Mock init to succeed but tracer still not initialized + with patch("agentops.init") as mock_init: + mock_init.return_value = None # init succeeds but doesn't set initialized + + result = start_trace("test") + assert result is None + + @patch("agentops.sdk.core.tracer") + @patch("agentops.tracer") + def test_edge_case_exception_in_exit_method(self, mock_agentops_tracer, mock_core_tracer): + """Test handling when exception occurs in __exit__ method""" + # Create a mock TraceContext + mock_trace = TraceContext(span=Mock(), token=Mock()) + mock_agentops_tracer.initialized = True + mock_agentops_tracer.start_trace.return_value = mock_trace + + # Make end_trace raise an exception + mock_core_tracer.end_trace.side_effect = RuntimeError("End trace failed") + + # The exception in __exit__ should be suppressed + with start_trace("exception_in_exit"): + pass # Should not raise + + # Verify end_trace was attempted + mock_core_tracer.end_trace.assert_called_once() + + @patch("agentops.sdk.core.tracer") + @patch("agentops.tracer") + def test_performance_many_sequential_traces(self, mock_agentops_tracer, mock_core_tracer): + """Test performance with many sequential traces""" + # Mock the tracer + mock_agentops_tracer.initialized = True + + # Create traces on demand + def create_trace(trace_name=None, tags=None, **kwargs): + return TraceContext(span=Mock(), token=Mock()) + + mock_agentops_tracer.start_trace.side_effect = create_trace + + # Create many traces sequentially + start_time = time.time() + for i in range(100): + with start_trace(f"trace_{i}") as trace: + assert trace is not None + assert trace.span is not None + + elapsed = time.time() - start_time + + # Should complete reasonably quickly (< 1 second for 100 traces) + assert elapsed < 1.0, f"Too slow: {elapsed:.2f}s for 100 traces" + + # Verify all traces were started and ended + assert mock_agentops_tracer.start_trace.call_count == 100 + assert mock_core_tracer.end_trace.call_count == 100 + + @patch("agentops.sdk.core.tracer") + def test_trace_context_state_management(self, mock_tracer): + """Test that TraceContext properly manages its internal state""" + mock_span = Mock() + mock_token = Mock() + trace_context = TraceContext(span=mock_span, token=mock_token) + + # Initial state + assert trace_context.span is mock_span + assert trace_context.token is mock_token + + # Enter context + result = trace_context.__enter__() + assert result is trace_context + + # Exit context normally + trace_context.__exit__(None, None, None) + mock_tracer.end_trace.assert_called_once_with(trace_context, StatusCode.OK) + + # State should remain accessible after exit + assert trace_context.span is mock_span + assert trace_context.token is mock_token + + @patch("agentops.sdk.core.tracer") + @patch("agentops.tracer") + def test_context_manager_with_async_context(self, mock_agentops_tracer, mock_core_tracer): + """Test context manager works in async context""" + # Create a mock TraceContext + mock_trace = TraceContext(span=Mock(), token=Mock()) + mock_agentops_tracer.initialized = True + mock_agentops_tracer.start_trace.return_value = mock_trace + + async def async_function(): + with start_trace("async_context") as trace: + assert trace is mock_trace + await asyncio.sleep(0.01) + return "done" + + # Run async function + result = asyncio.run(async_function()) + assert result == "done" + + # Verify trace was properly managed + mock_agentops_tracer.start_trace.assert_called_once_with(trace_name="async_context", tags=None) + mock_core_tracer.end_trace.assert_called_once_with(mock_trace, StatusCode.OK) + + +class TestContextManagerBackwardCompatibility(unittest.TestCase): + """Test backward compatibility for context manager usage""" + + @patch("agentops.sdk.core.tracer") + @patch("agentops.tracer") + def test_existing_code_patterns_still_work(self, mock_agentops_tracer, mock_core_tracer): + """Test that code using the old patterns still works""" + # Create mock traces - need more than 3 for this test + mock_traces = [TraceContext(span=Mock(), token=Mock()) for _ in range(5)] + mock_agentops_tracer.initialized = True + mock_agentops_tracer.start_trace.side_effect = mock_traces + + # Old pattern 1: Simple context manager + with start_trace("basic") as trace: + # Should work without changes + assert trace.span is not None + + # Old pattern 2: Context manager with exception handling + try: + with start_trace("with_error") as trace: + raise ValueError("test") + except ValueError: + pass + + # Old pattern 3: Nested traces + with start_trace("outer") as outer: + with start_trace("inner") as inner: + assert outer is not inner + + # All should work - 4 calls total (basic, with_error, outer, inner) + assert mock_agentops_tracer.start_trace.call_count == 4 + assert mock_core_tracer.end_trace.call_count == 4 + + @patch("agentops.sdk.core.tracer") + @patch("agentops.tracer") + def test_api_compatibility(self, mock_agentops_tracer, mock_core_tracer): + """Test that the API remains compatible""" + # Create mock TraceContexts for each call + mock_traces = [TraceContext(span=Mock(), token=Mock()) for _ in range(3)] + mock_agentops_tracer.initialized = True + mock_agentops_tracer.start_trace.side_effect = mock_traces + + # Test function signatures + # start_trace(trace_name, tags=None) + trace1 = start_trace("test1") + assert trace1 is mock_traces[0] + + trace2 = start_trace("test2", tags=["tag1", "tag2"]) + assert trace2 is mock_traces[1] + + trace3 = start_trace("test3", tags={"key": "value"}) + assert trace3 is mock_traces[2] + + # Use context manager protocol to end traces + trace1.__exit__(None, None, None) + trace2.__exit__(ValueError, ValueError("test"), None) + trace3.__exit__(None, None, None) + + # All calls should work + assert mock_agentops_tracer.start_trace.call_count == 3 + assert mock_core_tracer.end_trace.call_count == 3 + + @patch("agentops.sdk.core.tracer") + @patch("agentops.tracer") + def test_return_type_compatibility(self, mock_agentops_tracer, mock_core_tracer): + """Test that return types are compatible with existing code""" + mock_span = Mock() + mock_token = Mock() + mock_trace = TraceContext(span=mock_span, token=mock_token) + mock_agentops_tracer.initialized = True + mock_agentops_tracer.start_trace.return_value = mock_trace + + # start_trace returns TraceContext (or None) + trace = start_trace("test") + assert isinstance(trace, TraceContext) + assert hasattr(trace, "span") + assert hasattr(trace, "token") + assert hasattr(trace, "__enter__") + assert hasattr(trace, "__exit__") + + # Can be used as context manager + with trace: + pass + + @patch("agentops.sdk.core.tracer") + @patch("agentops.tracer") + def test_context_manager_with_keyboard_interrupt(self, mock_agentops_tracer, mock_core_tracer): + """Test context manager handles KeyboardInterrupt properly""" + # Create a mock TraceContext + mock_trace = TraceContext(span=Mock(), token=Mock()) + mock_agentops_tracer.initialized = True + mock_agentops_tracer.start_trace.return_value = mock_trace + + # Test KeyboardInterrupt handling + with self.assertRaises(KeyboardInterrupt): + with start_trace("keyboard_interrupt_trace"): + raise KeyboardInterrupt() + + # Verify end_trace was called with ERROR state + mock_core_tracer.end_trace.assert_called_once_with(mock_trace, StatusCode.ERROR) + + @patch("agentops.sdk.core.tracer") + @patch("agentops.tracer") + def test_context_manager_with_system_exit(self, mock_agentops_tracer, mock_core_tracer): + """Test context manager handles SystemExit properly""" + # Create a mock TraceContext + mock_trace = TraceContext(span=Mock(), token=Mock()) + mock_agentops_tracer.initialized = True + mock_agentops_tracer.start_trace.return_value = mock_trace + + # Test SystemExit handling + with self.assertRaises(SystemExit): + with start_trace("system_exit_trace"): + raise SystemExit(1) + + # Verify end_trace was called with ERROR state + mock_core_tracer.end_trace.assert_called_once_with(mock_trace, StatusCode.ERROR) + + @patch("agentops.sdk.core.tracer") + @patch("agentops.tracer") + def test_context_manager_in_generator_function(self, mock_agentops_tracer, mock_core_tracer): + """Test context manager works correctly in generator functions""" + # Create mock traces + mock_traces = [TraceContext(span=Mock(), token=Mock()) for _ in range(3)] + mock_agentops_tracer.initialized = True + mock_agentops_tracer.start_trace.side_effect = mock_traces + + def trace_generator(): + with start_trace("generator_trace"): + yield 1 + yield 2 + yield 3 + + # Consume the generator + results = list(trace_generator()) + assert results == [1, 2, 3] + + # Verify trace was properly managed + mock_agentops_tracer.start_trace.assert_called_once() + mock_core_tracer.end_trace.assert_called_once() + + @patch("agentops.sdk.core.tracer") + def test_context_manager_exit_return_value(self, mock_tracer): + """Test that __exit__ returns None (doesn't suppress exceptions)""" + mock_span = Mock() + mock_token = Mock() + trace_context = TraceContext(span=mock_span, token=mock_token) + + # __exit__ should return None (or falsy) to not suppress exceptions + result = trace_context.__exit__(None, None, None) + assert result is None or not result + + # Also with exception + result = trace_context.__exit__(ValueError, ValueError("test"), None) + assert result is None or not result + + @patch("agentops.sdk.core.tracer") + @patch("agentops.tracer") + def test_context_manager_with_very_large_data(self, mock_agentops_tracer, mock_core_tracer): + """Test context manager with very large trace names and tags""" + # Create a mock TraceContext + mock_trace = TraceContext(span=Mock(), token=Mock()) + mock_agentops_tracer.initialized = True + mock_agentops_tracer.start_trace.return_value = mock_trace + + # Very large trace name and tags + large_trace_name = "x" * 10000 + large_tags = {f"key_{i}": f"value_{i}" * 100 for i in range(100)} + + with start_trace(large_trace_name, tags=large_tags) as trace: + assert trace is mock_trace + + # Should handle large data without issues + mock_agentops_tracer.start_trace.assert_called_once() + args, kwargs = mock_agentops_tracer.start_trace.call_args + assert kwargs["trace_name"] == large_trace_name + assert kwargs["tags"] == large_tags + + @patch("agentops.sdk.core.tracer") + @patch("agentops.tracer") + def test_context_manager_with_asyncio_tasks(self, mock_agentops_tracer, mock_core_tracer): + """Test context manager with multiple asyncio tasks""" + # Mock the tracer + mock_agentops_tracer.initialized = True + + # Create traces for each task + trace_count = 0 + + def create_trace(trace_name=None, tags=None, **kwargs): + nonlocal trace_count + trace_count += 1 + return TraceContext(span=Mock(name=f"span_{trace_count}"), token=Mock()) + + mock_agentops_tracer.start_trace.side_effect = create_trace + + async def task_with_trace(task_id): + with start_trace(f"async_task_{task_id}"): + await asyncio.sleep(0.01) + return task_id + + async def run_concurrent_tasks(): + tasks = [task_with_trace(i) for i in range(5)] + results = await asyncio.gather(*tasks) + return results + + # Run async tasks + results = asyncio.run(run_concurrent_tasks()) + assert results == [0, 1, 2, 3, 4] + + # All traces should be started and ended + assert mock_agentops_tracer.start_trace.call_count == 5 + assert mock_core_tracer.end_trace.call_count == 5 + + @patch("agentops.sdk.core.tracer") + @patch("agentops.tracer") + def test_context_manager_resource_cleanup_on_exit_failure(self, mock_agentops_tracer, mock_core_tracer): + """Test that resources are cleaned up even if __exit__ fails""" + # Create a mock TraceContext + mock_trace = TraceContext(span=Mock(), token=Mock()) + mock_agentops_tracer.initialized = True + mock_agentops_tracer.start_trace.return_value = mock_trace + + # Make end_trace fail + mock_core_tracer.end_trace.side_effect = Exception("Cleanup failed") + + # Should not raise exception from __exit__ + with start_trace("cleanup_test") as trace: + assert trace is mock_trace + + # end_trace was attempted despite failure + mock_core_tracer.end_trace.assert_called_once() + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/unit/test_session.py b/tests/unit/test_session.py index cfcb2b903..4b40bf770 100644 --- a/tests/unit/test_session.py +++ b/tests/unit/test_session.py @@ -8,7 +8,7 @@ @pytest.fixture(scope="function") def mock_tracing_core(): - """Mock the TracingCore to avoid actual initialization""" + """Mock the global tracer to avoid actual initialization""" # Patch both the main location and where it's imported in client with ( patch("agentops.tracer") as mock_tracer, @@ -137,7 +137,7 @@ def test_start_trace_without_init(): # Reset client for test agentops._client = agentops.Client() - # Mock TracingCore to be uninitialized initially, then initialized after init + # Mock global tracer to be uninitialized initially, then initialized after init with ( patch("agentops.tracer") as mock_tracer, patch("agentops.client.client.tracer", mock_tracer), @@ -150,7 +150,7 @@ def test_start_trace_without_init(): with patch("agentops.init") as mock_init: def side_effect(): - # After init is called, mark TracingCore as initialized + # After init is called, mark global tracer as initialized mock_tracer.initialized = True mock_init.side_effect = side_effect @@ -172,7 +172,7 @@ def test_end_trace(mock_tracing_core, mock_trace_context): # End the trace agentops.end_trace(mock_trace_context, end_state="Success") - # Verify end_trace was called on TracingCore + # Verify end_trace was called on global tracer mock_tracing_core.end_trace.assert_called_once_with(trace_context=mock_trace_context, end_state="Success") @@ -233,7 +233,7 @@ def failing_function(): def test_legacy_start_session_compatibility(mock_tracing_core, mock_api_client, mock_trace_context, reset_client): - """Test that legacy start_session still works and calls TracingCore.start_trace""" + """Test that legacy start_session still works and calls tracer.start_trace""" import agentops from agentops.legacy import Session @@ -253,13 +253,13 @@ def test_legacy_start_session_compatibility(mock_tracing_core, mock_api_client, # Check that the session's trace_context has the expected properties assert session.trace_context is not None - # Verify that TracingCore.start_trace was called + # Verify that tracer.start_trace was called # Note: May be called multiple times due to initialization assert mock_tracing_core.start_trace.call_count >= 1 def test_legacy_end_session_compatibility(mock_tracing_core, mock_api_client, mock_trace_context, reset_client): - """Test that legacy end_session still works and calls TracingCore.end_trace""" + """Test that legacy end_session still works and calls tracer.end_trace""" import agentops from agentops.legacy import Session @@ -275,7 +275,7 @@ def test_legacy_end_session_compatibility(mock_tracing_core, mock_api_client, mo # End the session agentops.end_session(session) - # Verify that TracingCore.end_trace was called + # Verify that tracer.end_trace was called mock_tracing_core.end_trace.assert_called_once_with(mock_trace_context, end_state="Success") diff --git a/tests/unit/test_session_legacy.py b/tests/unit/test_session_legacy.py index 4b4b21aa9..e3b587f24 100644 --- a/tests/unit/test_session_legacy.py +++ b/tests/unit/test_session_legacy.py @@ -143,7 +143,7 @@ def test_crewai_kwargs_force_flush(): agentops.end_session(end_state="Success", end_state_reason="Test Finished", is_auto_end=True) # Explicitly ensure the core isn't already shut down for the test - assert tracer._initialized, "TracingCore should still be initialized" + assert tracer._initialized, "Global tracer should still be initialized" def test_crewai_task_instrumentation(instrumentation):