diff --git a/agentops/__init__.py b/agentops/__init__.py
index dc526e8aa..0ed1171a8 100755
--- a/agentops/__init__.py
+++ b/agentops/__init__.py
@@ -15,18 +15,28 @@
from typing import List, Optional, Union, Dict, Any
from agentops.client import Client
from agentops.sdk.core import TraceContext, tracer
-from agentops.sdk.decorators import trace, session, agent, task, workflow, operation
+from agentops.sdk.decorators import trace, session, agent, task, workflow, operation, tool
+from agentops.enums import TraceState, SUCCESS, ERROR, UNSET
+from opentelemetry.trace.status import StatusCode
from agentops.logging.config import logger
+import threading
-# Client global instance; one per process runtime
-_client = Client()
+# Thread-safe client management
+_client_lock = threading.Lock()
+_client = None
def get_client() -> Client:
- """Get the singleton client instance"""
+ """Get the singleton client instance in a thread-safe manner"""
global _client
+ # Double-checked locking pattern for thread safety
+ if _client is None:
+ with _client_lock:
+ if _client is None:
+ _client = Client()
+
return _client
@@ -106,24 +116,31 @@ def init(
elif default_tags:
merged_tags = default_tags
- return _client.init(
- api_key=api_key,
- endpoint=endpoint,
- app_url=app_url,
- max_wait_time=max_wait_time,
- max_queue_size=max_queue_size,
- default_tags=merged_tags,
- trace_name=trace_name,
- instrument_llm_calls=instrument_llm_calls,
- auto_start_session=auto_start_session,
- auto_init=auto_init,
- skip_auto_end_session=skip_auto_end_session,
- env_data_opt_out=env_data_opt_out,
- log_level=log_level,
- fail_safe=fail_safe,
- exporter_endpoint=exporter_endpoint,
+ # Prepare initialization arguments
+ init_kwargs = {
+ "api_key": api_key,
+ "endpoint": endpoint,
+ "app_url": app_url,
+ "max_wait_time": max_wait_time,
+ "max_queue_size": max_queue_size,
+ "default_tags": merged_tags,
+ "trace_name": trace_name,
+ "instrument_llm_calls": instrument_llm_calls,
+ "auto_start_session": auto_start_session,
+ "auto_init": auto_init,
+ "skip_auto_end_session": skip_auto_end_session,
+ "env_data_opt_out": env_data_opt_out,
+ "log_level": log_level,
+ "fail_safe": fail_safe,
+ "exporter_endpoint": exporter_endpoint,
**kwargs,
- )
+ }
+
+ # Get the current client instance (creates new one if needed)
+ client = get_client()
+
+ # Initialize the client directly
+ return client.init(**init_kwargs)
def configure(**kwargs):
@@ -173,7 +190,8 @@ def configure(**kwargs):
if invalid_params:
logger.warning(f"Invalid configuration parameters: {invalid_params}")
- _client.configure(**kwargs)
+ client = get_client()
+ client.configure(**kwargs)
def start_trace(
@@ -207,7 +225,9 @@ def start_trace(
return tracer.start_trace(trace_name=trace_name, tags=tags)
-def end_trace(trace_context: Optional[TraceContext] = None, end_state: str = "Success") -> None:
+def end_trace(
+ trace_context: Optional[TraceContext] = None, end_state: Union[TraceState, StatusCode, str] = TraceState.SUCCESS
+) -> None:
"""
Ends a trace (its root span) and finalizes it.
If no trace_context is provided, ends all active session spans.
@@ -246,4 +266,12 @@ def end_trace(trace_context: Optional[TraceContext] = None, end_state: str = "Su
"workflow",
"operation",
"tracer",
+ "tool",
+ # Trace state enums
+ "TraceState",
+ "SUCCESS",
+ "ERROR",
+ "UNSET",
+ # OpenTelemetry status codes (for advanced users)
+ "StatusCode",
]
diff --git a/agentops/client/client.py b/agentops/client/client.py
index 1979e6c6e..0fe95b95c 100644
--- a/agentops/client/client.py
+++ b/agentops/client/client.py
@@ -24,7 +24,7 @@ def _end_init_trace_atexit():
if _client_init_trace_context is not None:
logger.debug("Auto-ending client's init trace during shutdown.")
try:
- # Use TracingCore to end the trace directly
+ # Use global tracer to end the trace directly
if tracer.initialized and _client_init_trace_context.span.is_recording():
tracer.end_trace(_client_init_trace_context, end_state="Shutdown")
except Exception as e:
@@ -104,7 +104,7 @@ def init(self, **kwargs: Any) -> None: # Return type updated to None
try:
response = self.api.v3.fetch_auth_token(self.config.api_key)
if response is None:
- # If auth fails, we cannot proceed with TracingCore initialization that depends on project_id
+ # If auth fails, we cannot proceed with tracer initialization that depends on project_id
logger.error("Failed to fetch auth token. AgentOps SDK will not be initialized.")
return None # Explicitly return None if auth fails
except Exception as e:
@@ -161,8 +161,8 @@ def init(self, **kwargs: Any) -> None: # Return type updated to None
else:
logger.error("Failed to start the auto-init trace.")
- # Even if auto-start fails, core services up to TracingCore might be initialized.
- # Set self.initialized to True if TracingCore is up, but return None.
+ # Even if auto-start fails, core services up to the tracer might be initialized.
+ # Set self.initialized to True if tracer is up, but return None.
self._initialized = tracer.initialized
return None # Failed to start trace
diff --git a/agentops/enums.py b/agentops/enums.py
new file mode 100644
index 000000000..c3dcfda38
--- /dev/null
+++ b/agentops/enums.py
@@ -0,0 +1,36 @@
+"""
+AgentOps enums for user-friendly API.
+
+This module provides simple enums that users can import from agentops
+without needing to know about OpenTelemetry internals.
+"""
+
+from enum import Enum
+from opentelemetry.trace.status import StatusCode
+
+
+class TraceState(Enum):
+ """
+ Enum for trace end states.
+
+ This provides a user-friendly interface that maps to OpenTelemetry StatusCode internally.
+ Users can simply use agentops.TraceState.SUCCESS instead of importing OpenTelemetry.
+ """
+
+ SUCCESS = StatusCode.OK
+ ERROR = StatusCode.ERROR
+ UNSET = StatusCode.UNSET
+
+ def __str__(self) -> str:
+ """Return the name for string representation."""
+ return self.name
+
+ def to_status_code(self) -> StatusCode:
+ """Convert to OpenTelemetry StatusCode."""
+ return self.value
+
+
+# For backward compatibility, also provide these as module-level constants
+SUCCESS = TraceState.SUCCESS
+ERROR = TraceState.ERROR
+UNSET = TraceState.UNSET
diff --git a/agentops/instrumentation/__init__.py b/agentops/instrumentation/__init__.py
index b48475af9..96966a8ad 100644
--- a/agentops/instrumentation/__init__.py
+++ b/agentops/instrumentation/__init__.py
@@ -444,7 +444,7 @@ def instrument_one(loader: InstrumentorLoader) -> Optional[BaseInstrumentor]:
instrumentor = loader.get_instance()
try:
- instrumentor.instrument(tracer_provider=TracingCore.get_instance()._provider)
+ instrumentor.instrument(tracer_provider=tracer._provider)
logger.info(
f"AgentOps: Successfully instrumented '{loader.class_name}' for package '{loader.package_name or loader.module_name}'."
)
diff --git a/agentops/legacy/__init__.py b/agentops/legacy/__init__.py
index 7c7f787b6..d835c0877 100644
--- a/agentops/legacy/__init__.py
+++ b/agentops/legacy/__init__.py
@@ -65,7 +65,7 @@ def start_session(
) -> Session:
"""
@deprecated Use agentops.start_trace() instead.
- Starts a legacy AgentOps session. Calls TracingCore.start_trace internally.
+ Starts a legacy AgentOps session. Calls tracer.start_trace internally.
"""
global _current_session, _current_trace_context
@@ -89,7 +89,7 @@ def start_session(
trace_context = tracer.start_trace(trace_name="session", tags=tags)
if trace_context is None:
- logger.error("Failed to start trace via TracingCore. Returning dummy session.")
+ logger.error("Failed to start trace via global tracer. Returning dummy session.")
dummy_session = Session(None)
_current_session = dummy_session
_current_trace_context = None
@@ -124,13 +124,13 @@ def _set_span_attributes(span: Any, attributes: Dict[str, Any]) -> None:
def end_session(session_or_status: Any = None, **kwargs: Any) -> None:
"""
@deprecated Use agentops.end_trace() instead.
- Ends a legacy AgentOps session. Calls TracingCore.end_trace internally.
+ Ends a legacy AgentOps session. Calls tracer.end_trace internally.
Supports multiple calling patterns for backward compatibility.
"""
global _current_session, _current_trace_context
if not tracer.initialized:
- logger.debug("Ignoring end_session: TracingCore not initialized.")
+ logger.debug("Ignoring end_session: global tracer not initialized.")
return
target_trace_context: Optional[TraceContext] = None
@@ -189,7 +189,7 @@ def end_session(session_or_status: Any = None, **kwargs: Any) -> None:
def end_all_sessions() -> None:
"""@deprecated Ends all active sessions/traces."""
if not tracer.initialized:
- logger.debug("Ignoring end_all_sessions: TracingCore not initialized.")
+ logger.debug("Ignoring end_all_sessions: global tracer not initialized.")
return
# Use the new end_trace functionality to end all active traces
diff --git a/agentops/sdk/README.md b/agentops/sdk/README.md
index b39eb52c0..e13fe6c87 100644
--- a/agentops/sdk/README.md
+++ b/agentops/sdk/README.md
@@ -176,11 +176,11 @@ flowchart TD
## Example Usage
```python
-from agentops import Session, agent, tool
-from agentops.sdk import TracingCore, TracingConfig
+from agentops import Session, agent, tool, tracer
+from agentops.sdk import TracingConfig
-# Initialize the tracing core with a dedicated configuration
-TracingCore.get_instance().initialize(
+# Initialize the global tracer with a dedicated configuration
+tracer.initialize(
service_name="my-service",
exporter_endpoint="https://my-exporter-endpoint.com",
max_queue_size=1000,
diff --git a/agentops/sdk/__init__.py b/agentops/sdk/__init__.py
index f1be1f718..bc2f6b6f9 100644
--- a/agentops/sdk/__init__.py
+++ b/agentops/sdk/__init__.py
@@ -5,18 +5,16 @@
for different types of operations in AI agent workflows.
"""
-# Import core components
-from agentops.sdk.core import TracingCore
-
# Import decorators
from agentops.sdk.decorators import agent, operation, session, task, workflow
# from agentops.sdk.traced import TracedObject # Merged into TracedObject
from agentops.sdk.types import TracingConfig
+from opentelemetry.trace.status import StatusCode
+
__all__ = [
# Core components
- "TracingCore",
"TracingConfig",
# Decorators
"session",
@@ -24,4 +22,6 @@
"agent",
"task",
"workflow",
+ # OpenTelemetry status codes
+ "StatusCode",
]
diff --git a/agentops/sdk/core.py b/agentops/sdk/core.py
index acab6185f..8874fd2b6 100644
--- a/agentops/sdk/core.py
+++ b/agentops/sdk/core.py
@@ -2,7 +2,7 @@
import atexit
import threading
-from typing import Optional, Any, Dict
+from typing import Optional, Any, Dict, Union
from opentelemetry import metrics, trace
from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter
@@ -26,6 +26,7 @@
)
from agentops.semconv import SpanKind
from agentops.helpers.dashboard import log_trace_url
+from opentelemetry.trace.status import StatusCode
# No need to create shortcuts since we're using our own ResourceAttributes class now
@@ -36,6 +37,38 @@ def __init__(self, span: Span, token: Optional[context_api.Token] = None, is_ini
self.span = span
self.token = token
self.is_init_trace = is_init_trace # Flag to identify the auto-started trace
+ self._end_state = StatusCode.UNSET # Default end state because we don't know yet
+
+ def __enter__(self) -> "TraceContext":
+ """Enter the trace context."""
+ return self
+
+ def __exit__(self, exc_type: Optional[type], exc_val: Optional[Exception], exc_tb: Optional[Any]) -> bool:
+ """Exit the trace context and end the trace.
+
+ Automatically sets the trace status based on whether an exception occurred:
+ - If an exception is present, sets status to ERROR
+ - If no exception occurred, sets status to OK
+
+ Returns:
+ False: Always returns False to propagate any exceptions that occurred
+ within the context manager block, following Python's
+ context manager protocol for proper exception handling.
+ """
+ if exc_type is not None:
+ self._end_state = StatusCode.ERROR
+ if exc_val:
+ logger.debug(f"Trace exiting with exception: {exc_val}")
+ else:
+ # No exception occurred, set to OK
+ self._end_state = StatusCode.OK
+
+ try:
+ tracer.end_trace(self, self._end_state)
+ except Exception as e:
+ logger.error(f"Error ending trace in context manager: {e}")
+
+ return False
# get_imported_libraries moved to agentops.helpers.system
@@ -201,7 +234,7 @@ def config(self) -> TracingConfig:
"""Get the tracing configuration."""
if self._config is None:
# This case should ideally not be reached if initialized properly
- raise AgentOpsClientNotInitializedException("TracingCore config accessed before initialization.")
+ raise AgentOpsClientNotInitializedException("Tracer config accessed before initialization.")
return self._config
def shutdown(self) -> None:
@@ -316,7 +349,7 @@ def start_trace(
A TraceContext object containing the span and context token, or None if not initialized.
"""
if not self.initialized:
- logger.warning("TracingCore not initialized. Cannot start trace.")
+ logger.warning("Global tracer not initialized. Cannot start trace.")
return None
# Build trace attributes
@@ -347,7 +380,9 @@ def start_trace(
return trace_context
- def end_trace(self, trace_context: Optional[TraceContext] = None, end_state: str = "Success") -> None:
+ def end_trace(
+ self, trace_context: Optional[TraceContext] = None, end_state: Union[Any, StatusCode, str] = None
+ ) -> None:
"""
Ends a trace (its root span) and finalizes it.
If no trace_context is provided, ends all active session spans.
@@ -357,9 +392,15 @@ def end_trace(self, trace_context: Optional[TraceContext] = None, end_state: str
end_state: The final state of the trace (e.g., "Success", "Failure", "Error").
"""
if not self.initialized:
- logger.warning("TracingCore not initialized. Cannot end trace.")
+ logger.warning("Global tracer not initialized. Cannot end trace.")
return
+ # Set default if not provided
+ if end_state is None:
+ from agentops.enums import TraceState
+
+ end_state = TraceState.SUCCESS
+
# If no specific trace_context provided, end all active traces
if trace_context is None:
with self._traces_lock:
@@ -373,7 +414,7 @@ def end_trace(self, trace_context: Optional[TraceContext] = None, end_state: str
# End specific trace
self._end_single_trace(trace_context, end_state)
- def _end_single_trace(self, trace_context: TraceContext, end_state: str) -> None:
+ def _end_single_trace(self, trace_context: TraceContext, end_state: Union[Any, StatusCode, str]) -> None:
"""
Internal method to end a single trace.
@@ -393,7 +434,20 @@ def _end_single_trace(self, trace_context: TraceContext, end_state: str) -> None
# Handle case where span is mocked or trace_id is not a valid integer
trace_id = str(span.get_span_context().trace_id)
- logger.debug(f"Ending trace with span ID: {span.get_span_context().span_id}, end_state: {end_state}")
+ # Convert TraceState enum to StatusCode if needed
+ from agentops.enums import TraceState
+
+ if isinstance(end_state, TraceState):
+ # It's a TraceState enum
+ state_str = str(end_state)
+ elif isinstance(end_state, StatusCode):
+ # It's already a StatusCode
+ state_str = str(end_state)
+ else:
+ # It's a string (legacy)
+ state_str = str(end_state)
+
+ logger.debug(f"Ending trace with span ID: {span.get_span_context().span_id}, end_state: {state_str}")
try:
# Build and set session end attributes
diff --git a/docs/mint.json b/docs/mint.json
index f2c0865f7..00fec4adf 100644
--- a/docs/mint.json
+++ b/docs/mint.json
@@ -187,6 +187,7 @@
"v2/usage/dashboard-info",
"v2/usage/sdk-reference",
"v2/usage/advanced-configuration",
+ "v2/usage/context-managers",
"v2/usage/tracking-llm-calls",
"v2/usage/tracking-agents",
"v2/usage/recording-operations",
diff --git a/docs/v2/usage/context-managers.mdx b/docs/v2/usage/context-managers.mdx
new file mode 100644
index 000000000..391dbd2b2
--- /dev/null
+++ b/docs/v2/usage/context-managers.mdx
@@ -0,0 +1,328 @@
+---
+title: "Context Managers"
+description: "Use AgentOps traces as Python context managers for automatic lifecycle management"
+---
+
+# Context Managers
+
+AgentOps provides native context manager support for traces, allowing you to use Python's `with` statement for automatic trace lifecycle management. This approach ensures traces are properly started and ended, even when exceptions occur.
+
+## Basic Usage
+
+The simplest way to use context managers is with the `start_trace()` function:
+
+```python
+import agentops
+
+# Initialize AgentOps
+agentops.init(api_key="your-api-key")
+
+# Use context manager for automatic trace management
+with agentops.start_trace("my_workflow") as trace:
+ # Your code here
+ print("Processing data...")
+ # Trace automatically ends when exiting the with block
+```
+
+The trace will automatically:
+- Start when entering the `with` block
+- End with "Success" status when exiting normally
+- End with "Error" status if an exception occurs
+- Clean up resources properly in all cases
+
+## Advanced Usage
+
+### Traces with Tags
+
+You can add tags to traces for better organization and filtering:
+
+```python
+import agentops
+
+agentops.init(api_key="your-api-key")
+
+# Using list tags
+with agentops.start_trace("data_processing", tags=["batch", "production"]):
+ process_batch_data()
+
+# Using dictionary tags for more structured metadata
+with agentops.start_trace("user_request", tags={
+ "user_id": "12345",
+ "request_type": "query",
+ "priority": "high"
+}):
+ handle_user_request()
+```
+
+### Parallel Traces
+
+Context managers create independent parallel traces, not parent-child relationships:
+
+```python
+import agentops
+
+agentops.init(api_key="your-api-key")
+
+# Sequential parallel traces
+with agentops.start_trace("task_1"):
+ print("Task 1 executing")
+
+with agentops.start_trace("task_2"):
+ print("Task 2 executing")
+
+# Nested context managers create parallel traces
+with agentops.start_trace("outer_workflow"):
+ print("Outer workflow started")
+
+ with agentops.start_trace("inner_task"):
+ print("Inner task executing (parallel to outer)")
+
+ print("Outer workflow continuing")
+```
+
+### Exception Handling
+
+Context managers automatically handle exceptions and set appropriate trace states:
+
+```python
+import agentops
+
+agentops.init(api_key="your-api-key")
+
+# Automatic error handling
+try:
+ with agentops.start_trace("risky_operation"):
+ # This will automatically set trace status to "Error"
+ raise ValueError("Something went wrong")
+except ValueError as e:
+ print(f"Caught error: {e}")
+ # Trace has already been ended with Error status
+
+# Graceful degradation pattern
+try:
+ with agentops.start_trace("primary_service"):
+ result = call_primary_service()
+except ServiceUnavailableError:
+ with agentops.start_trace("fallback_service"):
+ result = call_fallback_service()
+```
+
+### Concurrent Execution
+
+Context managers work seamlessly with threading and asyncio:
+
+
+
+```python Threading
+import agentops
+import threading
+
+agentops.init(api_key="your-api-key")
+
+# With threading
+def worker_function(worker_id):
+ with agentops.start_trace(f"worker_{worker_id}"):
+ # Each thread gets its own independent trace
+ process_work(worker_id)
+
+threads = []
+for i in range(3):
+ thread = threading.Thread(target=worker_function, args=(i,))
+ threads.append(thread)
+ thread.start()
+
+for thread in threads:
+ thread.join()
+```
+
+```python Asyncio
+import agentops
+import asyncio
+
+agentops.init(api_key="your-api-key")
+
+# With asyncio
+async def async_task(task_id):
+ with agentops.start_trace(f"async_task_{task_id}"):
+ await asyncio.sleep(0.1) # Simulate async work
+ return f"result_{task_id}"
+
+async def main():
+ tasks = [async_task(i) for i in range(3)]
+ results = await asyncio.gather(*tasks)
+ return results
+
+# Run async tasks
+results = asyncio.run(main())
+```
+
+
+
+## Production Patterns
+
+### API Endpoint Monitoring
+
+```python
+import agentops
+from flask import Flask, request
+
+app = Flask(__name__)
+agentops.init(api_key="your-api-key")
+
+@app.route('/api/process', methods=['POST'])
+def process_request():
+ # Create trace for each API request
+ with agentops.start_trace("api_request", tags={
+ "endpoint": "/api/process",
+ "method": "POST",
+ "user_id": request.headers.get("user-id")
+ }):
+ try:
+ data = request.get_json()
+ result = process_data(data)
+ return {"status": "success", "result": result}
+ except Exception as e:
+ # Exception automatically sets trace to Error status
+ return {"status": "error", "message": str(e)}, 500
+```
+
+### Batch Processing
+
+```python
+import agentops
+
+agentops.init(api_key="your-api-key")
+
+def process_batch(items):
+ with agentops.start_trace("batch_processing", tags={
+ "batch_size": len(items),
+ "batch_type": "data_processing"
+ }):
+ successful = 0
+ failed = 0
+
+ for item in items:
+ try:
+ with agentops.start_trace("item_processing", tags={
+ "item_id": item.get("id"),
+ "item_type": item.get("type")
+ }):
+ process_item(item)
+ successful += 1
+ except Exception as e:
+ failed += 1
+ print(f"Failed to process item {item.get('id')}: {e}")
+
+ print(f"Batch completed: {successful} successful, {failed} failed")
+```
+
+### Retry Logic
+
+```python
+import agentops
+import time
+
+agentops.init(api_key="your-api-key")
+
+def retry_operation(operation_name, max_retries=3):
+ for attempt in range(max_retries):
+ try:
+ with agentops.start_trace(f"{operation_name}_attempt_{attempt + 1}", tags={
+ "operation": operation_name,
+ "attempt": attempt + 1,
+ "max_retries": max_retries
+ }):
+ # Your operation here
+ result = perform_operation()
+ return result # Success - exit retry loop
+
+ except Exception as e:
+ if attempt < max_retries - 1:
+ wait_time = 2 ** attempt # Exponential backoff
+ print(f"Attempt {attempt + 1} failed: {e}. Retrying in {wait_time}s...")
+ time.sleep(wait_time)
+ else:
+ print(f"All {max_retries} attempts failed")
+ raise
+```
+
+## Backward Compatibility
+
+Context managers are fully backward compatible with existing AgentOps code patterns:
+
+
+
+```python Manual Management
+import agentops
+
+agentops.init(api_key="your-api-key")
+
+# Manual trace management (legacy)
+trace = agentops.start_trace("manual_trace")
+# ... your code ...
+agentops.end_trace(trace, "Success")
+```
+
+```python Context Manager
+import agentops
+
+agentops.init(api_key="your-api-key")
+
+# Context manager (new, recommended)
+with agentops.start_trace("context_managed_trace") as trace:
+ # ... your code ...
+ pass # Automatically ended
+```
+
+```python Property Access
+import agentops
+
+agentops.init(api_key="your-api-key")
+
+# Accessing trace properties
+with agentops.start_trace("property_access") as trace:
+ span = trace.span # Access underlying span
+ trace_id = trace.span.get_span_context().trace_id
+```
+
+```python Mixed Usage
+import agentops
+
+agentops.init(api_key="your-api-key")
+
+# Mixed usage
+trace = agentops.start_trace("mixed_usage")
+try:
+ with trace: # Use existing trace as context manager
+ # ... your code ...
+ pass
+except Exception:
+ agentops.end_trace(trace, "Error")
+```
+
+
+
+## Examples
+
+For complete working examples, see the following files in the AgentOps repository:
+
+
+
+ Simple context manager patterns and error handling
+
+
+ Sequential, nested, and concurrent trace patterns
+
+
+ Exception handling, retry patterns, and graceful degradation
+
+
+ API endpoints, batch processing, microservices, and monitoring
+
+
+
+These examples demonstrate real-world usage patterns and best practices for using AgentOps context managers in production applications.
+
+## API Reference
+
+For detailed API information, see the [SDK Reference](/v2/usage/sdk-reference#trace-management) documentation.
diff --git a/examples/context_manager/README.md b/examples/context_manager/README.md
new file mode 100644
index 000000000..5ed88c88f
--- /dev/null
+++ b/examples/context_manager/README.md
@@ -0,0 +1,93 @@
+# AgentOps Context Manager Examples
+
+This directory contains examples demonstrating how to use AgentOps with context managers for automatic trace lifecycle management using the simplified, native implementation.
+
+## Overview
+
+AgentOps provides native context manager support through the `TraceContext` class. This implementation eliminates wrapper classes and leverages OpenTelemetry's built-in capabilities for clean, automatic trace management.
+
+## Key Benefits
+
+- **Automatic cleanup**: Traces end automatically when exiting the context
+- **Error handling**: Traces are marked with appropriate error states when exceptions occur
+- **Native OpenTelemetry integration**: Leverages OpenTelemetry's built-in context management
+- **Simplified architecture**: No wrapper classes or monkey patching required
+- **Thread safety**: Works seamlessly in multi-threaded environments
+- **Performance optimized**: Direct method calls without wrapper overhead
+
+## Quick Start
+
+```python
+import agentops
+
+# Initialize AgentOps
+agentops.init(api_key="your-api-key")
+
+# Use native context manager support
+with agentops.start_trace("my_task") as trace:
+ # Your code here
+ pass # Trace automatically ends with Success/Error state
+```
+
+## Examples
+
+### `basic_usage.py`
+- Native TraceContext context manager usage
+- Multiple parallel traces
+- Error handling basics
+- Comparison with OpenTelemetry concepts
+
+### `parallel_traces.py`
+- Sequential and nested parallel traces
+- Concurrent traces with threading and ThreadPoolExecutor
+- Mixed success/error scenarios
+- Different tag types (lists, dictionaries, none)
+
+### `error_handling.py`
+- Exception handling with different error types
+- Nested exception handling and propagation
+- Recovery patterns (retry, graceful degradation, partial success)
+- Custom exceptions, finally blocks, and exception chaining
+
+### `production_patterns.py`
+- API endpoint monitoring and metrics
+- Batch processing with individual item tracking
+- Microservice communication patterns
+- Production monitoring and alerting
+
+## Running Examples
+
+### Step 1: Install Dependencies
+
+```bash
+# Install required packages
+pip install agentops python-dotenv
+```
+
+### Step 2: Set Up API Key
+
+To use AgentOps and send trace data to your dashboard, you need to configure your API key using one of these methods:
+
+**Option A: Environment Variable**
+```bash
+# Export directly in your shell
+export AGENTOPS_API_KEY="your-api-key-here"
+```
+
+**Option B: .env File (Recommended)**
+```bash
+# Create a .env file in the project root
+echo "AGENTOPS_API_KEY=your-api-key-here" > .env
+```
+
+**Important**: When using a `.env` file, the examples automatically load it using `load_dotenv()`. Make sure the `.env` file is in the same directory as the example scripts or in the project root.
+
+### Step 3: Run the Examples
+
+```bash
+# Run each example to see different patterns
+python examples/context_manager/basic_usage.py
+python examples/context_manager/parallel_traces.py
+python examples/context_manager/error_handling.py
+python examples/context_manager/production_patterns.py
+```
diff --git a/examples/context_manager/basic_usage.py b/examples/context_manager/basic_usage.py
new file mode 100644
index 000000000..122991339
--- /dev/null
+++ b/examples/context_manager/basic_usage.py
@@ -0,0 +1,140 @@
+"""
+Basic Context Manager Example
+
+This example demonstrates how to use AgentOps context manager with the native
+TraceContext support, eliminating the need for wrappers or monkey patching.
+"""
+
+import os
+import agentops
+from agentops import agent, task, tool
+from dotenv import load_dotenv
+
+load_dotenv()
+
+# Get API key from environment
+AGENTOPS_API_KEY = os.environ.get("AGENTOPS_API_KEY")
+
+
+@agent
+class SimpleAgent:
+ """A simple example agent."""
+
+ def __init__(self, name: str):
+ self.name = name
+
+ @task
+ def process_data(self, data: str) -> str:
+ """Process some data."""
+ result = f"Processed: {data}"
+ return self.use_tool(result)
+
+ @tool
+ def use_tool(self, input_data: str) -> str:
+ """Use a tool to transform data."""
+ return f"Tool output: {input_data.upper()}"
+
+
+def basic_context_manager_example():
+ """Example using the native TraceContext context manager."""
+ print("Basic Context Manager Example")
+
+ # Initialize AgentOps
+ agentops.init(api_key=AGENTOPS_API_KEY)
+
+ # Use native TraceContext context manager
+ with agentops.start_trace("basic_example", tags=["basic", "demo"]):
+ print("Trace started")
+
+ # Create and use agent
+ agent = SimpleAgent("BasicAgent")
+ result = agent.process_data("sample data")
+ print(f"Result: {result}")
+
+ print("Trace ended automatically")
+
+
+def multiple_parallel_traces():
+ """Example showing multiple parallel traces."""
+ print("\nMultiple Parallel Traces")
+
+ agentops.init(api_key=AGENTOPS_API_KEY)
+
+ # First trace
+ with agentops.start_trace("task_1", tags=["parallel", "task-1"]):
+ print("Task 1 started")
+ agent1 = SimpleAgent("Agent1")
+ result1 = agent1.process_data("task 1 data")
+ print(f"Task 1 result: {result1}")
+
+ # Second trace (independent)
+ with agentops.start_trace("task_2", tags=["parallel", "task-2"]):
+ print("Task 2 started")
+ agent2 = SimpleAgent("Agent2")
+ result2 = agent2.process_data("task 2 data")
+ print(f"Task 2 result: {result2}")
+
+ print("All parallel traces completed")
+
+
+def error_handling_example():
+ """Example showing error handling with context manager."""
+ print("\nError Handling Example")
+
+ agentops.init(api_key=AGENTOPS_API_KEY)
+
+ try:
+ with agentops.start_trace("error_example", tags=["error-handling"]):
+ print("Trace started")
+
+ agent = SimpleAgent("ErrorAgent")
+ result = agent.process_data("some data")
+ print(f"Processing successful: {result}")
+
+ # Simulate an error
+ raise ValueError("Simulated error")
+
+ except ValueError as e:
+ print(f"Caught error: {e}")
+ print("Trace automatically ended with Error status")
+
+
+def nested_traces_example():
+ """Example showing nested traces (which are parallel, not parent-child)."""
+ print("\nNested Traces Example")
+
+ agentops.init(api_key=AGENTOPS_API_KEY)
+
+ # Outer trace
+ with agentops.start_trace("main_workflow", tags=["workflow", "main"]):
+ print("Main workflow started")
+
+ # Inner trace (parallel, not child)
+ with agentops.start_trace("sub_task", tags=["workflow", "sub"]):
+ print("Sub task started")
+
+ agent = SimpleAgent("WorkflowAgent")
+ result = agent.process_data("workflow data")
+ print(f"Sub task result: {result}")
+
+ print("Sub task completed")
+ print("Main workflow completed")
+
+
+if __name__ == "__main__":
+ print("AgentOps Context Manager Examples")
+ print("=" * 40)
+
+ # Show basic usage
+ basic_context_manager_example()
+
+ # Show multiple parallel traces
+ multiple_parallel_traces()
+
+ # Show error handling
+ error_handling_example()
+
+ # Show nested traces
+ nested_traces_example()
+
+ print("\nAll examples completed!")
diff --git a/examples/context_manager/error_handling.py b/examples/context_manager/error_handling.py
new file mode 100644
index 000000000..bfeffda08
--- /dev/null
+++ b/examples/context_manager/error_handling.py
@@ -0,0 +1,317 @@
+"""
+Error Handling with Context Managers
+
+This example demonstrates error handling patterns with AgentOps context managers,
+showing how traces automatically handle different types of exceptions.
+"""
+
+import os
+import time
+import agentops
+from agentops import agent, task, tool
+from dotenv import load_dotenv
+
+load_dotenv()
+
+# Get API key from environment
+AGENTOPS_API_KEY = os.environ.get("AGENTOPS_API_KEY")
+
+
+@agent
+class ErrorProneAgent:
+ """An agent that can encounter various types of errors."""
+
+ def __init__(self, name: str):
+ self.name = name
+
+ @task
+ def risky_operation(self, operation_type: str) -> str:
+ """Perform a risky operation that might fail."""
+ if operation_type == "value_error":
+ raise ValueError("Invalid value provided")
+ elif operation_type == "type_error":
+ raise TypeError("Wrong type provided")
+ elif operation_type == "runtime_error":
+ raise RuntimeError("Runtime error occurred")
+ elif operation_type == "custom_error":
+ raise CustomError("Custom error occurred")
+ else:
+ return f"Success: {operation_type}"
+
+ @tool
+ def validate_input(self, data: str) -> str:
+ """Validate input data."""
+ if not data or data == "invalid":
+ raise ValueError("Invalid input data")
+ return f"Validated: {data}"
+
+ @task
+ def multi_step_operation(self, steps: list) -> str:
+ """Perform multiple steps, any of which might fail."""
+ results = []
+ for i, step in enumerate(steps):
+ if step == "fail":
+ raise RuntimeError(f"Step {i+1} failed")
+ results.append(f"Step{i+1}:{step}")
+ return " -> ".join(results)
+
+
+class CustomError(Exception):
+ """Custom exception for demonstration."""
+
+ pass
+
+
+def basic_exception_handling():
+ """Basic example of exception handling with context managers."""
+ print("Basic Exception Handling")
+
+ agentops.init(api_key=AGENTOPS_API_KEY)
+
+ error_types = ["value_error", "type_error", "runtime_error", "success"]
+
+ for error_type in error_types:
+ try:
+ with agentops.start_trace(f"basic_{error_type}", tags=["basic", "error-handling"]):
+ print(f"Started trace for {error_type}")
+
+ agent = ErrorProneAgent(f"BasicAgent_{error_type}")
+ result = agent.risky_operation(error_type)
+ print(f"Success result: {result}")
+
+ except ValueError as e:
+ print(f"Caught ValueError: {e}")
+ except TypeError as e:
+ print(f"Caught TypeError: {e}")
+ except RuntimeError as e:
+ print(f"Caught RuntimeError: {e}")
+
+ print("Basic exception handling completed")
+
+
+def nested_exception_handling():
+ """Example of exception handling in nested traces."""
+ print("\nNested Exception Handling")
+
+ agentops.init(api_key=AGENTOPS_API_KEY)
+
+ try:
+ with agentops.start_trace("outer_operation", tags=["nested", "outer"]):
+ print("Outer trace started")
+
+ # outer_agent = ErrorProneAgent("OuterAgent") # Not used in this example
+
+ try:
+ with agentops.start_trace("inner_operation", tags=["nested", "inner"]):
+ print("Inner trace started")
+
+ inner_agent = ErrorProneAgent("InnerAgent")
+ # This will cause an error in the inner trace
+ inner_agent.risky_operation("value_error")
+
+ except ValueError as e:
+ print(f"Inner trace caught ValueError: {e}")
+ # Re-raise to affect outer trace
+ raise RuntimeError("Inner operation failed") from e
+
+ except RuntimeError as e:
+ print(f"Outer trace caught RuntimeError: {e}")
+
+ print("Nested exception handling completed")
+
+
+def retry_pattern():
+ """Example of retry pattern with context managers."""
+ print("\nRetry Pattern")
+
+ agentops.init(api_key=AGENTOPS_API_KEY)
+
+ max_retries = 3
+ for attempt in range(max_retries):
+ try:
+ with agentops.start_trace(f"retry_attempt_{attempt+1}", tags=["retry", f"attempt-{attempt+1}"]):
+ print(f"Retry attempt {attempt+1} started")
+
+ agent = ErrorProneAgent(f"RetryAgent_Attempt{attempt+1}")
+
+ # Simulate success on the last attempt
+ if attempt < max_retries - 1:
+ result = agent.risky_operation("runtime_error")
+ else:
+ result = agent.risky_operation("success")
+
+ print(f"Retry attempt {attempt+1} succeeded: {result}")
+ break
+
+ except RuntimeError as e:
+ print(f"Retry attempt {attempt+1} failed: {e}")
+ if attempt < max_retries - 1:
+ wait_time = 2**attempt # Exponential backoff
+ print(f"Waiting {wait_time}s before retry...")
+ time.sleep(wait_time * 0.01) # Shortened for demo
+ else:
+ print("All retry attempts exhausted")
+ raise
+
+ print("Retry pattern completed")
+
+
+def graceful_degradation():
+ """Example of graceful degradation pattern."""
+ print("\nGraceful Degradation")
+
+ agentops.init(api_key=AGENTOPS_API_KEY)
+
+ try:
+ with agentops.start_trace("primary_service", tags=["degradation", "primary"]):
+ print("Primary service trace started")
+
+ agent = ErrorProneAgent("PrimaryAgent")
+ result = agent.risky_operation("runtime_error")
+ print(f"Primary service result: {result}")
+
+ except RuntimeError as e:
+ print(f"Primary service failed: {e}")
+ print("Falling back to secondary service...")
+
+ with agentops.start_trace("fallback_service", tags=["degradation", "fallback"]):
+ print("Fallback service trace started")
+
+ fallback_agent = ErrorProneAgent("FallbackAgent")
+ result = fallback_agent.risky_operation("success")
+ print(f"Fallback service result: {result}")
+
+ print("Graceful degradation completed")
+
+
+def partial_success_handling():
+ """Example of partial success handling."""
+ print("\nPartial Success Handling")
+
+ agentops.init(api_key=AGENTOPS_API_KEY)
+
+ steps = ["step1", "step2", "fail", "step4"]
+
+ with agentops.start_trace("partial_success", tags=["partial", "multi-step"]):
+ print("Partial success trace started")
+
+ agent = ErrorProneAgent("PartialAgent")
+
+ try:
+ result = agent.multi_step_operation(steps)
+ print(f"All steps completed: {result}")
+ except RuntimeError as e:
+ print(f"Operation partially failed: {e}")
+
+ print("Partial success handling completed")
+
+
+def custom_exception_handling():
+ """Example of handling custom exceptions."""
+ print("\nCustom Exception Handling")
+
+ agentops.init(api_key=AGENTOPS_API_KEY)
+
+ try:
+ with agentops.start_trace("custom_exception", tags=["custom", "exception"]):
+ print("Custom exception trace started")
+
+ agent = ErrorProneAgent("CustomAgent")
+ result = agent.risky_operation("custom_error")
+ print(f"Result: {result}")
+
+ except CustomError as e:
+ print(f"Caught custom exception: {e}")
+ except Exception as e:
+ print(f"Caught unexpected exception: {e}")
+
+ print("Custom exception handling completed")
+
+
+def finally_blocks_example():
+ """Example of exception handling with finally blocks."""
+ print("\nFinally Blocks Example")
+
+ agentops.init(api_key=AGENTOPS_API_KEY)
+
+ cleanup_actions = []
+
+ try:
+ with agentops.start_trace("finally_example", tags=["finally", "cleanup"]):
+ print("Finally example trace started")
+
+ agent = ErrorProneAgent("FinallyAgent")
+
+ try:
+ result = agent.risky_operation("value_error")
+ print(f"Result: {result}")
+ finally:
+ cleanup_actions.append("Inner cleanup executed")
+ print("Inner finally block executed")
+
+ except ValueError as e:
+ print(f"Caught exception: {e}")
+ finally:
+ cleanup_actions.append("Outer cleanup executed")
+ print("Outer finally block executed")
+
+ print(f"Cleanup actions performed: {cleanup_actions}")
+ print("Finally block handling completed")
+
+
+def exception_chaining_example():
+ """Example of exception chaining and context preservation."""
+ print("\nException Chaining Example")
+
+ agentops.init(api_key=AGENTOPS_API_KEY)
+
+ try:
+ with agentops.start_trace("exception_chaining", tags=["chaining", "context"]):
+ print("Exception chaining trace started")
+
+ agent = ErrorProneAgent("ChainingAgent")
+
+ try:
+ # First operation fails
+ agent.validate_input("invalid")
+ except ValueError as e:
+ print(f"Validation failed: {e}")
+ # Chain the exception with additional context
+ raise RuntimeError("Operation failed due to validation error") from e
+
+ except RuntimeError as e:
+ print(f"Caught chained exception: {e}")
+ print(f"Original cause: {e.__cause__}")
+
+ print("Exception chaining completed")
+
+
+if __name__ == "__main__":
+ print("AgentOps Error Handling Examples")
+ print("=" * 40)
+
+ # Basic exception handling
+ basic_exception_handling()
+
+ # Nested exception handling
+ nested_exception_handling()
+
+ # Retry pattern
+ retry_pattern()
+
+ # Graceful degradation
+ graceful_degradation()
+
+ # Partial success handling
+ partial_success_handling()
+
+ # Custom exception handling
+ custom_exception_handling()
+
+ # Finally blocks
+ finally_blocks_example()
+
+ # Exception chaining
+ exception_chaining_example()
+
+ print("\nAll error handling examples completed!")
diff --git a/examples/context_manager/parallel_traces.py b/examples/context_manager/parallel_traces.py
new file mode 100644
index 000000000..cbc9c8f89
--- /dev/null
+++ b/examples/context_manager/parallel_traces.py
@@ -0,0 +1,245 @@
+"""
+Parallel Traces Example
+
+This example demonstrates how AgentOps context managers create independent
+parallel traces rather than parent-child relationships, which is ideal for
+concurrent operations and workflow management.
+"""
+
+import os
+import time
+import threading
+import concurrent.futures
+import agentops
+from agentops import agent, task, tool
+from dotenv import load_dotenv
+
+load_dotenv()
+
+# Get API key from environment
+AGENTOPS_API_KEY = os.environ.get("AGENTOPS_API_KEY")
+
+
+@agent
+class WorkerAgent:
+ """A worker agent that can process tasks independently."""
+
+ def __init__(self, worker_id: str):
+ self.worker_id = worker_id
+
+ @task
+ def process_task(self, task_data: str) -> str:
+ """Process a task with some simulated work."""
+ # Simulate some work
+ time.sleep(0.1)
+
+ result = self.analyze_data(task_data)
+ return self.finalize_result(result)
+
+ @tool
+ def analyze_data(self, data: str) -> str:
+ """Analyze the input data."""
+ return f"Analyzed: {data}"
+
+ @tool
+ def finalize_result(self, analyzed_data: str) -> str:
+ """Finalize the processing result."""
+ return f"Final: {analyzed_data.upper()}"
+
+
+def sequential_parallel_traces():
+ """Example of sequential parallel traces - each trace is independent."""
+ print("Sequential Parallel Traces")
+
+ agentops.init(api_key=AGENTOPS_API_KEY)
+
+ tasks = ["task_1", "task_2", "task_3"]
+ results = []
+
+ for i, task_name in enumerate(tasks):
+ # Each trace is completely independent
+ with agentops.start_trace(f"sequential_{task_name}", tags=["sequential", f"step-{i+1}"]):
+ print(f"Started trace for {task_name}")
+
+ worker = WorkerAgent(f"Worker{i+1}")
+ result = worker.process_task(f"data_for_{task_name}")
+ results.append(result)
+
+ print(f"{task_name} result: {result}")
+
+ print("All sequential traces completed")
+ print(f"Final results: {results}")
+
+
+def nested_parallel_traces():
+ """Example showing that nested context managers create parallel traces."""
+ print("\nNested Parallel Traces")
+
+ agentops.init(api_key=AGENTOPS_API_KEY)
+
+ # Outer trace for the overall workflow
+ with agentops.start_trace("workflow_main", tags=["workflow", "main"]):
+ print("Main workflow trace started")
+
+ # Inner trace for data preparation (parallel, not child)
+ with agentops.start_trace("data_preparation", tags=["workflow", "preparation"]):
+ print("Data preparation trace started (parallel to main)")
+
+ prep_worker = WorkerAgent("PrepWorker")
+ prep_result = prep_worker.process_task("raw_data")
+ print(f"Preparation result: {prep_result}")
+
+ print("Data preparation trace ended")
+
+ # Another inner trace for data processing (also parallel)
+ with agentops.start_trace("data_processing", tags=["workflow", "processing"]):
+ print("Data processing trace started (parallel to main)")
+
+ proc_worker = WorkerAgent("ProcessWorker")
+ proc_result = proc_worker.process_task("prepared_data")
+ print(f"Processing result: {proc_result}")
+
+ print("Data processing trace ended")
+ print("Main workflow completed")
+
+ print("Main workflow trace ended")
+ print("All traces were independent/parallel, not parent-child relationships")
+
+
+def concurrent_traces_with_threads():
+ """Example of truly concurrent traces using threading."""
+ print("\nConcurrent Traces with Threading")
+
+ agentops.init(api_key=AGENTOPS_API_KEY)
+
+ def worker_function(worker_id: int, task_data: str):
+ """Function to run in a separate thread with its own trace."""
+ trace_name = f"concurrent_worker_{worker_id}"
+
+ with agentops.start_trace(trace_name, tags=["concurrent", f"worker-{worker_id}"]):
+ print(f"Thread {worker_id}: Trace started")
+
+ worker = WorkerAgent(f"ConcurrentWorker{worker_id}")
+ result = worker.process_task(task_data)
+
+ # Simulate varying work times
+ time.sleep(0.05 * worker_id)
+
+ print(f"Thread {worker_id}: Result: {result}")
+ return result
+
+ # Start multiple threads, each with their own trace
+ threads = []
+ results = []
+
+ for i in range(3):
+ thread = threading.Thread(target=lambda i=i: results.append(worker_function(i, f"concurrent_data_{i}")))
+ threads.append(thread)
+ thread.start()
+
+ # Wait for all threads to complete
+ for thread in threads:
+ thread.join()
+
+ print("All concurrent traces completed")
+ print(f"Results from concurrent execution: {len(results)} completed")
+
+
+def concurrent_traces_with_executor():
+ """Example using ThreadPoolExecutor for concurrent traces."""
+ print("\nConcurrent Traces with ThreadPoolExecutor")
+
+ agentops.init(api_key=AGENTOPS_API_KEY)
+
+ def process_with_trace(task_id: int, data: str) -> str:
+ """Process data within its own trace context."""
+ trace_name = f"executor_task_{task_id}"
+
+ with agentops.start_trace(trace_name, tags=["executor", f"task-{task_id}"]):
+ print(f"Executor task {task_id}: Trace started")
+
+ worker = WorkerAgent(f"ExecutorWorker{task_id}")
+ result = worker.process_task(data)
+
+ print(f"Executor task {task_id}: Result: {result}")
+ return result
+
+ # Use ThreadPoolExecutor for concurrent execution
+ tasks_data = [
+ (1, "executor_data_1"),
+ (2, "executor_data_2"),
+ (3, "executor_data_3"),
+ (4, "executor_data_4"),
+ ]
+
+ with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
+ # Submit all tasks
+ future_to_task = {executor.submit(process_with_trace, task_id, data): task_id for task_id, data in tasks_data}
+
+ # Collect results as they complete
+ results = []
+ for future in concurrent.futures.as_completed(future_to_task):
+ task_id = future_to_task[future]
+ try:
+ result = future.result()
+ results.append((task_id, result))
+ print(f"Task {task_id} completed successfully")
+ except Exception as e:
+ print(f"Task {task_id} failed: {e}")
+
+ print("All executor-based traces completed")
+ print(f"Completed {len(results)} tasks concurrently")
+
+
+def trace_with_different_tag_types():
+ """Example showing different ways to tag parallel traces."""
+ print("\nTraces with Different Tag Types")
+
+ agentops.init(api_key=AGENTOPS_API_KEY)
+
+ # Trace with list tags
+ with agentops.start_trace("list_tags_trace", tags=["list", "example", "demo"]):
+ print("Trace with list tags started")
+ worker1 = WorkerAgent("ListTagWorker")
+ result1 = worker1.process_task("list_tag_data")
+ print(f"List tags result: {result1}")
+
+ # Trace with dictionary tags
+ with agentops.start_trace(
+ "dict_tags_trace", tags={"environment": "demo", "version": "1.0", "priority": "high", "team": "engineering"}
+ ):
+ print("Trace with dictionary tags started")
+ worker2 = WorkerAgent("DictTagWorker")
+ result2 = worker2.process_task("dict_tag_data")
+ print(f"Dictionary tags result: {result2}")
+
+ # Trace with no tags
+ with agentops.start_trace("no_tags_trace"):
+ print("Trace with no tags started")
+ worker3 = WorkerAgent("NoTagWorker")
+ result3 = worker3.process_task("no_tag_data")
+ print(f"No tags result: {result3}")
+
+ print("All differently tagged traces completed")
+
+
+if __name__ == "__main__":
+ print("AgentOps Parallel Traces Examples")
+ print("=" * 40)
+
+ # Sequential parallel traces
+ sequential_parallel_traces()
+
+ # Nested parallel traces
+ nested_parallel_traces()
+
+ # Concurrent traces with threading
+ concurrent_traces_with_threads()
+
+ # Concurrent traces with executor
+ concurrent_traces_with_executor()
+
+ # Different tag types
+ trace_with_different_tag_types()
+
+ print("\nAll parallel trace examples completed!")
diff --git a/examples/context_manager/production_patterns.py b/examples/context_manager/production_patterns.py
new file mode 100644
index 000000000..8dedc50ec
--- /dev/null
+++ b/examples/context_manager/production_patterns.py
@@ -0,0 +1,340 @@
+"""
+Production Patterns with Context Managers
+
+This example demonstrates real-world production patterns using AgentOps
+context managers, including monitoring, logging, and performance tracking.
+"""
+
+import os
+import time
+import logging
+from datetime import datetime
+from typing import Dict, Any
+import agentops
+from agentops import agent, task, tool
+from dotenv import load_dotenv
+
+load_dotenv()
+
+# Get API key from environment
+AGENTOPS_API_KEY = os.environ.get("AGENTOPS_API_KEY")
+
+# Configure logging
+logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
+logger = logging.getLogger(__name__)
+
+
+@agent
+class ProductionAgent:
+ """A production-ready agent with monitoring."""
+
+ def __init__(self, name: str, config: Dict[str, Any]):
+ self.name = name
+ self.config = config
+ self.metrics = {"tasks_completed": 0, "errors_encountered": 0, "total_processing_time": 0.0}
+ logger.info(f"Initialized production agent: {self.name}")
+
+ @task
+ def process_request(self, request_data: Dict[str, Any]) -> Dict[str, Any]:
+ """Process a production request with monitoring."""
+ start_time = time.time()
+
+ try:
+ logger.info(f"{self.name} processing request: {request_data.get('id', 'unknown')}")
+
+ # Validate request
+ validated_data = self.validate_request(request_data)
+
+ # Process data
+ processed_data = self.transform_data(validated_data)
+
+ # Generate response
+ response = self.generate_response(processed_data)
+
+ # Update metrics
+ processing_time = time.time() - start_time
+ self.metrics["tasks_completed"] += 1
+ self.metrics["total_processing_time"] += processing_time
+
+ logger.info(f"{self.name} completed request in {processing_time:.3f}s")
+ return response
+
+ except Exception as e:
+ self.metrics["errors_encountered"] += 1
+ logger.error(f"{self.name} failed to process request: {e}")
+ raise
+
+ @tool
+ def validate_request(self, data: Dict[str, Any]) -> Dict[str, Any]:
+ """Validate incoming request data."""
+ required_fields = ["id", "type", "payload"]
+
+ for field in required_fields:
+ if field not in data:
+ raise ValueError(f"Missing required field: {field}")
+
+ logger.debug(f"{self.name} validated request: {data['id']}")
+ return data
+
+ @tool
+ def transform_data(self, data: Dict[str, Any]) -> Dict[str, Any]:
+ """Transform and enrich the data."""
+ # Simulate data transformation
+ time.sleep(0.01) # Simulate processing time
+
+ transformed = {
+ **data,
+ "processed_at": datetime.now().isoformat(),
+ "processed_by": self.name,
+ "version": self.config.get("version", "1.0"),
+ }
+
+ logger.debug(f"{self.name} transformed data for: {data['id']}")
+ return transformed
+
+ @tool
+ def generate_response(self, data: Dict[str, Any]) -> Dict[str, Any]:
+ """Generate the final response."""
+ response = {
+ "status": "success",
+ "request_id": data["id"],
+ "result": f"Processed {data['type']} successfully",
+ "metadata": {
+ "processed_at": data["processed_at"],
+ "processed_by": data["processed_by"],
+ "version": data["version"],
+ },
+ }
+
+ logger.debug(f"{self.name} generated response for: {data['id']}")
+ return response
+
+ def get_metrics(self) -> Dict[str, Any]:
+ """Get current agent metrics."""
+ avg_processing_time = self.metrics["total_processing_time"] / max(self.metrics["tasks_completed"], 1)
+
+ return {
+ **self.metrics,
+ "average_processing_time": avg_processing_time,
+ "error_rate": self.metrics["errors_encountered"]
+ / max(self.metrics["tasks_completed"] + self.metrics["errors_encountered"], 1),
+ }
+
+
+def api_endpoint_pattern():
+ """Example of using context managers in API endpoint pattern."""
+ print("API Endpoint Pattern")
+
+ agentops.init(api_key=AGENTOPS_API_KEY)
+
+ # Simulate API requests
+ requests = [
+ {"id": "req_001", "type": "user_query", "payload": {"query": "What is AI?"}},
+ {"id": "req_002", "type": "data_analysis", "payload": {"dataset": "sales_data"}},
+ {"id": "req_003", "type": "user_query", "payload": {"query": "Process this"}},
+ ]
+
+ agent_config = {"version": "2.1.0", "environment": "production"}
+ api_agent = ProductionAgent("APIAgent", agent_config)
+
+ for request in requests:
+ try:
+ with agentops.start_trace("api_request", tags=["api", request["type"]]):
+ response = api_agent.process_request(request)
+ logger.info(f"API Response: {response['status']} for {response['request_id']}")
+
+ except Exception as e:
+ logger.error(f"API request failed: {e}")
+
+ # Print final metrics
+ print(f"Agent Metrics: {api_agent.get_metrics()}")
+
+
+def batch_processing_pattern():
+ """Example of batch processing with context managers."""
+ print("\nBatch Processing Pattern")
+
+ agentops.init(api_key=AGENTOPS_API_KEY)
+
+ # Simulate batch data
+ batch_data = [{"id": f"item_{i:03d}", "type": "data_record", "payload": {"value": i * 10}} for i in range(1, 6)]
+
+ agent_config = {"version": "1.5.0", "batch_size": 5}
+ batch_agent = ProductionAgent("BatchAgent", agent_config)
+
+ with agentops.start_trace("batch_processing", tags=["batch", "bulk"]):
+ logger.info(f"Starting batch processing of {len(batch_data)} items")
+
+ successful_items = 0
+ failed_items = 0
+
+ for item in batch_data:
+ try:
+ with agentops.start_trace("item_processing", tags=["batch", "item"]):
+ batch_agent.process_request(item)
+ successful_items += 1
+ logger.debug(f"Processed item: {item['id']}")
+
+ except Exception as e:
+ failed_items += 1
+ logger.error(f"Failed to process item {item['id']}: {e}")
+
+ logger.info(f"Batch completed: {successful_items} successful, {failed_items} failed")
+
+ print(f"Batch Agent Metrics: {batch_agent.get_metrics()}")
+
+
+def microservice_pattern():
+ """Example of microservice communication pattern."""
+ print("\nMicroservice Communication Pattern")
+
+ agentops.init(api_key=AGENTOPS_API_KEY)
+
+ def authenticate_user(user_id: str) -> bool:
+ """Simulate authentication service."""
+ with agentops.start_trace("authenticate", tags=["auth", "security"]):
+ logger.info(f"Authenticating user: {user_id}")
+ time.sleep(0.01) # Simulate auth check
+ return user_id != "invalid_user"
+
+ def get_user_profile(user_id: str) -> Dict[str, Any]:
+ """Simulate user service."""
+ with agentops.start_trace("get_profile", tags=["user", "profile"]):
+ logger.info(f"Fetching profile for user: {user_id}")
+ time.sleep(0.02) # Simulate database query
+ return {"user_id": user_id, "name": f"User {user_id}", "email": f"{user_id}@example.com"}
+
+ def send_notification(user_id: str, message: str) -> bool:
+ """Simulate notification service."""
+ with agentops.start_trace("send_notification", tags=["notification", "email"]):
+ logger.info(f"Sending notification to user: {user_id}")
+ time.sleep(0.01) # Simulate email sending
+ return True
+
+ # Simulate a complete user workflow
+ user_requests = ["user_123", "user_456", "invalid_user"]
+
+ for user_id in user_requests:
+ try:
+ with agentops.start_trace("user_workflow", tags=["workflow", "user"]):
+ logger.info(f"Processing workflow for user: {user_id}")
+
+ # Step 1: Authenticate
+ if not authenticate_user(user_id):
+ raise ValueError(f"Authentication failed for user: {user_id}")
+
+ # Step 2: Get profile
+ get_user_profile(user_id)
+
+ # Step 3: Send welcome notification
+ send_notification(user_id, "Welcome to our service!")
+
+ logger.info(f"Workflow completed for user: {user_id}")
+
+ except Exception as e:
+ logger.error(f"Workflow failed for user {user_id}: {e}")
+
+
+def monitoring_pattern():
+ """Example of monitoring with context managers."""
+ print("\nMonitoring Pattern")
+
+ agentops.init(api_key=AGENTOPS_API_KEY)
+
+ class AlertManager:
+ """Simple alert manager for demonstration."""
+
+ def __init__(self):
+ self.alerts = []
+
+ def check_and_alert(self, trace_name: str, duration: float, success: bool):
+ """Check conditions and generate alerts."""
+ # Alert on slow operations
+ if duration > 0.1: # 100ms threshold
+ self.alerts.append(
+ {
+ "type": "SLOW_OPERATION",
+ "trace": trace_name,
+ "duration": duration,
+ "timestamp": datetime.now().isoformat(),
+ }
+ )
+ logger.warning(f"SLOW OPERATION ALERT: {trace_name} took {duration:.3f}s")
+
+ # Alert on failures
+ if not success:
+ self.alerts.append(
+ {"type": "OPERATION_FAILURE", "trace": trace_name, "timestamp": datetime.now().isoformat()}
+ )
+ logger.warning(f"FAILURE ALERT: {trace_name} failed")
+
+ alert_manager = AlertManager()
+
+ class MonitoredOperation:
+ """Context manager with built-in monitoring and alerting."""
+
+ def __init__(self, operation_name: str, tags: list = None):
+ self.operation_name = operation_name
+ self.tags = tags or []
+ self.start_time = None
+ self.trace_context = None
+
+ def __enter__(self):
+ self.start_time = time.time()
+ self.trace_context = agentops.start_trace(self.operation_name, tags=self.tags)
+ return self.trace_context
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ duration = time.time() - self.start_time
+ success = exc_type is None
+
+ # Check for alerts
+ alert_manager.check_and_alert(self.operation_name, duration, success)
+
+ return False
+
+ # Simulate operations with different performance characteristics
+ operations = [
+ ("fast_operation", 0.01, False), # Fast, successful
+ ("slow_operation", 0.15, False), # Slow, successful (will trigger alert)
+ ("failing_operation", 0.05, True), # Fast, but fails (will trigger alert)
+ ("normal_operation", 0.03, False), # Normal, successful
+ ]
+
+ for op_name, sleep_time, should_fail in operations:
+ try:
+ with MonitoredOperation(op_name, tags=["monitoring", "demo"]):
+ logger.info(f"Executing {op_name}")
+ time.sleep(sleep_time)
+
+ if should_fail:
+ raise RuntimeError(f"Simulated failure in {op_name}")
+
+ logger.info(f"{op_name} completed successfully")
+
+ except Exception as e:
+ logger.error(f"{op_name} failed: {e}")
+
+ # Print alerts
+ print(f"Generated {len(alert_manager.alerts)} alerts:")
+ for alert in alert_manager.alerts:
+ print(f" {alert['type']}: {alert.get('trace', 'unknown')} at {alert['timestamp']}")
+
+
+if __name__ == "__main__":
+ print("AgentOps Production Patterns Examples")
+ print("=" * 50)
+
+ # API endpoint pattern
+ api_endpoint_pattern()
+
+ # Batch processing pattern
+ batch_processing_pattern()
+
+ # Microservice pattern
+ microservice_pattern()
+
+ # Monitoring pattern
+ monitoring_pattern()
+
+ print("\nAll production pattern examples completed!")
diff --git a/tests/benchmark/benchmark_init.py b/tests/benchmark/benchmark_init.py
index 94dad2660..743df9e5d 100644
--- a/tests/benchmark/benchmark_init.py
+++ b/tests/benchmark/benchmark_init.py
@@ -2,13 +2,13 @@
"""
-Benchmark script for measuring TracingCore initialization time.
+Benchmark script for measuring global tracer initialization time.
"""
def run_benchmark():
"""
- Run a benchmark of TracingCore initialization.
+ Run a benchmark of global tracer initialization.
Returns:
Dictionary with timing results
@@ -41,6 +41,6 @@ def print_results(results):
if __name__ == "__main__":
- print("Running TracingCore benchmark...")
+ print("Running global tracer benchmark...")
results = run_benchmark()
print_results(results)
diff --git a/tests/integration/test_session_concurrency.py b/tests/integration/test_session_concurrency.py
index c89203021..928dd972a 100644
--- a/tests/integration/test_session_concurrency.py
+++ b/tests/integration/test_session_concurrency.py
@@ -40,7 +40,7 @@ def setup_agentops(mock_api_key):
mock_api.v3.fetch_auth_token.return_value = {"token": "mock_token", "project_id": "mock_project_id"}
mock_api_client.return_value = mock_api
- # Mock TracingCore to avoid actual initialization
+ # Mock global tracer to avoid actual initialization
with patch("agentops.tracer") as mock_tracer:
mock_tracer.initialized = True
diff --git a/tests/unit/sdk/test_internal_span_processor.py b/tests/unit/sdk/test_internal_span_processor.py
index f53e1ae12..ed76a3ff5 100644
--- a/tests/unit/sdk/test_internal_span_processor.py
+++ b/tests/unit/sdk/test_internal_span_processor.py
@@ -12,7 +12,7 @@
class TestURLLogging(unittest.TestCase):
- """Tests for URL logging functionality in TracingCore."""
+ """Tests for URL logging functionality in global tracer."""
def setUp(self):
self.tracing_core = tracer
@@ -21,7 +21,7 @@ def setUp(self):
self.tracing_core._config = {"project_id": "test_project"}
@patch("agentops.sdk.core.log_trace_url")
- @patch("agentops.sdk.core.TracingCore.make_span")
+ @patch("agentops.sdk.core.tracer.make_span")
def test_start_trace_logs_url(self, mock_make_span, mock_log_trace_url):
"""Test that start_trace logs the trace URL."""
# Create a mock span
@@ -57,7 +57,7 @@ def test_end_trace_logs_url(self, mock_finalize_span, mock_log_trace_url):
mock_log_trace_url.assert_called_once_with(mock_span, title="test_trace")
@patch("agentops.sdk.core.log_trace_url")
- @patch("agentops.sdk.core.TracingCore.make_span")
+ @patch("agentops.sdk.core.tracer.make_span")
def test_start_trace_url_logging_failure_does_not_break_trace(self, mock_make_span, mock_log_trace_url):
"""Test that URL logging failure doesn't break trace creation."""
# Create a mock span
@@ -100,7 +100,7 @@ def test_end_trace_url_logging_failure_does_not_break_trace(self, mock_finalize_
mock_log_trace_url.assert_called_once_with(mock_span, title="test_trace")
@patch("agentops.sdk.core.log_trace_url")
- @patch("agentops.sdk.core.TracingCore.make_span")
+ @patch("agentops.sdk.core.tracer.make_span")
def test_start_trace_with_tags_logs_url(self, mock_make_span, mock_log_trace_url):
"""Test that start_trace with tags logs the trace URL."""
# Create a mock span
@@ -128,7 +128,7 @@ def setUp(self):
self.tracing_core._config = {"project_id": "test_project"}
@patch("agentops.sdk.core.log_trace_url")
- @patch("agentops.sdk.core.TracingCore.make_span")
+ @patch("agentops.sdk.core.tracer.make_span")
@patch("agentops.sdk.core.tracer.finalize_span")
def test_session_decorator_logs_url_on_start_and_end(self, mock_finalize_span, mock_make_span, mock_log_trace_url):
"""Test that session decorator logs URLs on both start and end."""
@@ -160,7 +160,7 @@ def test_function():
self.assertEqual(result, "test_result")
@patch("agentops.sdk.core.log_trace_url")
- @patch("agentops.sdk.core.TracingCore.make_span")
+ @patch("agentops.sdk.core.tracer.make_span")
@patch("agentops.sdk.core.tracer.finalize_span")
def test_session_decorator_with_default_name_logs_url(self, mock_finalize_span, mock_make_span, mock_log_trace_url):
"""Test that session decorator with default name logs URLs."""
@@ -191,7 +191,7 @@ def my_function():
self.assertEqual(result, "result")
@patch("agentops.sdk.core.log_trace_url")
- @patch("agentops.sdk.core.TracingCore.make_span")
+ @patch("agentops.sdk.core.tracer.make_span")
@patch("agentops.sdk.core.tracer.finalize_span")
def test_session_decorator_handles_url_logging_failure(
self, mock_finalize_span, mock_make_span, mock_log_trace_url
diff --git a/tests/unit/test_context_manager.py b/tests/unit/test_context_manager.py
new file mode 100644
index 000000000..3f2e612be
--- /dev/null
+++ b/tests/unit/test_context_manager.py
@@ -0,0 +1,777 @@
+import unittest
+from unittest.mock import Mock, patch
+import threading
+import time
+import asyncio
+
+from agentops import start_trace
+from agentops.sdk.core import TraceContext
+from opentelemetry.trace import StatusCode
+
+
+class TestContextManager(unittest.TestCase):
+ """Test the context manager functionality of TraceContext"""
+
+ def test_trace_context_has_context_manager_methods(self):
+ """Test that TraceContext has __enter__ and __exit__ methods"""
+ # TraceContext should have context manager protocol methods
+ assert hasattr(TraceContext, "__enter__")
+ assert hasattr(TraceContext, "__exit__")
+
+ @patch("agentops.sdk.core.tracer")
+ def test_trace_context_enter_returns_self(self, mock_tracer):
+ """Test that __enter__ returns the TraceContext instance"""
+ mock_span = Mock()
+ mock_token = Mock()
+ trace_context = TraceContext(span=mock_span, token=mock_token)
+
+ # __enter__ should return self
+ result = trace_context.__enter__()
+ assert result is trace_context
+
+ @patch("agentops.sdk.core.tracer")
+ def test_trace_context_exit_calls_end_trace(self, mock_tracer):
+ """Test that __exit__ calls end_trace with appropriate state"""
+ mock_span = Mock()
+ mock_token = Mock()
+ trace_context = TraceContext(span=mock_span, token=mock_token)
+
+ # Test normal exit (no exception)
+ trace_context.__exit__(None, None, None)
+ mock_tracer.end_trace.assert_called_once_with(trace_context, StatusCode.OK)
+
+ @patch("agentops.sdk.core.tracer")
+ def test_trace_context_exit_with_exception_sets_error_state(self, mock_tracer):
+ """Test that __exit__ sets ERROR state when exception occurs"""
+ mock_span = Mock()
+ mock_token = Mock()
+ trace_context = TraceContext(span=mock_span, token=mock_token)
+
+ # Test exit with exception
+ mock_tracer.reset_mock()
+ exc_type = ValueError
+ exc_val = ValueError("test error")
+ exc_tb = None
+
+ trace_context.__exit__(exc_type, exc_val, exc_tb)
+ mock_tracer.end_trace.assert_called_once_with(trace_context, StatusCode.ERROR)
+
+ @patch("agentops.sdk.core.tracer")
+ @patch("agentops.tracer")
+ def test_context_manager_usage_pattern(self, mock_agentops_tracer, mock_core_tracer):
+ """Test using start_trace as a context manager"""
+ # Create a mock TraceContext
+ mock_span = Mock()
+ mock_token = Mock()
+ mock_trace_context = TraceContext(span=mock_span, token=mock_token)
+
+ # Mock the tracer's start_trace method to return our TraceContext
+ mock_agentops_tracer.initialized = True
+ mock_agentops_tracer.start_trace.return_value = mock_trace_context
+
+ # Use as context manager
+ with start_trace("test_trace") as trace:
+ assert trace is mock_trace_context
+ assert trace.span is mock_span
+ assert trace.token is mock_token
+
+ # Verify start_trace was called
+ mock_agentops_tracer.start_trace.assert_called_once_with(trace_name="test_trace", tags=None)
+ # Verify end_trace was called
+ mock_core_tracer.end_trace.assert_called_once_with(mock_trace_context, StatusCode.OK)
+
+ @patch("agentops.sdk.core.tracer")
+ @patch("agentops.tracer")
+ def test_context_manager_with_exception(self, mock_agentops_tracer, mock_core_tracer):
+ """Test context manager handles exceptions properly"""
+ # Create a mock TraceContext
+ mock_span = Mock()
+ mock_token = Mock()
+ mock_trace_context = TraceContext(span=mock_span, token=mock_token)
+
+ # Mock the tracer's start_trace method
+ mock_agentops_tracer.initialized = True
+ mock_agentops_tracer.start_trace.return_value = mock_trace_context
+
+ # Test exception handling
+ with self.assertRaises(ValueError):
+ with start_trace("test_trace"):
+ raise ValueError("Test exception")
+
+ # Verify end_trace was called with ERROR state
+ mock_core_tracer.end_trace.assert_called_once_with(mock_trace_context, StatusCode.ERROR)
+
+ @patch("agentops.sdk.core.tracer")
+ @patch("agentops.tracer")
+ @patch("agentops.init")
+ def test_start_trace_auto_initializes_if_needed(self, mock_init, mock_agentops_tracer, mock_core_tracer):
+ """Test that start_trace attempts to initialize SDK if not initialized"""
+ # First call: SDK not initialized
+ mock_agentops_tracer.initialized = False
+
+ # After init() is called, set initialized to True
+ def set_initialized():
+ mock_agentops_tracer.initialized = True
+
+ mock_init.side_effect = set_initialized
+
+ # Create a mock TraceContext for when start_trace is called after init
+ mock_span = Mock()
+ mock_token = Mock()
+ mock_trace_context = TraceContext(span=mock_span, token=mock_token)
+ mock_agentops_tracer.start_trace.return_value = mock_trace_context
+
+ # Call start_trace
+ result = start_trace("test_trace")
+
+ # Verify init was called
+ mock_init.assert_called_once()
+ # Verify start_trace was called on tracer
+ mock_agentops_tracer.start_trace.assert_called_once_with(trace_name="test_trace", tags=None)
+ assert result is mock_trace_context
+
+ def test_no_wrapper_classes_needed(self):
+ """Test that we don't need wrapper classes - TraceContext is the context manager"""
+ # TraceContext itself implements the context manager protocol
+ # No need for TraceContextManager wrapper
+ mock_span = Mock()
+ mock_token = Mock()
+ trace_context = TraceContext(span=mock_span, token=mock_token)
+
+ # Can use directly as context manager
+ assert hasattr(trace_context, "__enter__")
+ assert hasattr(trace_context, "__exit__")
+ assert callable(trace_context.__enter__)
+ assert callable(trace_context.__exit__)
+
+ @patch("agentops.sdk.core.tracer")
+ @patch("agentops.tracer")
+ def test_parallel_traces_independence(self, mock_agentops_tracer, mock_core_tracer):
+ """Test that multiple traces can run in parallel independently"""
+ # Create mock TraceContexts
+ mock_trace1 = TraceContext(span=Mock(), token=Mock())
+ mock_trace2 = TraceContext(span=Mock(), token=Mock())
+
+ # Mock the tracer to return different traces
+ mock_agentops_tracer.initialized = True
+ mock_agentops_tracer.start_trace.side_effect = [mock_trace1, mock_trace2]
+
+ # Start two traces
+ trace1 = start_trace("trace1")
+ trace2 = start_trace("trace2")
+
+ # They should be different instances
+ assert trace1 is not trace2
+ assert trace1 is mock_trace1
+ assert trace2 is mock_trace2
+
+ # End them independently using context manager protocol
+ trace1.__exit__(None, None, None)
+ trace2.__exit__(None, None, None)
+
+ # Verify both were ended
+ assert mock_core_tracer.end_trace.call_count == 2
+
+ @patch("agentops.sdk.core.tracer")
+ @patch("agentops.tracer")
+ def test_nested_context_managers_create_parallel_traces(self, mock_agentops_tracer, mock_core_tracer):
+ """Test that nested context managers create parallel traces, not parent-child"""
+ # Create mock TraceContexts
+ mock_outer = TraceContext(span=Mock(), token=Mock())
+ mock_inner = TraceContext(span=Mock(), token=Mock())
+
+ # Mock the tracer
+ mock_agentops_tracer.initialized = True
+ mock_agentops_tracer.start_trace.side_effect = [mock_outer, mock_inner]
+
+ # Use nested context managers
+ with start_trace("outer_trace") as outer:
+ assert outer is mock_outer
+ with start_trace("inner_trace") as inner:
+ assert inner is mock_inner
+ assert inner is not outer
+ # Both traces are active
+ assert mock_agentops_tracer.start_trace.call_count == 2
+
+ # Verify both were ended
+ assert mock_core_tracer.end_trace.call_count == 2
+ # Inner trace ended first, then outer
+ calls = mock_core_tracer.end_trace.call_args_list
+ assert calls[0][0][0] is mock_inner
+ assert calls[1][0][0] is mock_outer
+
+ @patch("agentops.sdk.core.tracer")
+ @patch("agentops.tracer")
+ def test_exception_in_nested_traces(self, mock_agentops_tracer, mock_core_tracer):
+ """Test exception handling in nested traces"""
+ # Create mock TraceContexts
+ mock_outer = TraceContext(span=Mock(), token=Mock())
+ mock_inner = TraceContext(span=Mock(), token=Mock())
+
+ # Mock the tracer
+ mock_agentops_tracer.initialized = True
+ mock_agentops_tracer.start_trace.side_effect = [mock_outer, mock_inner]
+
+ # Test exception in inner trace
+ with self.assertRaises(ValueError):
+ with start_trace("outer_trace"):
+ with start_trace("inner_trace"):
+ raise ValueError("Inner exception")
+
+ # Both traces should be ended with ERROR state
+ assert mock_core_tracer.end_trace.call_count == 2
+ calls = mock_core_tracer.end_trace.call_args_list
+ # Inner trace ended with ERROR
+ assert calls[0][0][0] is mock_inner
+ assert calls[0][0][1] == StatusCode.ERROR
+ # Outer trace also ended with ERROR (exception propagated)
+ assert calls[1][0][0] is mock_outer
+ assert calls[1][0][1] == StatusCode.ERROR
+
+ @patch("agentops.sdk.core.tracer")
+ def test_trace_context_attributes_access(self, mock_tracer):
+ """Test accessing span and token attributes of TraceContext"""
+ mock_span = Mock()
+ mock_token = Mock()
+ trace_context = TraceContext(span=mock_span, token=mock_token)
+
+ # Direct attribute access
+ assert trace_context.span is mock_span
+ assert trace_context.token is mock_token
+
+ @patch("agentops.sdk.core.tracer")
+ @patch("agentops.tracer")
+ def test_multiple_exceptions_in_sequence(self, mock_agentops_tracer, mock_core_tracer):
+ """Test handling multiple exceptions in sequence"""
+ # Mock the tracer
+ mock_agentops_tracer.initialized = True
+
+ # Create different mock traces for each attempt
+ mock_traces = [TraceContext(span=Mock(), token=Mock()) for _ in range(3)]
+ mock_agentops_tracer.start_trace.side_effect = mock_traces
+
+ # Multiple traces with exceptions
+ for i in range(3):
+ with self.assertRaises(RuntimeError):
+ with start_trace(f"trace_{i}"):
+ raise RuntimeError(f"Error {i}")
+
+ # All should be ended with ERROR state
+ assert mock_core_tracer.end_trace.call_count == 3
+ for i, call in enumerate(mock_core_tracer.end_trace.call_args_list):
+ assert call[0][0] is mock_traces[i]
+ assert call[0][1] == StatusCode.ERROR
+
+ @patch("agentops.sdk.core.tracer")
+ @patch("agentops.tracer")
+ def test_trace_with_tags_dict(self, mock_agentops_tracer, mock_core_tracer):
+ """Test starting trace with tags as dictionary"""
+ # Create a mock TraceContext
+ mock_trace = TraceContext(span=Mock(), token=Mock())
+ mock_agentops_tracer.initialized = True
+ mock_agentops_tracer.start_trace.return_value = mock_trace
+
+ tags = {"environment": "test", "version": "1.0"}
+ with start_trace("tagged_trace", tags=tags) as trace:
+ assert trace is mock_trace
+
+ # Verify tags were passed
+ mock_agentops_tracer.start_trace.assert_called_once_with(trace_name="tagged_trace", tags=tags)
+
+ @patch("agentops.sdk.core.tracer")
+ @patch("agentops.tracer")
+ def test_trace_with_tags_list(self, mock_agentops_tracer, mock_core_tracer):
+ """Test starting trace with tags as list"""
+ # Create a mock TraceContext
+ mock_trace = TraceContext(span=Mock(), token=Mock())
+ mock_agentops_tracer.initialized = True
+ mock_agentops_tracer.start_trace.return_value = mock_trace
+
+ tags = ["test", "v1.0", "experimental"]
+ with start_trace("tagged_trace", tags=tags) as trace:
+ assert trace is mock_trace
+
+ # Verify tags were passed
+ mock_agentops_tracer.start_trace.assert_called_once_with(trace_name="tagged_trace", tags=tags)
+
+ @patch("agentops.sdk.core.tracer")
+ @patch("agentops.tracer")
+ def test_trace_context_manager_thread_safety(self, mock_agentops_tracer, mock_core_tracer):
+ """Test that context managers work correctly in multi-threaded environment"""
+ # Mock the tracer
+ mock_agentops_tracer.initialized = True
+
+ # Create unique traces for each thread
+ thread_traces = {}
+ trace_lock = threading.Lock()
+
+ def create_trace(trace_name=None, tags=None, **kwargs):
+ trace = TraceContext(span=Mock(), token=Mock())
+ with trace_lock:
+ thread_traces[threading.current_thread().ident] = trace
+ return trace
+
+ mock_agentops_tracer.start_trace.side_effect = create_trace
+
+ results = []
+ errors = []
+
+ def worker(thread_id):
+ try:
+ with start_trace(f"thread_{thread_id}_trace") as trace:
+ # Each thread should get its own trace
+ results.append((thread_id, trace))
+ time.sleep(0.01) # Simulate some work
+ except Exception as e:
+ errors.append((thread_id, str(e)))
+
+ # Start multiple threads
+ threads = []
+ for i in range(5):
+ t = threading.Thread(target=worker, args=(i,))
+ threads.append(t)
+ t.start()
+
+ # Wait for all threads
+ for t in threads:
+ t.join()
+
+ # Check results
+ assert len(errors) == 0, f"Errors in threads: {errors}"
+ assert len(results) == 5
+
+ # Each thread should have gotten a unique trace
+ traces = [r[1] for r in results]
+ assert len(set(id(t) for t in traces)) == 5 # All unique
+
+ @patch("agentops.sdk.core.tracer")
+ @patch("agentops.tracer")
+ def test_context_manager_with_early_return(self, mock_agentops_tracer, mock_core_tracer):
+ """Test that context manager properly cleans up with early return"""
+ # Create a mock TraceContext
+ mock_trace = TraceContext(span=Mock(), token=Mock())
+ mock_agentops_tracer.initialized = True
+ mock_agentops_tracer.start_trace.return_value = mock_trace
+
+ def function_with_early_return():
+ with start_trace("early_return_trace"):
+ if True: # Early return condition
+ return "early"
+ return "normal"
+
+ result = function_with_early_return()
+ assert result == "early"
+
+ # Verify trace was still ended properly
+ mock_core_tracer.end_trace.assert_called_once_with(mock_trace, StatusCode.OK)
+
+ @patch("agentops.sdk.core.tracer")
+ @patch("agentops.tracer")
+ def test_context_manager_with_finally_block(self, mock_agentops_tracer, mock_core_tracer):
+ """Test context manager with try-finally block"""
+ # Create a mock TraceContext
+ mock_trace = TraceContext(span=Mock(), token=Mock())
+ mock_agentops_tracer.initialized = True
+ mock_agentops_tracer.start_trace.return_value = mock_trace
+
+ finally_executed = False
+
+ try:
+ with start_trace("finally_trace"):
+ try:
+ raise ValueError("Test")
+ finally:
+ finally_executed = True
+ except ValueError:
+ pass
+
+ assert finally_executed
+ # Trace should be ended with ERROR due to exception
+ mock_core_tracer.end_trace.assert_called_once_with(mock_trace, StatusCode.ERROR)
+
+ @patch("agentops.sdk.core.tracer")
+ @patch("agentops.tracer")
+ def test_backwards_compatibility_existing_patterns(self, mock_agentops_tracer, mock_core_tracer):
+ """Test that existing usage patterns continue to work"""
+ # Create mock traces
+ mock_traces = [TraceContext(span=Mock(), token=Mock()) for _ in range(3)]
+ mock_agentops_tracer.initialized = True
+ mock_agentops_tracer.start_trace.side_effect = mock_traces
+
+ # Pattern 1: Basic context manager
+ with start_trace("basic") as trace:
+ assert trace is mock_traces[0]
+
+ # Pattern 2: Manual start/end using context manager protocol
+ trace = start_trace("manual")
+ assert trace is mock_traces[1]
+ trace.__exit__(None, None, None) # Use context manager exit instead of end_trace
+
+ # Pattern 3: With tags
+ with start_trace("tagged", tags=["production", "v2"]) as trace:
+ assert trace is mock_traces[2]
+
+ # All patterns should work
+ assert mock_agentops_tracer.start_trace.call_count == 3
+ assert mock_core_tracer.end_trace.call_count == 3
+
+ @patch("agentops.sdk.core.tracer")
+ @patch("agentops.tracer")
+ def test_edge_case_none_trace_context(self, mock_agentops_tracer, mock_core_tracer):
+ """Test handling when start_trace returns None"""
+ # Mock SDK not initialized and init fails
+ mock_agentops_tracer.initialized = False
+
+ # When start_trace is called on uninitialized tracer, it returns None
+ with patch("agentops.init") as mock_init:
+ mock_init.side_effect = Exception("Init failed")
+
+ result = start_trace("test_trace")
+ assert result is None
+
+ # Verify start_trace was not called on tracer (since init failed)
+ mock_agentops_tracer.start_trace.assert_not_called()
+
+ @patch("agentops.sdk.core.tracer")
+ @patch("agentops.tracer")
+ def test_edge_case_tracing_core_not_initialized(self, mock_agentops_tracer, mock_core_tracer):
+ """Test behavior when global tracer is not initialized"""
+ mock_agentops_tracer.initialized = False
+
+ # Mock init to succeed but tracer still not initialized
+ with patch("agentops.init") as mock_init:
+ mock_init.return_value = None # init succeeds but doesn't set initialized
+
+ result = start_trace("test")
+ assert result is None
+
+ @patch("agentops.sdk.core.tracer")
+ @patch("agentops.tracer")
+ def test_edge_case_exception_in_exit_method(self, mock_agentops_tracer, mock_core_tracer):
+ """Test handling when exception occurs in __exit__ method"""
+ # Create a mock TraceContext
+ mock_trace = TraceContext(span=Mock(), token=Mock())
+ mock_agentops_tracer.initialized = True
+ mock_agentops_tracer.start_trace.return_value = mock_trace
+
+ # Make end_trace raise an exception
+ mock_core_tracer.end_trace.side_effect = RuntimeError("End trace failed")
+
+ # The exception in __exit__ should be suppressed
+ with start_trace("exception_in_exit"):
+ pass # Should not raise
+
+ # Verify end_trace was attempted
+ mock_core_tracer.end_trace.assert_called_once()
+
+ @patch("agentops.sdk.core.tracer")
+ @patch("agentops.tracer")
+ def test_performance_many_sequential_traces(self, mock_agentops_tracer, mock_core_tracer):
+ """Test performance with many sequential traces"""
+ # Mock the tracer
+ mock_agentops_tracer.initialized = True
+
+ # Create traces on demand
+ def create_trace(trace_name=None, tags=None, **kwargs):
+ return TraceContext(span=Mock(), token=Mock())
+
+ mock_agentops_tracer.start_trace.side_effect = create_trace
+
+ # Create many traces sequentially
+ start_time = time.time()
+ for i in range(100):
+ with start_trace(f"trace_{i}") as trace:
+ assert trace is not None
+ assert trace.span is not None
+
+ elapsed = time.time() - start_time
+
+ # Should complete reasonably quickly (< 1 second for 100 traces)
+ assert elapsed < 1.0, f"Too slow: {elapsed:.2f}s for 100 traces"
+
+ # Verify all traces were started and ended
+ assert mock_agentops_tracer.start_trace.call_count == 100
+ assert mock_core_tracer.end_trace.call_count == 100
+
+ @patch("agentops.sdk.core.tracer")
+ def test_trace_context_state_management(self, mock_tracer):
+ """Test that TraceContext properly manages its internal state"""
+ mock_span = Mock()
+ mock_token = Mock()
+ trace_context = TraceContext(span=mock_span, token=mock_token)
+
+ # Initial state
+ assert trace_context.span is mock_span
+ assert trace_context.token is mock_token
+
+ # Enter context
+ result = trace_context.__enter__()
+ assert result is trace_context
+
+ # Exit context normally
+ trace_context.__exit__(None, None, None)
+ mock_tracer.end_trace.assert_called_once_with(trace_context, StatusCode.OK)
+
+ # State should remain accessible after exit
+ assert trace_context.span is mock_span
+ assert trace_context.token is mock_token
+
+ @patch("agentops.sdk.core.tracer")
+ @patch("agentops.tracer")
+ def test_context_manager_with_async_context(self, mock_agentops_tracer, mock_core_tracer):
+ """Test context manager works in async context"""
+ # Create a mock TraceContext
+ mock_trace = TraceContext(span=Mock(), token=Mock())
+ mock_agentops_tracer.initialized = True
+ mock_agentops_tracer.start_trace.return_value = mock_trace
+
+ async def async_function():
+ with start_trace("async_context") as trace:
+ assert trace is mock_trace
+ await asyncio.sleep(0.01)
+ return "done"
+
+ # Run async function
+ result = asyncio.run(async_function())
+ assert result == "done"
+
+ # Verify trace was properly managed
+ mock_agentops_tracer.start_trace.assert_called_once_with(trace_name="async_context", tags=None)
+ mock_core_tracer.end_trace.assert_called_once_with(mock_trace, StatusCode.OK)
+
+
+class TestContextManagerBackwardCompatibility(unittest.TestCase):
+ """Test backward compatibility for context manager usage"""
+
+ @patch("agentops.sdk.core.tracer")
+ @patch("agentops.tracer")
+ def test_existing_code_patterns_still_work(self, mock_agentops_tracer, mock_core_tracer):
+ """Test that code using the old patterns still works"""
+ # Create mock traces - need more than 3 for this test
+ mock_traces = [TraceContext(span=Mock(), token=Mock()) for _ in range(5)]
+ mock_agentops_tracer.initialized = True
+ mock_agentops_tracer.start_trace.side_effect = mock_traces
+
+ # Old pattern 1: Simple context manager
+ with start_trace("basic") as trace:
+ # Should work without changes
+ assert trace.span is not None
+
+ # Old pattern 2: Context manager with exception handling
+ try:
+ with start_trace("with_error") as trace:
+ raise ValueError("test")
+ except ValueError:
+ pass
+
+ # Old pattern 3: Nested traces
+ with start_trace("outer") as outer:
+ with start_trace("inner") as inner:
+ assert outer is not inner
+
+ # All should work - 4 calls total (basic, with_error, outer, inner)
+ assert mock_agentops_tracer.start_trace.call_count == 4
+ assert mock_core_tracer.end_trace.call_count == 4
+
+ @patch("agentops.sdk.core.tracer")
+ @patch("agentops.tracer")
+ def test_api_compatibility(self, mock_agentops_tracer, mock_core_tracer):
+ """Test that the API remains compatible"""
+ # Create mock TraceContexts for each call
+ mock_traces = [TraceContext(span=Mock(), token=Mock()) for _ in range(3)]
+ mock_agentops_tracer.initialized = True
+ mock_agentops_tracer.start_trace.side_effect = mock_traces
+
+ # Test function signatures
+ # start_trace(trace_name, tags=None)
+ trace1 = start_trace("test1")
+ assert trace1 is mock_traces[0]
+
+ trace2 = start_trace("test2", tags=["tag1", "tag2"])
+ assert trace2 is mock_traces[1]
+
+ trace3 = start_trace("test3", tags={"key": "value"})
+ assert trace3 is mock_traces[2]
+
+ # Use context manager protocol to end traces
+ trace1.__exit__(None, None, None)
+ trace2.__exit__(ValueError, ValueError("test"), None)
+ trace3.__exit__(None, None, None)
+
+ # All calls should work
+ assert mock_agentops_tracer.start_trace.call_count == 3
+ assert mock_core_tracer.end_trace.call_count == 3
+
+ @patch("agentops.sdk.core.tracer")
+ @patch("agentops.tracer")
+ def test_return_type_compatibility(self, mock_agentops_tracer, mock_core_tracer):
+ """Test that return types are compatible with existing code"""
+ mock_span = Mock()
+ mock_token = Mock()
+ mock_trace = TraceContext(span=mock_span, token=mock_token)
+ mock_agentops_tracer.initialized = True
+ mock_agentops_tracer.start_trace.return_value = mock_trace
+
+ # start_trace returns TraceContext (or None)
+ trace = start_trace("test")
+ assert isinstance(trace, TraceContext)
+ assert hasattr(trace, "span")
+ assert hasattr(trace, "token")
+ assert hasattr(trace, "__enter__")
+ assert hasattr(trace, "__exit__")
+
+ # Can be used as context manager
+ with trace:
+ pass
+
+ @patch("agentops.sdk.core.tracer")
+ @patch("agentops.tracer")
+ def test_context_manager_with_keyboard_interrupt(self, mock_agentops_tracer, mock_core_tracer):
+ """Test context manager handles KeyboardInterrupt properly"""
+ # Create a mock TraceContext
+ mock_trace = TraceContext(span=Mock(), token=Mock())
+ mock_agentops_tracer.initialized = True
+ mock_agentops_tracer.start_trace.return_value = mock_trace
+
+ # Test KeyboardInterrupt handling
+ with self.assertRaises(KeyboardInterrupt):
+ with start_trace("keyboard_interrupt_trace"):
+ raise KeyboardInterrupt()
+
+ # Verify end_trace was called with ERROR state
+ mock_core_tracer.end_trace.assert_called_once_with(mock_trace, StatusCode.ERROR)
+
+ @patch("agentops.sdk.core.tracer")
+ @patch("agentops.tracer")
+ def test_context_manager_with_system_exit(self, mock_agentops_tracer, mock_core_tracer):
+ """Test context manager handles SystemExit properly"""
+ # Create a mock TraceContext
+ mock_trace = TraceContext(span=Mock(), token=Mock())
+ mock_agentops_tracer.initialized = True
+ mock_agentops_tracer.start_trace.return_value = mock_trace
+
+ # Test SystemExit handling
+ with self.assertRaises(SystemExit):
+ with start_trace("system_exit_trace"):
+ raise SystemExit(1)
+
+ # Verify end_trace was called with ERROR state
+ mock_core_tracer.end_trace.assert_called_once_with(mock_trace, StatusCode.ERROR)
+
+ @patch("agentops.sdk.core.tracer")
+ @patch("agentops.tracer")
+ def test_context_manager_in_generator_function(self, mock_agentops_tracer, mock_core_tracer):
+ """Test context manager works correctly in generator functions"""
+ # Create mock traces
+ mock_traces = [TraceContext(span=Mock(), token=Mock()) for _ in range(3)]
+ mock_agentops_tracer.initialized = True
+ mock_agentops_tracer.start_trace.side_effect = mock_traces
+
+ def trace_generator():
+ with start_trace("generator_trace"):
+ yield 1
+ yield 2
+ yield 3
+
+ # Consume the generator
+ results = list(trace_generator())
+ assert results == [1, 2, 3]
+
+ # Verify trace was properly managed
+ mock_agentops_tracer.start_trace.assert_called_once()
+ mock_core_tracer.end_trace.assert_called_once()
+
+ @patch("agentops.sdk.core.tracer")
+ def test_context_manager_exit_return_value(self, mock_tracer):
+ """Test that __exit__ returns None (doesn't suppress exceptions)"""
+ mock_span = Mock()
+ mock_token = Mock()
+ trace_context = TraceContext(span=mock_span, token=mock_token)
+
+ # __exit__ should return None (or falsy) to not suppress exceptions
+ result = trace_context.__exit__(None, None, None)
+ assert result is None or not result
+
+ # Also with exception
+ result = trace_context.__exit__(ValueError, ValueError("test"), None)
+ assert result is None or not result
+
+ @patch("agentops.sdk.core.tracer")
+ @patch("agentops.tracer")
+ def test_context_manager_with_very_large_data(self, mock_agentops_tracer, mock_core_tracer):
+ """Test context manager with very large trace names and tags"""
+ # Create a mock TraceContext
+ mock_trace = TraceContext(span=Mock(), token=Mock())
+ mock_agentops_tracer.initialized = True
+ mock_agentops_tracer.start_trace.return_value = mock_trace
+
+ # Very large trace name and tags
+ large_trace_name = "x" * 10000
+ large_tags = {f"key_{i}": f"value_{i}" * 100 for i in range(100)}
+
+ with start_trace(large_trace_name, tags=large_tags) as trace:
+ assert trace is mock_trace
+
+ # Should handle large data without issues
+ mock_agentops_tracer.start_trace.assert_called_once()
+ args, kwargs = mock_agentops_tracer.start_trace.call_args
+ assert kwargs["trace_name"] == large_trace_name
+ assert kwargs["tags"] == large_tags
+
+ @patch("agentops.sdk.core.tracer")
+ @patch("agentops.tracer")
+ def test_context_manager_with_asyncio_tasks(self, mock_agentops_tracer, mock_core_tracer):
+ """Test context manager with multiple asyncio tasks"""
+ # Mock the tracer
+ mock_agentops_tracer.initialized = True
+
+ # Create traces for each task
+ trace_count = 0
+
+ def create_trace(trace_name=None, tags=None, **kwargs):
+ nonlocal trace_count
+ trace_count += 1
+ return TraceContext(span=Mock(name=f"span_{trace_count}"), token=Mock())
+
+ mock_agentops_tracer.start_trace.side_effect = create_trace
+
+ async def task_with_trace(task_id):
+ with start_trace(f"async_task_{task_id}"):
+ await asyncio.sleep(0.01)
+ return task_id
+
+ async def run_concurrent_tasks():
+ tasks = [task_with_trace(i) for i in range(5)]
+ results = await asyncio.gather(*tasks)
+ return results
+
+ # Run async tasks
+ results = asyncio.run(run_concurrent_tasks())
+ assert results == [0, 1, 2, 3, 4]
+
+ # All traces should be started and ended
+ assert mock_agentops_tracer.start_trace.call_count == 5
+ assert mock_core_tracer.end_trace.call_count == 5
+
+ @patch("agentops.sdk.core.tracer")
+ @patch("agentops.tracer")
+ def test_context_manager_resource_cleanup_on_exit_failure(self, mock_agentops_tracer, mock_core_tracer):
+ """Test that resources are cleaned up even if __exit__ fails"""
+ # Create a mock TraceContext
+ mock_trace = TraceContext(span=Mock(), token=Mock())
+ mock_agentops_tracer.initialized = True
+ mock_agentops_tracer.start_trace.return_value = mock_trace
+
+ # Make end_trace fail
+ mock_core_tracer.end_trace.side_effect = Exception("Cleanup failed")
+
+ # Should not raise exception from __exit__
+ with start_trace("cleanup_test") as trace:
+ assert trace is mock_trace
+
+ # end_trace was attempted despite failure
+ mock_core_tracer.end_trace.assert_called_once()
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/tests/unit/test_session.py b/tests/unit/test_session.py
index cfcb2b903..4b40bf770 100644
--- a/tests/unit/test_session.py
+++ b/tests/unit/test_session.py
@@ -8,7 +8,7 @@
@pytest.fixture(scope="function")
def mock_tracing_core():
- """Mock the TracingCore to avoid actual initialization"""
+ """Mock the global tracer to avoid actual initialization"""
# Patch both the main location and where it's imported in client
with (
patch("agentops.tracer") as mock_tracer,
@@ -137,7 +137,7 @@ def test_start_trace_without_init():
# Reset client for test
agentops._client = agentops.Client()
- # Mock TracingCore to be uninitialized initially, then initialized after init
+ # Mock global tracer to be uninitialized initially, then initialized after init
with (
patch("agentops.tracer") as mock_tracer,
patch("agentops.client.client.tracer", mock_tracer),
@@ -150,7 +150,7 @@ def test_start_trace_without_init():
with patch("agentops.init") as mock_init:
def side_effect():
- # After init is called, mark TracingCore as initialized
+ # After init is called, mark global tracer as initialized
mock_tracer.initialized = True
mock_init.side_effect = side_effect
@@ -172,7 +172,7 @@ def test_end_trace(mock_tracing_core, mock_trace_context):
# End the trace
agentops.end_trace(mock_trace_context, end_state="Success")
- # Verify end_trace was called on TracingCore
+ # Verify end_trace was called on global tracer
mock_tracing_core.end_trace.assert_called_once_with(trace_context=mock_trace_context, end_state="Success")
@@ -233,7 +233,7 @@ def failing_function():
def test_legacy_start_session_compatibility(mock_tracing_core, mock_api_client, mock_trace_context, reset_client):
- """Test that legacy start_session still works and calls TracingCore.start_trace"""
+ """Test that legacy start_session still works and calls tracer.start_trace"""
import agentops
from agentops.legacy import Session
@@ -253,13 +253,13 @@ def test_legacy_start_session_compatibility(mock_tracing_core, mock_api_client,
# Check that the session's trace_context has the expected properties
assert session.trace_context is not None
- # Verify that TracingCore.start_trace was called
+ # Verify that tracer.start_trace was called
# Note: May be called multiple times due to initialization
assert mock_tracing_core.start_trace.call_count >= 1
def test_legacy_end_session_compatibility(mock_tracing_core, mock_api_client, mock_trace_context, reset_client):
- """Test that legacy end_session still works and calls TracingCore.end_trace"""
+ """Test that legacy end_session still works and calls tracer.end_trace"""
import agentops
from agentops.legacy import Session
@@ -275,7 +275,7 @@ def test_legacy_end_session_compatibility(mock_tracing_core, mock_api_client, mo
# End the session
agentops.end_session(session)
- # Verify that TracingCore.end_trace was called
+ # Verify that tracer.end_trace was called
mock_tracing_core.end_trace.assert_called_once_with(mock_trace_context, end_state="Success")
diff --git a/tests/unit/test_session_legacy.py b/tests/unit/test_session_legacy.py
index 4b4b21aa9..e3b587f24 100644
--- a/tests/unit/test_session_legacy.py
+++ b/tests/unit/test_session_legacy.py
@@ -143,7 +143,7 @@ def test_crewai_kwargs_force_flush():
agentops.end_session(end_state="Success", end_state_reason="Test Finished", is_auto_end=True)
# Explicitly ensure the core isn't already shut down for the test
- assert tracer._initialized, "TracingCore should still be initialized"
+ assert tracer._initialized, "Global tracer should still be initialized"
def test_crewai_task_instrumentation(instrumentation):