From bd804fa805e1e590f95300f01ad1edf49a05eb16 Mon Sep 17 00:00:00 2001 From: fenilfaldu Date: Thu, 29 May 2025 19:17:23 +0530 Subject: [PATCH 1/8] Implement concurrent.futures instrumentation for OpenTelemetry context propagation --- agentops/instrumentation/__init__.py | 37 +- .../concurrent_futures/__init__.py | 10 + .../concurrent_futures/instrumentation.py | 159 ++++++ .../sdk/test_concurrent_instrumentation.py | 475 ++++++++++++++++++ 4 files changed, 678 insertions(+), 3 deletions(-) create mode 100644 agentops/instrumentation/concurrent_futures/__init__.py create mode 100644 agentops/instrumentation/concurrent_futures/instrumentation.py create mode 100644 tests/unit/sdk/test_concurrent_instrumentation.py diff --git a/agentops/instrumentation/__init__.py b/agentops/instrumentation/__init__.py index d4e271f3d..121754933 100644 --- a/agentops/instrumentation/__init__.py +++ b/agentops/instrumentation/__init__.py @@ -66,9 +66,10 @@ def _uninstrument_providers(): def _should_instrument_package(package_name: str) -> bool: """ Determine if a package should be instrumented based on current state. - Handles special cases for agentic libraries and providers. + Handles special cases for agentic libraries, providers, and utility instrumentors. """ global _has_agentic_library + # If this is an agentic library, uninstrument all providers first if package_name in AGENTIC_LIBRARIES: _uninstrument_providers() @@ -78,6 +79,10 @@ def _should_instrument_package(package_name: str) -> bool: # Skip providers if an agentic library is already instrumented if package_name in PROVIDERS and _has_agentic_library: return False + + # Utility instrumentors are always enabled regardless of agentic library state + if package_name in UTILITY_INSTRUMENTORS: + return not _is_package_instrumented(package_name) # Skip if already instrumented if _is_package_instrumented(package_name): @@ -93,7 +98,10 @@ def _perform_instrumentation(package_name: str): return # Get the appropriate configuration for the package - config = PROVIDERS.get(package_name) or AGENTIC_LIBRARIES[package_name] + config = PROVIDERS.get(package_name) or AGENTIC_LIBRARIES.get(package_name) or UTILITY_INSTRUMENTORS.get(package_name) + if not config: + return + loader = InstrumentorLoader(**config) if loader.should_activate: @@ -143,6 +151,7 @@ def _import_monitor(name: str, globals_dict=None, locals_dict=None, fromlist=(), # Instrument all matching packages for package_to_check in packages_to_check: if package_to_check not in _instrumenting_packages and not _is_package_instrumented(package_to_check): + _instrumenting_packages.add(package_to_check) try: _perform_instrumentation(package_to_check) @@ -188,6 +197,22 @@ class InstrumentorConfig(TypedDict): "min_version": "0.1.0", "package_name": "google-genai", # Actual pip package name }, + # "mem0": { + # "module_name": "agentops.instrumentation.mem0", + # "class_name": "Mem0Instrumentor", + # "min_version": "0.1.10", + # "package_name": "mem0ai", # Actual pip package name + # }, +} + +# Configuration for utility instrumentors +UTILITY_INSTRUMENTORS: dict[str, InstrumentorConfig] = { + "concurrent.futures": { + "module_name": "agentops.instrumentation.concurrent_futures", + "class_name": "ConcurrentFuturesInstrumentor", + "min_version": "3.7.0", # Python 3.7+ (concurrent.futures is stdlib) + "package_name": "python", # Special case for stdlib modules + }, } # Configuration for supported agentic libraries @@ -211,7 +236,7 @@ class InstrumentorConfig(TypedDict): } # Combine all target packages for monitoring -TARGET_PACKAGES = set(PROVIDERS.keys()) | set(AGENTIC_LIBRARIES.keys()) +TARGET_PACKAGES = set(PROVIDERS.keys()) | set(AGENTIC_LIBRARIES.keys()) | set(UTILITY_INSTRUMENTORS.keys()) # Create a single instance of the manager # _manager = InstrumentationManager() # Removed @@ -238,6 +263,12 @@ def module(self) -> ModuleType: def should_activate(self) -> bool: """Check if the package is available and meets version requirements.""" try: + # Special case for stdlib modules (like concurrent.futures) + if self.package_name == "python": + import sys + python_version = f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}" + return Version(python_version) >= parse(self.min_version) + # Use explicit package_name if provided, otherwise derive from module_name if self.package_name: provider_name = self.package_name diff --git a/agentops/instrumentation/concurrent_futures/__init__.py b/agentops/instrumentation/concurrent_futures/__init__.py new file mode 100644 index 000000000..716b95f3f --- /dev/null +++ b/agentops/instrumentation/concurrent_futures/__init__.py @@ -0,0 +1,10 @@ +""" +Instrumentation for concurrent.futures module. + +This module provides automatic instrumentation for ThreadPoolExecutor to ensure +proper OpenTelemetry context propagation across thread boundaries. +""" + +from .instrumentation import ConcurrentFuturesInstrumentor + +__all__ = ["ConcurrentFuturesInstrumentor"] \ No newline at end of file diff --git a/agentops/instrumentation/concurrent_futures/instrumentation.py b/agentops/instrumentation/concurrent_futures/instrumentation.py new file mode 100644 index 000000000..bbd468e31 --- /dev/null +++ b/agentops/instrumentation/concurrent_futures/instrumentation.py @@ -0,0 +1,159 @@ +""" +OpenTelemetry Instrumentation for concurrent.futures module. + +This instrumentation automatically patches ThreadPoolExecutor to ensure proper +context propagation across thread boundaries, preventing "NEW TRACE DETECTED" issues. +""" + +import contextvars +import functools +import sys +from typing import Collection, Optional, Any, Callable +from concurrent.futures import ThreadPoolExecutor + +from opentelemetry.instrumentation.instrumentor import BaseInstrumentor +from opentelemetry.util._importlib_metadata import version + +from agentops.logging import logger + +# Store original methods to restore during uninstrumentation +_original_init = None +_original_submit = None + + +def _context_propagating_init(original_init): + """Wrap ThreadPoolExecutor.__init__ to set up context-aware initializer.""" + + @functools.wraps(original_init) + def wrapped_init(self, max_workers=None, thread_name_prefix='', initializer=None, initargs=()): + # Capture the current context when the executor is created + main_context = contextvars.copy_context() + + def context_aware_initializer(): + """Initializer that sets up the captured context in each worker thread.""" + logger.debug("[ConcurrentFuturesInstrumentor] Setting up context in worker thread") + + # Set the main context variables in this thread + for var, value in main_context.items(): + try: + var.set(value) + except Exception as e: + logger.debug(f"[ConcurrentFuturesInstrumentor] Could not set context var {var}: {e}") + + # Run user's initializer if provided + if initializer and callable(initializer): + try: + if initargs: + initializer(*initargs) + else: + initializer() + except Exception as e: + logger.error(f"[ConcurrentFuturesInstrumentor] Error in user initializer: {e}") + raise + + logger.debug("[ConcurrentFuturesInstrumentor] Worker thread context setup complete") + + # Create executor with context-aware initializer + prefix = f'AgentOps-{thread_name_prefix}' if thread_name_prefix else 'AgentOps-Thread' + + # Call original init with our context-aware initializer + original_init( + self, + max_workers=max_workers, + thread_name_prefix=prefix, + initializer=context_aware_initializer, + initargs=() # We handle initargs in our wrapper + ) + + logger.debug(f"[ConcurrentFuturesInstrumentor] ThreadPoolExecutor initialized with context propagation") + + return wrapped_init + + +def _context_propagating_submit(original_submit): + """Wrap ThreadPoolExecutor.submit to ensure context propagation.""" + + @functools.wraps(original_submit) + def wrapped_submit(self, func, *args, **kwargs): + # Log the submission + func_name = getattr(func, '__name__', str(func)) + logger.debug(f"[ConcurrentFuturesInstrumentor] Submitting function: {func_name}") + + # The context propagation is handled by the initializer, so we can submit normally + # But we can add additional logging or monitoring here if needed + return original_submit(self, func, *args, **kwargs) + + return wrapped_submit + + +class ConcurrentFuturesInstrumentor(BaseInstrumentor): + """ + Instrumentor for concurrent.futures module. + + This instrumentor patches ThreadPoolExecutor to automatically propagate + OpenTelemetry context to worker threads, ensuring all LLM calls and other + instrumented operations maintain proper trace context. + """ + + def instrumentation_dependencies(self) -> Collection[str]: + """Return a list of instrumentation dependencies.""" + return [] + + def _instrument(self, **kwargs): + """Instrument the concurrent.futures module.""" + global _original_init, _original_submit + + logger.debug("[ConcurrentFuturesInstrumentor] Starting instrumentation") + + # Store original methods + _original_init = ThreadPoolExecutor.__init__ + _original_submit = ThreadPoolExecutor.submit + + # Patch ThreadPoolExecutor methods + ThreadPoolExecutor.__init__ = _context_propagating_init(_original_init) + ThreadPoolExecutor.submit = _context_propagating_submit(_original_submit) + + logger.info("[ConcurrentFuturesInstrumentor] Successfully instrumented concurrent.futures.ThreadPoolExecutor") + + def _uninstrument(self, **kwargs): + """Uninstrument the concurrent.futures module.""" + global _original_init, _original_submit + + logger.debug("[ConcurrentFuturesInstrumentor] Starting uninstrumentation") + + # Restore original methods + if _original_init: + ThreadPoolExecutor.__init__ = _original_init + _original_init = None + + if _original_submit: + ThreadPoolExecutor.submit = _original_submit + _original_submit = None + + logger.info("[ConcurrentFuturesInstrumentor] Successfully uninstrumented concurrent.futures.ThreadPoolExecutor") + + @staticmethod + def instrument_module_directly(): + """ + Directly instrument the module without using the standard instrumentor interface. + + This can be called manually if automatic instrumentation is not desired. + """ + instrumentor = ConcurrentFuturesInstrumentor() + if not instrumentor.is_instrumented_by_opentelemetry: + instrumentor.instrument() + return True + return False + + @staticmethod + def uninstrument_module_directly(): + """ + Directly uninstrument the module. + + This can be called manually to remove instrumentation. + """ + instrumentor = ConcurrentFuturesInstrumentor() + if instrumentor.is_instrumented_by_opentelemetry: + instrumentor.uninstrument() + return True + return False \ No newline at end of file diff --git a/tests/unit/sdk/test_concurrent_instrumentation.py b/tests/unit/sdk/test_concurrent_instrumentation.py new file mode 100644 index 000000000..0dd2ac93b --- /dev/null +++ b/tests/unit/sdk/test_concurrent_instrumentation.py @@ -0,0 +1,475 @@ +""" +Unit tests for concurrent instrumentation and context propagation. + +This module tests the behavior of OpenTelemetry spans when using concurrent.futures.ThreadPoolExecutor, +specifically testing context propagation across thread boundaries. +""" + +import concurrent.futures +import contextvars +import time +import unittest +from unittest.mock import patch, MagicMock +import threading + +from opentelemetry import context, trace +from opentelemetry.sdk.trace import ReadableSpan, TracerProvider +from opentelemetry.sdk.trace.export import SimpleSpanProcessor +from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter + +from agentops.sdk.processors import InternalSpanProcessor + + +class IsolatedInstrumentationTester: + """ + A lighter-weight instrumentation tester that doesn't affect global state. + + This version creates an isolated tracer provider and doesn't shut down + the global tracing core, making it safer for use alongside other tests. + """ + + def __init__(self): + """Initialize with isolated tracer provider.""" + # Create isolated tracer provider and exporter + self.tracer_provider = TracerProvider() + self.memory_exporter = InMemorySpanExporter() + self.span_processor = SimpleSpanProcessor(self.memory_exporter) + self.tracer_provider.add_span_processor(self.span_processor) + + # Don't set as global provider - keep isolated + self.tracer = self.tracer_provider.get_tracer(__name__) + + def get_tracer(self): + """Get the isolated tracer.""" + return self.tracer + + def clear_spans(self): + """Clear all spans from the memory exporter.""" + self.span_processor.force_flush() + self.memory_exporter.clear() + + def get_finished_spans(self): + """Get all finished spans.""" + self.span_processor.force_flush() + return list(self.memory_exporter.get_finished_spans()) + + +class TestConcurrentInstrumentation(unittest.TestCase): + """Tests for concurrent instrumentation and context propagation.""" + + def setUp(self): + """Set up test environment with isolated instrumentation tester.""" + self.tester = IsolatedInstrumentationTester() + self.tracer = self.tester.get_tracer() + + def tearDown(self): + """Clean up test environment without affecting global state.""" + # Only clear our isolated spans + self.tester.clear_spans() + + def _create_simple_span(self, name: str, sleep_duration: float = 0.01) -> str: + """Helper to create a simple span and return its trace_id.""" + with self.tracer.start_as_current_span(name) as span: + time.sleep(sleep_duration) # Simulate work + return span.get_span_context().trace_id + + def _create_nested_spans(self, parent_name: str, child_name: str) -> tuple: + """Helper to create nested spans and return their trace_ids.""" + with self.tracer.start_as_current_span(parent_name) as parent_span: + parent_trace_id = parent_span.get_span_context().trace_id + time.sleep(0.01) + + with self.tracer.start_as_current_span(child_name) as child_span: + child_trace_id = child_span.get_span_context().trace_id + time.sleep(0.01) + + return parent_trace_id, child_trace_id + + def test_sequential_spans_same_trace(self): + """Test that sequential spans in the same thread share the same trace.""" + trace_id1 = self._create_simple_span("span1") + trace_id2 = self._create_simple_span("span2") + + # In sequential execution, spans should be independent (different traces) + spans = self.tester.get_finished_spans() + self.assertEqual(len(spans), 2) + + # Each span should be a root span (no parent) + for span in spans: + self.assertIsNone(span.parent) + + def test_nested_spans_same_trace(self): + """Test that nested spans share the same trace.""" + parent_trace_id, child_trace_id = self._create_nested_spans("parent", "child") + + # Nested spans should share the same trace + self.assertEqual(parent_trace_id, child_trace_id) + + spans = self.tester.get_finished_spans() + self.assertEqual(len(spans), 2) + + # Find parent and child spans + parent_spans = [s for s in spans if s.name == "parent"] + child_spans = [s for s in spans if s.name == "child"] + + self.assertEqual(len(parent_spans), 1) + self.assertEqual(len(child_spans), 1) + + parent_span = parent_spans[0] + child_span = child_spans[0] + + # Child should have parent as its parent + self.assertEqual(child_span.parent.span_id, parent_span.context.span_id) + + def test_threadpool_without_context_propagation_creates_separate_traces(self): + """Test that ThreadPoolExecutor without context propagation creates separate traces.""" + def worker_task(task_id: str) -> dict: + """Worker task that creates a span without context propagation.""" + with self.tracer.start_as_current_span(f"worker_task_{task_id}") as span: + time.sleep(0.01) # Simulate work + return { + "task_id": task_id, + "trace_id": span.get_span_context().trace_id, + "span_id": span.get_span_context().span_id, + "thread_id": threading.get_ident() + } + + # Create a parent span + with self.tracer.start_as_current_span("main_task") as main_span: + main_trace_id = main_span.get_span_context().trace_id + + # Execute tasks in thread pool WITHOUT context propagation + with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: + futures = [ + executor.submit(worker_task, f"task_{i}") + for i in range(3) + ] + results = [future.result() for future in concurrent.futures.as_completed(futures)] + + spans = self.tester.get_finished_spans() + self.assertEqual(len(spans), 4) # 1 main + 3 worker spans + + # Extract trace IDs from results + worker_trace_ids = [result["trace_id"] for result in results] + + # Each worker should have a different trace ID from the main span + for worker_trace_id in worker_trace_ids: + self.assertNotEqual(worker_trace_id, main_trace_id, + "Worker span should NOT share trace with main span (no context propagation)") + + # Worker spans should also be different from each other (separate traces) + unique_trace_ids = set(worker_trace_ids) + self.assertEqual(len(unique_trace_ids), 3, + "Each worker should create a separate trace") + + # Verify that worker spans have no parent (they are root spans) + worker_spans = [s for s in spans if s.name.startswith("worker_task_")] + for worker_span in worker_spans: + self.assertIsNone(worker_span.parent, + "Worker spans should be root spans without parent") + + def test_threadpool_with_manual_context_propagation_shares_trace(self): + """Test that ThreadPoolExecutor with manual context propagation shares the same trace.""" + def worker_task_with_context(task_info: tuple) -> dict: + """Worker task that restores context before creating spans.""" + task_id, ctx = task_info + + # Restore the context in this thread + token = context.attach(ctx) + try: + with self.tracer.start_as_current_span(f"worker_task_{task_id}") as span: + time.sleep(0.01) # Simulate work + return { + "task_id": task_id, + "trace_id": span.get_span_context().trace_id, + "span_id": span.get_span_context().span_id, + "thread_id": threading.get_ident(), + "parent_span_id": span.parent.span_id if span.parent else None + } + finally: + context.detach(token) + + # Create a parent span and capture its context + with self.tracer.start_as_current_span("main_task") as main_span: + main_trace_id = main_span.get_span_context().trace_id + main_span_id = main_span.get_span_context().span_id + current_context = context.get_current() + + # Execute tasks in thread pool WITH manual context propagation + with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: + futures = [ + executor.submit(worker_task_with_context, (f"task_{i}", current_context)) + for i in range(3) + ] + results = [future.result() for future in concurrent.futures.as_completed(futures)] + + spans = self.tester.get_finished_spans() + self.assertEqual(len(spans), 4) # 1 main + 3 worker spans + + # Extract trace IDs from results + worker_trace_ids = [result["trace_id"] for result in results] + + # All workers should share the same trace ID as the main span + for result in results: + self.assertEqual(result["trace_id"], main_trace_id, + f"Worker task {result['task_id']} should share trace with main span") + self.assertEqual(result["parent_span_id"], main_span_id, + f"Worker task {result['task_id']} should have main span as parent") + + # All worker trace IDs should be the same + unique_trace_ids = set(worker_trace_ids) + self.assertEqual(len(unique_trace_ids), 1, + "All workers should share the same trace") + + def test_threadpool_with_contextvars_copy_context_shares_trace(self): + """Test ThreadPoolExecutor with proper context propagation using attach/detach.""" + def worker_task_with_context_management(args) -> dict: + """Worker task that manages context properly.""" + task_id, ctx = args + # Use attach/detach for better control over context + token = context.attach(ctx) + try: + with self.tracer.start_as_current_span(f"worker_task_{task_id}") as span: + time.sleep(0.01) # Simulate work + return { + "task_id": task_id, + "trace_id": span.get_span_context().trace_id, + "span_id": span.get_span_context().span_id, + "thread_id": threading.get_ident(), + "parent_span_id": span.parent.span_id if span.parent else None + } + finally: + context.detach(token) + + # Create a parent span and capture context properly + with self.tracer.start_as_current_span("main_task") as main_span: + main_trace_id = main_span.get_span_context().trace_id + main_span_id = main_span.get_span_context().span_id + + # Get current context to propagate + current_context = context.get_current() + + with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: + futures = [ + executor.submit(worker_task_with_context_management, (f"task_{i}", current_context)) + for i in range(3) + ] + results = [future.result() for future in concurrent.futures.as_completed(futures)] + + spans = self.tester.get_finished_spans() + self.assertEqual(len(spans), 4) # 1 main + 3 worker spans + + # All workers should share the same trace ID as the main span + for result in results: + self.assertEqual(result["trace_id"], main_trace_id, + f"Worker task {result['task_id']} should share trace with main span") + self.assertEqual(result["parent_span_id"], main_span_id, + f"Worker task {result['task_id']} should have main span as parent") + + def test_mixed_sequential_and_concurrent_spans(self): + """Test a complex scenario with both sequential and concurrent spans.""" + results = [] + + # Sequential span 1 + trace_id1 = self._create_simple_span("sequential_1") + results.append(("sequential_1", trace_id1)) + + # Concurrent spans with context propagation + with self.tracer.start_as_current_span("concurrent_parent") as parent_span: + parent_trace_id = parent_span.get_span_context().trace_id + results.append(("concurrent_parent", parent_trace_id)) + + def worker_task_with_context(args) -> tuple: + task_id, ctx = args + token = context.attach(ctx) + try: + with self.tracer.start_as_current_span(f"concurrent_{task_id}") as span: + time.sleep(0.01) + return (f"concurrent_{task_id}", span.get_span_context().trace_id) + finally: + context.detach(token) + + current_context = context.get_current() + with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: + futures = [ + executor.submit(worker_task_with_context, (f"task_{i}", current_context)) + for i in range(2) + ] + concurrent_results = [future.result() for future in concurrent.futures.as_completed(futures)] + results.extend(concurrent_results) + + # Sequential span 2 + trace_id2 = self._create_simple_span("sequential_2") + results.append(("sequential_2", trace_id2)) + + spans = self.tester.get_finished_spans() + self.assertEqual(len(spans), 5) # 2 sequential + 1 parent + 2 concurrent + + # Verify trace relationships + sequential_spans = [r for r in results if r[0].startswith("sequential_")] + concurrent_spans = [r for r in results if r[0].startswith("concurrent_")] + + # Sequential spans should have different traces + sequential_trace_ids = [r[1] for r in sequential_spans] + self.assertEqual(len(set(sequential_trace_ids)), 2, + "Sequential spans should have different traces") + + # Concurrent spans should share the same trace + concurrent_trace_ids = [r[1] for r in concurrent_spans] + unique_concurrent_traces = set(concurrent_trace_ids) + self.assertEqual(len(unique_concurrent_traces), 1, + "All concurrent spans should share the same trace") + + def test_error_handling_in_concurrent_spans(self): + """Test error handling and span status in concurrent execution.""" + def worker_task_with_error_and_context(args) -> dict: + """Worker task that may raise an error.""" + task_id, ctx = args + token = context.attach(ctx) + try: + with self.tracer.start_as_current_span(f"worker_task_{task_id}") as span: + if task_id == "error_task": + span.set_status(trace.Status(trace.StatusCode.ERROR, "Simulated error")) + raise ValueError("Simulated error") + + time.sleep(0.01) + return { + "task_id": task_id, + "trace_id": span.get_span_context().trace_id, + "status": "success" + } + finally: + context.detach(token) + + with self.tracer.start_as_current_span("main_task") as main_span: + main_trace_id = main_span.get_span_context().trace_id + current_context = context.get_current() + + with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: + futures = [ + executor.submit(worker_task_with_error_and_context, ("success_task_1", current_context)), + executor.submit(worker_task_with_error_and_context, ("error_task", current_context)), + executor.submit(worker_task_with_error_and_context, ("success_task_2", current_context)), + ] + + results = [] + errors = [] + for future in concurrent.futures.as_completed(futures): + try: + results.append(future.result()) + except Exception as e: + errors.append(str(e)) + + spans = self.tester.get_finished_spans() + self.assertEqual(len(spans), 4) # 1 main + 3 worker spans + + # Should have 2 successful results and 1 error + self.assertEqual(len(results), 2) + self.assertEqual(len(errors), 1) + self.assertIn("Simulated error", errors[0]) + + # All spans should share the same trace + for result in results: + self.assertEqual(result["trace_id"], main_trace_id) + + # Find the error span and verify its status + error_spans = [s for s in spans if s.name == "worker_task_error_task"] + self.assertEqual(len(error_spans), 1) + + error_span = error_spans[0] + self.assertEqual(error_span.status.status_code, trace.StatusCode.ERROR) + + @patch('agentops.sdk.processors.logger') + def test_internal_span_processor_with_concurrent_spans(self, mock_logger): + """Test InternalSpanProcessor behavior with concurrent spans.""" + # Create an InternalSpanProcessor to test + processor = InternalSpanProcessor() + + # Add the processor to the tracer provider + self.tester.tracer_provider.add_span_processor(processor) + + try: + def worker_task_with_context(args) -> str: + task_id, ctx = args + token = context.attach(ctx) + try: + with self.tracer.start_as_current_span(f"openai.chat.completion_{task_id}") as span: + time.sleep(0.01) + return f"result_{task_id}" + finally: + context.detach(token) + + # Execute concurrent tasks + with self.tracer.start_as_current_span("main_session") as main_span: + current_context = context.get_current() + + with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: + futures = [ + executor.submit(worker_task_with_context, (f"task_{i}", current_context)) + for i in range(2) + ] + results = [future.result() for future in concurrent.futures.as_completed(futures)] + + # Verify results + self.assertEqual(len(results), 2) + + # The processor should have tracked root spans + # Note: With proper context propagation, all spans should belong to the same trace + spans = self.tester.get_finished_spans() + + # Verify that debug logging would have been called + # (The processor tracks root spans and logs when they end) + self.assertTrue(mock_logger.debug.called) + + finally: + # Clean up the processor to avoid affecting other tests + try: + processor.shutdown() + except Exception: + pass + + def test_performance_impact_of_context_propagation(self): + """Test the performance impact of different context propagation methods.""" + import timeit + + def without_context_propagation(): + def worker(): + with self.tracer.start_as_current_span("test_span"): + time.sleep(0.001) + + with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: + futures = [executor.submit(worker) for _ in range(4)] + [f.result() for f in futures] + + def with_context_propagation(): + def worker_with_context(ctx): + token = context.attach(ctx) + try: + with self.tracer.start_as_current_span("test_span"): + time.sleep(0.001) + finally: + context.detach(token) + + current_context = context.get_current() + with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: + futures = [executor.submit(worker_with_context, current_context) for _ in range(4)] + [f.result() for f in futures] + + # Clear spans before performance test + self.tester.clear_spans() + + # Measure timing (just to ensure context propagation doesn't break anything) + time_without = timeit.timeit(without_context_propagation, number=1) + self.tester.clear_spans() + + time_with = timeit.timeit(with_context_propagation, number=1) + self.tester.clear_spans() + + # Context propagation should not cause significant performance degradation + # This is a sanity check rather than a strict performance requirement + self.assertGreater(time_with * 10, time_without, + "Context propagation should not cause extreme performance degradation") + + +if __name__ == "__main__": + unittest.main() \ No newline at end of file From cfb3619d6bea3a08d7da1499179b727e9f3f431f Mon Sep 17 00:00:00 2001 From: fenilfaldu Date: Thu, 29 May 2025 19:39:37 +0530 Subject: [PATCH 2/8] ruff checks :) --- agentops/instrumentation/__init__.py | 14 ++-- .../concurrent_futures/__init__.py | 2 +- .../concurrent_futures/instrumentation.py | 72 +++++++++---------- agentops/sdk/decorators/__init__.py | 1 + 4 files changed, 45 insertions(+), 44 deletions(-) diff --git a/agentops/instrumentation/__init__.py b/agentops/instrumentation/__init__.py index 121754933..54e4b1c10 100644 --- a/agentops/instrumentation/__init__.py +++ b/agentops/instrumentation/__init__.py @@ -69,7 +69,7 @@ def _should_instrument_package(package_name: str) -> bool: Handles special cases for agentic libraries, providers, and utility instrumentors. """ global _has_agentic_library - + # If this is an agentic library, uninstrument all providers first if package_name in AGENTIC_LIBRARIES: _uninstrument_providers() @@ -79,7 +79,7 @@ def _should_instrument_package(package_name: str) -> bool: # Skip providers if an agentic library is already instrumented if package_name in PROVIDERS and _has_agentic_library: return False - + # Utility instrumentors are always enabled regardless of agentic library state if package_name in UTILITY_INSTRUMENTORS: return not _is_package_instrumented(package_name) @@ -98,10 +98,12 @@ def _perform_instrumentation(package_name: str): return # Get the appropriate configuration for the package - config = PROVIDERS.get(package_name) or AGENTIC_LIBRARIES.get(package_name) or UTILITY_INSTRUMENTORS.get(package_name) + config = ( + PROVIDERS.get(package_name) or AGENTIC_LIBRARIES.get(package_name) or UTILITY_INSTRUMENTORS.get(package_name) + ) if not config: return - + loader = InstrumentorLoader(**config) if loader.should_activate: @@ -151,7 +153,6 @@ def _import_monitor(name: str, globals_dict=None, locals_dict=None, fromlist=(), # Instrument all matching packages for package_to_check in packages_to_check: if package_to_check not in _instrumenting_packages and not _is_package_instrumented(package_to_check): - _instrumenting_packages.add(package_to_check) try: _perform_instrumentation(package_to_check) @@ -266,9 +267,10 @@ def should_activate(self) -> bool: # Special case for stdlib modules (like concurrent.futures) if self.package_name == "python": import sys + python_version = f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}" return Version(python_version) >= parse(self.min_version) - + # Use explicit package_name if provided, otherwise derive from module_name if self.package_name: provider_name = self.package_name diff --git a/agentops/instrumentation/concurrent_futures/__init__.py b/agentops/instrumentation/concurrent_futures/__init__.py index 716b95f3f..943fd5b0b 100644 --- a/agentops/instrumentation/concurrent_futures/__init__.py +++ b/agentops/instrumentation/concurrent_futures/__init__.py @@ -7,4 +7,4 @@ from .instrumentation import ConcurrentFuturesInstrumentor -__all__ = ["ConcurrentFuturesInstrumentor"] \ No newline at end of file +__all__ = ["ConcurrentFuturesInstrumentor"] diff --git a/agentops/instrumentation/concurrent_futures/instrumentation.py b/agentops/instrumentation/concurrent_futures/instrumentation.py index bbd468e31..3ed4caee3 100644 --- a/agentops/instrumentation/concurrent_futures/instrumentation.py +++ b/agentops/instrumentation/concurrent_futures/instrumentation.py @@ -7,12 +7,10 @@ import contextvars import functools -import sys -from typing import Collection, Optional, Any, Callable +from typing import Collection from concurrent.futures import ThreadPoolExecutor from opentelemetry.instrumentation.instrumentor import BaseInstrumentor -from opentelemetry.util._importlib_metadata import version from agentops.logging import logger @@ -23,23 +21,23 @@ def _context_propagating_init(original_init): """Wrap ThreadPoolExecutor.__init__ to set up context-aware initializer.""" - + @functools.wraps(original_init) - def wrapped_init(self, max_workers=None, thread_name_prefix='', initializer=None, initargs=()): + def wrapped_init(self, max_workers=None, thread_name_prefix="", initializer=None, initargs=()): # Capture the current context when the executor is created main_context = contextvars.copy_context() - + def context_aware_initializer(): """Initializer that sets up the captured context in each worker thread.""" logger.debug("[ConcurrentFuturesInstrumentor] Setting up context in worker thread") - + # Set the main context variables in this thread for var, value in main_context.items(): try: var.set(value) except Exception as e: logger.debug(f"[ConcurrentFuturesInstrumentor] Could not set context var {var}: {e}") - + # Run user's initializer if provided if initializer and callable(initializer): try: @@ -50,93 +48,93 @@ def context_aware_initializer(): except Exception as e: logger.error(f"[ConcurrentFuturesInstrumentor] Error in user initializer: {e}") raise - + logger.debug("[ConcurrentFuturesInstrumentor] Worker thread context setup complete") - + # Create executor with context-aware initializer - prefix = f'AgentOps-{thread_name_prefix}' if thread_name_prefix else 'AgentOps-Thread' - + prefix = f"AgentOps-{thread_name_prefix}" if thread_name_prefix else "AgentOps-Thread" + # Call original init with our context-aware initializer original_init( self, max_workers=max_workers, thread_name_prefix=prefix, initializer=context_aware_initializer, - initargs=() # We handle initargs in our wrapper + initargs=(), # We handle initargs in our wrapper ) - - logger.debug(f"[ConcurrentFuturesInstrumentor] ThreadPoolExecutor initialized with context propagation") - + + logger.debug("[ConcurrentFuturesInstrumentor] ThreadPoolExecutor initialized with context propagation") + return wrapped_init def _context_propagating_submit(original_submit): """Wrap ThreadPoolExecutor.submit to ensure context propagation.""" - + @functools.wraps(original_submit) def wrapped_submit(self, func, *args, **kwargs): # Log the submission - func_name = getattr(func, '__name__', str(func)) + func_name = getattr(func, "__name__", str(func)) logger.debug(f"[ConcurrentFuturesInstrumentor] Submitting function: {func_name}") - + # The context propagation is handled by the initializer, so we can submit normally # But we can add additional logging or monitoring here if needed return original_submit(self, func, *args, **kwargs) - + return wrapped_submit class ConcurrentFuturesInstrumentor(BaseInstrumentor): """ Instrumentor for concurrent.futures module. - + This instrumentor patches ThreadPoolExecutor to automatically propagate OpenTelemetry context to worker threads, ensuring all LLM calls and other instrumented operations maintain proper trace context. """ - + def instrumentation_dependencies(self) -> Collection[str]: """Return a list of instrumentation dependencies.""" return [] - + def _instrument(self, **kwargs): """Instrument the concurrent.futures module.""" global _original_init, _original_submit - + logger.debug("[ConcurrentFuturesInstrumentor] Starting instrumentation") - + # Store original methods _original_init = ThreadPoolExecutor.__init__ _original_submit = ThreadPoolExecutor.submit - + # Patch ThreadPoolExecutor methods ThreadPoolExecutor.__init__ = _context_propagating_init(_original_init) ThreadPoolExecutor.submit = _context_propagating_submit(_original_submit) - + logger.info("[ConcurrentFuturesInstrumentor] Successfully instrumented concurrent.futures.ThreadPoolExecutor") - + def _uninstrument(self, **kwargs): """Uninstrument the concurrent.futures module.""" global _original_init, _original_submit - + logger.debug("[ConcurrentFuturesInstrumentor] Starting uninstrumentation") - + # Restore original methods if _original_init: ThreadPoolExecutor.__init__ = _original_init _original_init = None - + if _original_submit: ThreadPoolExecutor.submit = _original_submit _original_submit = None - + logger.info("[ConcurrentFuturesInstrumentor] Successfully uninstrumented concurrent.futures.ThreadPoolExecutor") - + @staticmethod def instrument_module_directly(): """ Directly instrument the module without using the standard instrumentor interface. - + This can be called manually if automatic instrumentation is not desired. """ instrumentor = ConcurrentFuturesInstrumentor() @@ -144,16 +142,16 @@ def instrument_module_directly(): instrumentor.instrument() return True return False - + @staticmethod def uninstrument_module_directly(): """ Directly uninstrument the module. - + This can be called manually to remove instrumentation. """ instrumentor = ConcurrentFuturesInstrumentor() if instrumentor.is_instrumented_by_opentelemetry: instrumentor.uninstrument() return True - return False \ No newline at end of file + return False diff --git a/agentops/sdk/decorators/__init__.py b/agentops/sdk/decorators/__init__.py index f775b45d5..608b23908 100644 --- a/agentops/sdk/decorators/__init__.py +++ b/agentops/sdk/decorators/__init__.py @@ -20,6 +20,7 @@ tool = create_entity_decorator(SpanKind.TOOL) operation = task + # For backward compatibility: @session decorator calls @trace decorator @functools.wraps(trace) def session(*args, **kwargs): From ea8401fffc1ac204c500cf1711a52a48a1550ba5 Mon Sep 17 00:00:00 2001 From: fenilfaldu Date: Thu, 29 May 2025 19:40:49 +0530 Subject: [PATCH 3/8] ruff check again :( --- .../sdk/test_concurrent_instrumentation.py | 179 +++++++++--------- 1 file changed, 85 insertions(+), 94 deletions(-) diff --git a/tests/unit/sdk/test_concurrent_instrumentation.py b/tests/unit/sdk/test_concurrent_instrumentation.py index 0dd2ac93b..d2f2359ba 100644 --- a/tests/unit/sdk/test_concurrent_instrumentation.py +++ b/tests/unit/sdk/test_concurrent_instrumentation.py @@ -6,14 +6,13 @@ """ import concurrent.futures -import contextvars import time import unittest -from unittest.mock import patch, MagicMock +from unittest.mock import patch import threading from opentelemetry import context, trace -from opentelemetry.sdk.trace import ReadableSpan, TracerProvider +from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import SimpleSpanProcessor from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter @@ -23,11 +22,11 @@ class IsolatedInstrumentationTester: """ A lighter-weight instrumentation tester that doesn't affect global state. - + This version creates an isolated tracer provider and doesn't shut down the global tracing core, making it safer for use alongside other tests. """ - + def __init__(self): """Initialize with isolated tracer provider.""" # Create isolated tracer provider and exporter @@ -35,19 +34,19 @@ def __init__(self): self.memory_exporter = InMemorySpanExporter() self.span_processor = SimpleSpanProcessor(self.memory_exporter) self.tracer_provider.add_span_processor(self.span_processor) - + # Don't set as global provider - keep isolated self.tracer = self.tracer_provider.get_tracer(__name__) - + def get_tracer(self): """Get the isolated tracer.""" return self.tracer - + def clear_spans(self): """Clear all spans from the memory exporter.""" self.span_processor.force_flush() self.memory_exporter.clear() - + def get_finished_spans(self): """Get all finished spans.""" self.span_processor.force_flush() @@ -78,22 +77,20 @@ def _create_nested_spans(self, parent_name: str, child_name: str) -> tuple: with self.tracer.start_as_current_span(parent_name) as parent_span: parent_trace_id = parent_span.get_span_context().trace_id time.sleep(0.01) - + with self.tracer.start_as_current_span(child_name) as child_span: child_trace_id = child_span.get_span_context().trace_id time.sleep(0.01) - + return parent_trace_id, child_trace_id def test_sequential_spans_same_trace(self): """Test that sequential spans in the same thread share the same trace.""" - trace_id1 = self._create_simple_span("span1") - trace_id2 = self._create_simple_span("span2") - + # In sequential execution, spans should be independent (different traces) spans = self.tester.get_finished_spans() self.assertEqual(len(spans), 2) - + # Each span should be a root span (no parent) for span in spans: self.assertIsNone(span.parent) @@ -101,28 +98,29 @@ def test_sequential_spans_same_trace(self): def test_nested_spans_same_trace(self): """Test that nested spans share the same trace.""" parent_trace_id, child_trace_id = self._create_nested_spans("parent", "child") - + # Nested spans should share the same trace self.assertEqual(parent_trace_id, child_trace_id) - + spans = self.tester.get_finished_spans() self.assertEqual(len(spans), 2) - + # Find parent and child spans parent_spans = [s for s in spans if s.name == "parent"] child_spans = [s for s in spans if s.name == "child"] - + self.assertEqual(len(parent_spans), 1) self.assertEqual(len(child_spans), 1) - + parent_span = parent_spans[0] child_span = child_spans[0] - + # Child should have parent as its parent self.assertEqual(child_span.parent.span_id, parent_span.context.span_id) def test_threadpool_without_context_propagation_creates_separate_traces(self): """Test that ThreadPoolExecutor without context propagation creates separate traces.""" + def worker_task(task_id: str) -> dict: """Worker task that creates a span without context propagation.""" with self.tracer.start_as_current_span(f"worker_task_{task_id}") as span: @@ -131,19 +129,16 @@ def worker_task(task_id: str) -> dict: "task_id": task_id, "trace_id": span.get_span_context().trace_id, "span_id": span.get_span_context().span_id, - "thread_id": threading.get_ident() + "thread_id": threading.get_ident(), } # Create a parent span with self.tracer.start_as_current_span("main_task") as main_span: main_trace_id = main_span.get_span_context().trace_id - + # Execute tasks in thread pool WITHOUT context propagation with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: - futures = [ - executor.submit(worker_task, f"task_{i}") - for i in range(3) - ] + futures = [executor.submit(worker_task, f"task_{i}") for i in range(3)] results = [future.result() for future in concurrent.futures.as_completed(futures)] spans = self.tester.get_finished_spans() @@ -151,29 +146,31 @@ def worker_task(task_id: str) -> dict: # Extract trace IDs from results worker_trace_ids = [result["trace_id"] for result in results] - + # Each worker should have a different trace ID from the main span for worker_trace_id in worker_trace_ids: - self.assertNotEqual(worker_trace_id, main_trace_id, - "Worker span should NOT share trace with main span (no context propagation)") + self.assertNotEqual( + worker_trace_id, + main_trace_id, + "Worker span should NOT share trace with main span (no context propagation)", + ) # Worker spans should also be different from each other (separate traces) unique_trace_ids = set(worker_trace_ids) - self.assertEqual(len(unique_trace_ids), 3, - "Each worker should create a separate trace") + self.assertEqual(len(unique_trace_ids), 3, "Each worker should create a separate trace") # Verify that worker spans have no parent (they are root spans) worker_spans = [s for s in spans if s.name.startswith("worker_task_")] for worker_span in worker_spans: - self.assertIsNone(worker_span.parent, - "Worker spans should be root spans without parent") + self.assertIsNone(worker_span.parent, "Worker spans should be root spans without parent") def test_threadpool_with_manual_context_propagation_shares_trace(self): """Test that ThreadPoolExecutor with manual context propagation shares the same trace.""" + def worker_task_with_context(task_info: tuple) -> dict: """Worker task that restores context before creating spans.""" task_id, ctx = task_info - + # Restore the context in this thread token = context.attach(ctx) try: @@ -184,7 +181,7 @@ def worker_task_with_context(task_info: tuple) -> dict: "trace_id": span.get_span_context().trace_id, "span_id": span.get_span_context().span_id, "thread_id": threading.get_ident(), - "parent_span_id": span.parent.span_id if span.parent else None + "parent_span_id": span.parent.span_id if span.parent else None, } finally: context.detach(token) @@ -194,13 +191,10 @@ def worker_task_with_context(task_info: tuple) -> dict: main_trace_id = main_span.get_span_context().trace_id main_span_id = main_span.get_span_context().span_id current_context = context.get_current() - + # Execute tasks in thread pool WITH manual context propagation with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: - futures = [ - executor.submit(worker_task_with_context, (f"task_{i}", current_context)) - for i in range(3) - ] + futures = [executor.submit(worker_task_with_context, (f"task_{i}", current_context)) for i in range(3)] results = [future.result() for future in concurrent.futures.as_completed(futures)] spans = self.tester.get_finished_spans() @@ -208,21 +202,25 @@ def worker_task_with_context(task_info: tuple) -> dict: # Extract trace IDs from results worker_trace_ids = [result["trace_id"] for result in results] - + # All workers should share the same trace ID as the main span for result in results: - self.assertEqual(result["trace_id"], main_trace_id, - f"Worker task {result['task_id']} should share trace with main span") - self.assertEqual(result["parent_span_id"], main_span_id, - f"Worker task {result['task_id']} should have main span as parent") + self.assertEqual( + result["trace_id"], main_trace_id, f"Worker task {result['task_id']} should share trace with main span" + ) + self.assertEqual( + result["parent_span_id"], + main_span_id, + f"Worker task {result['task_id']} should have main span as parent", + ) # All worker trace IDs should be the same unique_trace_ids = set(worker_trace_ids) - self.assertEqual(len(unique_trace_ids), 1, - "All workers should share the same trace") + self.assertEqual(len(unique_trace_ids), 1, "All workers should share the same trace") def test_threadpool_with_contextvars_copy_context_shares_trace(self): """Test ThreadPoolExecutor with proper context propagation using attach/detach.""" + def worker_task_with_context_management(args) -> dict: """Worker task that manages context properly.""" task_id, ctx = args @@ -236,7 +234,7 @@ def worker_task_with_context_management(args) -> dict: "trace_id": span.get_span_context().trace_id, "span_id": span.get_span_context().span_id, "thread_id": threading.get_ident(), - "parent_span_id": span.parent.span_id if span.parent else None + "parent_span_id": span.parent.span_id if span.parent else None, } finally: context.detach(token) @@ -245,10 +243,10 @@ def worker_task_with_context_management(args) -> dict: with self.tracer.start_as_current_span("main_task") as main_span: main_trace_id = main_span.get_span_context().trace_id main_span_id = main_span.get_span_context().span_id - + # Get current context to propagate current_context = context.get_current() - + with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: futures = [ executor.submit(worker_task_with_context_management, (f"task_{i}", current_context)) @@ -261,10 +259,14 @@ def worker_task_with_context_management(args) -> dict: # All workers should share the same trace ID as the main span for result in results: - self.assertEqual(result["trace_id"], main_trace_id, - f"Worker task {result['task_id']} should share trace with main span") - self.assertEqual(result["parent_span_id"], main_span_id, - f"Worker task {result['task_id']} should have main span as parent") + self.assertEqual( + result["trace_id"], main_trace_id, f"Worker task {result['task_id']} should share trace with main span" + ) + self.assertEqual( + result["parent_span_id"], + main_span_id, + f"Worker task {result['task_id']} should have main span as parent", + ) def test_mixed_sequential_and_concurrent_spans(self): """Test a complex scenario with both sequential and concurrent spans.""" @@ -291,10 +293,7 @@ def worker_task_with_context(args) -> tuple: current_context = context.get_current() with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: - futures = [ - executor.submit(worker_task_with_context, (f"task_{i}", current_context)) - for i in range(2) - ] + futures = [executor.submit(worker_task_with_context, (f"task_{i}", current_context)) for i in range(2)] concurrent_results = [future.result() for future in concurrent.futures.as_completed(futures)] results.extend(concurrent_results) @@ -311,17 +310,16 @@ def worker_task_with_context(args) -> tuple: # Sequential spans should have different traces sequential_trace_ids = [r[1] for r in sequential_spans] - self.assertEqual(len(set(sequential_trace_ids)), 2, - "Sequential spans should have different traces") + self.assertEqual(len(set(sequential_trace_ids)), 2, "Sequential spans should have different traces") # Concurrent spans should share the same trace concurrent_trace_ids = [r[1] for r in concurrent_spans] unique_concurrent_traces = set(concurrent_trace_ids) - self.assertEqual(len(unique_concurrent_traces), 1, - "All concurrent spans should share the same trace") + self.assertEqual(len(unique_concurrent_traces), 1, "All concurrent spans should share the same trace") def test_error_handling_in_concurrent_spans(self): """Test error handling and span status in concurrent execution.""" + def worker_task_with_error_and_context(args) -> dict: """Worker task that may raise an error.""" task_id, ctx = args @@ -331,13 +329,9 @@ def worker_task_with_error_and_context(args) -> dict: if task_id == "error_task": span.set_status(trace.Status(trace.StatusCode.ERROR, "Simulated error")) raise ValueError("Simulated error") - + time.sleep(0.01) - return { - "task_id": task_id, - "trace_id": span.get_span_context().trace_id, - "status": "success" - } + return {"task_id": task_id, "trace_id": span.get_span_context().trace_id, "status": "success"} finally: context.detach(token) @@ -351,7 +345,7 @@ def worker_task_with_error_and_context(args) -> dict: executor.submit(worker_task_with_error_and_context, ("error_task", current_context)), executor.submit(worker_task_with_error_and_context, ("success_task_2", current_context)), ] - + results = [] errors = [] for future in concurrent.futures.as_completed(futures): @@ -375,52 +369,48 @@ def worker_task_with_error_and_context(args) -> dict: # Find the error span and verify its status error_spans = [s for s in spans if s.name == "worker_task_error_task"] self.assertEqual(len(error_spans), 1) - + error_span = error_spans[0] self.assertEqual(error_span.status.status_code, trace.StatusCode.ERROR) - @patch('agentops.sdk.processors.logger') + @patch("agentops.sdk.processors.logger") def test_internal_span_processor_with_concurrent_spans(self, mock_logger): """Test InternalSpanProcessor behavior with concurrent spans.""" # Create an InternalSpanProcessor to test processor = InternalSpanProcessor() - + # Add the processor to the tracer provider self.tester.tracer_provider.add_span_processor(processor) - + try: + def worker_task_with_context(args) -> str: task_id, ctx = args token = context.attach(ctx) try: - with self.tracer.start_as_current_span(f"openai.chat.completion_{task_id}") as span: + with self.tracer.start_as_current_span(f"openai.chat.completion_{task_id}"): time.sleep(0.01) return f"result_{task_id}" finally: context.detach(token) # Execute concurrent tasks - with self.tracer.start_as_current_span("main_session") as main_span: + with self.tracer.start_as_current_span("main_session"): current_context = context.get_current() - + with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: futures = [ - executor.submit(worker_task_with_context, (f"task_{i}", current_context)) - for i in range(2) + executor.submit(worker_task_with_context, (f"task_{i}", current_context)) for i in range(2) ] results = [future.result() for future in concurrent.futures.as_completed(futures)] # Verify results self.assertEqual(len(results), 2) - - # The processor should have tracked root spans - # Note: With proper context propagation, all spans should belong to the same trace - spans = self.tester.get_finished_spans() - + # Verify that debug logging would have been called # (The processor tracks root spans and logs when they end) self.assertTrue(mock_logger.debug.called) - + finally: # Clean up the processor to avoid affecting other tests try: @@ -431,12 +421,12 @@ def worker_task_with_context(args) -> str: def test_performance_impact_of_context_propagation(self): """Test the performance impact of different context propagation methods.""" import timeit - + def without_context_propagation(): def worker(): with self.tracer.start_as_current_span("test_span"): time.sleep(0.001) - + with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: futures = [executor.submit(worker) for _ in range(4)] [f.result() for f in futures] @@ -449,7 +439,7 @@ def worker_with_context(ctx): time.sleep(0.001) finally: context.detach(token) - + current_context = context.get_current() with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: futures = [executor.submit(worker_with_context, current_context) for _ in range(4)] @@ -457,19 +447,20 @@ def worker_with_context(ctx): # Clear spans before performance test self.tester.clear_spans() - + # Measure timing (just to ensure context propagation doesn't break anything) time_without = timeit.timeit(without_context_propagation, number=1) self.tester.clear_spans() - + time_with = timeit.timeit(with_context_propagation, number=1) self.tester.clear_spans() - + # Context propagation should not cause significant performance degradation # This is a sanity check rather than a strict performance requirement - self.assertGreater(time_with * 10, time_without, - "Context propagation should not cause extreme performance degradation") + self.assertGreater( + time_with * 10, time_without, "Context propagation should not cause extreme performance degradation" + ) if __name__ == "__main__": - unittest.main() \ No newline at end of file + unittest.main() From d17f14e77356b289bbdc34ebaf4507d88564f429 Mon Sep 17 00:00:00 2001 From: fenilfaldu Date: Thu, 29 May 2025 19:46:15 +0530 Subject: [PATCH 4/8] damn ruff +_+ --- tests/unit/sdk/test_concurrent_instrumentation.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/unit/sdk/test_concurrent_instrumentation.py b/tests/unit/sdk/test_concurrent_instrumentation.py index d2f2359ba..a702a1307 100644 --- a/tests/unit/sdk/test_concurrent_instrumentation.py +++ b/tests/unit/sdk/test_concurrent_instrumentation.py @@ -86,6 +86,9 @@ def _create_nested_spans(self, parent_name: str, child_name: str) -> tuple: def test_sequential_spans_same_trace(self): """Test that sequential spans in the same thread share the same trace.""" + self._create_simple_span("span1") + self._create_simple_span("span2") + # In sequential execution, spans should be independent (different traces) spans = self.tester.get_finished_spans() From 4c2faa2d74b73faa70f13837dec0405d7cc44788 Mon Sep 17 00:00:00 2001 From: fenilfaldu Date: Thu, 29 May 2025 19:48:16 +0530 Subject: [PATCH 5/8] heck ruff again --- tests/unit/sdk/test_concurrent_instrumentation.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/unit/sdk/test_concurrent_instrumentation.py b/tests/unit/sdk/test_concurrent_instrumentation.py index a702a1307..14f4114f4 100644 --- a/tests/unit/sdk/test_concurrent_instrumentation.py +++ b/tests/unit/sdk/test_concurrent_instrumentation.py @@ -88,7 +88,6 @@ def test_sequential_spans_same_trace(self): """Test that sequential spans in the same thread share the same trace.""" self._create_simple_span("span1") self._create_simple_span("span2") - # In sequential execution, spans should be independent (different traces) spans = self.tester.get_finished_spans() From 0f09692d3c414b5093f032636c6c0f6bb9b0da45 Mon Sep 17 00:00:00 2001 From: fenilfaldu Date: Tue, 3 Jun 2025 04:49:03 +0530 Subject: [PATCH 6/8] constants update --- agentops/instrumentation/__init__.py | 145 +++++++++++++-------------- 1 file changed, 69 insertions(+), 76 deletions(-) diff --git a/agentops/instrumentation/__init__.py b/agentops/instrumentation/__init__.py index 54e4b1c10..c3619b3c6 100644 --- a/agentops/instrumentation/__init__.py +++ b/agentops/instrumentation/__init__.py @@ -32,6 +32,75 @@ from agentops.sdk.core import TracingCore +# Define the structure for instrumentor configurations +class InstrumentorConfig(TypedDict): + module_name: str + class_name: str + min_version: str + package_name: NotRequired[str] # Optional: actual pip package name if different from module + + +# Configuration for supported LLM providers +PROVIDERS: dict[str, InstrumentorConfig] = { + "openai": { + "module_name": "agentops.instrumentation.openai", + "class_name": "OpenAIInstrumentor", + "min_version": "1.0.0", + }, + "anthropic": { + "module_name": "agentops.instrumentation.anthropic", + "class_name": "AnthropicInstrumentor", + "min_version": "0.32.0", + }, + "ibm_watsonx_ai": { + "module_name": "agentops.instrumentation.ibm_watsonx_ai", + "class_name": "IBMWatsonXInstrumentor", + "min_version": "0.1.0", + }, + "google.genai": { + "module_name": "agentops.instrumentation.google_genai", + "class_name": "GoogleGenAIInstrumentor", + "min_version": "0.1.0", + "package_name": "google-genai", # Actual pip package name + }, +} + +# Configuration for utility instrumentors +UTILITY_INSTRUMENTORS: dict[str, InstrumentorConfig] = { + "concurrent.futures": { + "module_name": "agentops.instrumentation.concurrent_futures", + "class_name": "ConcurrentFuturesInstrumentor", + "min_version": "3.7.0", # Python 3.7+ (concurrent.futures is stdlib) + "package_name": "python", # Special case for stdlib modules + }, +} + +# Configuration for supported agentic libraries +AGENTIC_LIBRARIES: dict[str, InstrumentorConfig] = { + "crewai": { + "module_name": "agentops.instrumentation.crewai", + "class_name": "CrewAIInstrumentor", + "min_version": "0.56.0", + }, + "autogen": {"module_name": "agentops.instrumentation.ag2", "class_name": "AG2Instrumentor", "min_version": "0.1.0"}, + "agents": { + "module_name": "agentops.instrumentation.openai_agents", + "class_name": "OpenAIAgentsInstrumentor", + "min_version": "0.0.1", + }, + "google.adk": { + "module_name": "agentops.instrumentation.google_adk", + "class_name": "GoogleADKInstrumentor", + "min_version": "0.1.0", + }, +} + +# Combine all target packages for monitoring +TARGET_PACKAGES = set(PROVIDERS.keys()) | set(AGENTIC_LIBRARIES.keys()) | set(UTILITY_INSTRUMENTORS.keys()) + +# Create a single instance of the manager +# _manager = InstrumentationManager() # Removed + # Module-level state variables _active_instrumentors: list[BaseInstrumentor] = [] _original_builtins_import = builtins.__import__ # Store original import @@ -167,82 +236,6 @@ def _import_monitor(name: str, globals_dict=None, locals_dict=None, fromlist=(), return module -# Define the structure for instrumentor configurations -class InstrumentorConfig(TypedDict): - module_name: str - class_name: str - min_version: str - package_name: NotRequired[str] # Optional: actual pip package name if different from module - - -# Configuration for supported LLM providers -PROVIDERS: dict[str, InstrumentorConfig] = { - "openai": { - "module_name": "agentops.instrumentation.openai", - "class_name": "OpenAIInstrumentor", - "min_version": "1.0.0", - }, - "anthropic": { - "module_name": "agentops.instrumentation.anthropic", - "class_name": "AnthropicInstrumentor", - "min_version": "0.32.0", - }, - "ibm_watsonx_ai": { - "module_name": "agentops.instrumentation.ibm_watsonx_ai", - "class_name": "IBMWatsonXInstrumentor", - "min_version": "0.1.0", - }, - "google.genai": { - "module_name": "agentops.instrumentation.google_genai", - "class_name": "GoogleGenAIInstrumentor", - "min_version": "0.1.0", - "package_name": "google-genai", # Actual pip package name - }, - # "mem0": { - # "module_name": "agentops.instrumentation.mem0", - # "class_name": "Mem0Instrumentor", - # "min_version": "0.1.10", - # "package_name": "mem0ai", # Actual pip package name - # }, -} - -# Configuration for utility instrumentors -UTILITY_INSTRUMENTORS: dict[str, InstrumentorConfig] = { - "concurrent.futures": { - "module_name": "agentops.instrumentation.concurrent_futures", - "class_name": "ConcurrentFuturesInstrumentor", - "min_version": "3.7.0", # Python 3.7+ (concurrent.futures is stdlib) - "package_name": "python", # Special case for stdlib modules - }, -} - -# Configuration for supported agentic libraries -AGENTIC_LIBRARIES: dict[str, InstrumentorConfig] = { - "crewai": { - "module_name": "agentops.instrumentation.crewai", - "class_name": "CrewAIInstrumentor", - "min_version": "0.56.0", - }, - "autogen": {"module_name": "agentops.instrumentation.ag2", "class_name": "AG2Instrumentor", "min_version": "0.1.0"}, - "agents": { - "module_name": "agentops.instrumentation.openai_agents", - "class_name": "OpenAIAgentsInstrumentor", - "min_version": "0.0.1", - }, - "google.adk": { - "module_name": "agentops.instrumentation.google_adk", - "class_name": "GoogleADKInstrumentor", - "min_version": "0.1.0", - }, -} - -# Combine all target packages for monitoring -TARGET_PACKAGES = set(PROVIDERS.keys()) | set(AGENTIC_LIBRARIES.keys()) | set(UTILITY_INSTRUMENTORS.keys()) - -# Create a single instance of the manager -# _manager = InstrumentationManager() # Removed - - @dataclass class InstrumentorLoader: """ From cd53e9d5af0a613792a089bce7f39bca08b63fb6 Mon Sep 17 00:00:00 2001 From: fenilfaldu Date: Sun, 8 Jun 2025 00:55:48 +0530 Subject: [PATCH 7/8] refactor the instrumentation with utlity instrumentation --- agentops/instrumentation/__init__.py | 28 ++++++++++++++++++++++++---- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/agentops/instrumentation/__init__.py b/agentops/instrumentation/__init__.py index e515cae94..a86189fea 100644 --- a/agentops/instrumentation/__init__.py +++ b/agentops/instrumentation/__init__.py @@ -119,6 +119,16 @@ def _is_installed_package(module_obj: ModuleType, package_name_key: str) -> bool rather than a local module, especially when names might collide. `package_name_key` is the key from TARGET_PACKAGES (e.g., 'agents', 'google.adk'). """ + # Special case for stdlib modules (marked with package_name="python" in UTILITY_INSTRUMENTORS) + if ( + package_name_key in UTILITY_INSTRUMENTORS + and UTILITY_INSTRUMENTORS[package_name_key].get("package_name") == "python" + ): + logger.debug( + f"_is_installed_package: Module '{package_name_key}' is a Python standard library module. Considering it an installed package." + ) + return True + if not hasattr(module_obj, "__file__") or not module_obj.__file__: logger.debug( f"_is_installed_package: Module '{package_name_key}' has no __file__, assuming it might be an SDK namespace package. Returning True." @@ -220,6 +230,12 @@ def _should_instrument_package(package_name: str) -> bool: logger.debug(f"_should_instrument_package: '{package_name}' already instrumented by AgentOps. Skipping.") return False + # Utility instrumentors should always be instrumented regardless of agentic library state + if package_name in UTILITY_INSTRUMENTORS: + logger.debug(f"_should_instrument_package: '{package_name}' is a utility instrumentor. Always allowing.") + return True + + # Only apply agentic/provider logic if it's NOT a utility instrumentor is_target_agentic = package_name in AGENTIC_LIBRARIES is_target_provider = package_name in PROVIDERS @@ -268,14 +284,18 @@ def _perform_instrumentation(package_name: str): return # Get the appropriate configuration for the package - # Ensure package_name is a key in either PROVIDERS or AGENTIC_LIBRARIES - if package_name not in PROVIDERS and package_name not in AGENTIC_LIBRARIES: + # Ensure package_name is a key in either PROVIDERS, AGENTIC_LIBRARIES, or UTILITY_INSTRUMENTORS + if ( + package_name not in PROVIDERS + and package_name not in AGENTIC_LIBRARIES + and package_name not in UTILITY_INSTRUMENTORS + ): logger.debug( - f"_perform_instrumentation: Package '{package_name}' not found in PROVIDERS or AGENTIC_LIBRARIES. Skipping." + f"_perform_instrumentation: Package '{package_name}' not found in PROVIDERS, AGENTIC_LIBRARIES, or UTILITY_INSTRUMENTORS. Skipping." ) return - config = PROVIDERS.get(package_name) or AGENTIC_LIBRARIES[package_name] + config = PROVIDERS.get(package_name) or AGENTIC_LIBRARIES.get(package_name) or UTILITY_INSTRUMENTORS[package_name] loader = InstrumentorLoader(**config) # instrument_one already checks loader.should_activate From 0f9749f1c782ab91c9b6a7bea959c57571e64cc4 Mon Sep 17 00:00:00 2001 From: fenilfaldu Date: Sun, 8 Jun 2025 02:50:50 +0530 Subject: [PATCH 8/8] added types to the function/methods defs --- .../concurrent_futures/instrumentation.py | 39 +++++++++++++------ 1 file changed, 28 insertions(+), 11 deletions(-) diff --git a/agentops/instrumentation/concurrent_futures/instrumentation.py b/agentops/instrumentation/concurrent_futures/instrumentation.py index 3ed4caee3..71c9b50f0 100644 --- a/agentops/instrumentation/concurrent_futures/instrumentation.py +++ b/agentops/instrumentation/concurrent_futures/instrumentation.py @@ -7,8 +7,9 @@ import contextvars import functools -from typing import Collection -from concurrent.futures import ThreadPoolExecutor +from typing import Any, Callable, Collection, Optional, Tuple, TypeVar + +from concurrent.futures import ThreadPoolExecutor, Future from opentelemetry.instrumentation.instrumentor import BaseInstrumentor @@ -18,16 +19,26 @@ _original_init = None _original_submit = None +# Type variables for better typing +T = TypeVar("T") +R = TypeVar("R") + -def _context_propagating_init(original_init): +def _context_propagating_init(original_init: Callable) -> Callable: """Wrap ThreadPoolExecutor.__init__ to set up context-aware initializer.""" @functools.wraps(original_init) - def wrapped_init(self, max_workers=None, thread_name_prefix="", initializer=None, initargs=()): + def wrapped_init( + self: ThreadPoolExecutor, + max_workers: Optional[int] = None, + thread_name_prefix: str = "", + initializer: Optional[Callable] = None, + initargs: Tuple = (), + ) -> None: # Capture the current context when the executor is created main_context = contextvars.copy_context() - def context_aware_initializer(): + def context_aware_initializer() -> None: """Initializer that sets up the captured context in each worker thread.""" logger.debug("[ConcurrentFuturesInstrumentor] Setting up context in worker thread") @@ -68,11 +79,11 @@ def context_aware_initializer(): return wrapped_init -def _context_propagating_submit(original_submit): +def _context_propagating_submit(original_submit: Callable) -> Callable: """Wrap ThreadPoolExecutor.submit to ensure context propagation.""" @functools.wraps(original_submit) - def wrapped_submit(self, func, *args, **kwargs): + def wrapped_submit(self: ThreadPoolExecutor, func: Callable[..., R], *args: Any, **kwargs: Any) -> Future[R]: # Log the submission func_name = getattr(func, "__name__", str(func)) logger.debug(f"[ConcurrentFuturesInstrumentor] Submitting function: {func_name}") @@ -97,7 +108,7 @@ def instrumentation_dependencies(self) -> Collection[str]: """Return a list of instrumentation dependencies.""" return [] - def _instrument(self, **kwargs): + def _instrument(self, **kwargs: Any) -> None: """Instrument the concurrent.futures module.""" global _original_init, _original_submit @@ -113,7 +124,7 @@ def _instrument(self, **kwargs): logger.info("[ConcurrentFuturesInstrumentor] Successfully instrumented concurrent.futures.ThreadPoolExecutor") - def _uninstrument(self, **kwargs): + def _uninstrument(self, **kwargs: Any) -> None: """Uninstrument the concurrent.futures module.""" global _original_init, _original_submit @@ -131,11 +142,14 @@ def _uninstrument(self, **kwargs): logger.info("[ConcurrentFuturesInstrumentor] Successfully uninstrumented concurrent.futures.ThreadPoolExecutor") @staticmethod - def instrument_module_directly(): + def instrument_module_directly() -> bool: """ Directly instrument the module without using the standard instrumentor interface. This can be called manually if automatic instrumentation is not desired. + + Returns: + bool: True if instrumentation was applied, False if already instrumented """ instrumentor = ConcurrentFuturesInstrumentor() if not instrumentor.is_instrumented_by_opentelemetry: @@ -144,11 +158,14 @@ def instrument_module_directly(): return False @staticmethod - def uninstrument_module_directly(): + def uninstrument_module_directly() -> bool: """ Directly uninstrument the module. This can be called manually to remove instrumentation. + + Returns: + bool: True if uninstrumentation was applied, False if already uninstrumented """ instrumentor = ConcurrentFuturesInstrumentor() if instrumentor.is_instrumented_by_opentelemetry: