From c73e0589c8968df9e241f02dce8510cb2a427189 Mon Sep 17 00:00:00 2001 From: fenilfaldu Date: Fri, 20 Jun 2025 04:37:15 +0530 Subject: [PATCH 1/3] feat: Introduce utility dependencies management for improved instrumentation handling --- agentops/instrumentation/__init__.py | 175 +++++++++++++++++++-------- 1 file changed, 125 insertions(+), 50 deletions(-) diff --git a/agentops/instrumentation/__init__.py b/agentops/instrumentation/__init__.py index 2df5c7a9a..fd3804ff1 100644 --- a/agentops/instrumentation/__init__.py +++ b/agentops/instrumentation/__init__.py @@ -85,6 +85,14 @@ class InstrumentorConfig(TypedDict): }, } +# Define which packages require which utility instrumentors +# This maps package names to the list of utility instrumentors they depend on +UTILITY_DEPENDENCIES: dict[str, list[str]] = { + "mem0": ["concurrent.futures"], # mem0 uses concurrent.futures for parallel processing + # Add more dependencies as needed in the future + # "langchain": ["concurrent.futures", "asyncio"], +} + # Configuration for supported agentic libraries AGENTIC_LIBRARIES: dict[str, InstrumentorConfig] = { "crewai": { @@ -130,6 +138,7 @@ class InstrumentorConfig(TypedDict): _original_builtins_import = builtins.__import__ # Store original import _instrumenting_packages: Set[str] = set() _has_agentic_library: bool = False +_pending_utility_instrumentation: Set[str] = set() # Track packages that need utility instrumentation # New helper function to check module origin @@ -250,10 +259,20 @@ def _should_instrument_package(package_name: str) -> bool: logger.debug(f"_should_instrument_package: '{package_name}' already instrumented by AgentOps. Skipping.") return False - # Utility instrumentors should always be instrumented regardless of agentic library state + # Utility instrumentors should only be instrumented when their dependent packages are active if package_name in UTILITY_INSTRUMENTORS: - logger.debug(f"_should_instrument_package: '{package_name}' is a utility instrumentor. Always allowing.") - return True + # Check if any package that depends on this utility is instrumented + for dependent_package, utilities in UTILITY_DEPENDENCIES.items(): + if package_name in utilities and _is_package_instrumented(dependent_package): + logger.debug( + f"_should_instrument_package: '{package_name}' is a utility instrumentor needed by '{dependent_package}'. Allowing." + ) + return True + + logger.debug( + f"_should_instrument_package: '{package_name}' is a utility instrumentor but no dependent packages are active. Skipping." + ) + return False # Only apply agentic/provider logic if it's NOT a utility instrumentor is_target_agentic = package_name in AGENTIC_LIBRARIES @@ -297,9 +316,37 @@ def _should_instrument_package(package_name: str) -> bool: return False +def _instrument_utility_dependencies(package_name: str): + """ + Instrument any utility dependencies required by the given package. + + Args: + package_name: The package that was just instrumented + """ + if package_name in UTILITY_DEPENDENCIES: + utilities_needed = UTILITY_DEPENDENCIES[package_name] + logger.debug(f"_instrument_utility_dependencies: Package '{package_name}' requires utilities: {utilities_needed}") + + for utility_name in utilities_needed: + if utility_name in UTILITY_INSTRUMENTORS and not _is_package_instrumented(utility_name): + logger.info(f"AgentOps: Instrumenting utility '{utility_name}' required by '{package_name}'") + + # Check if the utility module is available + if utility_name in sys.modules: + _perform_instrumentation(utility_name) + else: + logger.debug(f"_instrument_utility_dependencies: Utility '{utility_name}' not yet imported, will instrument when imported") + + def _perform_instrumentation(package_name: str): """Helper function to perform instrumentation for a given package.""" global _instrumenting_packages, _active_instrumentors, _has_agentic_library + + # Check if we're already instrumenting this package (prevent circular instrumentation) + if package_name in _instrumenting_packages: + logger.debug(f"_perform_instrumentation: Already instrumenting '{package_name}', skipping to prevent circular instrumentation") + return + if not _should_instrument_package(package_name): return @@ -318,47 +365,79 @@ def _perform_instrumentation(package_name: str): config = PROVIDERS.get(package_name) or AGENTIC_LIBRARIES.get(package_name) or UTILITY_INSTRUMENTORS[package_name] loader = InstrumentorLoader(**config) - # instrument_one already checks loader.should_activate - instrumentor_instance = instrument_one(loader) - if instrumentor_instance is not None: - # Check if it was *actually* instrumented by instrument_one by seeing if the instrument method was called successfully. - # This relies on instrument_one returning None if its internal .instrument() call failed (if we revert that, this needs adjustment) - # For now, assuming instrument_one returns instance only on full success. - # User request was to return instrumentor even if .instrument() fails. So, we check if _agentops_instrumented_package_key was set by us. - - # Let's assume instrument_one might return an instance whose .instrument() failed. - # The key is set before _active_instrumentors.append, so if it's already there and matches, it means it's a re-attempt on the same package. - # The _is_package_instrumented check at the start of _should_instrument_package should prevent most re-entry for the same package_name. - - # Store the package key this instrumentor is for, to aid _is_package_instrumented - instrumentor_instance._agentops_instrumented_package_key = package_name - - # Add to active_instrumentors only if it's not a duplicate in terms of package_key being instrumented - # This is a safeguard, _is_package_instrumented should catch this earlier. - is_newly_added = True - for existing_inst in _active_instrumentors: + # Add to _instrumenting_packages to prevent circular instrumentation + _instrumenting_packages.add(package_name) + + try: + # instrument_one already checks loader.should_activate + instrumentor_instance = instrument_one(loader) + if instrumentor_instance is not None: + # Check if it was *actually* instrumented by instrument_one by seeing if the instrument method was called successfully. + # This relies on instrument_one returning None if its internal .instrument() call failed (if we revert that, this needs adjustment) + # For now, assuming instrument_one returns instance only on full success. + # User request was to return instrumentor even if .instrument() fails. So, we check if _agentops_instrumented_package_key was set by us. + + # Let's assume instrument_one might return an instance whose .instrument() failed. + # The key is set before _active_instrumentors.append, so if it's already there and matches, it means it's a re-attempt on the same package. + # The _is_package_instrumented check at the start of _should_instrument_package should prevent most re-entry for the same package_name. + + # Store the package key this instrumentor is for, to aid _is_package_instrumented + instrumentor_instance._agentops_instrumented_package_key = package_name + + # Add to active_instrumentors only if it's not a duplicate in terms of package_key being instrumented + # This is a safeguard, _is_package_instrumented should catch this earlier. + is_newly_added = True + for existing_inst in _active_instrumentors: + if ( + hasattr(existing_inst, "_agentops_instrumented_package_key") + and existing_inst._agentops_instrumented_package_key == package_name + ): + is_newly_added = False + logger.debug( + f"_perform_instrumentation: Instrumentor for '{package_name}' already in _active_instrumentors. Not adding again." + ) + break + if is_newly_added: + _active_instrumentors.append(instrumentor_instance) + + # If this was an agentic library AND it's newly effectively instrumented. if ( - hasattr(existing_inst, "_agentops_instrumented_package_key") - and existing_inst._agentops_instrumented_package_key == package_name - ): - is_newly_added = False - logger.debug( - f"_perform_instrumentation: Instrumentor for '{package_name}' already in _active_instrumentors. Not adding again." - ) - break - if is_newly_added: - _active_instrumentors.append(instrumentor_instance) + package_name in AGENTIC_LIBRARIES and not _has_agentic_library + ): # Check _has_agentic_library to ensure this is the *first* one. + # _uninstrument_providers() was already called in _should_instrument_package for the first agentic library. + _has_agentic_library = True + + # Mark package for utility dependency instrumentation + # We defer this to avoid circular imports during package initialization + if package_name not in UTILITY_INSTRUMENTORS and is_newly_added: # Don't recursively instrument utilities + if package_name in UTILITY_DEPENDENCIES: + _pending_utility_instrumentation.add(package_name) + logger.debug(f"_perform_instrumentation: Marked '{package_name}' for deferred utility instrumentation") + else: + logger.debug( + f"_perform_instrumentation: instrument_one for '{package_name}' returned None. Not added to active instrumentors." + ) + finally: + # Always remove from _instrumenting_packages when done + _instrumenting_packages.discard(package_name) - # If this was an agentic library AND it's newly effectively instrumented. - if ( - package_name in AGENTIC_LIBRARIES and not _has_agentic_library - ): # Check _has_agentic_library to ensure this is the *first* one. - # _uninstrument_providers() was already called in _should_instrument_package for the first agentic library. - _has_agentic_library = True - else: - logger.debug( - f"_perform_instrumentation: instrument_one for '{package_name}' returned None. Not added to active instrumentors." - ) + +def _process_pending_utility_instrumentation(): + """Process any pending utility instrumentations.""" + global _pending_utility_instrumentation + + if not _pending_utility_instrumentation: + return + + # Copy and clear to avoid modifying during iteration + pending = _pending_utility_instrumentation.copy() + _pending_utility_instrumentation.clear() + + for package_name in pending: + try: + _instrument_utility_dependencies(package_name) + except Exception as e: + logger.debug(f"Error instrumenting utility dependencies for {package_name}: {e}") def _import_monitor(name: str, globals_dict=None, locals_dict=None, fromlist=(), level=0): @@ -368,6 +447,9 @@ def _import_monitor(name: str, globals_dict=None, locals_dict=None, fromlist=(), """ global _instrumenting_packages, _has_agentic_library + # Process any pending utility instrumentations before handling new imports + _process_pending_utility_instrumentation() + # If an agentic library is already instrumented, skip all further instrumentation if _has_agentic_library: return _original_builtins_import(name, globals_dict, locals_dict, fromlist, level) @@ -408,7 +490,7 @@ def _import_monitor(name: str, globals_dict=None, locals_dict=None, fromlist=(), # Instrument all matching packages for package_to_check in packages_to_check: - if package_to_check not in _instrumenting_packages and not _is_package_instrumented(package_to_check): + if not _is_package_instrumented(package_to_check): target_module_obj = sys.modules.get(package_to_check) if target_module_obj: @@ -423,7 +505,6 @@ def _import_monitor(name: str, globals_dict=None, locals_dict=None, fromlist=(), f"_import_monitor: No module object found in sys.modules for '{package_to_check}', proceeding with SDK instrumentation attempt." ) - _instrumenting_packages.add(package_to_check) try: _perform_instrumentation(package_to_check) # If we just instrumented an agentic library, stop @@ -431,8 +512,6 @@ def _import_monitor(name: str, globals_dict=None, locals_dict=None, fromlist=(), break except Exception as e: logger.error(f"Error instrumenting {package_to_check}: {str(e)}") - finally: - _instrumenting_packages.discard(package_to_check) return module @@ -542,7 +621,6 @@ def instrument_all(): if ( package_to_check - and package_to_check not in _instrumenting_packages and not _is_package_instrumented(package_to_check) ): target_module_obj = sys.modules.get(package_to_check) @@ -556,13 +634,10 @@ def instrument_all(): f"instrument_all: No module object found for '{package_to_check}' in sys.modules during startup scan. Proceeding cautiously." ) - _instrumenting_packages.add(package_to_check) try: _perform_instrumentation(package_to_check) except Exception as e: logger.error(f"Error instrumenting {package_to_check}: {str(e)}") - finally: - _instrumenting_packages.discard(package_to_check) def uninstrument_all(): From 3d513b3f26f87114f2bfcf47913d056961c64b5d Mon Sep 17 00:00:00 2001 From: fenilfaldu Date: Mon, 23 Jun 2025 20:53:10 +0530 Subject: [PATCH 2/3] refactor concurrent.futures instrumentation --- agentops/instrumentation/__init__.py | 43 ++++++++++++++++------------ 1 file changed, 24 insertions(+), 19 deletions(-) diff --git a/agentops/instrumentation/__init__.py b/agentops/instrumentation/__init__.py index fd3804ff1..f0a02ab4a 100644 --- a/agentops/instrumentation/__init__.py +++ b/agentops/instrumentation/__init__.py @@ -268,7 +268,7 @@ def _should_instrument_package(package_name: str) -> bool: f"_should_instrument_package: '{package_name}' is a utility instrumentor needed by '{dependent_package}'. Allowing." ) return True - + logger.debug( f"_should_instrument_package: '{package_name}' is a utility instrumentor but no dependent packages are active. Skipping." ) @@ -319,34 +319,40 @@ def _should_instrument_package(package_name: str) -> bool: def _instrument_utility_dependencies(package_name: str): """ Instrument any utility dependencies required by the given package. - + Args: package_name: The package that was just instrumented """ if package_name in UTILITY_DEPENDENCIES: utilities_needed = UTILITY_DEPENDENCIES[package_name] - logger.debug(f"_instrument_utility_dependencies: Package '{package_name}' requires utilities: {utilities_needed}") - + logger.debug( + f"_instrument_utility_dependencies: Package '{package_name}' requires utilities: {utilities_needed}" + ) + for utility_name in utilities_needed: if utility_name in UTILITY_INSTRUMENTORS and not _is_package_instrumented(utility_name): logger.info(f"AgentOps: Instrumenting utility '{utility_name}' required by '{package_name}'") - + # Check if the utility module is available if utility_name in sys.modules: _perform_instrumentation(utility_name) else: - logger.debug(f"_instrument_utility_dependencies: Utility '{utility_name}' not yet imported, will instrument when imported") + logger.debug( + f"_instrument_utility_dependencies: Utility '{utility_name}' not yet imported, will instrument when imported" + ) def _perform_instrumentation(package_name: str): """Helper function to perform instrumentation for a given package.""" global _instrumenting_packages, _active_instrumentors, _has_agentic_library - + # Check if we're already instrumenting this package (prevent circular instrumentation) if package_name in _instrumenting_packages: - logger.debug(f"_perform_instrumentation: Already instrumenting '{package_name}', skipping to prevent circular instrumentation") + logger.debug( + f"_perform_instrumentation: Already instrumenting '{package_name}', skipping to prevent circular instrumentation" + ) return - + if not _should_instrument_package(package_name): return @@ -367,7 +373,7 @@ def _perform_instrumentation(package_name: str): # Add to _instrumenting_packages to prevent circular instrumentation _instrumenting_packages.add(package_name) - + try: # instrument_one already checks loader.should_activate instrumentor_instance = instrument_one(loader) @@ -406,13 +412,15 @@ def _perform_instrumentation(package_name: str): ): # Check _has_agentic_library to ensure this is the *first* one. # _uninstrument_providers() was already called in _should_instrument_package for the first agentic library. _has_agentic_library = True - + # Mark package for utility dependency instrumentation # We defer this to avoid circular imports during package initialization if package_name not in UTILITY_INSTRUMENTORS and is_newly_added: # Don't recursively instrument utilities if package_name in UTILITY_DEPENDENCIES: _pending_utility_instrumentation.add(package_name) - logger.debug(f"_perform_instrumentation: Marked '{package_name}' for deferred utility instrumentation") + logger.debug( + f"_perform_instrumentation: Marked '{package_name}' for deferred utility instrumentation" + ) else: logger.debug( f"_perform_instrumentation: instrument_one for '{package_name}' returned None. Not added to active instrumentors." @@ -425,14 +433,14 @@ def _perform_instrumentation(package_name: str): def _process_pending_utility_instrumentation(): """Process any pending utility instrumentations.""" global _pending_utility_instrumentation - + if not _pending_utility_instrumentation: return - + # Copy and clear to avoid modifying during iteration pending = _pending_utility_instrumentation.copy() _pending_utility_instrumentation.clear() - + for package_name in pending: try: _instrument_utility_dependencies(package_name) @@ -619,10 +627,7 @@ def instrument_all(): package_to_check = target break - if ( - package_to_check - and not _is_package_instrumented(package_to_check) - ): + if package_to_check and not _is_package_instrumented(package_to_check): target_module_obj = sys.modules.get(package_to_check) if target_module_obj: From 623004d446522ec4fbacb3f1ca4cb5a84710f5ae Mon Sep 17 00:00:00 2001 From: fenilfaldu Date: Fri, 27 Jun 2025 17:45:42 +0530 Subject: [PATCH 3/3] code cleanup --- .../utilities/concurrent_futures/instrumentation.py | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/agentops/instrumentation/utilities/concurrent_futures/instrumentation.py b/agentops/instrumentation/utilities/concurrent_futures/instrumentation.py index 594cd0420..0cd99ce6b 100644 --- a/agentops/instrumentation/utilities/concurrent_futures/instrumentation.py +++ b/agentops/instrumentation/utilities/concurrent_futures/instrumentation.py @@ -40,7 +40,6 @@ def wrapped_init( def context_aware_initializer() -> None: """Initializer that sets up the captured context in each worker thread.""" - logger.debug("[ConcurrentFuturesInstrumentor] Setting up context in worker thread") # Set the main context variables in this thread for var, value in main_context.items(): @@ -60,8 +59,6 @@ def context_aware_initializer() -> None: logger.error(f"[ConcurrentFuturesInstrumentor] Error in user initializer: {e}") raise - logger.debug("[ConcurrentFuturesInstrumentor] Worker thread context setup complete") - # Create executor with context-aware initializer prefix = f"AgentOps-{thread_name_prefix}" if thread_name_prefix else "AgentOps-Thread" @@ -74,8 +71,6 @@ def context_aware_initializer() -> None: initargs=(), # We handle initargs in our wrapper ) - logger.debug("[ConcurrentFuturesInstrumentor] ThreadPoolExecutor initialized with context propagation") - return wrapped_init @@ -85,8 +80,7 @@ def _context_propagating_submit(original_submit: Callable) -> Callable: @functools.wraps(original_submit) def wrapped_submit(self: ThreadPoolExecutor, func: Callable[..., R], *args: Any, **kwargs: Any) -> Future[R]: # Log the submission - func_name = getattr(func, "__name__", str(func)) - logger.debug(f"[ConcurrentFuturesInstrumentor] Submitting function: {func_name}") + func_name = getattr(func, "__name__", str(func)) # noqa: F841 # The context propagation is handled by the initializer, so we can submit normally # But we can add additional logging or monitoring here if needed