From 06eca5933141447d7a3fb493b1f3e5f8cfc74068 Mon Sep 17 00:00:00 2001 From: Pratyush Shukla Date: Thu, 29 May 2025 21:31:01 +0530 Subject: [PATCH 01/13] add context management to use `with` with `start_trace` --- agentops/__init__.py | 63 +++++++++++++++++++++++++++++++------------- agentops/sdk/core.py | 19 +++++++++++++ 2 files changed, 63 insertions(+), 19 deletions(-) diff --git a/agentops/__init__.py b/agentops/__init__.py index 3b252759a..17bd5a6f0 100755 --- a/agentops/__init__.py +++ b/agentops/__init__.py @@ -15,7 +15,7 @@ from typing import List, Optional, Union, Dict, Any from agentops.client import Client from agentops.sdk.core import TracingCore, TraceContext -from agentops.sdk.decorators import trace, session, agent, task, workflow, operation +from agentops.sdk.decorators import trace, session, agent, task, workflow, operation, tool from agentops.logging.config import logger @@ -27,6 +27,21 @@ def get_client() -> Client: """Get the singleton client instance""" global _client + # If _client is None create a new one + if _client is None: + _client = Client() + + # Check if the current client instance is still valid + # This handles the case where test fixtures reset the Client singleton + # Only check if we have a real Client instance (not a mock) + if hasattr(_client, "__class__") and _client.__class__.__name__ == "Client": + # The Client class uses __instance as its singleton, so we check if it's changed + current_singleton = Client() + + # If the singleton has changed (due to test resets), update our reference + if _client is not current_singleton: + _client = current_singleton + return _client @@ -106,24 +121,31 @@ def init( elif default_tags: merged_tags = default_tags - return _client.init( - api_key=api_key, - endpoint=endpoint, - app_url=app_url, - max_wait_time=max_wait_time, - max_queue_size=max_queue_size, - default_tags=merged_tags, - trace_name=trace_name, - instrument_llm_calls=instrument_llm_calls, - auto_start_session=auto_start_session, - auto_init=auto_init, - skip_auto_end_session=skip_auto_end_session, - env_data_opt_out=env_data_opt_out, - log_level=log_level, - fail_safe=fail_safe, - exporter_endpoint=exporter_endpoint, + # Prepare initialization arguments + init_kwargs = { + "api_key": api_key, + "endpoint": endpoint, + "app_url": app_url, + "max_wait_time": max_wait_time, + "max_queue_size": max_queue_size, + "default_tags": merged_tags, + "trace_name": trace_name, + "instrument_llm_calls": instrument_llm_calls, + "auto_start_session": auto_start_session, + "auto_init": auto_init, + "skip_auto_end_session": skip_auto_end_session, + "env_data_opt_out": env_data_opt_out, + "log_level": log_level, + "fail_safe": fail_safe, + "exporter_endpoint": exporter_endpoint, **kwargs, - ) + } + + # Get the current client instance (creates new one if needed) + client = get_client() + + # Initialize the client directly + return client.init(**init_kwargs) def configure(**kwargs): @@ -173,7 +195,8 @@ def configure(**kwargs): if invalid_params: logger.warning(f"Invalid configuration parameters: {invalid_params}") - _client.configure(**kwargs) + client = get_client() + client.configure(**kwargs) def start_trace( @@ -205,6 +228,7 @@ def start_trace( logger.error(f"SDK auto-initialization failed during start_trace: {e}. Cannot start trace.") return None + # Return the native TraceContext directly - it already has context manager support return tracing_core.start_trace(trace_name=trace_name, tags=tags) @@ -247,4 +271,5 @@ def end_trace(trace_context: Optional[TraceContext] = None, end_state: str = "Su "task", "workflow", "operation", + "tool", ] diff --git a/agentops/sdk/core.py b/agentops/sdk/core.py index d36c55228..28319056a 100644 --- a/agentops/sdk/core.py +++ b/agentops/sdk/core.py @@ -34,6 +34,25 @@ def __init__(self, span: Span, token: Optional[context_api.Token] = None, is_ini self.span = span self.token = token self.is_init_trace = is_init_trace # Flag to identify the auto-started trace + self._end_state = "Indeterminate" # Default end state because we don't know yet + + def __enter__(self) -> "TraceContext": + """Enter the trace context.""" + return self + + def __exit__(self, exc_type: Optional[type], exc_val: Optional[Exception], exc_tb: Optional[Any]) -> bool: + """Exit the trace context and end the trace.""" + if exc_type is not None: + self._end_state = "Error" + if exc_val: + logger.debug(f"Trace exiting with exception: {exc_val}") + + try: + TracingCore.get_instance().end_trace(self, self._end_state) + except Exception as e: + logger.error(f"Error ending trace in context manager: {e}") + + return False def get_imported_libraries(): From 513e7d351c31f45e600d6987aa093ec3a40bb682 Mon Sep 17 00:00:00 2001 From: Pratyush Shukla Date: Thu, 29 May 2025 22:11:26 +0530 Subject: [PATCH 02/13] set to `Success` when no error encountered --- agentops/sdk/core.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/agentops/sdk/core.py b/agentops/sdk/core.py index 28319056a..25ede14dd 100644 --- a/agentops/sdk/core.py +++ b/agentops/sdk/core.py @@ -46,6 +46,9 @@ def __exit__(self, exc_type: Optional[type], exc_val: Optional[Exception], exc_t self._end_state = "Error" if exc_val: logger.debug(f"Trace exiting with exception: {exc_val}") + else: + # No exception occurred, set to Success + self._end_state = "Success" try: TracingCore.get_instance().end_trace(self, self._end_state) From 4f7102629fe9383e7863a49ef094d935813955d8 Mon Sep 17 00:00:00 2001 From: Pratyush Shukla Date: Thu, 29 May 2025 22:16:58 +0530 Subject: [PATCH 03/13] add examples --- examples/context_manager/README.md | 93 +++++ examples/context_manager/basic_usage.py | 140 ++++++++ examples/context_manager/error_handling.py | 317 ++++++++++++++++ examples/context_manager/parallel_traces.py | 245 +++++++++++++ .../context_manager/production_patterns.py | 340 ++++++++++++++++++ 5 files changed, 1135 insertions(+) create mode 100644 examples/context_manager/README.md create mode 100644 examples/context_manager/basic_usage.py create mode 100644 examples/context_manager/error_handling.py create mode 100644 examples/context_manager/parallel_traces.py create mode 100644 examples/context_manager/production_patterns.py diff --git a/examples/context_manager/README.md b/examples/context_manager/README.md new file mode 100644 index 000000000..5ed88c88f --- /dev/null +++ b/examples/context_manager/README.md @@ -0,0 +1,93 @@ +# AgentOps Context Manager Examples + +This directory contains examples demonstrating how to use AgentOps with context managers for automatic trace lifecycle management using the simplified, native implementation. + +## Overview + +AgentOps provides native context manager support through the `TraceContext` class. This implementation eliminates wrapper classes and leverages OpenTelemetry's built-in capabilities for clean, automatic trace management. + +## Key Benefits + +- **Automatic cleanup**: Traces end automatically when exiting the context +- **Error handling**: Traces are marked with appropriate error states when exceptions occur +- **Native OpenTelemetry integration**: Leverages OpenTelemetry's built-in context management +- **Simplified architecture**: No wrapper classes or monkey patching required +- **Thread safety**: Works seamlessly in multi-threaded environments +- **Performance optimized**: Direct method calls without wrapper overhead + +## Quick Start + +```python +import agentops + +# Initialize AgentOps +agentops.init(api_key="your-api-key") + +# Use native context manager support +with agentops.start_trace("my_task") as trace: + # Your code here + pass # Trace automatically ends with Success/Error state +``` + +## Examples + +### `basic_usage.py` +- Native TraceContext context manager usage +- Multiple parallel traces +- Error handling basics +- Comparison with OpenTelemetry concepts + +### `parallel_traces.py` +- Sequential and nested parallel traces +- Concurrent traces with threading and ThreadPoolExecutor +- Mixed success/error scenarios +- Different tag types (lists, dictionaries, none) + +### `error_handling.py` +- Exception handling with different error types +- Nested exception handling and propagation +- Recovery patterns (retry, graceful degradation, partial success) +- Custom exceptions, finally blocks, and exception chaining + +### `production_patterns.py` +- API endpoint monitoring and metrics +- Batch processing with individual item tracking +- Microservice communication patterns +- Production monitoring and alerting + +## Running Examples + +### Step 1: Install Dependencies + +```bash +# Install required packages +pip install agentops python-dotenv +``` + +### Step 2: Set Up API Key + +To use AgentOps and send trace data to your dashboard, you need to configure your API key using one of these methods: + +**Option A: Environment Variable** +```bash +# Export directly in your shell +export AGENTOPS_API_KEY="your-api-key-here" +``` + +**Option B: .env File (Recommended)** +```bash +# Create a .env file in the project root +echo "AGENTOPS_API_KEY=your-api-key-here" > .env +``` + +**Important**: When using a `.env` file, the examples automatically load it using `load_dotenv()`. Make sure the `.env` file is in the same directory as the example scripts or in the project root. + +### Step 3: Run the Examples + +```bash +# Run each example to see different patterns +python examples/context_manager/basic_usage.py +python examples/context_manager/parallel_traces.py +python examples/context_manager/error_handling.py +python examples/context_manager/production_patterns.py +``` diff --git a/examples/context_manager/basic_usage.py b/examples/context_manager/basic_usage.py new file mode 100644 index 000000000..122991339 --- /dev/null +++ b/examples/context_manager/basic_usage.py @@ -0,0 +1,140 @@ +""" +Basic Context Manager Example + +This example demonstrates how to use AgentOps context manager with the native +TraceContext support, eliminating the need for wrappers or monkey patching. +""" + +import os +import agentops +from agentops import agent, task, tool +from dotenv import load_dotenv + +load_dotenv() + +# Get API key from environment +AGENTOPS_API_KEY = os.environ.get("AGENTOPS_API_KEY") + + +@agent +class SimpleAgent: + """A simple example agent.""" + + def __init__(self, name: str): + self.name = name + + @task + def process_data(self, data: str) -> str: + """Process some data.""" + result = f"Processed: {data}" + return self.use_tool(result) + + @tool + def use_tool(self, input_data: str) -> str: + """Use a tool to transform data.""" + return f"Tool output: {input_data.upper()}" + + +def basic_context_manager_example(): + """Example using the native TraceContext context manager.""" + print("Basic Context Manager Example") + + # Initialize AgentOps + agentops.init(api_key=AGENTOPS_API_KEY) + + # Use native TraceContext context manager + with agentops.start_trace("basic_example", tags=["basic", "demo"]): + print("Trace started") + + # Create and use agent + agent = SimpleAgent("BasicAgent") + result = agent.process_data("sample data") + print(f"Result: {result}") + + print("Trace ended automatically") + + +def multiple_parallel_traces(): + """Example showing multiple parallel traces.""" + print("\nMultiple Parallel Traces") + + agentops.init(api_key=AGENTOPS_API_KEY) + + # First trace + with agentops.start_trace("task_1", tags=["parallel", "task-1"]): + print("Task 1 started") + agent1 = SimpleAgent("Agent1") + result1 = agent1.process_data("task 1 data") + print(f"Task 1 result: {result1}") + + # Second trace (independent) + with agentops.start_trace("task_2", tags=["parallel", "task-2"]): + print("Task 2 started") + agent2 = SimpleAgent("Agent2") + result2 = agent2.process_data("task 2 data") + print(f"Task 2 result: {result2}") + + print("All parallel traces completed") + + +def error_handling_example(): + """Example showing error handling with context manager.""" + print("\nError Handling Example") + + agentops.init(api_key=AGENTOPS_API_KEY) + + try: + with agentops.start_trace("error_example", tags=["error-handling"]): + print("Trace started") + + agent = SimpleAgent("ErrorAgent") + result = agent.process_data("some data") + print(f"Processing successful: {result}") + + # Simulate an error + raise ValueError("Simulated error") + + except ValueError as e: + print(f"Caught error: {e}") + print("Trace automatically ended with Error status") + + +def nested_traces_example(): + """Example showing nested traces (which are parallel, not parent-child).""" + print("\nNested Traces Example") + + agentops.init(api_key=AGENTOPS_API_KEY) + + # Outer trace + with agentops.start_trace("main_workflow", tags=["workflow", "main"]): + print("Main workflow started") + + # Inner trace (parallel, not child) + with agentops.start_trace("sub_task", tags=["workflow", "sub"]): + print("Sub task started") + + agent = SimpleAgent("WorkflowAgent") + result = agent.process_data("workflow data") + print(f"Sub task result: {result}") + + print("Sub task completed") + print("Main workflow completed") + + +if __name__ == "__main__": + print("AgentOps Context Manager Examples") + print("=" * 40) + + # Show basic usage + basic_context_manager_example() + + # Show multiple parallel traces + multiple_parallel_traces() + + # Show error handling + error_handling_example() + + # Show nested traces + nested_traces_example() + + print("\nAll examples completed!") diff --git a/examples/context_manager/error_handling.py b/examples/context_manager/error_handling.py new file mode 100644 index 000000000..bfeffda08 --- /dev/null +++ b/examples/context_manager/error_handling.py @@ -0,0 +1,317 @@ +""" +Error Handling with Context Managers + +This example demonstrates error handling patterns with AgentOps context managers, +showing how traces automatically handle different types of exceptions. +""" + +import os +import time +import agentops +from agentops import agent, task, tool +from dotenv import load_dotenv + +load_dotenv() + +# Get API key from environment +AGENTOPS_API_KEY = os.environ.get("AGENTOPS_API_KEY") + + +@agent +class ErrorProneAgent: + """An agent that can encounter various types of errors.""" + + def __init__(self, name: str): + self.name = name + + @task + def risky_operation(self, operation_type: str) -> str: + """Perform a risky operation that might fail.""" + if operation_type == "value_error": + raise ValueError("Invalid value provided") + elif operation_type == "type_error": + raise TypeError("Wrong type provided") + elif operation_type == "runtime_error": + raise RuntimeError("Runtime error occurred") + elif operation_type == "custom_error": + raise CustomError("Custom error occurred") + else: + return f"Success: {operation_type}" + + @tool + def validate_input(self, data: str) -> str: + """Validate input data.""" + if not data or data == "invalid": + raise ValueError("Invalid input data") + return f"Validated: {data}" + + @task + def multi_step_operation(self, steps: list) -> str: + """Perform multiple steps, any of which might fail.""" + results = [] + for i, step in enumerate(steps): + if step == "fail": + raise RuntimeError(f"Step {i+1} failed") + results.append(f"Step{i+1}:{step}") + return " -> ".join(results) + + +class CustomError(Exception): + """Custom exception for demonstration.""" + + pass + + +def basic_exception_handling(): + """Basic example of exception handling with context managers.""" + print("Basic Exception Handling") + + agentops.init(api_key=AGENTOPS_API_KEY) + + error_types = ["value_error", "type_error", "runtime_error", "success"] + + for error_type in error_types: + try: + with agentops.start_trace(f"basic_{error_type}", tags=["basic", "error-handling"]): + print(f"Started trace for {error_type}") + + agent = ErrorProneAgent(f"BasicAgent_{error_type}") + result = agent.risky_operation(error_type) + print(f"Success result: {result}") + + except ValueError as e: + print(f"Caught ValueError: {e}") + except TypeError as e: + print(f"Caught TypeError: {e}") + except RuntimeError as e: + print(f"Caught RuntimeError: {e}") + + print("Basic exception handling completed") + + +def nested_exception_handling(): + """Example of exception handling in nested traces.""" + print("\nNested Exception Handling") + + agentops.init(api_key=AGENTOPS_API_KEY) + + try: + with agentops.start_trace("outer_operation", tags=["nested", "outer"]): + print("Outer trace started") + + # outer_agent = ErrorProneAgent("OuterAgent") # Not used in this example + + try: + with agentops.start_trace("inner_operation", tags=["nested", "inner"]): + print("Inner trace started") + + inner_agent = ErrorProneAgent("InnerAgent") + # This will cause an error in the inner trace + inner_agent.risky_operation("value_error") + + except ValueError as e: + print(f"Inner trace caught ValueError: {e}") + # Re-raise to affect outer trace + raise RuntimeError("Inner operation failed") from e + + except RuntimeError as e: + print(f"Outer trace caught RuntimeError: {e}") + + print("Nested exception handling completed") + + +def retry_pattern(): + """Example of retry pattern with context managers.""" + print("\nRetry Pattern") + + agentops.init(api_key=AGENTOPS_API_KEY) + + max_retries = 3 + for attempt in range(max_retries): + try: + with agentops.start_trace(f"retry_attempt_{attempt+1}", tags=["retry", f"attempt-{attempt+1}"]): + print(f"Retry attempt {attempt+1} started") + + agent = ErrorProneAgent(f"RetryAgent_Attempt{attempt+1}") + + # Simulate success on the last attempt + if attempt < max_retries - 1: + result = agent.risky_operation("runtime_error") + else: + result = agent.risky_operation("success") + + print(f"Retry attempt {attempt+1} succeeded: {result}") + break + + except RuntimeError as e: + print(f"Retry attempt {attempt+1} failed: {e}") + if attempt < max_retries - 1: + wait_time = 2**attempt # Exponential backoff + print(f"Waiting {wait_time}s before retry...") + time.sleep(wait_time * 0.01) # Shortened for demo + else: + print("All retry attempts exhausted") + raise + + print("Retry pattern completed") + + +def graceful_degradation(): + """Example of graceful degradation pattern.""" + print("\nGraceful Degradation") + + agentops.init(api_key=AGENTOPS_API_KEY) + + try: + with agentops.start_trace("primary_service", tags=["degradation", "primary"]): + print("Primary service trace started") + + agent = ErrorProneAgent("PrimaryAgent") + result = agent.risky_operation("runtime_error") + print(f"Primary service result: {result}") + + except RuntimeError as e: + print(f"Primary service failed: {e}") + print("Falling back to secondary service...") + + with agentops.start_trace("fallback_service", tags=["degradation", "fallback"]): + print("Fallback service trace started") + + fallback_agent = ErrorProneAgent("FallbackAgent") + result = fallback_agent.risky_operation("success") + print(f"Fallback service result: {result}") + + print("Graceful degradation completed") + + +def partial_success_handling(): + """Example of partial success handling.""" + print("\nPartial Success Handling") + + agentops.init(api_key=AGENTOPS_API_KEY) + + steps = ["step1", "step2", "fail", "step4"] + + with agentops.start_trace("partial_success", tags=["partial", "multi-step"]): + print("Partial success trace started") + + agent = ErrorProneAgent("PartialAgent") + + try: + result = agent.multi_step_operation(steps) + print(f"All steps completed: {result}") + except RuntimeError as e: + print(f"Operation partially failed: {e}") + + print("Partial success handling completed") + + +def custom_exception_handling(): + """Example of handling custom exceptions.""" + print("\nCustom Exception Handling") + + agentops.init(api_key=AGENTOPS_API_KEY) + + try: + with agentops.start_trace("custom_exception", tags=["custom", "exception"]): + print("Custom exception trace started") + + agent = ErrorProneAgent("CustomAgent") + result = agent.risky_operation("custom_error") + print(f"Result: {result}") + + except CustomError as e: + print(f"Caught custom exception: {e}") + except Exception as e: + print(f"Caught unexpected exception: {e}") + + print("Custom exception handling completed") + + +def finally_blocks_example(): + """Example of exception handling with finally blocks.""" + print("\nFinally Blocks Example") + + agentops.init(api_key=AGENTOPS_API_KEY) + + cleanup_actions = [] + + try: + with agentops.start_trace("finally_example", tags=["finally", "cleanup"]): + print("Finally example trace started") + + agent = ErrorProneAgent("FinallyAgent") + + try: + result = agent.risky_operation("value_error") + print(f"Result: {result}") + finally: + cleanup_actions.append("Inner cleanup executed") + print("Inner finally block executed") + + except ValueError as e: + print(f"Caught exception: {e}") + finally: + cleanup_actions.append("Outer cleanup executed") + print("Outer finally block executed") + + print(f"Cleanup actions performed: {cleanup_actions}") + print("Finally block handling completed") + + +def exception_chaining_example(): + """Example of exception chaining and context preservation.""" + print("\nException Chaining Example") + + agentops.init(api_key=AGENTOPS_API_KEY) + + try: + with agentops.start_trace("exception_chaining", tags=["chaining", "context"]): + print("Exception chaining trace started") + + agent = ErrorProneAgent("ChainingAgent") + + try: + # First operation fails + agent.validate_input("invalid") + except ValueError as e: + print(f"Validation failed: {e}") + # Chain the exception with additional context + raise RuntimeError("Operation failed due to validation error") from e + + except RuntimeError as e: + print(f"Caught chained exception: {e}") + print(f"Original cause: {e.__cause__}") + + print("Exception chaining completed") + + +if __name__ == "__main__": + print("AgentOps Error Handling Examples") + print("=" * 40) + + # Basic exception handling + basic_exception_handling() + + # Nested exception handling + nested_exception_handling() + + # Retry pattern + retry_pattern() + + # Graceful degradation + graceful_degradation() + + # Partial success handling + partial_success_handling() + + # Custom exception handling + custom_exception_handling() + + # Finally blocks + finally_blocks_example() + + # Exception chaining + exception_chaining_example() + + print("\nAll error handling examples completed!") diff --git a/examples/context_manager/parallel_traces.py b/examples/context_manager/parallel_traces.py new file mode 100644 index 000000000..cbc9c8f89 --- /dev/null +++ b/examples/context_manager/parallel_traces.py @@ -0,0 +1,245 @@ +""" +Parallel Traces Example + +This example demonstrates how AgentOps context managers create independent +parallel traces rather than parent-child relationships, which is ideal for +concurrent operations and workflow management. +""" + +import os +import time +import threading +import concurrent.futures +import agentops +from agentops import agent, task, tool +from dotenv import load_dotenv + +load_dotenv() + +# Get API key from environment +AGENTOPS_API_KEY = os.environ.get("AGENTOPS_API_KEY") + + +@agent +class WorkerAgent: + """A worker agent that can process tasks independently.""" + + def __init__(self, worker_id: str): + self.worker_id = worker_id + + @task + def process_task(self, task_data: str) -> str: + """Process a task with some simulated work.""" + # Simulate some work + time.sleep(0.1) + + result = self.analyze_data(task_data) + return self.finalize_result(result) + + @tool + def analyze_data(self, data: str) -> str: + """Analyze the input data.""" + return f"Analyzed: {data}" + + @tool + def finalize_result(self, analyzed_data: str) -> str: + """Finalize the processing result.""" + return f"Final: {analyzed_data.upper()}" + + +def sequential_parallel_traces(): + """Example of sequential parallel traces - each trace is independent.""" + print("Sequential Parallel Traces") + + agentops.init(api_key=AGENTOPS_API_KEY) + + tasks = ["task_1", "task_2", "task_3"] + results = [] + + for i, task_name in enumerate(tasks): + # Each trace is completely independent + with agentops.start_trace(f"sequential_{task_name}", tags=["sequential", f"step-{i+1}"]): + print(f"Started trace for {task_name}") + + worker = WorkerAgent(f"Worker{i+1}") + result = worker.process_task(f"data_for_{task_name}") + results.append(result) + + print(f"{task_name} result: {result}") + + print("All sequential traces completed") + print(f"Final results: {results}") + + +def nested_parallel_traces(): + """Example showing that nested context managers create parallel traces.""" + print("\nNested Parallel Traces") + + agentops.init(api_key=AGENTOPS_API_KEY) + + # Outer trace for the overall workflow + with agentops.start_trace("workflow_main", tags=["workflow", "main"]): + print("Main workflow trace started") + + # Inner trace for data preparation (parallel, not child) + with agentops.start_trace("data_preparation", tags=["workflow", "preparation"]): + print("Data preparation trace started (parallel to main)") + + prep_worker = WorkerAgent("PrepWorker") + prep_result = prep_worker.process_task("raw_data") + print(f"Preparation result: {prep_result}") + + print("Data preparation trace ended") + + # Another inner trace for data processing (also parallel) + with agentops.start_trace("data_processing", tags=["workflow", "processing"]): + print("Data processing trace started (parallel to main)") + + proc_worker = WorkerAgent("ProcessWorker") + proc_result = proc_worker.process_task("prepared_data") + print(f"Processing result: {proc_result}") + + print("Data processing trace ended") + print("Main workflow completed") + + print("Main workflow trace ended") + print("All traces were independent/parallel, not parent-child relationships") + + +def concurrent_traces_with_threads(): + """Example of truly concurrent traces using threading.""" + print("\nConcurrent Traces with Threading") + + agentops.init(api_key=AGENTOPS_API_KEY) + + def worker_function(worker_id: int, task_data: str): + """Function to run in a separate thread with its own trace.""" + trace_name = f"concurrent_worker_{worker_id}" + + with agentops.start_trace(trace_name, tags=["concurrent", f"worker-{worker_id}"]): + print(f"Thread {worker_id}: Trace started") + + worker = WorkerAgent(f"ConcurrentWorker{worker_id}") + result = worker.process_task(task_data) + + # Simulate varying work times + time.sleep(0.05 * worker_id) + + print(f"Thread {worker_id}: Result: {result}") + return result + + # Start multiple threads, each with their own trace + threads = [] + results = [] + + for i in range(3): + thread = threading.Thread(target=lambda i=i: results.append(worker_function(i, f"concurrent_data_{i}"))) + threads.append(thread) + thread.start() + + # Wait for all threads to complete + for thread in threads: + thread.join() + + print("All concurrent traces completed") + print(f"Results from concurrent execution: {len(results)} completed") + + +def concurrent_traces_with_executor(): + """Example using ThreadPoolExecutor for concurrent traces.""" + print("\nConcurrent Traces with ThreadPoolExecutor") + + agentops.init(api_key=AGENTOPS_API_KEY) + + def process_with_trace(task_id: int, data: str) -> str: + """Process data within its own trace context.""" + trace_name = f"executor_task_{task_id}" + + with agentops.start_trace(trace_name, tags=["executor", f"task-{task_id}"]): + print(f"Executor task {task_id}: Trace started") + + worker = WorkerAgent(f"ExecutorWorker{task_id}") + result = worker.process_task(data) + + print(f"Executor task {task_id}: Result: {result}") + return result + + # Use ThreadPoolExecutor for concurrent execution + tasks_data = [ + (1, "executor_data_1"), + (2, "executor_data_2"), + (3, "executor_data_3"), + (4, "executor_data_4"), + ] + + with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: + # Submit all tasks + future_to_task = {executor.submit(process_with_trace, task_id, data): task_id for task_id, data in tasks_data} + + # Collect results as they complete + results = [] + for future in concurrent.futures.as_completed(future_to_task): + task_id = future_to_task[future] + try: + result = future.result() + results.append((task_id, result)) + print(f"Task {task_id} completed successfully") + except Exception as e: + print(f"Task {task_id} failed: {e}") + + print("All executor-based traces completed") + print(f"Completed {len(results)} tasks concurrently") + + +def trace_with_different_tag_types(): + """Example showing different ways to tag parallel traces.""" + print("\nTraces with Different Tag Types") + + agentops.init(api_key=AGENTOPS_API_KEY) + + # Trace with list tags + with agentops.start_trace("list_tags_trace", tags=["list", "example", "demo"]): + print("Trace with list tags started") + worker1 = WorkerAgent("ListTagWorker") + result1 = worker1.process_task("list_tag_data") + print(f"List tags result: {result1}") + + # Trace with dictionary tags + with agentops.start_trace( + "dict_tags_trace", tags={"environment": "demo", "version": "1.0", "priority": "high", "team": "engineering"} + ): + print("Trace with dictionary tags started") + worker2 = WorkerAgent("DictTagWorker") + result2 = worker2.process_task("dict_tag_data") + print(f"Dictionary tags result: {result2}") + + # Trace with no tags + with agentops.start_trace("no_tags_trace"): + print("Trace with no tags started") + worker3 = WorkerAgent("NoTagWorker") + result3 = worker3.process_task("no_tag_data") + print(f"No tags result: {result3}") + + print("All differently tagged traces completed") + + +if __name__ == "__main__": + print("AgentOps Parallel Traces Examples") + print("=" * 40) + + # Sequential parallel traces + sequential_parallel_traces() + + # Nested parallel traces + nested_parallel_traces() + + # Concurrent traces with threading + concurrent_traces_with_threads() + + # Concurrent traces with executor + concurrent_traces_with_executor() + + # Different tag types + trace_with_different_tag_types() + + print("\nAll parallel trace examples completed!") diff --git a/examples/context_manager/production_patterns.py b/examples/context_manager/production_patterns.py new file mode 100644 index 000000000..8dedc50ec --- /dev/null +++ b/examples/context_manager/production_patterns.py @@ -0,0 +1,340 @@ +""" +Production Patterns with Context Managers + +This example demonstrates real-world production patterns using AgentOps +context managers, including monitoring, logging, and performance tracking. +""" + +import os +import time +import logging +from datetime import datetime +from typing import Dict, Any +import agentops +from agentops import agent, task, tool +from dotenv import load_dotenv + +load_dotenv() + +# Get API key from environment +AGENTOPS_API_KEY = os.environ.get("AGENTOPS_API_KEY") + +# Configure logging +logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s") +logger = logging.getLogger(__name__) + + +@agent +class ProductionAgent: + """A production-ready agent with monitoring.""" + + def __init__(self, name: str, config: Dict[str, Any]): + self.name = name + self.config = config + self.metrics = {"tasks_completed": 0, "errors_encountered": 0, "total_processing_time": 0.0} + logger.info(f"Initialized production agent: {self.name}") + + @task + def process_request(self, request_data: Dict[str, Any]) -> Dict[str, Any]: + """Process a production request with monitoring.""" + start_time = time.time() + + try: + logger.info(f"{self.name} processing request: {request_data.get('id', 'unknown')}") + + # Validate request + validated_data = self.validate_request(request_data) + + # Process data + processed_data = self.transform_data(validated_data) + + # Generate response + response = self.generate_response(processed_data) + + # Update metrics + processing_time = time.time() - start_time + self.metrics["tasks_completed"] += 1 + self.metrics["total_processing_time"] += processing_time + + logger.info(f"{self.name} completed request in {processing_time:.3f}s") + return response + + except Exception as e: + self.metrics["errors_encountered"] += 1 + logger.error(f"{self.name} failed to process request: {e}") + raise + + @tool + def validate_request(self, data: Dict[str, Any]) -> Dict[str, Any]: + """Validate incoming request data.""" + required_fields = ["id", "type", "payload"] + + for field in required_fields: + if field not in data: + raise ValueError(f"Missing required field: {field}") + + logger.debug(f"{self.name} validated request: {data['id']}") + return data + + @tool + def transform_data(self, data: Dict[str, Any]) -> Dict[str, Any]: + """Transform and enrich the data.""" + # Simulate data transformation + time.sleep(0.01) # Simulate processing time + + transformed = { + **data, + "processed_at": datetime.now().isoformat(), + "processed_by": self.name, + "version": self.config.get("version", "1.0"), + } + + logger.debug(f"{self.name} transformed data for: {data['id']}") + return transformed + + @tool + def generate_response(self, data: Dict[str, Any]) -> Dict[str, Any]: + """Generate the final response.""" + response = { + "status": "success", + "request_id": data["id"], + "result": f"Processed {data['type']} successfully", + "metadata": { + "processed_at": data["processed_at"], + "processed_by": data["processed_by"], + "version": data["version"], + }, + } + + logger.debug(f"{self.name} generated response for: {data['id']}") + return response + + def get_metrics(self) -> Dict[str, Any]: + """Get current agent metrics.""" + avg_processing_time = self.metrics["total_processing_time"] / max(self.metrics["tasks_completed"], 1) + + return { + **self.metrics, + "average_processing_time": avg_processing_time, + "error_rate": self.metrics["errors_encountered"] + / max(self.metrics["tasks_completed"] + self.metrics["errors_encountered"], 1), + } + + +def api_endpoint_pattern(): + """Example of using context managers in API endpoint pattern.""" + print("API Endpoint Pattern") + + agentops.init(api_key=AGENTOPS_API_KEY) + + # Simulate API requests + requests = [ + {"id": "req_001", "type": "user_query", "payload": {"query": "What is AI?"}}, + {"id": "req_002", "type": "data_analysis", "payload": {"dataset": "sales_data"}}, + {"id": "req_003", "type": "user_query", "payload": {"query": "Process this"}}, + ] + + agent_config = {"version": "2.1.0", "environment": "production"} + api_agent = ProductionAgent("APIAgent", agent_config) + + for request in requests: + try: + with agentops.start_trace("api_request", tags=["api", request["type"]]): + response = api_agent.process_request(request) + logger.info(f"API Response: {response['status']} for {response['request_id']}") + + except Exception as e: + logger.error(f"API request failed: {e}") + + # Print final metrics + print(f"Agent Metrics: {api_agent.get_metrics()}") + + +def batch_processing_pattern(): + """Example of batch processing with context managers.""" + print("\nBatch Processing Pattern") + + agentops.init(api_key=AGENTOPS_API_KEY) + + # Simulate batch data + batch_data = [{"id": f"item_{i:03d}", "type": "data_record", "payload": {"value": i * 10}} for i in range(1, 6)] + + agent_config = {"version": "1.5.0", "batch_size": 5} + batch_agent = ProductionAgent("BatchAgent", agent_config) + + with agentops.start_trace("batch_processing", tags=["batch", "bulk"]): + logger.info(f"Starting batch processing of {len(batch_data)} items") + + successful_items = 0 + failed_items = 0 + + for item in batch_data: + try: + with agentops.start_trace("item_processing", tags=["batch", "item"]): + batch_agent.process_request(item) + successful_items += 1 + logger.debug(f"Processed item: {item['id']}") + + except Exception as e: + failed_items += 1 + logger.error(f"Failed to process item {item['id']}: {e}") + + logger.info(f"Batch completed: {successful_items} successful, {failed_items} failed") + + print(f"Batch Agent Metrics: {batch_agent.get_metrics()}") + + +def microservice_pattern(): + """Example of microservice communication pattern.""" + print("\nMicroservice Communication Pattern") + + agentops.init(api_key=AGENTOPS_API_KEY) + + def authenticate_user(user_id: str) -> bool: + """Simulate authentication service.""" + with agentops.start_trace("authenticate", tags=["auth", "security"]): + logger.info(f"Authenticating user: {user_id}") + time.sleep(0.01) # Simulate auth check + return user_id != "invalid_user" + + def get_user_profile(user_id: str) -> Dict[str, Any]: + """Simulate user service.""" + with agentops.start_trace("get_profile", tags=["user", "profile"]): + logger.info(f"Fetching profile for user: {user_id}") + time.sleep(0.02) # Simulate database query + return {"user_id": user_id, "name": f"User {user_id}", "email": f"{user_id}@example.com"} + + def send_notification(user_id: str, message: str) -> bool: + """Simulate notification service.""" + with agentops.start_trace("send_notification", tags=["notification", "email"]): + logger.info(f"Sending notification to user: {user_id}") + time.sleep(0.01) # Simulate email sending + return True + + # Simulate a complete user workflow + user_requests = ["user_123", "user_456", "invalid_user"] + + for user_id in user_requests: + try: + with agentops.start_trace("user_workflow", tags=["workflow", "user"]): + logger.info(f"Processing workflow for user: {user_id}") + + # Step 1: Authenticate + if not authenticate_user(user_id): + raise ValueError(f"Authentication failed for user: {user_id}") + + # Step 2: Get profile + get_user_profile(user_id) + + # Step 3: Send welcome notification + send_notification(user_id, "Welcome to our service!") + + logger.info(f"Workflow completed for user: {user_id}") + + except Exception as e: + logger.error(f"Workflow failed for user {user_id}: {e}") + + +def monitoring_pattern(): + """Example of monitoring with context managers.""" + print("\nMonitoring Pattern") + + agentops.init(api_key=AGENTOPS_API_KEY) + + class AlertManager: + """Simple alert manager for demonstration.""" + + def __init__(self): + self.alerts = [] + + def check_and_alert(self, trace_name: str, duration: float, success: bool): + """Check conditions and generate alerts.""" + # Alert on slow operations + if duration > 0.1: # 100ms threshold + self.alerts.append( + { + "type": "SLOW_OPERATION", + "trace": trace_name, + "duration": duration, + "timestamp": datetime.now().isoformat(), + } + ) + logger.warning(f"SLOW OPERATION ALERT: {trace_name} took {duration:.3f}s") + + # Alert on failures + if not success: + self.alerts.append( + {"type": "OPERATION_FAILURE", "trace": trace_name, "timestamp": datetime.now().isoformat()} + ) + logger.warning(f"FAILURE ALERT: {trace_name} failed") + + alert_manager = AlertManager() + + class MonitoredOperation: + """Context manager with built-in monitoring and alerting.""" + + def __init__(self, operation_name: str, tags: list = None): + self.operation_name = operation_name + self.tags = tags or [] + self.start_time = None + self.trace_context = None + + def __enter__(self): + self.start_time = time.time() + self.trace_context = agentops.start_trace(self.operation_name, tags=self.tags) + return self.trace_context + + def __exit__(self, exc_type, exc_val, exc_tb): + duration = time.time() - self.start_time + success = exc_type is None + + # Check for alerts + alert_manager.check_and_alert(self.operation_name, duration, success) + + return False + + # Simulate operations with different performance characteristics + operations = [ + ("fast_operation", 0.01, False), # Fast, successful + ("slow_operation", 0.15, False), # Slow, successful (will trigger alert) + ("failing_operation", 0.05, True), # Fast, but fails (will trigger alert) + ("normal_operation", 0.03, False), # Normal, successful + ] + + for op_name, sleep_time, should_fail in operations: + try: + with MonitoredOperation(op_name, tags=["monitoring", "demo"]): + logger.info(f"Executing {op_name}") + time.sleep(sleep_time) + + if should_fail: + raise RuntimeError(f"Simulated failure in {op_name}") + + logger.info(f"{op_name} completed successfully") + + except Exception as e: + logger.error(f"{op_name} failed: {e}") + + # Print alerts + print(f"Generated {len(alert_manager.alerts)} alerts:") + for alert in alert_manager.alerts: + print(f" {alert['type']}: {alert.get('trace', 'unknown')} at {alert['timestamp']}") + + +if __name__ == "__main__": + print("AgentOps Production Patterns Examples") + print("=" * 50) + + # API endpoint pattern + api_endpoint_pattern() + + # Batch processing pattern + batch_processing_pattern() + + # Microservice pattern + microservice_pattern() + + # Monitoring pattern + monitoring_pattern() + + print("\nAll production pattern examples completed!") From c7055df066b02116c97e2fc56ec3566a9785a5c8 Mon Sep 17 00:00:00 2001 From: Pratyush Shukla Date: Thu, 29 May 2025 22:31:06 +0530 Subject: [PATCH 04/13] add tests for context management --- tests/unit/test_context_manager.py | 868 +++++++++++++++++++++++++++++ 1 file changed, 868 insertions(+) create mode 100644 tests/unit/test_context_manager.py diff --git a/tests/unit/test_context_manager.py b/tests/unit/test_context_manager.py new file mode 100644 index 000000000..89ad85b49 --- /dev/null +++ b/tests/unit/test_context_manager.py @@ -0,0 +1,868 @@ +""" +Comprehensive tests for AgentOps context manager functionality. + +This test suite validates the native TraceContext context manager implementation +including edge cases, parallel traces, backwards compatibility, and error scenarios. +""" + +import pytest +import threading +import time +import asyncio +from unittest.mock import Mock, patch, call +from agentops.sdk.core import TraceContext +from agentops import start_trace, end_trace + + +class TestContextManager: + """Comprehensive test suite for AgentOps context manager functionality.""" + + def test_trace_context_has_context_manager_methods(self): + """Test that TraceContext has native context manager methods.""" + mock_span = Mock() + mock_span.get_span_context.return_value.span_id = "test_span_id" + mock_span.get_span_context.return_value.trace_id = 12345 + + trace_context = TraceContext(mock_span) + + # Verify it has context manager methods + assert hasattr(trace_context, "__enter__") + assert hasattr(trace_context, "__exit__") + assert callable(trace_context.__enter__) + assert callable(trace_context.__exit__) + + def test_trace_context_enter_returns_self(self): + """Test that __enter__ returns the TraceContext instance.""" + mock_span = Mock() + trace_context = TraceContext(mock_span) + + result = trace_context.__enter__() + assert result is trace_context + + @patch("agentops.sdk.core.TracingCore.get_instance") + def test_trace_context_exit_calls_end_trace(self, mock_get_instance): + """Test that __exit__ calls end_trace on TracingCore.""" + mock_tracing_core = Mock() + mock_get_instance.return_value = mock_tracing_core + + mock_span = Mock() + trace_context = TraceContext(mock_span) + + # Call __exit__ without exception + result = trace_context.__exit__(None, None, None) + + # Verify end_trace was called with Success state + mock_tracing_core.end_trace.assert_called_once_with(trace_context, "Success") + assert result is False # Should not suppress exceptions + + @patch("agentops.sdk.core.TracingCore.get_instance") + def test_trace_context_exit_with_exception_sets_error_state(self, mock_get_instance): + """Test that __exit__ sets Error state when exception occurs.""" + mock_tracing_core = Mock() + mock_get_instance.return_value = mock_tracing_core + + mock_span = Mock() + trace_context = TraceContext(mock_span) + + # Call __exit__ with exception + exc_type = ValueError + exc_val = ValueError("test error") + exc_tb = None + + result = trace_context.__exit__(exc_type, exc_val, exc_tb) + + # Verify end_trace was called with Error state + mock_tracing_core.end_trace.assert_called_once_with(trace_context, "Error") + assert result is False # Should not suppress exceptions + + @patch("agentops.sdk.core.TracingCore.get_instance") + def test_context_manager_usage_pattern(self, mock_get_instance): + """Test the actual context manager usage pattern.""" + mock_tracing_core = Mock() + mock_tracing_core.initialized = True + mock_get_instance.return_value = mock_tracing_core + + mock_span = Mock() + mock_span.get_span_context.return_value.span_id = "test_span_id" + mock_span.get_span_context.return_value.trace_id = 12345 + mock_span.name = "test_trace" + + mock_trace_context = TraceContext(mock_span) + mock_tracing_core.start_trace.return_value = mock_trace_context + + # Test the context manager pattern + with start_trace("test_trace") as trace: + assert trace is mock_trace_context + # Do some work + pass + + # Verify start_trace and end_trace were called + mock_tracing_core.start_trace.assert_called_once_with(trace_name="test_trace", tags=None) + mock_tracing_core.end_trace.assert_called_once_with(mock_trace_context, "Success") + + @patch("agentops.sdk.core.TracingCore.get_instance") + def test_context_manager_with_exception(self, mock_get_instance): + """Test context manager behavior when exception is raised.""" + mock_tracing_core = Mock() + mock_tracing_core.initialized = True + mock_get_instance.return_value = mock_tracing_core + + mock_span = Mock() + mock_trace_context = TraceContext(mock_span) + mock_tracing_core.start_trace.return_value = mock_trace_context + + # Test exception handling + with pytest.raises(ValueError): + with start_trace("test_trace"): + raise ValueError("test error") + + # Verify end_trace was called with Error state + mock_tracing_core.end_trace.assert_called_once_with(mock_trace_context, "Error") + + @patch("agentops.init") + @patch("agentops.sdk.core.TracingCore.get_instance") + def test_start_trace_auto_initializes_if_needed(self, mock_get_instance, mock_init): + """Test that start_trace auto-initializes if SDK not initialized.""" + mock_tracing_core = Mock() + mock_tracing_core.initialized = False + mock_get_instance.return_value = mock_tracing_core + + # Mock the init call to set initialized to True + def side_effect(): + mock_tracing_core.initialized = True + + mock_init.side_effect = side_effect + + mock_span = Mock() + mock_trace_context = TraceContext(mock_span) + mock_tracing_core.start_trace.return_value = mock_trace_context + + # Call start_trace + result = start_trace("test_trace") + + # Verify init was called and trace was started + mock_init.assert_called_once() + mock_tracing_core.start_trace.assert_called_once_with(trace_name="test_trace", tags=None) + assert result is mock_trace_context + + def test_no_wrapper_classes_needed(self): + """Test that we don't need wrapper classes anymore.""" + mock_span = Mock() + trace_context = TraceContext(mock_span) + + # Should be able to use directly as context manager + assert hasattr(trace_context, "__enter__") + assert hasattr(trace_context, "__exit__") + + # Should not need any wrapper + with trace_context as ctx: + assert ctx is trace_context + + @patch("agentops.sdk.core.TracingCore.get_instance") + def test_parallel_traces_independence(self, mock_get_instance): + """Test that parallel traces are independent and don't interfere.""" + mock_tracing_core = Mock() + mock_tracing_core.initialized = True + mock_get_instance.return_value = mock_tracing_core + + # Create mock spans for different traces + mock_span1 = Mock() + mock_span1.get_span_context.return_value.trace_id = 111 + mock_span1.name = "trace1" + + mock_span2 = Mock() + mock_span2.get_span_context.return_value.trace_id = 222 + mock_span2.name = "trace2" + + mock_trace1 = TraceContext(mock_span1) + mock_trace2 = TraceContext(mock_span2) + + # Mock start_trace to return different traces + mock_tracing_core.start_trace.side_effect = [mock_trace1, mock_trace2] + + # Start two parallel traces + trace1 = start_trace("trace1") + trace2 = start_trace("trace2") + + assert trace1 is mock_trace1 + assert trace2 is mock_trace2 + assert trace1 is not trace2 + + # Verify both traces were started independently + assert mock_tracing_core.start_trace.call_count == 2 + mock_tracing_core.start_trace.assert_has_calls( + [call(trace_name="trace1", tags=None), call(trace_name="trace2", tags=None)] + ) + + @patch("agentops.sdk.core.TracingCore.get_instance") + def test_nested_context_managers_create_parallel_traces(self, mock_get_instance): + """Test that nested context managers create parallel traces, not parent-child.""" + mock_tracing_core = Mock() + mock_tracing_core.initialized = True + mock_get_instance.return_value = mock_tracing_core + + # Create mock traces + mock_span1 = Mock() + mock_span2 = Mock() + mock_trace1 = TraceContext(mock_span1) + mock_trace2 = TraceContext(mock_span2) + + mock_tracing_core.start_trace.side_effect = [mock_trace1, mock_trace2] + + # Test nested context managers + with start_trace("outer_trace") as outer: + assert outer is mock_trace1 + + with start_trace("inner_trace") as inner: + assert inner is mock_trace2 + assert inner is not outer + + # Verify both traces were started and ended independently + assert mock_tracing_core.start_trace.call_count == 2 + assert mock_tracing_core.end_trace.call_count == 2 + + # Verify the order of end_trace calls (inner first, then outer) + mock_tracing_core.end_trace.assert_has_calls( + [ + call(mock_trace2, "Success"), # inner trace ends first + call(mock_trace1, "Success"), # outer trace ends second + ] + ) + + @patch("agentops.sdk.core.TracingCore.get_instance") + def test_exception_in_nested_traces(self, mock_get_instance): + """Test exception handling in nested traces.""" + mock_tracing_core = Mock() + mock_tracing_core.initialized = True + mock_get_instance.return_value = mock_tracing_core + + mock_span1 = Mock() + mock_span2 = Mock() + mock_trace1 = TraceContext(mock_span1) + mock_trace2 = TraceContext(mock_span2) + + mock_tracing_core.start_trace.side_effect = [mock_trace1, mock_trace2] + + # Test exception in nested trace + with pytest.raises(ValueError): + with start_trace("outer_trace"): + with start_trace("inner_trace"): + raise ValueError("inner error") + + # Verify both traces ended with appropriate states + mock_tracing_core.end_trace.assert_has_calls( + [ + call(mock_trace2, "Error"), # inner trace ends with Error + call(mock_trace1, "Error"), # outer trace also ends with Error due to exception propagation + ] + ) + + def test_trace_context_attributes_access(self): + """Test that TraceContext attributes are accessible.""" + mock_span = Mock() + mock_span.name = "test_span" + mock_span.get_span_context.return_value.trace_id = 12345 + + mock_token = Mock() + + trace_context = TraceContext(mock_span, token=mock_token, is_init_trace=True) + + # Test attribute access + assert trace_context.span is mock_span + assert trace_context.token is mock_token + assert trace_context.is_init_trace is True + assert trace_context._end_state == "Indeterminate" # Default state before exit + + @patch("agentops.sdk.core.TracingCore.get_instance") + def test_multiple_exceptions_in_sequence(self, mock_get_instance): + """Test handling multiple exceptions in sequence.""" + mock_tracing_core = Mock() + mock_tracing_core.initialized = True + mock_get_instance.return_value = mock_tracing_core + + mock_spans = [Mock() for _ in range(3)] + mock_traces = [TraceContext(span) for span in mock_spans] + mock_tracing_core.start_trace.side_effect = mock_traces + + # Test multiple traces with exceptions + for i, exception_type in enumerate([ValueError, TypeError, RuntimeError]): + with pytest.raises(exception_type): + with start_trace(f"trace_{i}"): + raise exception_type(f"error_{i}") + + # Verify all traces ended with Error state + assert mock_tracing_core.end_trace.call_count == 3 + for i, mock_trace in enumerate(mock_traces): + mock_tracing_core.end_trace.assert_any_call(mock_trace, "Error") + + @patch("agentops.sdk.core.TracingCore.get_instance") + def test_trace_with_tags_dict(self, mock_get_instance): + """Test trace creation with dictionary tags.""" + mock_tracing_core = Mock() + mock_tracing_core.initialized = True + mock_get_instance.return_value = mock_tracing_core + + mock_span = Mock() + mock_trace = TraceContext(mock_span) + mock_tracing_core.start_trace.return_value = mock_trace + + tags = {"environment": "test", "version": "1.0", "priority": "high"} + + with start_trace("tagged_trace", tags=tags) as trace: + assert trace is mock_trace + + # Verify tags were passed correctly + mock_tracing_core.start_trace.assert_called_once_with(trace_name="tagged_trace", tags=tags) + + @patch("agentops.sdk.core.TracingCore.get_instance") + def test_trace_with_tags_list(self, mock_get_instance): + """Test trace creation with list tags.""" + mock_tracing_core = Mock() + mock_tracing_core.initialized = True + mock_get_instance.return_value = mock_tracing_core + + mock_span = Mock() + mock_trace = TraceContext(mock_span) + mock_tracing_core.start_trace.return_value = mock_trace + + tags = ["test", "integration", "high-priority"] + + with start_trace("tagged_trace", tags=tags) as trace: + assert trace is mock_trace + + # Verify tags were passed correctly + mock_tracing_core.start_trace.assert_called_once_with(trace_name="tagged_trace", tags=tags) + + @patch("agentops.sdk.core.TracingCore.get_instance") + def test_trace_context_manager_thread_safety(self, mock_get_instance): + """Test that context managers work correctly in multi-threaded environment.""" + mock_tracing_core = Mock() + mock_tracing_core.initialized = True + mock_get_instance.return_value = mock_tracing_core + + results = [] + errors = [] + + def create_mock_trace(name): + mock_span = Mock() + mock_span.name = name + mock_span.get_span_context.return_value.trace_id = hash(name) + return TraceContext(mock_span) + + # Create different mock traces for each thread + mock_traces = [create_mock_trace(f"thread_{i}") for i in range(5)] + mock_tracing_core.start_trace.side_effect = mock_traces + + def worker(thread_id): + try: + with start_trace(f"thread_{thread_id}") as trace: + # Simulate some work + time.sleep(0.01) + results.append((thread_id, trace.span.name)) + except Exception as e: + errors.append((thread_id, str(e))) + + # Start multiple threads + threads = [] + for i in range(5): + thread = threading.Thread(target=worker, args=(i,)) + threads.append(thread) + thread.start() + + # Wait for all threads to complete + for thread in threads: + thread.join() + + # Verify no errors occurred + assert len(errors) == 0, f"Errors in threads: {errors}" + + # Verify all threads completed successfully + assert len(results) == 5 + + # Verify each thread got its own trace + thread_names = [result[1] for result in results] + expected_names = [f"thread_{i}" for i in range(5)] + assert sorted(thread_names) == sorted(expected_names) + + # Verify all traces were started and ended + assert mock_tracing_core.start_trace.call_count == 5 + assert mock_tracing_core.end_trace.call_count == 5 + + @patch("agentops.sdk.core.TracingCore.get_instance") + def test_context_manager_with_early_return(self, mock_get_instance): + """Test context manager behavior with early return statements.""" + mock_tracing_core = Mock() + mock_tracing_core.initialized = True + mock_get_instance.return_value = mock_tracing_core + + mock_span = Mock() + mock_trace = TraceContext(mock_span) + mock_tracing_core.start_trace.return_value = mock_trace + + def function_with_early_return(): + with start_trace("early_return_trace"): + if True: # Simulate condition for early return + return "early_result" + # This code should not be reached + return "normal_result" + + result = function_with_early_return() + + # Verify early return worked + assert result == "early_result" + + # Verify trace was still properly ended + mock_tracing_core.start_trace.assert_called_once() + mock_tracing_core.end_trace.assert_called_once_with(mock_trace, "Success") + + @patch("agentops.sdk.core.TracingCore.get_instance") + def test_context_manager_with_finally_block(self, mock_get_instance): + """Test context manager interaction with finally blocks.""" + mock_tracing_core = Mock() + mock_tracing_core.initialized = True + mock_get_instance.return_value = mock_tracing_core + + mock_span = Mock() + mock_trace = TraceContext(mock_span) + mock_tracing_core.start_trace.return_value = mock_trace + + finally_executed = [] + + try: + with start_trace("finally_trace"): + try: + raise ValueError("test error") + finally: + finally_executed.append("finally_block") + except ValueError: + pass + + # Verify finally block was executed + assert finally_executed == ["finally_block"] + + # Verify trace was ended with Error state + mock_tracing_core.end_trace.assert_called_once_with(mock_trace, "Error") + + @patch("agentops.sdk.core.TracingCore.get_instance") + def test_backwards_compatibility_existing_patterns(self, mock_get_instance): + """Test that existing code patterns continue to work.""" + mock_tracing_core = Mock() + mock_tracing_core.initialized = True + mock_get_instance.return_value = mock_tracing_core + + mock_span = Mock() + mock_trace = TraceContext(mock_span) + mock_tracing_core.start_trace.return_value = mock_trace + + # Test various existing patterns that should still work + + # Pattern 1: Basic usage + with start_trace("basic") as trace: + assert trace is not None + assert hasattr(trace, "span") + + # Pattern 2: With tags + with start_trace("tagged", tags=["test", "example"]) as trace: + assert trace is not None + + # Pattern 3: Accessing trace properties + with start_trace("properties") as trace: + span = trace.span + assert span is mock_span + + # Pattern 4: Manual end_trace (should still work) + trace = start_trace("manual") + end_trace(trace, "Success") + + # Verify all calls were made correctly + assert mock_tracing_core.start_trace.call_count == 4 + assert mock_tracing_core.end_trace.call_count == 4 + + @patch("agentops.sdk.core.TracingCore.get_instance") + def test_edge_case_none_trace_context(self, mock_get_instance): + """Test edge case where start_trace returns None.""" + mock_tracing_core = Mock() + mock_tracing_core.initialized = True + mock_get_instance.return_value = mock_tracing_core + + # Mock start_trace to return None (edge case) + mock_tracing_core.start_trace.return_value = None + + # This should not raise an exception + result = start_trace("none_trace") + assert result is None + + # Verify start_trace was called + mock_tracing_core.start_trace.assert_called_once() + + @patch("agentops.sdk.core.TracingCore.get_instance") + def test_edge_case_tracing_core_not_initialized(self, mock_get_instance): + """Test edge case where TracingCore is not initialized.""" + mock_tracing_core = Mock() + mock_tracing_core.initialized = False + mock_get_instance.return_value = mock_tracing_core + + # Mock init to fail + with patch("agentops.init") as mock_init: + mock_init.side_effect = Exception("Init failed") + + # This should return None and not raise + result = start_trace("uninitialized") + assert result is None + + # Verify init was attempted + mock_init.assert_called_once() + + @patch("agentops.sdk.core.TracingCore.get_instance") + def test_edge_case_exception_in_exit_method(self, mock_get_instance): + """Test edge case where exception occurs in __exit__ method.""" + mock_tracing_core = Mock() + mock_tracing_core.initialized = True + mock_get_instance.return_value = mock_tracing_core + + # Mock end_trace to raise an exception + mock_tracing_core.end_trace.side_effect = Exception("End trace failed") + + mock_span = Mock() + mock_trace = TraceContext(mock_span) + mock_tracing_core.start_trace.return_value = mock_trace + + # The context manager should handle the exception gracefully + with start_trace("exception_in_exit"): + pass # Normal execution + + # Verify end_trace was called despite the exception + mock_tracing_core.end_trace.assert_called_once() + + @patch("agentops.sdk.core.TracingCore.get_instance") + def test_performance_many_sequential_traces(self, mock_get_instance): + """Test performance with many sequential traces.""" + mock_tracing_core = Mock() + mock_tracing_core.initialized = True + mock_get_instance.return_value = mock_tracing_core + + def create_mock_trace(i): + mock_span = Mock() + mock_span.name = f"trace_{i}" + return TraceContext(mock_span) + + # Create many mock traces + num_traces = 100 + mock_traces = [create_mock_trace(i) for i in range(num_traces)] + mock_tracing_core.start_trace.side_effect = mock_traces + + # Execute many sequential traces + start_time = time.time() + + for i in range(num_traces): + with start_trace(f"trace_{i}") as trace: + assert trace is mock_traces[i] + + end_time = time.time() + + # Verify all traces were processed + assert mock_tracing_core.start_trace.call_count == num_traces + assert mock_tracing_core.end_trace.call_count == num_traces + + # Performance should be reasonable (less than 1 second for 100 traces) + execution_time = end_time - start_time + assert execution_time < 1.0, f"Execution took too long: {execution_time}s" + + def test_trace_context_state_management(self): + """Test TraceContext internal state management.""" + mock_span = Mock() + trace_context = TraceContext(mock_span) + + # Test initial state + assert trace_context._end_state == "Indeterminate" + + # Test state change on exception + trace_context.__exit__(ValueError, ValueError("test"), None) + assert trace_context._end_state == "Error" + + @patch("agentops.sdk.core.TracingCore.get_instance") + def test_context_manager_with_async_context(self, mock_get_instance): + """Test context manager behavior in async context (sync usage).""" + mock_tracing_core = Mock() + mock_tracing_core.initialized = True + mock_get_instance.return_value = mock_tracing_core + + mock_span = Mock() + mock_trace = TraceContext(mock_span) + mock_tracing_core.start_trace.return_value = mock_trace + + async def async_function(): + with start_trace("async_context") as trace: + await asyncio.sleep(0.001) # Simulate async work + return trace + + # Run the async function + result = asyncio.run(async_function()) + + # Verify trace was handled correctly + assert result is mock_trace + mock_tracing_core.start_trace.assert_called_once() + mock_tracing_core.end_trace.assert_called_once_with(mock_trace, "Success") + + +class TestContextManagerBackwardCompatibility: + """Test backward compatibility with existing code patterns.""" + + @patch("agentops.sdk.core.TracingCore.get_instance") + def test_existing_code_patterns_still_work(self, mock_get_instance): + """Test that all existing code patterns continue to work.""" + mock_tracing_core = Mock() + mock_tracing_core.initialized = True + mock_get_instance.return_value = mock_tracing_core + + mock_span = Mock() + mock_trace = TraceContext(mock_span) + mock_tracing_core.start_trace.return_value = mock_trace + + # Pattern 1: Basic context manager + with start_trace("basic") as trace: + assert trace is not None + + # Pattern 2: With tags as list + with start_trace("list_tags", tags=["tag1", "tag2"]) as trace: + assert trace is not None + + # Pattern 3: With tags as dict + with start_trace("dict_tags", tags={"env": "test"}) as trace: + assert trace is not None + + # Pattern 4: Accessing span + with start_trace("span_access") as trace: + span = trace.span + assert span is mock_span + + # Pattern 5: Manual trace management (legacy) + trace = start_trace("manual") + end_trace(trace) + + # All patterns should work identically + assert mock_tracing_core.start_trace.call_count == 5 + + @patch("agentops.sdk.core.TracingCore.get_instance") + def test_api_compatibility(self, mock_get_instance): + """Test that the API remains exactly the same.""" + mock_tracing_core = Mock() + mock_tracing_core.initialized = True + mock_get_instance.return_value = mock_tracing_core + + mock_span = Mock() + mock_trace = TraceContext(mock_span) + mock_tracing_core.start_trace.return_value = mock_trace + + # Test function signatures haven't changed + + # start_trace with all parameters + trace1 = start_trace("test", tags=["tag"]) + assert trace1 is mock_trace + + # start_trace with positional args + trace2 = start_trace("test2") + assert trace2 is mock_trace + + # end_trace with all parameters + end_trace(trace1, "Success") + + # end_trace with defaults + end_trace(trace2) + + # Verify calls were made correctly + mock_tracing_core.start_trace.assert_has_calls( + [call(trace_name="test", tags=["tag"]), call(trace_name="test2", tags=None)] + ) + mock_tracing_core.end_trace.assert_has_calls( + [call(trace_context=trace1, end_state="Success"), call(trace_context=trace2, end_state="Success")] + ) + + def test_return_type_compatibility(self): + """Test that return types are compatible with existing code.""" + mock_span = Mock() + trace_context = TraceContext(mock_span) + + # Test that TraceContext has all expected attributes + assert hasattr(trace_context, "span") + assert hasattr(trace_context, "token") + assert hasattr(trace_context, "is_init_trace") + assert hasattr(trace_context, "_end_state") + + # Test that it can be used as a context manager + assert hasattr(trace_context, "__enter__") + assert hasattr(trace_context, "__exit__") + + # Test that __enter__ returns self (standard pattern) + assert trace_context.__enter__() is trace_context + + @patch("agentops.sdk.core.TracingCore.get_instance") + def test_context_manager_with_keyboard_interrupt(self, mock_get_instance): + """Test context manager behavior with KeyboardInterrupt.""" + mock_tracing_core = Mock() + mock_tracing_core.initialized = True + mock_get_instance.return_value = mock_tracing_core + + mock_span = Mock() + mock_trace = TraceContext(mock_span) + mock_tracing_core.start_trace.return_value = mock_trace + + # Test KeyboardInterrupt handling + with pytest.raises(KeyboardInterrupt): + with start_trace("keyboard_interrupt_trace"): + raise KeyboardInterrupt("User interrupted") + + # Verify trace was ended with Error state + mock_tracing_core.end_trace.assert_called_once_with(mock_trace, "Error") + + @patch("agentops.sdk.core.TracingCore.get_instance") + def test_context_manager_with_system_exit(self, mock_get_instance): + """Test context manager behavior with SystemExit.""" + mock_tracing_core = Mock() + mock_tracing_core.initialized = True + mock_get_instance.return_value = mock_tracing_core + + mock_span = Mock() + mock_trace = TraceContext(mock_span) + mock_tracing_core.start_trace.return_value = mock_trace + + # Test SystemExit handling + with pytest.raises(SystemExit): + with start_trace("system_exit_trace"): + raise SystemExit(1) + + # Verify trace was ended with Error state + mock_tracing_core.end_trace.assert_called_once_with(mock_trace, "Error") + + @patch("agentops.sdk.core.TracingCore.get_instance") + def test_context_manager_in_generator_function(self, mock_get_instance): + """Test context manager usage within generator functions.""" + mock_tracing_core = Mock() + mock_tracing_core.initialized = True + mock_get_instance.return_value = mock_tracing_core + + mock_span = Mock() + mock_trace = TraceContext(mock_span) + mock_tracing_core.start_trace.return_value = mock_trace + + def trace_generator(): + with start_trace("generator_trace") as trace: + yield f"value_1_{trace.span}" + yield f"value_2_{trace.span}" + yield f"value_3_{trace.span}" + + # Consume the generator + results = list(trace_generator()) + + # Verify generator worked correctly + assert len(results) == 3 + assert all("value_" in result for result in results) + + # Verify trace was properly managed + mock_tracing_core.start_trace.assert_called_once() + mock_tracing_core.end_trace.assert_called_once_with(mock_trace, "Success") + + def test_context_manager_exit_return_value(self): + """Test that __exit__ always returns False (doesn't suppress exceptions).""" + mock_span = Mock() + trace_context = TraceContext(mock_span) + + # Test with no exception + result = trace_context.__exit__(None, None, None) + assert result is False + + # Test with exception + result = trace_context.__exit__(ValueError, ValueError("test"), None) + assert result is False + + # Test with different exception types + for exc_type in [RuntimeError, TypeError, KeyboardInterrupt, SystemExit]: + result = trace_context.__exit__(exc_type, exc_type("test"), None) + assert result is False, f"__exit__ should return False for {exc_type.__name__}" + + @patch("agentops.sdk.core.TracingCore.get_instance") + def test_context_manager_with_very_large_data(self, mock_get_instance): + """Test context manager with very large trace names and tags.""" + mock_tracing_core = Mock() + mock_tracing_core.initialized = True + mock_get_instance.return_value = mock_tracing_core + + mock_span = Mock() + mock_trace = TraceContext(mock_span) + mock_tracing_core.start_trace.return_value = mock_trace + + # Test with very large trace name + large_trace_name = "x" * 10000 # 10KB trace name + large_tags = {f"key_{i}": "x" * 1000 for i in range(100)} # Large tags + + with start_trace(large_trace_name, tags=large_tags) as trace: + assert trace is mock_trace + + # Verify the large data was passed correctly + mock_tracing_core.start_trace.assert_called_once_with(trace_name=large_trace_name, tags=large_tags) + mock_tracing_core.end_trace.assert_called_once_with(mock_trace, "Success") + + @patch("agentops.sdk.core.TracingCore.get_instance") + def test_context_manager_with_asyncio_tasks(self, mock_get_instance): + """Test context manager with actual asyncio tasks (not just async functions).""" + mock_tracing_core = Mock() + mock_tracing_core.initialized = True + mock_get_instance.return_value = mock_tracing_core + + # Create different mock traces for each task + mock_spans = [Mock() for _ in range(3)] + mock_traces = [TraceContext(span) for span in mock_spans] + mock_tracing_core.start_trace.side_effect = mock_traces + + async def task_with_trace(task_id): + with start_trace(f"async_task_{task_id}"): + await asyncio.sleep(0.001) # Simulate async work + return f"result_{task_id}" + + async def run_concurrent_tasks(): + # Create multiple asyncio tasks + tasks = [ + asyncio.create_task(task_with_trace(1)), + asyncio.create_task(task_with_trace(2)), + asyncio.create_task(task_with_trace(3)), + ] + + # Wait for all tasks to complete + results = await asyncio.gather(*tasks) + return results + + # Run the concurrent tasks + results = asyncio.run(run_concurrent_tasks()) + + # Verify all tasks completed + assert len(results) == 3 + assert results == ["result_1", "result_2", "result_3"] + + # Verify all traces were started and ended + assert mock_tracing_core.start_trace.call_count == 3 + assert mock_tracing_core.end_trace.call_count == 3 + + @patch("agentops.sdk.core.TracingCore.get_instance") + def test_context_manager_resource_cleanup_on_exit_failure(self, mock_get_instance): + """Test that resources are cleaned up even if __exit__ fails.""" + mock_tracing_core = Mock() + mock_tracing_core.initialized = True + mock_get_instance.return_value = mock_tracing_core + + mock_span = Mock() + mock_trace = TraceContext(mock_span) + mock_tracing_core.start_trace.return_value = mock_trace + + # Mock end_trace to fail + mock_tracing_core.end_trace.side_effect = Exception("End trace failed") + + # The context manager should handle the failure gracefully + with start_trace("cleanup_test") as trace: + assert trace is mock_trace + # Normal execution + + # Verify end_trace was attempted despite the failure + mock_tracing_core.end_trace.assert_called_once_with(mock_trace, "Success") + + # Verify the trace state was still updated + assert mock_trace._end_state == "Success" From c57001a53e7158af7e39f38489af6db374148818 Mon Sep 17 00:00:00 2001 From: Pratyush Shukla Date: Thu, 29 May 2025 22:49:06 +0530 Subject: [PATCH 05/13] add docs --- docs/mint.json | 1 + docs/v2/usage/context-managers.mdx | 328 +++++++++++++++++++++++++++++ 2 files changed, 329 insertions(+) create mode 100644 docs/v2/usage/context-managers.mdx diff --git a/docs/mint.json b/docs/mint.json index f2c0865f7..00fec4adf 100644 --- a/docs/mint.json +++ b/docs/mint.json @@ -187,6 +187,7 @@ "v2/usage/dashboard-info", "v2/usage/sdk-reference", "v2/usage/advanced-configuration", + "v2/usage/context-managers", "v2/usage/tracking-llm-calls", "v2/usage/tracking-agents", "v2/usage/recording-operations", diff --git a/docs/v2/usage/context-managers.mdx b/docs/v2/usage/context-managers.mdx new file mode 100644 index 000000000..391dbd2b2 --- /dev/null +++ b/docs/v2/usage/context-managers.mdx @@ -0,0 +1,328 @@ +--- +title: "Context Managers" +description: "Use AgentOps traces as Python context managers for automatic lifecycle management" +--- + +# Context Managers + +AgentOps provides native context manager support for traces, allowing you to use Python's `with` statement for automatic trace lifecycle management. This approach ensures traces are properly started and ended, even when exceptions occur. + +## Basic Usage + +The simplest way to use context managers is with the `start_trace()` function: + +```python +import agentops + +# Initialize AgentOps +agentops.init(api_key="your-api-key") + +# Use context manager for automatic trace management +with agentops.start_trace("my_workflow") as trace: + # Your code here + print("Processing data...") + # Trace automatically ends when exiting the with block +``` + +The trace will automatically: +- Start when entering the `with` block +- End with "Success" status when exiting normally +- End with "Error" status if an exception occurs +- Clean up resources properly in all cases + +## Advanced Usage + +### Traces with Tags + +You can add tags to traces for better organization and filtering: + +```python +import agentops + +agentops.init(api_key="your-api-key") + +# Using list tags +with agentops.start_trace("data_processing", tags=["batch", "production"]): + process_batch_data() + +# Using dictionary tags for more structured metadata +with agentops.start_trace("user_request", tags={ + "user_id": "12345", + "request_type": "query", + "priority": "high" +}): + handle_user_request() +``` + +### Parallel Traces + +Context managers create independent parallel traces, not parent-child relationships: + +```python +import agentops + +agentops.init(api_key="your-api-key") + +# Sequential parallel traces +with agentops.start_trace("task_1"): + print("Task 1 executing") + +with agentops.start_trace("task_2"): + print("Task 2 executing") + +# Nested context managers create parallel traces +with agentops.start_trace("outer_workflow"): + print("Outer workflow started") + + with agentops.start_trace("inner_task"): + print("Inner task executing (parallel to outer)") + + print("Outer workflow continuing") +``` + +### Exception Handling + +Context managers automatically handle exceptions and set appropriate trace states: + +```python +import agentops + +agentops.init(api_key="your-api-key") + +# Automatic error handling +try: + with agentops.start_trace("risky_operation"): + # This will automatically set trace status to "Error" + raise ValueError("Something went wrong") +except ValueError as e: + print(f"Caught error: {e}") + # Trace has already been ended with Error status + +# Graceful degradation pattern +try: + with agentops.start_trace("primary_service"): + result = call_primary_service() +except ServiceUnavailableError: + with agentops.start_trace("fallback_service"): + result = call_fallback_service() +``` + +### Concurrent Execution + +Context managers work seamlessly with threading and asyncio: + + + +```python Threading +import agentops +import threading + +agentops.init(api_key="your-api-key") + +# With threading +def worker_function(worker_id): + with agentops.start_trace(f"worker_{worker_id}"): + # Each thread gets its own independent trace + process_work(worker_id) + +threads = [] +for i in range(3): + thread = threading.Thread(target=worker_function, args=(i,)) + threads.append(thread) + thread.start() + +for thread in threads: + thread.join() +``` + +```python Asyncio +import agentops +import asyncio + +agentops.init(api_key="your-api-key") + +# With asyncio +async def async_task(task_id): + with agentops.start_trace(f"async_task_{task_id}"): + await asyncio.sleep(0.1) # Simulate async work + return f"result_{task_id}" + +async def main(): + tasks = [async_task(i) for i in range(3)] + results = await asyncio.gather(*tasks) + return results + +# Run async tasks +results = asyncio.run(main()) +``` + + + +## Production Patterns + +### API Endpoint Monitoring + +```python +import agentops +from flask import Flask, request + +app = Flask(__name__) +agentops.init(api_key="your-api-key") + +@app.route('/api/process', methods=['POST']) +def process_request(): + # Create trace for each API request + with agentops.start_trace("api_request", tags={ + "endpoint": "/api/process", + "method": "POST", + "user_id": request.headers.get("user-id") + }): + try: + data = request.get_json() + result = process_data(data) + return {"status": "success", "result": result} + except Exception as e: + # Exception automatically sets trace to Error status + return {"status": "error", "message": str(e)}, 500 +``` + +### Batch Processing + +```python +import agentops + +agentops.init(api_key="your-api-key") + +def process_batch(items): + with agentops.start_trace("batch_processing", tags={ + "batch_size": len(items), + "batch_type": "data_processing" + }): + successful = 0 + failed = 0 + + for item in items: + try: + with agentops.start_trace("item_processing", tags={ + "item_id": item.get("id"), + "item_type": item.get("type") + }): + process_item(item) + successful += 1 + except Exception as e: + failed += 1 + print(f"Failed to process item {item.get('id')}: {e}") + + print(f"Batch completed: {successful} successful, {failed} failed") +``` + +### Retry Logic + +```python +import agentops +import time + +agentops.init(api_key="your-api-key") + +def retry_operation(operation_name, max_retries=3): + for attempt in range(max_retries): + try: + with agentops.start_trace(f"{operation_name}_attempt_{attempt + 1}", tags={ + "operation": operation_name, + "attempt": attempt + 1, + "max_retries": max_retries + }): + # Your operation here + result = perform_operation() + return result # Success - exit retry loop + + except Exception as e: + if attempt < max_retries - 1: + wait_time = 2 ** attempt # Exponential backoff + print(f"Attempt {attempt + 1} failed: {e}. Retrying in {wait_time}s...") + time.sleep(wait_time) + else: + print(f"All {max_retries} attempts failed") + raise +``` + +## Backward Compatibility + +Context managers are fully backward compatible with existing AgentOps code patterns: + + + +```python Manual Management +import agentops + +agentops.init(api_key="your-api-key") + +# Manual trace management (legacy) +trace = agentops.start_trace("manual_trace") +# ... your code ... +agentops.end_trace(trace, "Success") +``` + +```python Context Manager +import agentops + +agentops.init(api_key="your-api-key") + +# Context manager (new, recommended) +with agentops.start_trace("context_managed_trace") as trace: + # ... your code ... + pass # Automatically ended +``` + +```python Property Access +import agentops + +agentops.init(api_key="your-api-key") + +# Accessing trace properties +with agentops.start_trace("property_access") as trace: + span = trace.span # Access underlying span + trace_id = trace.span.get_span_context().trace_id +``` + +```python Mixed Usage +import agentops + +agentops.init(api_key="your-api-key") + +# Mixed usage +trace = agentops.start_trace("mixed_usage") +try: + with trace: # Use existing trace as context manager + # ... your code ... + pass +except Exception: + agentops.end_trace(trace, "Error") +``` + + + +## Examples + +For complete working examples, see the following files in the AgentOps repository: + + + + Simple context manager patterns and error handling + + + Sequential, nested, and concurrent trace patterns + + + Exception handling, retry patterns, and graceful degradation + + + API endpoints, batch processing, microservices, and monitoring + + + +These examples demonstrate real-world usage patterns and best practices for using AgentOps context managers in production applications. + +## API Reference + +For detailed API information, see the [SDK Reference](/v2/usage/sdk-reference#trace-management) documentation. From 5c8b995bf034546be9512774b28767a86a9a8ef1 Mon Sep 17 00:00:00 2001 From: Pratyush Shukla Date: Fri, 30 May 2025 01:35:18 +0530 Subject: [PATCH 06/13] add `TraceState` for handling status codes and handle thread management for client --- agentops/__init__.py | 38 +++++++++++++++++++++----------------- agentops/enums.py | 36 ++++++++++++++++++++++++++++++++++++ agentops/sdk/__init__.py | 4 ++++ agentops/sdk/core.py | 38 +++++++++++++++++++++++++++++--------- 4 files changed, 90 insertions(+), 26 deletions(-) create mode 100644 agentops/enums.py diff --git a/agentops/__init__.py b/agentops/__init__.py index 17bd5a6f0..0520b8f5b 100755 --- a/agentops/__init__.py +++ b/agentops/__init__.py @@ -16,31 +16,26 @@ from agentops.client import Client from agentops.sdk.core import TracingCore, TraceContext from agentops.sdk.decorators import trace, session, agent, task, workflow, operation, tool +from agentops.enums import TraceState, SUCCESS, ERROR, UNSET +from opentelemetry.trace.status import StatusCode from agentops.logging.config import logger +import threading -# Client global instance; one per process runtime -_client = Client() +# Thread-safe client management +_client_lock = threading.Lock() +_client = None def get_client() -> Client: - """Get the singleton client instance""" + """Get the singleton client instance in a thread-safe manner""" global _client - # If _client is None create a new one + # Double-checked locking pattern for thread safety if _client is None: - _client = Client() - - # Check if the current client instance is still valid - # This handles the case where test fixtures reset the Client singleton - # Only check if we have a real Client instance (not a mock) - if hasattr(_client, "__class__") and _client.__class__.__name__ == "Client": - # The Client class uses __instance as its singleton, so we check if it's changed - current_singleton = Client() - - # If the singleton has changed (due to test resets), update our reference - if _client is not current_singleton: - _client = current_singleton + with _client_lock: + if _client is None: + _client = Client() return _client @@ -232,7 +227,9 @@ def start_trace( return tracing_core.start_trace(trace_name=trace_name, tags=tags) -def end_trace(trace_context: Optional[TraceContext] = None, end_state: str = "Success") -> None: +def end_trace( + trace_context: Optional[TraceContext] = None, end_state: Union[TraceState, StatusCode, str] = TraceState.SUCCESS +) -> None: """ Ends a trace (its root span) and finalizes it. If no trace_context is provided, ends all active session spans. @@ -272,4 +269,11 @@ def end_trace(trace_context: Optional[TraceContext] = None, end_state: str = "Su "workflow", "operation", "tool", + # Trace state enums + "TraceState", + "SUCCESS", + "ERROR", + "UNSET", + # OpenTelemetry status codes (for advanced users) + "StatusCode", ] diff --git a/agentops/enums.py b/agentops/enums.py new file mode 100644 index 000000000..c3dcfda38 --- /dev/null +++ b/agentops/enums.py @@ -0,0 +1,36 @@ +""" +AgentOps enums for user-friendly API. + +This module provides simple enums that users can import from agentops +without needing to know about OpenTelemetry internals. +""" + +from enum import Enum +from opentelemetry.trace.status import StatusCode + + +class TraceState(Enum): + """ + Enum for trace end states. + + This provides a user-friendly interface that maps to OpenTelemetry StatusCode internally. + Users can simply use agentops.TraceState.SUCCESS instead of importing OpenTelemetry. + """ + + SUCCESS = StatusCode.OK + ERROR = StatusCode.ERROR + UNSET = StatusCode.UNSET + + def __str__(self) -> str: + """Return the name for string representation.""" + return self.name + + def to_status_code(self) -> StatusCode: + """Convert to OpenTelemetry StatusCode.""" + return self.value + + +# For backward compatibility, also provide these as module-level constants +SUCCESS = TraceState.SUCCESS +ERROR = TraceState.ERROR +UNSET = TraceState.UNSET diff --git a/agentops/sdk/__init__.py b/agentops/sdk/__init__.py index f1be1f718..58802e366 100644 --- a/agentops/sdk/__init__.py +++ b/agentops/sdk/__init__.py @@ -14,6 +14,8 @@ # from agentops.sdk.traced import TracedObject # Merged into TracedObject from agentops.sdk.types import TracingConfig +from opentelemetry.trace.status import StatusCode + __all__ = [ # Core components "TracingCore", @@ -24,4 +26,6 @@ "agent", "task", "workflow", + # OpenTelemetry status codes + "StatusCode", ] diff --git a/agentops/sdk/core.py b/agentops/sdk/core.py index 25ede14dd..189a03959 100644 --- a/agentops/sdk/core.py +++ b/agentops/sdk/core.py @@ -6,7 +6,7 @@ import sys import os import psutil -from typing import Optional, Any, Dict +from typing import Optional, Any, Dict, Union from opentelemetry import metrics, trace from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter @@ -24,6 +24,7 @@ from agentops.sdk.types import TracingConfig from agentops.semconv import ResourceAttributes, SpanKind, SpanAttributes, CoreAttributes from agentops.helpers.dashboard import log_trace_url +from opentelemetry.trace.status import StatusCode # No need to create shortcuts since we're using our own ResourceAttributes class now @@ -34,7 +35,7 @@ def __init__(self, span: Span, token: Optional[context_api.Token] = None, is_ini self.span = span self.token = token self.is_init_trace = is_init_trace # Flag to identify the auto-started trace - self._end_state = "Indeterminate" # Default end state because we don't know yet + self._end_state = StatusCode.UNSET # Default end state because we don't know yet def __enter__(self) -> "TraceContext": """Enter the trace context.""" @@ -43,12 +44,12 @@ def __enter__(self) -> "TraceContext": def __exit__(self, exc_type: Optional[type], exc_val: Optional[Exception], exc_tb: Optional[Any]) -> bool: """Exit the trace context and end the trace.""" if exc_type is not None: - self._end_state = "Error" + self._end_state = StatusCode.ERROR if exc_val: logger.debug(f"Trace exiting with exception: {exc_val}") else: - # No exception occurred, set to Success - self._end_state = "Success" + # No exception occurred, set to OK + self._end_state = StatusCode.OK try: TracingCore.get_instance().end_trace(self, self._end_state) @@ -476,7 +477,9 @@ def start_trace( return trace_context - def end_trace(self, trace_context: Optional[TraceContext] = None, end_state: str = "Success") -> None: + def end_trace( + self, trace_context: Optional[TraceContext] = None, end_state: Union[Any, StatusCode, str] = None + ) -> None: """ Ends a trace (its root span) and finalizes it. If no trace_context is provided, ends all active session spans. @@ -489,6 +492,12 @@ def end_trace(self, trace_context: Optional[TraceContext] = None, end_state: str logger.warning("TracingCore not initialized. Cannot end trace.") return + # Set default if not provided + if end_state is None: + from agentops.enums import TraceState + + end_state = TraceState.SUCCESS + # If no specific trace_context provided, end all active traces if trace_context is None: with self._traces_lock: @@ -502,7 +511,7 @@ def end_trace(self, trace_context: Optional[TraceContext] = None, end_state: str # End specific trace self._end_single_trace(trace_context, end_state) - def _end_single_trace(self, trace_context: TraceContext, end_state: str) -> None: + def _end_single_trace(self, trace_context: TraceContext, end_state: Union[Any, StatusCode, str]) -> None: """ Internal method to end a single trace. @@ -524,10 +533,21 @@ def _end_single_trace(self, trace_context: TraceContext, end_state: str) -> None # Handle case where span is mocked or trace_id is not a valid integer trace_id = str(span.get_span_context().trace_id) - logger.debug(f"Ending trace with span ID: {span.get_span_context().span_id}, end_state: {end_state}") + # Convert TraceState enum to StatusCode if needed + if hasattr(end_state, "to_status_code"): + # It's a TraceState enum + state_str = str(end_state) + elif isinstance(end_state, StatusCode): + # It's already a StatusCode + state_str = str(end_state) + else: + # It's a string (legacy) + state_str = str(end_state) + + logger.debug(f"Ending trace with span ID: {span.get_span_context().span_id}, end_state: {state_str}") try: - span.set_attribute(SpanAttributes.AGENTOPS_SESSION_END_STATE, end_state) + span.set_attribute(SpanAttributes.AGENTOPS_SESSION_END_STATE, state_str) _finalize_span(span, token=token) # Remove from active traces From 1bf7033639f9ebf15bf1ffc3e1212cdd2f40a3fd Mon Sep 17 00:00:00 2001 From: Pratyush Shukla Date: Fri, 30 May 2025 01:35:32 +0530 Subject: [PATCH 07/13] modify to use updated status codes --- tests/unit/test_context_manager.py | 52 ++++++++++++++++-------------- 1 file changed, 27 insertions(+), 25 deletions(-) diff --git a/tests/unit/test_context_manager.py b/tests/unit/test_context_manager.py index 89ad85b49..082399441 100644 --- a/tests/unit/test_context_manager.py +++ b/tests/unit/test_context_manager.py @@ -11,6 +11,8 @@ import asyncio from unittest.mock import Mock, patch, call from agentops.sdk.core import TraceContext +from agentops.enums import TraceState +from opentelemetry.trace.status import StatusCode from agentops import start_trace, end_trace @@ -51,8 +53,8 @@ def test_trace_context_exit_calls_end_trace(self, mock_get_instance): # Call __exit__ without exception result = trace_context.__exit__(None, None, None) - # Verify end_trace was called with Success state - mock_tracing_core.end_trace.assert_called_once_with(trace_context, "Success") + # Verify end_trace was called with SUCCESS state + mock_tracing_core.end_trace.assert_called_once_with(trace_context, StatusCode.OK) assert result is False # Should not suppress exceptions @patch("agentops.sdk.core.TracingCore.get_instance") @@ -71,8 +73,8 @@ def test_trace_context_exit_with_exception_sets_error_state(self, mock_get_insta result = trace_context.__exit__(exc_type, exc_val, exc_tb) - # Verify end_trace was called with Error state - mock_tracing_core.end_trace.assert_called_once_with(trace_context, "Error") + # Verify end_trace was called with ERROR state + mock_tracing_core.end_trace.assert_called_once_with(trace_context, StatusCode.ERROR) assert result is False # Should not suppress exceptions @patch("agentops.sdk.core.TracingCore.get_instance") @@ -98,7 +100,7 @@ def test_context_manager_usage_pattern(self, mock_get_instance): # Verify start_trace and end_trace were called mock_tracing_core.start_trace.assert_called_once_with(trace_name="test_trace", tags=None) - mock_tracing_core.end_trace.assert_called_once_with(mock_trace_context, "Success") + mock_tracing_core.end_trace.assert_called_once_with(mock_trace_context, StatusCode.OK) @patch("agentops.sdk.core.TracingCore.get_instance") def test_context_manager_with_exception(self, mock_get_instance): @@ -117,7 +119,7 @@ def test_context_manager_with_exception(self, mock_get_instance): raise ValueError("test error") # Verify end_trace was called with Error state - mock_tracing_core.end_trace.assert_called_once_with(mock_trace_context, "Error") + mock_tracing_core.end_trace.assert_called_once_with(mock_trace_context, StatusCode.ERROR) @patch("agentops.init") @patch("agentops.sdk.core.TracingCore.get_instance") @@ -224,8 +226,8 @@ def test_nested_context_managers_create_parallel_traces(self, mock_get_instance) # Verify the order of end_trace calls (inner first, then outer) mock_tracing_core.end_trace.assert_has_calls( [ - call(mock_trace2, "Success"), # inner trace ends first - call(mock_trace1, "Success"), # outer trace ends second + call(mock_trace2, StatusCode.OK), # inner trace ends first + call(mock_trace1, StatusCode.OK), # outer trace ends second ] ) @@ -252,8 +254,8 @@ def test_exception_in_nested_traces(self, mock_get_instance): # Verify both traces ended with appropriate states mock_tracing_core.end_trace.assert_has_calls( [ - call(mock_trace2, "Error"), # inner trace ends with Error - call(mock_trace1, "Error"), # outer trace also ends with Error due to exception propagation + call(mock_trace2, StatusCode.ERROR), # inner trace ends with Error + call(mock_trace1, StatusCode.ERROR), # outer trace also ends with Error due to exception propagation ] ) @@ -271,7 +273,7 @@ def test_trace_context_attributes_access(self): assert trace_context.span is mock_span assert trace_context.token is mock_token assert trace_context.is_init_trace is True - assert trace_context._end_state == "Indeterminate" # Default state before exit + assert trace_context._end_state == StatusCode.UNSET # Default state before exit @patch("agentops.sdk.core.TracingCore.get_instance") def test_multiple_exceptions_in_sequence(self, mock_get_instance): @@ -293,7 +295,7 @@ def test_multiple_exceptions_in_sequence(self, mock_get_instance): # Verify all traces ended with Error state assert mock_tracing_core.end_trace.call_count == 3 for i, mock_trace in enumerate(mock_traces): - mock_tracing_core.end_trace.assert_any_call(mock_trace, "Error") + mock_tracing_core.end_trace.assert_any_call(mock_trace, StatusCode.ERROR) @patch("agentops.sdk.core.TracingCore.get_instance") def test_trace_with_tags_dict(self, mock_get_instance): @@ -413,7 +415,7 @@ def function_with_early_return(): # Verify trace was still properly ended mock_tracing_core.start_trace.assert_called_once() - mock_tracing_core.end_trace.assert_called_once_with(mock_trace, "Success") + mock_tracing_core.end_trace.assert_called_once_with(mock_trace, StatusCode.OK) @patch("agentops.sdk.core.TracingCore.get_instance") def test_context_manager_with_finally_block(self, mock_get_instance): @@ -441,7 +443,7 @@ def test_context_manager_with_finally_block(self, mock_get_instance): assert finally_executed == ["finally_block"] # Verify trace was ended with Error state - mock_tracing_core.end_trace.assert_called_once_with(mock_trace, "Error") + mock_tracing_core.end_trace.assert_called_once_with(mock_trace, StatusCode.ERROR) @patch("agentops.sdk.core.TracingCore.get_instance") def test_backwards_compatibility_existing_patterns(self, mock_get_instance): @@ -472,7 +474,7 @@ def test_backwards_compatibility_existing_patterns(self, mock_get_instance): # Pattern 4: Manual end_trace (should still work) trace = start_trace("manual") - end_trace(trace, "Success") + end_trace(trace, StatusCode.OK) # Verify all calls were made correctly assert mock_tracing_core.start_trace.call_count == 4 @@ -574,11 +576,11 @@ def test_trace_context_state_management(self): trace_context = TraceContext(mock_span) # Test initial state - assert trace_context._end_state == "Indeterminate" + assert trace_context._end_state == StatusCode.UNSET # Test state change on exception trace_context.__exit__(ValueError, ValueError("test"), None) - assert trace_context._end_state == "Error" + assert trace_context._end_state == StatusCode.ERROR @patch("agentops.sdk.core.TracingCore.get_instance") def test_context_manager_with_async_context(self, mock_get_instance): @@ -602,7 +604,7 @@ async def async_function(): # Verify trace was handled correctly assert result is mock_trace mock_tracing_core.start_trace.assert_called_once() - mock_tracing_core.end_trace.assert_called_once_with(mock_trace, "Success") + mock_tracing_core.end_trace.assert_called_once_with(mock_trace, StatusCode.OK) class TestContextManagerBackwardCompatibility: @@ -675,7 +677,7 @@ def test_api_compatibility(self, mock_get_instance): [call(trace_name="test", tags=["tag"]), call(trace_name="test2", tags=None)] ) mock_tracing_core.end_trace.assert_has_calls( - [call(trace_context=trace1, end_state="Success"), call(trace_context=trace2, end_state="Success")] + [call(trace_context=trace1, end_state="Success"), call(trace_context=trace2, end_state=TraceState.SUCCESS)] ) def test_return_type_compatibility(self): @@ -713,7 +715,7 @@ def test_context_manager_with_keyboard_interrupt(self, mock_get_instance): raise KeyboardInterrupt("User interrupted") # Verify trace was ended with Error state - mock_tracing_core.end_trace.assert_called_once_with(mock_trace, "Error") + mock_tracing_core.end_trace.assert_called_once_with(mock_trace, StatusCode.ERROR) @patch("agentops.sdk.core.TracingCore.get_instance") def test_context_manager_with_system_exit(self, mock_get_instance): @@ -732,7 +734,7 @@ def test_context_manager_with_system_exit(self, mock_get_instance): raise SystemExit(1) # Verify trace was ended with Error state - mock_tracing_core.end_trace.assert_called_once_with(mock_trace, "Error") + mock_tracing_core.end_trace.assert_called_once_with(mock_trace, StatusCode.ERROR) @patch("agentops.sdk.core.TracingCore.get_instance") def test_context_manager_in_generator_function(self, mock_get_instance): @@ -760,7 +762,7 @@ def trace_generator(): # Verify trace was properly managed mock_tracing_core.start_trace.assert_called_once() - mock_tracing_core.end_trace.assert_called_once_with(mock_trace, "Success") + mock_tracing_core.end_trace.assert_called_once_with(mock_trace, StatusCode.OK) def test_context_manager_exit_return_value(self): """Test that __exit__ always returns False (doesn't suppress exceptions).""" @@ -800,7 +802,7 @@ def test_context_manager_with_very_large_data(self, mock_get_instance): # Verify the large data was passed correctly mock_tracing_core.start_trace.assert_called_once_with(trace_name=large_trace_name, tags=large_tags) - mock_tracing_core.end_trace.assert_called_once_with(mock_trace, "Success") + mock_tracing_core.end_trace.assert_called_once_with(mock_trace, StatusCode.OK) @patch("agentops.sdk.core.TracingCore.get_instance") def test_context_manager_with_asyncio_tasks(self, mock_get_instance): @@ -862,7 +864,7 @@ def test_context_manager_resource_cleanup_on_exit_failure(self, mock_get_instanc # Normal execution # Verify end_trace was attempted despite the failure - mock_tracing_core.end_trace.assert_called_once_with(mock_trace, "Success") + mock_tracing_core.end_trace.assert_called_once_with(mock_trace, StatusCode.OK) # Verify the trace state was still updated - assert mock_trace._end_state == "Success" + assert mock_trace._end_state == StatusCode.OK From 84670a809a727af53f7774079474d753c771b74d Mon Sep 17 00:00:00 2001 From: Pratyush Shukla Date: Fri, 30 May 2025 01:37:44 +0530 Subject: [PATCH 08/13] heckin' ruff --- agentops/sdk/decorators/__init__.py | 1 + 1 file changed, 1 insertion(+) diff --git a/agentops/sdk/decorators/__init__.py b/agentops/sdk/decorators/__init__.py index f775b45d5..608b23908 100644 --- a/agentops/sdk/decorators/__init__.py +++ b/agentops/sdk/decorators/__init__.py @@ -20,6 +20,7 @@ tool = create_entity_decorator(SpanKind.TOOL) operation = task + # For backward compatibility: @session decorator calls @trace decorator @functools.wraps(trace) def session(*args, **kwargs): From 426b8b40e17fc28a11dc5adbddd441dfb88ea899 Mon Sep 17 00:00:00 2001 From: Pratyush Shukla Date: Fri, 30 May 2025 02:06:10 +0530 Subject: [PATCH 09/13] gh copilot said do it so i did --- agentops/sdk/core.py | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/agentops/sdk/core.py b/agentops/sdk/core.py index 189a03959..e287dc235 100644 --- a/agentops/sdk/core.py +++ b/agentops/sdk/core.py @@ -42,7 +42,17 @@ def __enter__(self) -> "TraceContext": return self def __exit__(self, exc_type: Optional[type], exc_val: Optional[Exception], exc_tb: Optional[Any]) -> bool: - """Exit the trace context and end the trace.""" + """Exit the trace context and end the trace. + + Automatically sets the trace status based on whether an exception occurred: + - If an exception is present, sets status to ERROR + - If no exception occurred, sets status to OK + + Returns: + False: Always returns False to propagate any exceptions that occurred + within the context manager block, following Python's + context manager protocol for proper exception handling. + """ if exc_type is not None: self._end_state = StatusCode.ERROR if exc_val: @@ -534,7 +544,9 @@ def _end_single_trace(self, trace_context: TraceContext, end_state: Union[Any, S trace_id = str(span.get_span_context().trace_id) # Convert TraceState enum to StatusCode if needed - if hasattr(end_state, "to_status_code"): + from agentops.enums import TraceState + + if isinstance(end_state, TraceState): # It's a TraceState enum state_str = str(end_state) elif isinstance(end_state, StatusCode): From de0e410c071f09578092bff96e62f729f6962684 Mon Sep 17 00:00:00 2001 From: Pratyush Shukla Date: Sat, 7 Jun 2025 00:41:20 +0530 Subject: [PATCH 10/13] global tracer everywher --- agentops/client/client.py | 8 ++++---- agentops/legacy/__init__.py | 10 +++++----- agentops/sdk/__init__.py | 4 ---- agentops/sdk/core.py | 6 +++--- tests/integration/test_session_concurrency.py | 2 +- tests/unit/sdk/test_internal_span_processor.py | 14 +++++++------- tests/unit/test_context_manager.py | 2 +- tests/unit/test_session.py | 16 ++++++++-------- tests/unit/test_session_legacy.py | 2 +- 9 files changed, 30 insertions(+), 34 deletions(-) diff --git a/agentops/client/client.py b/agentops/client/client.py index 1979e6c6e..0fe95b95c 100644 --- a/agentops/client/client.py +++ b/agentops/client/client.py @@ -24,7 +24,7 @@ def _end_init_trace_atexit(): if _client_init_trace_context is not None: logger.debug("Auto-ending client's init trace during shutdown.") try: - # Use TracingCore to end the trace directly + # Use global tracer to end the trace directly if tracer.initialized and _client_init_trace_context.span.is_recording(): tracer.end_trace(_client_init_trace_context, end_state="Shutdown") except Exception as e: @@ -104,7 +104,7 @@ def init(self, **kwargs: Any) -> None: # Return type updated to None try: response = self.api.v3.fetch_auth_token(self.config.api_key) if response is None: - # If auth fails, we cannot proceed with TracingCore initialization that depends on project_id + # If auth fails, we cannot proceed with tracer initialization that depends on project_id logger.error("Failed to fetch auth token. AgentOps SDK will not be initialized.") return None # Explicitly return None if auth fails except Exception as e: @@ -161,8 +161,8 @@ def init(self, **kwargs: Any) -> None: # Return type updated to None else: logger.error("Failed to start the auto-init trace.") - # Even if auto-start fails, core services up to TracingCore might be initialized. - # Set self.initialized to True if TracingCore is up, but return None. + # Even if auto-start fails, core services up to the tracer might be initialized. + # Set self.initialized to True if tracer is up, but return None. self._initialized = tracer.initialized return None # Failed to start trace diff --git a/agentops/legacy/__init__.py b/agentops/legacy/__init__.py index 7c7f787b6..d835c0877 100644 --- a/agentops/legacy/__init__.py +++ b/agentops/legacy/__init__.py @@ -65,7 +65,7 @@ def start_session( ) -> Session: """ @deprecated Use agentops.start_trace() instead. - Starts a legacy AgentOps session. Calls TracingCore.start_trace internally. + Starts a legacy AgentOps session. Calls tracer.start_trace internally. """ global _current_session, _current_trace_context @@ -89,7 +89,7 @@ def start_session( trace_context = tracer.start_trace(trace_name="session", tags=tags) if trace_context is None: - logger.error("Failed to start trace via TracingCore. Returning dummy session.") + logger.error("Failed to start trace via global tracer. Returning dummy session.") dummy_session = Session(None) _current_session = dummy_session _current_trace_context = None @@ -124,13 +124,13 @@ def _set_span_attributes(span: Any, attributes: Dict[str, Any]) -> None: def end_session(session_or_status: Any = None, **kwargs: Any) -> None: """ @deprecated Use agentops.end_trace() instead. - Ends a legacy AgentOps session. Calls TracingCore.end_trace internally. + Ends a legacy AgentOps session. Calls tracer.end_trace internally. Supports multiple calling patterns for backward compatibility. """ global _current_session, _current_trace_context if not tracer.initialized: - logger.debug("Ignoring end_session: TracingCore not initialized.") + logger.debug("Ignoring end_session: global tracer not initialized.") return target_trace_context: Optional[TraceContext] = None @@ -189,7 +189,7 @@ def end_session(session_or_status: Any = None, **kwargs: Any) -> None: def end_all_sessions() -> None: """@deprecated Ends all active sessions/traces.""" if not tracer.initialized: - logger.debug("Ignoring end_all_sessions: TracingCore not initialized.") + logger.debug("Ignoring end_all_sessions: global tracer not initialized.") return # Use the new end_trace functionality to end all active traces diff --git a/agentops/sdk/__init__.py b/agentops/sdk/__init__.py index 58802e366..bc2f6b6f9 100644 --- a/agentops/sdk/__init__.py +++ b/agentops/sdk/__init__.py @@ -5,9 +5,6 @@ for different types of operations in AI agent workflows. """ -# Import core components -from agentops.sdk.core import TracingCore - # Import decorators from agentops.sdk.decorators import agent, operation, session, task, workflow @@ -18,7 +15,6 @@ __all__ = [ # Core components - "TracingCore", "TracingConfig", # Decorators "session", diff --git a/agentops/sdk/core.py b/agentops/sdk/core.py index 6fac7e766..8874fd2b6 100644 --- a/agentops/sdk/core.py +++ b/agentops/sdk/core.py @@ -234,7 +234,7 @@ def config(self) -> TracingConfig: """Get the tracing configuration.""" if self._config is None: # This case should ideally not be reached if initialized properly - raise AgentOpsClientNotInitializedException("TracingCore config accessed before initialization.") + raise AgentOpsClientNotInitializedException("Tracer config accessed before initialization.") return self._config def shutdown(self) -> None: @@ -349,7 +349,7 @@ def start_trace( A TraceContext object containing the span and context token, or None if not initialized. """ if not self.initialized: - logger.warning("TracingCore not initialized. Cannot start trace.") + logger.warning("Global tracer not initialized. Cannot start trace.") return None # Build trace attributes @@ -392,7 +392,7 @@ def end_trace( end_state: The final state of the trace (e.g., "Success", "Failure", "Error"). """ if not self.initialized: - logger.warning("TracingCore not initialized. Cannot end trace.") + logger.warning("Global tracer not initialized. Cannot end trace.") return # Set default if not provided diff --git a/tests/integration/test_session_concurrency.py b/tests/integration/test_session_concurrency.py index c89203021..928dd972a 100644 --- a/tests/integration/test_session_concurrency.py +++ b/tests/integration/test_session_concurrency.py @@ -40,7 +40,7 @@ def setup_agentops(mock_api_key): mock_api.v3.fetch_auth_token.return_value = {"token": "mock_token", "project_id": "mock_project_id"} mock_api_client.return_value = mock_api - # Mock TracingCore to avoid actual initialization + # Mock global tracer to avoid actual initialization with patch("agentops.tracer") as mock_tracer: mock_tracer.initialized = True diff --git a/tests/unit/sdk/test_internal_span_processor.py b/tests/unit/sdk/test_internal_span_processor.py index f53e1ae12..ed76a3ff5 100644 --- a/tests/unit/sdk/test_internal_span_processor.py +++ b/tests/unit/sdk/test_internal_span_processor.py @@ -12,7 +12,7 @@ class TestURLLogging(unittest.TestCase): - """Tests for URL logging functionality in TracingCore.""" + """Tests for URL logging functionality in global tracer.""" def setUp(self): self.tracing_core = tracer @@ -21,7 +21,7 @@ def setUp(self): self.tracing_core._config = {"project_id": "test_project"} @patch("agentops.sdk.core.log_trace_url") - @patch("agentops.sdk.core.TracingCore.make_span") + @patch("agentops.sdk.core.tracer.make_span") def test_start_trace_logs_url(self, mock_make_span, mock_log_trace_url): """Test that start_trace logs the trace URL.""" # Create a mock span @@ -57,7 +57,7 @@ def test_end_trace_logs_url(self, mock_finalize_span, mock_log_trace_url): mock_log_trace_url.assert_called_once_with(mock_span, title="test_trace") @patch("agentops.sdk.core.log_trace_url") - @patch("agentops.sdk.core.TracingCore.make_span") + @patch("agentops.sdk.core.tracer.make_span") def test_start_trace_url_logging_failure_does_not_break_trace(self, mock_make_span, mock_log_trace_url): """Test that URL logging failure doesn't break trace creation.""" # Create a mock span @@ -100,7 +100,7 @@ def test_end_trace_url_logging_failure_does_not_break_trace(self, mock_finalize_ mock_log_trace_url.assert_called_once_with(mock_span, title="test_trace") @patch("agentops.sdk.core.log_trace_url") - @patch("agentops.sdk.core.TracingCore.make_span") + @patch("agentops.sdk.core.tracer.make_span") def test_start_trace_with_tags_logs_url(self, mock_make_span, mock_log_trace_url): """Test that start_trace with tags logs the trace URL.""" # Create a mock span @@ -128,7 +128,7 @@ def setUp(self): self.tracing_core._config = {"project_id": "test_project"} @patch("agentops.sdk.core.log_trace_url") - @patch("agentops.sdk.core.TracingCore.make_span") + @patch("agentops.sdk.core.tracer.make_span") @patch("agentops.sdk.core.tracer.finalize_span") def test_session_decorator_logs_url_on_start_and_end(self, mock_finalize_span, mock_make_span, mock_log_trace_url): """Test that session decorator logs URLs on both start and end.""" @@ -160,7 +160,7 @@ def test_function(): self.assertEqual(result, "test_result") @patch("agentops.sdk.core.log_trace_url") - @patch("agentops.sdk.core.TracingCore.make_span") + @patch("agentops.sdk.core.tracer.make_span") @patch("agentops.sdk.core.tracer.finalize_span") def test_session_decorator_with_default_name_logs_url(self, mock_finalize_span, mock_make_span, mock_log_trace_url): """Test that session decorator with default name logs URLs.""" @@ -191,7 +191,7 @@ def my_function(): self.assertEqual(result, "result") @patch("agentops.sdk.core.log_trace_url") - @patch("agentops.sdk.core.TracingCore.make_span") + @patch("agentops.sdk.core.tracer.make_span") @patch("agentops.sdk.core.tracer.finalize_span") def test_session_decorator_handles_url_logging_failure( self, mock_finalize_span, mock_make_span, mock_log_trace_url diff --git a/tests/unit/test_context_manager.py b/tests/unit/test_context_manager.py index db80bf977..3f2e612be 100644 --- a/tests/unit/test_context_manager.py +++ b/tests/unit/test_context_manager.py @@ -435,7 +435,7 @@ def test_edge_case_none_trace_context(self, mock_agentops_tracer, mock_core_trac @patch("agentops.sdk.core.tracer") @patch("agentops.tracer") def test_edge_case_tracing_core_not_initialized(self, mock_agentops_tracer, mock_core_tracer): - """Test behavior when TracingCore is not initialized""" + """Test behavior when global tracer is not initialized""" mock_agentops_tracer.initialized = False # Mock init to succeed but tracer still not initialized diff --git a/tests/unit/test_session.py b/tests/unit/test_session.py index cfcb2b903..4b40bf770 100644 --- a/tests/unit/test_session.py +++ b/tests/unit/test_session.py @@ -8,7 +8,7 @@ @pytest.fixture(scope="function") def mock_tracing_core(): - """Mock the TracingCore to avoid actual initialization""" + """Mock the global tracer to avoid actual initialization""" # Patch both the main location and where it's imported in client with ( patch("agentops.tracer") as mock_tracer, @@ -137,7 +137,7 @@ def test_start_trace_without_init(): # Reset client for test agentops._client = agentops.Client() - # Mock TracingCore to be uninitialized initially, then initialized after init + # Mock global tracer to be uninitialized initially, then initialized after init with ( patch("agentops.tracer") as mock_tracer, patch("agentops.client.client.tracer", mock_tracer), @@ -150,7 +150,7 @@ def test_start_trace_without_init(): with patch("agentops.init") as mock_init: def side_effect(): - # After init is called, mark TracingCore as initialized + # After init is called, mark global tracer as initialized mock_tracer.initialized = True mock_init.side_effect = side_effect @@ -172,7 +172,7 @@ def test_end_trace(mock_tracing_core, mock_trace_context): # End the trace agentops.end_trace(mock_trace_context, end_state="Success") - # Verify end_trace was called on TracingCore + # Verify end_trace was called on global tracer mock_tracing_core.end_trace.assert_called_once_with(trace_context=mock_trace_context, end_state="Success") @@ -233,7 +233,7 @@ def failing_function(): def test_legacy_start_session_compatibility(mock_tracing_core, mock_api_client, mock_trace_context, reset_client): - """Test that legacy start_session still works and calls TracingCore.start_trace""" + """Test that legacy start_session still works and calls tracer.start_trace""" import agentops from agentops.legacy import Session @@ -253,13 +253,13 @@ def test_legacy_start_session_compatibility(mock_tracing_core, mock_api_client, # Check that the session's trace_context has the expected properties assert session.trace_context is not None - # Verify that TracingCore.start_trace was called + # Verify that tracer.start_trace was called # Note: May be called multiple times due to initialization assert mock_tracing_core.start_trace.call_count >= 1 def test_legacy_end_session_compatibility(mock_tracing_core, mock_api_client, mock_trace_context, reset_client): - """Test that legacy end_session still works and calls TracingCore.end_trace""" + """Test that legacy end_session still works and calls tracer.end_trace""" import agentops from agentops.legacy import Session @@ -275,7 +275,7 @@ def test_legacy_end_session_compatibility(mock_tracing_core, mock_api_client, mo # End the session agentops.end_session(session) - # Verify that TracingCore.end_trace was called + # Verify that tracer.end_trace was called mock_tracing_core.end_trace.assert_called_once_with(mock_trace_context, end_state="Success") diff --git a/tests/unit/test_session_legacy.py b/tests/unit/test_session_legacy.py index 4b4b21aa9..e3b587f24 100644 --- a/tests/unit/test_session_legacy.py +++ b/tests/unit/test_session_legacy.py @@ -143,7 +143,7 @@ def test_crewai_kwargs_force_flush(): agentops.end_session(end_state="Success", end_state_reason="Test Finished", is_auto_end=True) # Explicitly ensure the core isn't already shut down for the test - assert tracer._initialized, "TracingCore should still be initialized" + assert tracer._initialized, "Global tracer should still be initialized" def test_crewai_task_instrumentation(instrumentation): From da8fac04a5bf357edd3223e164393984b9cea1e6 Mon Sep 17 00:00:00 2001 From: Pratyush Shukla Date: Sat, 7 Jun 2025 01:17:53 +0530 Subject: [PATCH 11/13] should fix linting --- agentops/instrumentation/__init__.py | 2 +- agentops/sdk/README.md | 8 ++++---- tests/benchmark/benchmark_init.py | 6 +++--- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/agentops/instrumentation/__init__.py b/agentops/instrumentation/__init__.py index b48475af9..96966a8ad 100644 --- a/agentops/instrumentation/__init__.py +++ b/agentops/instrumentation/__init__.py @@ -444,7 +444,7 @@ def instrument_one(loader: InstrumentorLoader) -> Optional[BaseInstrumentor]: instrumentor = loader.get_instance() try: - instrumentor.instrument(tracer_provider=TracingCore.get_instance()._provider) + instrumentor.instrument(tracer_provider=tracer._provider) logger.info( f"AgentOps: Successfully instrumented '{loader.class_name}' for package '{loader.package_name or loader.module_name}'." ) diff --git a/agentops/sdk/README.md b/agentops/sdk/README.md index b39eb52c0..e13fe6c87 100644 --- a/agentops/sdk/README.md +++ b/agentops/sdk/README.md @@ -176,11 +176,11 @@ flowchart TD ## Example Usage ```python -from agentops import Session, agent, tool -from agentops.sdk import TracingCore, TracingConfig +from agentops import Session, agent, tool, tracer +from agentops.sdk import TracingConfig -# Initialize the tracing core with a dedicated configuration -TracingCore.get_instance().initialize( +# Initialize the global tracer with a dedicated configuration +tracer.initialize( service_name="my-service", exporter_endpoint="https://my-exporter-endpoint.com", max_queue_size=1000, diff --git a/tests/benchmark/benchmark_init.py b/tests/benchmark/benchmark_init.py index 94dad2660..743df9e5d 100644 --- a/tests/benchmark/benchmark_init.py +++ b/tests/benchmark/benchmark_init.py @@ -2,13 +2,13 @@ """ -Benchmark script for measuring TracingCore initialization time. +Benchmark script for measuring global tracer initialization time. """ def run_benchmark(): """ - Run a benchmark of TracingCore initialization. + Run a benchmark of global tracer initialization. Returns: Dictionary with timing results @@ -41,6 +41,6 @@ def print_results(results): if __name__ == "__main__": - print("Running TracingCore benchmark...") + print("Running global tracer benchmark...") results = run_benchmark() print_results(results) From 01d6bf265fda1fc8cd58f8187da6c69c4cb12702 Mon Sep 17 00:00:00 2001 From: Pratyush Shukla Date: Sat, 7 Jun 2025 01:22:58 +0530 Subject: [PATCH 12/13] Convert end_state to string using helper function Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- agentops/sdk/core.py | 15 ++------------- 1 file changed, 2 insertions(+), 13 deletions(-) diff --git a/agentops/sdk/core.py b/agentops/sdk/core.py index 8874fd2b6..dcad5b6b3 100644 --- a/agentops/sdk/core.py +++ b/agentops/sdk/core.py @@ -434,19 +434,8 @@ def _end_single_trace(self, trace_context: TraceContext, end_state: Union[Any, S # Handle case where span is mocked or trace_id is not a valid integer trace_id = str(span.get_span_context().trace_id) - # Convert TraceState enum to StatusCode if needed - from agentops.enums import TraceState - - if isinstance(end_state, TraceState): - # It's a TraceState enum - state_str = str(end_state) - elif isinstance(end_state, StatusCode): - # It's already a StatusCode - state_str = str(end_state) - else: - # It's a string (legacy) - state_str = str(end_state) - + # Convert end_state to string using helper function + state_str = self._convert_end_state_to_string(end_state) logger.debug(f"Ending trace with span ID: {span.get_span_context().span_id}, end_state: {state_str}") try: From c045f76e02bcffa0ee326179eeb5b23388aff87d Mon Sep 17 00:00:00 2001 From: Pratyush Shukla Date: Sat, 7 Jun 2025 01:24:50 +0530 Subject: [PATCH 13/13] Revert "Convert end_state to string using helper function" --- agentops/sdk/core.py | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/agentops/sdk/core.py b/agentops/sdk/core.py index dcad5b6b3..8874fd2b6 100644 --- a/agentops/sdk/core.py +++ b/agentops/sdk/core.py @@ -434,8 +434,19 @@ def _end_single_trace(self, trace_context: TraceContext, end_state: Union[Any, S # Handle case where span is mocked or trace_id is not a valid integer trace_id = str(span.get_span_context().trace_id) - # Convert end_state to string using helper function - state_str = self._convert_end_state_to_string(end_state) + # Convert TraceState enum to StatusCode if needed + from agentops.enums import TraceState + + if isinstance(end_state, TraceState): + # It's a TraceState enum + state_str = str(end_state) + elif isinstance(end_state, StatusCode): + # It's already a StatusCode + state_str = str(end_state) + else: + # It's a string (legacy) + state_str = str(end_state) + logger.debug(f"Ending trace with span ID: {span.get_span_context().span_id}, end_state: {state_str}") try: