From 7b2df20d13b8214993d94b22656a444864ee2cc1 Mon Sep 17 00:00:00 2001 From: Dwij Patel Date: Mon, 19 May 2025 11:51:53 +0530 Subject: [PATCH 1/6] Enhance instrumentation module with dynamic library detection and version checks. Introduced PROVIDERS and AGENTIC_LIBRARIES dictionaries for managing instrumentors, and implemented get_active_libraries function to identify currently used libraries. Updated InstrumentorLoader to validate library versions before activation. Refactored instrument_all function to prioritize agentic libraries before standard providers. --- agentops/instrumentation/__init__.py | 158 ++++++++++++++++----------- 1 file changed, 92 insertions(+), 66 deletions(-) diff --git a/agentops/instrumentation/__init__.py b/agentops/instrumentation/__init__.py index 79ce59981..702349a13 100644 --- a/agentops/instrumentation/__init__.py +++ b/agentops/instrumentation/__init__.py @@ -2,35 +2,81 @@ from types import ModuleType from dataclasses import dataclass import importlib +import sys +from importlib.metadata import version +from packaging.version import Version, parse from opentelemetry.instrumentation.instrumentor import BaseInstrumentor # type: ignore from agentops.logging import logger from agentops.sdk.core import TracingCore - -# references to all active instrumentors _active_instrumentors: list[BaseInstrumentor] = [] +PROVIDERS = { + "openai": { + "module_name": "agentops.instrumentation.openai", + "class_name": "OpenAIInstrumentor", + "min_version": "1.0.0", + }, + "anthropic": { + "module_name": "agentops.instrumentation.anthropic", + "class_name": "AnthropicInstrumentor", + "min_version": "0.32.0", + }, + "google.genai": { + "module_name": "agentops.instrumentation.google_generativeai", + "class_name": "GoogleGenerativeAIInstrumentor", + "min_version": "0.1.0", + }, + "ibm_watsonx_ai": { + "module_name": "agentops.instrumentation.ibm_watsonx_ai", + "class_name": "IBMWatsonXInstrumentor", + "min_version": "0.1.0", + }, +} + +AGENTIC_LIBRARIES = { + "crewai": { + "module_name": "agentops.instrumentation.crewai", + "class_name": "CrewAIInstrumentor", + "min_version": "0.56.0", + }, + "autogen": {"module_name": "agentops.instrumentation.ag2", "class_name": "AG2Instrumentor", "min_version": "0.1.0"}, + "agents": { + "module_name": "agentops.instrumentation.openai_agents", + "class_name": "OpenAIAgentsInstrumentor", + "min_version": "0.1.0", + }, +} + + +def get_active_libraries() -> set[str]: + """ + Get all actively used libraries in the current execution context. + This includes both directly imported and indirectly imported libraries. + """ + active_libs = set() + + # Check sys.modules for direct imports + for name, module in sys.modules.items(): + if isinstance(module, ModuleType): + root_package = name.split(".")[0] + if root_package in PROVIDERS or root_package in AGENTIC_LIBRARIES: + active_libs.add(root_package) + + return active_libs + @dataclass class InstrumentorLoader: """ - Represents a dynamically-loadable instrumentor. - - This class is used to load and activate instrumentors based on their module - and class names. - We use the `provider_import_name` to determine if the library is installed i - n the environment. - - `module_name` is the name of the module to import from. - `class_name` is the name of the class to instantiate from the module. - `provider_import_name` is the name of the package to check for availability. + Represents a dynamically-loadable instrumentor. """ module_name: str class_name: str - provider_import_name: str + min_version: str @property def module(self) -> ModuleType: @@ -39,10 +85,16 @@ def module(self) -> ModuleType: @property def should_activate(self) -> bool: - """Is the provider import available in the environment?""" + """Is the provider/library available in the environment with correct version?""" try: - importlib.import_module(self.provider_import_name) - return True + provider_name = self.module_name.split(".")[-1] + module_version = version(provider_name) + + if module_version is None: + logger.warning(f"Cannot determine {provider_name} version.") + return False + + return Version(module_version) >= parse(self.min_version) except ImportError: return False @@ -51,51 +103,11 @@ def get_instance(self) -> BaseInstrumentor: return getattr(self.module, self.class_name)() -available_instrumentors: list[InstrumentorLoader] = [ - InstrumentorLoader( - module_name="agentops.instrumentation.openai", - class_name="OpenAIInstrumentor", - provider_import_name="openai", - ), - InstrumentorLoader( - module_name="agentops.instrumentation.anthropic", - class_name="AnthropicInstrumentor", - provider_import_name="anthropic", - ), - InstrumentorLoader( - module_name="agentops.instrumentation.crewai", - class_name="CrewAIInstrumentor", - provider_import_name="crewai", - ), - InstrumentorLoader( - module_name="agentops.instrumentation.openai_agents", - class_name="OpenAIAgentsInstrumentor", - provider_import_name="agents", - ), - InstrumentorLoader( - module_name="agentops.instrumentation.google_generativeai", - class_name="GoogleGenerativeAIInstrumentor", - provider_import_name="google.genai", - ), - InstrumentorLoader( - module_name="agentops.instrumentation.ibm_watsonx_ai", - class_name="IBMWatsonXInstrumentor", - provider_import_name="ibm_watsonx_ai", - ), - InstrumentorLoader( - module_name="agentops.instrumentation.ag2", - class_name="AG2Instrumentor", - provider_import_name="autogen", - ), -] - - def instrument_one(loader: InstrumentorLoader) -> Optional[BaseInstrumentor]: """Instrument a single instrumentor.""" if not loader.should_activate: - # this package is not in the environment; skip logger.debug( - f"Package {loader.provider_import_name} not found; skipping instrumentation of {loader.class_name}" + f"Package {loader.module_name} not found or version < {loader.min_version}; skipping instrumentation" ) return None @@ -117,15 +129,29 @@ def instrument_all(): logger.debug("Instrumentors have already been populated.") return - for loader in available_instrumentors: - if loader.class_name in _active_instrumentors: - # already instrumented - logger.debug(f"Instrumentor {loader.class_name} has already been instrumented.") - return None - - instrumentor = instrument_one(loader) - if instrumentor is not None: - _active_instrumentors.append(instrumentor) + # Get all active libraries + active_libs = get_active_libraries() + logger.debug(f"Active libraries detected: {active_libs}") + + # First check for agentic libraries + agentic_detected = False + for lib_name, lib_config in AGENTIC_LIBRARIES.items(): + if lib_name in active_libs: + loader = InstrumentorLoader(**lib_config) + if loader.should_activate: + agentic_detected = True + instrumentor = instrument_one(loader) + if instrumentor is not None: + _active_instrumentors.append(instrumentor) + + # If no agentic libraries are detected, instrument active providers + if not agentic_detected: + for provider_name, provider_config in PROVIDERS.items(): + if provider_name in active_libs: + loader = InstrumentorLoader(**provider_config) + instrumentor = instrument_one(loader) + if instrumentor is not None: + _active_instrumentors.append(instrumentor) def uninstrument_all(): From bc0496d7c5ac6497fa0a8e8f8a895232ceff392e Mon Sep 17 00:00:00 2001 From: Dwij Patel Date: Tue, 20 May 2025 01:35:45 +0530 Subject: [PATCH 2/6] Refactor instrumentation module to implement InstrumentationManager for improved package monitoring and dynamic instrumentation. Added methods for managing active instrumentors, handling conflicts between agentic libraries and providers, and monitoring imports. Updated configuration for supported libraries and combined target packages for streamlined instrumentation. --- agentops/instrumentation/__init__.py | 266 ++++++++++++++++++++------- 1 file changed, 198 insertions(+), 68 deletions(-) diff --git a/agentops/instrumentation/__init__.py b/agentops/instrumentation/__init__.py index 702349a13..9f92a6d47 100644 --- a/agentops/instrumentation/__init__.py +++ b/agentops/instrumentation/__init__.py @@ -1,18 +1,185 @@ -from typing import Optional +""" +AgentOps Instrumentation Module + +This module provides automatic instrumentation for various LLM providers and agentic libraries. +It works by monitoring Python imports and automatically instrumenting packages as they are imported. + +Key Features: +- Automatic detection and instrumentation of LLM providers (OpenAI, Anthropic, etc.) +- Support for agentic libraries (CrewAI, AutoGen, etc.) +- Version-aware instrumentation (only activates for supported versions) +- Smart handling of provider vs agentic library conflicts +- Non-intrusive monitoring using Python's import system +""" + +from typing import Optional, Set from types import ModuleType from dataclasses import dataclass import importlib import sys from importlib.metadata import version from packaging.version import Version, parse +import builtins from opentelemetry.instrumentation.instrumentor import BaseInstrumentor # type: ignore from agentops.logging import logger from agentops.sdk.core import TracingCore -_active_instrumentors: list[BaseInstrumentor] = [] +class InstrumentationManager: + """ + Manages the instrumentation state and provides methods for monitoring and instrumenting packages. + This is implemented as a singleton to maintain consistent state across the application. + """ + + def __init__(self): + # List of currently active instrumentors + self._active_instrumentors: list[BaseInstrumentor] = [] + # Store the original import function to restore it later + self._original_import = builtins.__import__ + # Track packages currently being instrumented to prevent recursion + self._instrumenting_packages: Set[str] = set() + # Flag to track if an agentic library is currently instrumented + self._has_agentic_library: bool = False + + def is_package_instrumented(self, package_name: str) -> bool: + """Check if a package is already instrumented by looking at active instrumentors.""" + return any( + instrumentor.__class__.__name__.lower().startswith(package_name.lower()) + for instrumentor in self._active_instrumentors + ) + + def should_instrument_package(self, package_name: str) -> bool: + """ + Determine if a package should be instrumented based on current state. + Handles special cases for agentic libraries and providers. + """ + # If this is an agentic library, uninstrument all providers first + if package_name in AGENTIC_LIBRARIES: + self.uninstrument_providers() + self._has_agentic_library = True + logger.debug(f"Uninstrumented all providers due to agentic library {package_name} detection") + return True + + # Skip providers if an agentic library is already instrumented + if package_name in PROVIDERS and self._has_agentic_library: + logger.debug( + f"Skipping provider {package_name} instrumentation as an agentic library is already instrumented" + ) + return False + + # Skip if already instrumented + if self.is_package_instrumented(package_name): + logger.debug(f"Package {package_name} is already instrumented") + return False + + return True + + def uninstrument_providers(self): + """Uninstrument all provider instrumentors while keeping agentic libraries active.""" + providers_to_remove = [] + for instrumentor in self._active_instrumentors: + if any( + instrumentor.__class__.__name__.lower().startswith(provider.lower()) for provider in PROVIDERS.keys() + ): + instrumentor.uninstrument() + logger.debug(f"Uninstrumented provider {instrumentor.__class__.__name__}") + providers_to_remove.append(instrumentor) + + self._active_instrumentors = [i for i in self._active_instrumentors if i not in providers_to_remove] + + def _import_monitor(self, name: str, globals=None, locals=None, fromlist=(), level=0): + """ + Monitor imports and instrument packages as they are imported. + This replaces the built-in import function to intercept package imports. + """ + root = name.split(".", 1)[0] + + # Skip providers if an agentic library is already instrumented + if self._has_agentic_library and root in PROVIDERS: + return self._original_import(name, globals, locals, fromlist, level) + + # Check if this is a package we should instrument + if ( + root in TARGET_PACKAGES + and root not in self._instrumenting_packages + and not self.is_package_instrumented(root) + ): + logger.debug(f"Detected import of {root}") + self._instrumenting_packages.add(root) + try: + if not self.should_instrument_package(root): + return self._original_import(name, globals, locals, fromlist, level) + + # Get the appropriate configuration for the package + config = PROVIDERS.get(root) or AGENTIC_LIBRARIES[root] + loader = InstrumentorLoader(**config) + + if loader.should_activate: + instrumentor = instrument_one(loader) + if instrumentor is not None: + self._active_instrumentors.append(instrumentor) + except Exception as e: + logger.error(f"Error instrumenting {root}: {str(e)}") + finally: + self._instrumenting_packages.discard(root) + + return self._original_import(name, globals, locals, fromlist, level) + + def start_monitoring(self): + """Start monitoring imports and check already imported packages.""" + builtins.__import__ = self._import_monitor + self._check_existing_imports() + + def stop_monitoring(self): + """Stop monitoring imports and restore the original import function.""" + builtins.__import__ = self._original_import + + def _check_existing_imports(self): + """Check and instrument packages that were already imported before monitoring started.""" + for name in list(sys.modules.keys()): + module = sys.modules.get(name) + if not isinstance(module, ModuleType): + continue + + root = name.split(".", 1)[0] + if self._has_agentic_library and root in PROVIDERS: + continue + + if ( + root in TARGET_PACKAGES + and root not in self._instrumenting_packages + and not self.is_package_instrumented(root) + ): + self._instrumenting_packages.add(root) + try: + if not self.should_instrument_package(root): + continue + + config = PROVIDERS.get(root) or AGENTIC_LIBRARIES[root] + loader = InstrumentorLoader(**config) + + if loader.should_activate: + instrumentor = instrument_one(loader) + if instrumentor is not None: + self._active_instrumentors.append(instrumentor) + except Exception as e: + logger.error(f"Error instrumenting {root}: {str(e)}") + finally: + self._instrumenting_packages.discard(root) + + def uninstrument_all(self): + """Stop monitoring and uninstrument all packages.""" + self.stop_monitoring() + for instrumentor in self._active_instrumentors: + instrumentor.uninstrument() + logger.debug(f"Uninstrumented {instrumentor.__class__.__name__}") + self._active_instrumentors = [] + self._has_agentic_library = False + + +# Configuration for supported LLM providers PROVIDERS = { "openai": { "module_name": "agentops.instrumentation.openai", @@ -36,6 +203,7 @@ }, } +# Configuration for supported agentic libraries AGENTIC_LIBRARIES = { "crewai": { "module_name": "agentops.instrumentation.crewai", @@ -50,28 +218,18 @@ }, } +# Combine all target packages for monitoring +TARGET_PACKAGES = set(PROVIDERS.keys()) | set(AGENTIC_LIBRARIES.keys()) -def get_active_libraries() -> set[str]: - """ - Get all actively used libraries in the current execution context. - This includes both directly imported and indirectly imported libraries. - """ - active_libs = set() - - # Check sys.modules for direct imports - for name, module in sys.modules.items(): - if isinstance(module, ModuleType): - root_package = name.split(".")[0] - if root_package in PROVIDERS or root_package in AGENTIC_LIBRARIES: - active_libs.add(root_package) - - return active_libs +# Create a single instance of the manager +_manager = InstrumentationManager() @dataclass class InstrumentorLoader: """ Represents a dynamically-loadable instrumentor. + Handles version checking and instantiation of instrumentors. """ module_name: str @@ -80,31 +238,29 @@ class InstrumentorLoader: @property def module(self) -> ModuleType: - """Reference to the instrumentor module.""" + """Get the instrumentor module.""" return importlib.import_module(self.module_name) @property def should_activate(self) -> bool: - """Is the provider/library available in the environment with correct version?""" + """Check if the package is available and meets version requirements.""" try: provider_name = self.module_name.split(".")[-1] module_version = version(provider_name) - - if module_version is None: - logger.warning(f"Cannot determine {provider_name} version.") - return False - - return Version(module_version) >= parse(self.min_version) + return module_version is not None and Version(module_version) >= parse(self.min_version) except ImportError: return False def get_instance(self) -> BaseInstrumentor: - """Return a new instance of the instrumentor.""" + """Create and return a new instance of the instrumentor.""" return getattr(self.module, self.class_name)() def instrument_one(loader: InstrumentorLoader) -> Optional[BaseInstrumentor]: - """Instrument a single instrumentor.""" + """ + Instrument a single package using the provided loader. + Returns the instrumentor instance if successful, None otherwise. + """ if not loader.should_activate: logger.debug( f"Package {loader.module_name} not found or version < {loader.min_version}; skipping instrumentation" @@ -114,53 +270,27 @@ def instrument_one(loader: InstrumentorLoader) -> Optional[BaseInstrumentor]: instrumentor = loader.get_instance() instrumentor.instrument(tracer_provider=TracingCore.get_instance()._provider) logger.debug(f"Instrumented {loader.class_name}") - return instrumentor def instrument_all(): - """ - Instrument all available instrumentors. - This function is called when `instrument_llm_calls` is enabled. - """ - global _active_instrumentors - - if len(_active_instrumentors): - logger.debug("Instrumentors have already been populated.") - return - - # Get all active libraries - active_libs = get_active_libraries() - logger.debug(f"Active libraries detected: {active_libs}") - - # First check for agentic libraries - agentic_detected = False - for lib_name, lib_config in AGENTIC_LIBRARIES.items(): - if lib_name in active_libs: - loader = InstrumentorLoader(**lib_config) - if loader.should_activate: - agentic_detected = True - instrumentor = instrument_one(loader) - if instrumentor is not None: - _active_instrumentors.append(instrumentor) - - # If no agentic libraries are detected, instrument active providers - if not agentic_detected: - for provider_name, provider_config in PROVIDERS.items(): - if provider_name in active_libs: - loader = InstrumentorLoader(**provider_config) - instrumentor = instrument_one(loader) - if instrumentor is not None: - _active_instrumentors.append(instrumentor) + """Start monitoring and instrumenting packages if not already started.""" + if not _manager._active_instrumentors: + _manager.start_monitoring() def uninstrument_all(): + """Stop monitoring and uninstrument all packages.""" + _manager.uninstrument_all() + + +def get_active_libraries() -> set[str]: """ - Uninstrument all available instrumentors. - This can be called to disable instrumentation. + Get all actively used libraries in the current execution context. + Returns a set of package names that are currently imported and being monitored. """ - global _active_instrumentors - for instrumentor in _active_instrumentors: - instrumentor.uninstrument() - logger.debug(f"Uninstrumented {instrumentor.__class__.__name__}") - _active_instrumentors = [] + return { + name.split(".")[0] + for name, module in sys.modules.items() + if isinstance(module, ModuleType) and name.split(".")[0] in TARGET_PACKAGES + } From ebfde4b59aa7c137ae96042c1cfec1f707cdb01a Mon Sep 17 00:00:00 2001 From: Dwij Patel Date: Wed, 21 May 2025 00:07:08 +0530 Subject: [PATCH 3/6] Enhance instrumentation module by introducing TypedDict for instrumentor configurations, improving type safety for PROVIDERS and AGENTIC_LIBRARIES dictionaries. This change facilitates better structure and clarity in managing instrumentor settings. --- agentops/instrumentation/__init__.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/agentops/instrumentation/__init__.py b/agentops/instrumentation/__init__.py index 9f92a6d47..b822d6317 100644 --- a/agentops/instrumentation/__init__.py +++ b/agentops/instrumentation/__init__.py @@ -12,7 +12,7 @@ - Non-intrusive monitoring using Python's import system """ -from typing import Optional, Set +from typing import Optional, Set, TypedDict from types import ModuleType from dataclasses import dataclass import importlib @@ -179,8 +179,15 @@ def uninstrument_all(self): self._has_agentic_library = False +# Define the structure for instrumentor configurations +class InstrumentorConfig(TypedDict): + module_name: str + class_name: str + min_version: str + + # Configuration for supported LLM providers -PROVIDERS = { +PROVIDERS: dict[str, InstrumentorConfig] = { "openai": { "module_name": "agentops.instrumentation.openai", "class_name": "OpenAIInstrumentor", @@ -204,7 +211,7 @@ def uninstrument_all(self): } # Configuration for supported agentic libraries -AGENTIC_LIBRARIES = { +AGENTIC_LIBRARIES: dict[str, InstrumentorConfig] = { "crewai": { "module_name": "agentops.instrumentation.crewai", "class_name": "CrewAIInstrumentor", From 5f6c36afbfa4ee9316e0f97f5e1c5a1b459f35e7 Mon Sep 17 00:00:00 2001 From: Dwij Patel Date: Wed, 21 May 2025 11:44:25 +0530 Subject: [PATCH 4/6] Refactor instrumentation module to remove InstrumentationManager and implement module-level state management for active instrumentors. Introduced helper functions for package instrumentation, monitoring imports, and uninstrumenting providers, enhancing clarity and maintainability of the code. Updated instrumentation logic to handle conflicts between agentic libraries and providers more effectively. --- agentops/instrumentation/__init__.py | 275 +++++++++++++-------------- 1 file changed, 134 insertions(+), 141 deletions(-) diff --git a/agentops/instrumentation/__init__.py b/agentops/instrumentation/__init__.py index b822d6317..5f7eac54f 100644 --- a/agentops/instrumentation/__init__.py +++ b/agentops/instrumentation/__init__.py @@ -27,156 +27,148 @@ from agentops.sdk.core import TracingCore -class InstrumentationManager: +# Module-level state variables +_active_instrumentors: list[BaseInstrumentor] = [] +_original_builtins_import = builtins.__import__ # Store original import +_instrumenting_packages: Set[str] = set() +_has_agentic_library: bool = False + + +def _is_package_instrumented(package_name: str) -> bool: + """Check if a package is already instrumented by looking at active instrumentors.""" + return any( + instrumentor.__class__.__name__.lower().startswith(package_name.lower()) + for instrumentor in _active_instrumentors + ) + + +def _uninstrument_providers(): + """Uninstrument all provider instrumentors while keeping agentic libraries active.""" + global _active_instrumentors + providers_to_remove = [] + for instrumentor in _active_instrumentors: + if any(instrumentor.__class__.__name__.lower().startswith(provider.lower()) for provider in PROVIDERS.keys()): + instrumentor.uninstrument() + logger.debug(f"Uninstrumented provider {instrumentor.__class__.__name__}") + providers_to_remove.append(instrumentor) + + _active_instrumentors = [i for i in _active_instrumentors if i not in providers_to_remove] + + +def _should_instrument_package(package_name: str) -> bool: """ - Manages the instrumentation state and provides methods for monitoring and instrumenting packages. - This is implemented as a singleton to maintain consistent state across the application. + Determine if a package should be instrumented based on current state. + Handles special cases for agentic libraries and providers. """ + global _has_agentic_library + # If this is an agentic library, uninstrument all providers first + if package_name in AGENTIC_LIBRARIES: + _uninstrument_providers() + _has_agentic_library = True + logger.debug(f"Uninstrumented all providers due to agentic library {package_name} detection") + return True - def __init__(self): - # List of currently active instrumentors - self._active_instrumentors: list[BaseInstrumentor] = [] - # Store the original import function to restore it later - self._original_import = builtins.__import__ - # Track packages currently being instrumented to prevent recursion - self._instrumenting_packages: Set[str] = set() - # Flag to track if an agentic library is currently instrumented - self._has_agentic_library: bool = False - - def is_package_instrumented(self, package_name: str) -> bool: - """Check if a package is already instrumented by looking at active instrumentors.""" - return any( - instrumentor.__class__.__name__.lower().startswith(package_name.lower()) - for instrumentor in self._active_instrumentors - ) + # Skip providers if an agentic library is already instrumented + if package_name in PROVIDERS and _has_agentic_library: + logger.debug(f"Skipping provider {package_name} instrumentation as an agentic library is already instrumented") + return False - def should_instrument_package(self, package_name: str) -> bool: - """ - Determine if a package should be instrumented based on current state. - Handles special cases for agentic libraries and providers. - """ - # If this is an agentic library, uninstrument all providers first - if package_name in AGENTIC_LIBRARIES: - self.uninstrument_providers() - self._has_agentic_library = True - logger.debug(f"Uninstrumented all providers due to agentic library {package_name} detection") - return True - - # Skip providers if an agentic library is already instrumented - if package_name in PROVIDERS and self._has_agentic_library: - logger.debug( - f"Skipping provider {package_name} instrumentation as an agentic library is already instrumented" - ) - return False + # Skip if already instrumented + if _is_package_instrumented(package_name): + logger.debug(f"Package {package_name} is already instrumented") + return False - # Skip if already instrumented - if self.is_package_instrumented(package_name): - logger.debug(f"Package {package_name} is already instrumented") - return False + return True - return True - def uninstrument_providers(self): - """Uninstrument all provider instrumentors while keeping agentic libraries active.""" - providers_to_remove = [] - for instrumentor in self._active_instrumentors: - if any( - instrumentor.__class__.__name__.lower().startswith(provider.lower()) for provider in PROVIDERS.keys() - ): - instrumentor.uninstrument() - logger.debug(f"Uninstrumented provider {instrumentor.__class__.__name__}") - providers_to_remove.append(instrumentor) - - self._active_instrumentors = [i for i in self._active_instrumentors if i not in providers_to_remove] - - def _import_monitor(self, name: str, globals=None, locals=None, fromlist=(), level=0): - """ - Monitor imports and instrument packages as they are imported. - This replaces the built-in import function to intercept package imports. - """ - root = name.split(".", 1)[0] +def _perform_instrumentation(package_name: str): + """Helper function to perform instrumentation for a given package.""" + global _instrumenting_packages, _active_instrumentors + if not _should_instrument_package(package_name): + return + + # Get the appropriate configuration for the package + config = PROVIDERS.get(package_name) or AGENTIC_LIBRARIES[package_name] + loader = InstrumentorLoader(**config) + + if loader.should_activate: + instrumentor = instrument_one(loader) # instrument_one is already a module function + if instrumentor is not None: + _active_instrumentors.append(instrumentor) + + +def _import_monitor(name: str, globals_dict=None, locals_dict=None, fromlist=(), level=0): + """ + Monitor imports and instrument packages as they are imported. + This replaces the built-in import function to intercept package imports. + """ + global _instrumenting_packages + root = name.split(".", 1)[0] + + # Skip providers if an agentic library is already instrumented + if _has_agentic_library and root in PROVIDERS: + return _original_builtins_import(name, globals_dict, locals_dict, fromlist, level) + + # Check if this is a package we should instrument + if ( + root in TARGET_PACKAGES + and root not in _instrumenting_packages + and not _is_package_instrumented(root) # Check if already instrumented before adding + ): + logger.debug(f"Detected import of {root}") + _instrumenting_packages.add(root) + try: + _perform_instrumentation(root) + except Exception as e: + logger.error(f"Error instrumenting {root}: {str(e)}") + finally: + _instrumenting_packages.discard(root) + + return _original_builtins_import(name, globals_dict, locals_dict, fromlist, level) - # Skip providers if an agentic library is already instrumented - if self._has_agentic_library and root in PROVIDERS: - return self._original_import(name, globals, locals, fromlist, level) - - # Check if this is a package we should instrument - if ( - root in TARGET_PACKAGES - and root not in self._instrumenting_packages - and not self.is_package_instrumented(root) - ): - logger.debug(f"Detected import of {root}") - self._instrumenting_packages.add(root) - try: - if not self.should_instrument_package(root): - return self._original_import(name, globals, locals, fromlist, level) - # Get the appropriate configuration for the package - config = PROVIDERS.get(root) or AGENTIC_LIBRARIES[root] - loader = InstrumentorLoader(**config) +def _check_existing_imports(): + """Check and instrument packages that were already imported before monitoring started.""" + global _instrumenting_packages + for name in list(sys.modules.keys()): + module = sys.modules.get(name) + if not isinstance(module, ModuleType): + continue - if loader.should_activate: - instrumentor = instrument_one(loader) - if instrumentor is not None: - self._active_instrumentors.append(instrumentor) + root = name.split(".", 1)[0] + if _has_agentic_library and root in PROVIDERS: + continue + + if root in TARGET_PACKAGES and root not in _instrumenting_packages and not _is_package_instrumented(root): + _instrumenting_packages.add(root) + try: + _perform_instrumentation(root) except Exception as e: logger.error(f"Error instrumenting {root}: {str(e)}") finally: - self._instrumenting_packages.discard(root) - - return self._original_import(name, globals, locals, fromlist, level) - - def start_monitoring(self): - """Start monitoring imports and check already imported packages.""" - builtins.__import__ = self._import_monitor - self._check_existing_imports() - - def stop_monitoring(self): - """Stop monitoring imports and restore the original import function.""" - builtins.__import__ = self._original_import - - def _check_existing_imports(self): - """Check and instrument packages that were already imported before monitoring started.""" - for name in list(sys.modules.keys()): - module = sys.modules.get(name) - if not isinstance(module, ModuleType): - continue - - root = name.split(".", 1)[0] - if self._has_agentic_library and root in PROVIDERS: - continue - - if ( - root in TARGET_PACKAGES - and root not in self._instrumenting_packages - and not self.is_package_instrumented(root) - ): - self._instrumenting_packages.add(root) - try: - if not self.should_instrument_package(root): - continue - - config = PROVIDERS.get(root) or AGENTIC_LIBRARIES[root] - loader = InstrumentorLoader(**config) - - if loader.should_activate: - instrumentor = instrument_one(loader) - if instrumentor is not None: - self._active_instrumentors.append(instrumentor) - except Exception as e: - logger.error(f"Error instrumenting {root}: {str(e)}") - finally: - self._instrumenting_packages.discard(root) - - def uninstrument_all(self): - """Stop monitoring and uninstrument all packages.""" - self.stop_monitoring() - for instrumentor in self._active_instrumentors: - instrumentor.uninstrument() - logger.debug(f"Uninstrumented {instrumentor.__class__.__name__}") - self._active_instrumentors = [] - self._has_agentic_library = False + _instrumenting_packages.discard(root) + + +def _start_monitoring_internal(): + """Start monitoring imports and check already imported packages.""" + builtins.__import__ = _import_monitor + _check_existing_imports() + + +def _stop_monitoring_internal(): + """Stop monitoring imports and restore the original import function.""" + builtins.__import__ = _original_builtins_import + + +def _uninstrument_all_internal(): + """Stop monitoring and uninstrument all packages.""" + global _active_instrumentors, _has_agentic_library + _stop_monitoring_internal() + for instrumentor in _active_instrumentors: + instrumentor.uninstrument() + logger.debug(f"Uninstrumented {instrumentor.__class__.__name__}") + _active_instrumentors = [] + _has_agentic_library = False # Define the structure for instrumentor configurations @@ -229,7 +221,7 @@ class InstrumentorConfig(TypedDict): TARGET_PACKAGES = set(PROVIDERS.keys()) | set(AGENTIC_LIBRARIES.keys()) # Create a single instance of the manager -_manager = InstrumentationManager() +# _manager = InstrumentationManager() # Removed @dataclass @@ -282,13 +274,14 @@ def instrument_one(loader: InstrumentorLoader) -> Optional[BaseInstrumentor]: def instrument_all(): """Start monitoring and instrumenting packages if not already started.""" - if not _manager._active_instrumentors: - _manager.start_monitoring() + # Check if active_instrumentors is empty, as a proxy for not started. + if not _active_instrumentors: + _start_monitoring_internal() def uninstrument_all(): """Stop monitoring and uninstrument all packages.""" - _manager.uninstrument_all() + _uninstrument_all_internal() def get_active_libraries() -> set[str]: From ed1abebd848026d1a00138b685772a87383ff798 Mon Sep 17 00:00:00 2001 From: Dwij Patel Date: Wed, 21 May 2025 12:01:21 +0530 Subject: [PATCH 5/6] Refactor instrumentation module to streamline monitoring and uninstrumenting processes. Removed internal helper functions for starting and stopping monitoring, integrating their logic directly into the `instrument_all` and `uninstrument_all` functions for improved clarity and maintainability. --- agentops/instrumentation/__init__.py | 33 ++++++++-------------------- 1 file changed, 9 insertions(+), 24 deletions(-) diff --git a/agentops/instrumentation/__init__.py b/agentops/instrumentation/__init__.py index 5f7eac54f..4e4e9c99e 100644 --- a/agentops/instrumentation/__init__.py +++ b/agentops/instrumentation/__init__.py @@ -149,28 +149,6 @@ def _check_existing_imports(): _instrumenting_packages.discard(root) -def _start_monitoring_internal(): - """Start monitoring imports and check already imported packages.""" - builtins.__import__ = _import_monitor - _check_existing_imports() - - -def _stop_monitoring_internal(): - """Stop monitoring imports and restore the original import function.""" - builtins.__import__ = _original_builtins_import - - -def _uninstrument_all_internal(): - """Stop monitoring and uninstrument all packages.""" - global _active_instrumentors, _has_agentic_library - _stop_monitoring_internal() - for instrumentor in _active_instrumentors: - instrumentor.uninstrument() - logger.debug(f"Uninstrumented {instrumentor.__class__.__name__}") - _active_instrumentors = [] - _has_agentic_library = False - - # Define the structure for instrumentor configurations class InstrumentorConfig(TypedDict): module_name: str @@ -276,12 +254,19 @@ def instrument_all(): """Start monitoring and instrumenting packages if not already started.""" # Check if active_instrumentors is empty, as a proxy for not started. if not _active_instrumentors: - _start_monitoring_internal() + builtins.__import__ = _import_monitor + _check_existing_imports() def uninstrument_all(): """Stop monitoring and uninstrument all packages.""" - _uninstrument_all_internal() + global _active_instrumentors, _has_agentic_library + builtins.__import__ = _original_builtins_import + for instrumentor in _active_instrumentors: + instrumentor.uninstrument() + logger.debug(f"Uninstrumented {instrumentor.__class__.__name__}") + _active_instrumentors = [] + _has_agentic_library = False def get_active_libraries() -> set[str]: From 9fdbdb5e5fad78ed4dfa8023d57e1942f79c9323 Mon Sep 17 00:00:00 2001 From: Dwij Patel Date: Wed, 21 May 2025 12:05:28 +0530 Subject: [PATCH 6/6] Refactor instrumentation module by integrating the logic for checking existing imports directly into the `instrument_all` function. This change removes the now-unnecessary `_check_existing_imports` helper function, enhancing code clarity and maintainability while preserving the functionality of monitoring already imported packages. --- agentops/instrumentation/__init__.py | 41 ++++++++++++---------------- 1 file changed, 18 insertions(+), 23 deletions(-) diff --git a/agentops/instrumentation/__init__.py b/agentops/instrumentation/__init__.py index 4e4e9c99e..70017743b 100644 --- a/agentops/instrumentation/__init__.py +++ b/agentops/instrumentation/__init__.py @@ -127,28 +127,6 @@ def _import_monitor(name: str, globals_dict=None, locals_dict=None, fromlist=(), return _original_builtins_import(name, globals_dict, locals_dict, fromlist, level) -def _check_existing_imports(): - """Check and instrument packages that were already imported before monitoring started.""" - global _instrumenting_packages - for name in list(sys.modules.keys()): - module = sys.modules.get(name) - if not isinstance(module, ModuleType): - continue - - root = name.split(".", 1)[0] - if _has_agentic_library and root in PROVIDERS: - continue - - if root in TARGET_PACKAGES and root not in _instrumenting_packages and not _is_package_instrumented(root): - _instrumenting_packages.add(root) - try: - _perform_instrumentation(root) - except Exception as e: - logger.error(f"Error instrumenting {root}: {str(e)}") - finally: - _instrumenting_packages.discard(root) - - # Define the structure for instrumentor configurations class InstrumentorConfig(TypedDict): module_name: str @@ -255,7 +233,24 @@ def instrument_all(): # Check if active_instrumentors is empty, as a proxy for not started. if not _active_instrumentors: builtins.__import__ = _import_monitor - _check_existing_imports() + global _instrumenting_packages + for name in list(sys.modules.keys()): + module = sys.modules.get(name) + if not isinstance(module, ModuleType): + continue + + root = name.split(".", 1)[0] + if _has_agentic_library and root in PROVIDERS: + continue + + if root in TARGET_PACKAGES and root not in _instrumenting_packages and not _is_package_instrumented(root): + _instrumenting_packages.add(root) + try: + _perform_instrumentation(root) + except Exception as e: + logger.error(f"Error instrumenting {root}: {str(e)}") + finally: + _instrumenting_packages.discard(root) def uninstrument_all():