Skip to content
Merged
302 changes: 219 additions & 83 deletions agentops/instrumentation/__init__.py
Original file line number Diff line number Diff line change
@@ -1,140 +1,276 @@
from typing import Optional
"""
AgentOps Instrumentation Module

This module provides automatic instrumentation for various LLM providers and agentic libraries.
It works by monitoring Python imports and automatically instrumenting packages as they are imported.

Key Features:
- Automatic detection and instrumentation of LLM providers (OpenAI, Anthropic, etc.)
- Support for agentic libraries (CrewAI, AutoGen, etc.)
- Version-aware instrumentation (only activates for supported versions)
- Smart handling of provider vs agentic library conflicts
- Non-intrusive monitoring using Python's import system
"""

from typing import Optional, Set, TypedDict
from types import ModuleType
from dataclasses import dataclass
import importlib
import sys
from importlib.metadata import version
from packaging.version import Version, parse
import builtins

from opentelemetry.instrumentation.instrumentor import BaseInstrumentor # type: ignore

from agentops.logging import logger
from agentops.sdk.core import TracingCore


# references to all active instrumentors
# Module-level state variables
_active_instrumentors: list[BaseInstrumentor] = []
_original_builtins_import = builtins.__import__ # Store original import
_instrumenting_packages: Set[str] = set()
_has_agentic_library: bool = False


@dataclass
class InstrumentorLoader:
def _is_package_instrumented(package_name: str) -> bool:
"""Check if a package is already instrumented by looking at active instrumentors."""
return any(
instrumentor.__class__.__name__.lower().startswith(package_name.lower())
for instrumentor in _active_instrumentors
)


def _uninstrument_providers():
"""Uninstrument all provider instrumentors while keeping agentic libraries active."""
global _active_instrumentors
providers_to_remove = []
for instrumentor in _active_instrumentors:
if any(instrumentor.__class__.__name__.lower().startswith(provider.lower()) for provider in PROVIDERS.keys()):
instrumentor.uninstrument()
logger.debug(f"Uninstrumented provider {instrumentor.__class__.__name__}")
providers_to_remove.append(instrumentor)

_active_instrumentors = [i for i in _active_instrumentors if i not in providers_to_remove]


def _should_instrument_package(package_name: str) -> bool:
"""
Represents a dynamically-loadable instrumentor.
Determine if a package should be instrumented based on current state.
Handles special cases for agentic libraries and providers.
"""
global _has_agentic_library
# If this is an agentic library, uninstrument all providers first
if package_name in AGENTIC_LIBRARIES:
_uninstrument_providers()
_has_agentic_library = True
logger.debug(f"Uninstrumented all providers due to agentic library {package_name} detection")
return True

# Skip providers if an agentic library is already instrumented
if package_name in PROVIDERS and _has_agentic_library:
logger.debug(f"Skipping provider {package_name} instrumentation as an agentic library is already instrumented")
return False

Check warning on line 74 in agentops/instrumentation/__init__.py

View check run for this annotation

Codecov / codecov/patch

agentops/instrumentation/__init__.py#L73-L74

Added lines #L73 - L74 were not covered by tests

# Skip if already instrumented
if _is_package_instrumented(package_name):
logger.debug(f"Package {package_name} is already instrumented")
return False

Check warning on line 79 in agentops/instrumentation/__init__.py

View check run for this annotation

Codecov / codecov/patch

agentops/instrumentation/__init__.py#L78-L79

Added lines #L78 - L79 were not covered by tests

return True


def _perform_instrumentation(package_name: str):
"""Helper function to perform instrumentation for a given package."""
global _instrumenting_packages, _active_instrumentors
if not _should_instrument_package(package_name):
return

Check warning on line 88 in agentops/instrumentation/__init__.py

View check run for this annotation

Codecov / codecov/patch

agentops/instrumentation/__init__.py#L88

Added line #L88 was not covered by tests

# Get the appropriate configuration for the package
config = PROVIDERS.get(package_name) or AGENTIC_LIBRARIES[package_name]
loader = InstrumentorLoader(**config)

if loader.should_activate:
instrumentor = instrument_one(loader) # instrument_one is already a module function
if instrumentor is not None:
_active_instrumentors.append(instrumentor)


def _import_monitor(name: str, globals_dict=None, locals_dict=None, fromlist=(), level=0):
"""
Monitor imports and instrument packages as they are imported.
This replaces the built-in import function to intercept package imports.
"""
global _instrumenting_packages
root = name.split(".", 1)[0]

# Skip providers if an agentic library is already instrumented
if _has_agentic_library and root in PROVIDERS:
return _original_builtins_import(name, globals_dict, locals_dict, fromlist, level)

Check warning on line 110 in agentops/instrumentation/__init__.py

View check run for this annotation

Codecov / codecov/patch

agentops/instrumentation/__init__.py#L110

Added line #L110 was not covered by tests

# Check if this is a package we should instrument
if (
root in TARGET_PACKAGES
and root not in _instrumenting_packages
and not _is_package_instrumented(root) # Check if already instrumented before adding
):
logger.debug(f"Detected import of {root}")
_instrumenting_packages.add(root)
try:
_perform_instrumentation(root)
except Exception as e:
logger.error(f"Error instrumenting {root}: {str(e)}")

Check warning on line 123 in agentops/instrumentation/__init__.py

View check run for this annotation

Codecov / codecov/patch

agentops/instrumentation/__init__.py#L118-L123

Added lines #L118 - L123 were not covered by tests
finally:
_instrumenting_packages.discard(root)

Check warning on line 125 in agentops/instrumentation/__init__.py

View check run for this annotation

Codecov / codecov/patch

agentops/instrumentation/__init__.py#L125

Added line #L125 was not covered by tests

return _original_builtins_import(name, globals_dict, locals_dict, fromlist, level)

This class is used to load and activate instrumentors based on their module
and class names.
We use the `provider_import_name` to determine if the library is installed i
n the environment.

`module_name` is the name of the module to import from.
`class_name` is the name of the class to instantiate from the module.
`provider_import_name` is the name of the package to check for availability.
# Define the structure for instrumentor configurations
class InstrumentorConfig(TypedDict):
module_name: str
class_name: str
min_version: str


# Configuration for supported LLM providers
PROVIDERS: dict[str, InstrumentorConfig] = {
"openai": {
"module_name": "agentops.instrumentation.openai",
"class_name": "OpenAIInstrumentor",
"min_version": "1.0.0",
},
"anthropic": {
"module_name": "agentops.instrumentation.anthropic",
"class_name": "AnthropicInstrumentor",
"min_version": "0.32.0",
},
"google.genai": {
"module_name": "agentops.instrumentation.google_generativeai",
"class_name": "GoogleGenerativeAIInstrumentor",
"min_version": "0.1.0",
},
"ibm_watsonx_ai": {
"module_name": "agentops.instrumentation.ibm_watsonx_ai",
"class_name": "IBMWatsonXInstrumentor",
"min_version": "0.1.0",
},
}

# Configuration for supported agentic libraries
AGENTIC_LIBRARIES: dict[str, InstrumentorConfig] = {
"crewai": {
"module_name": "agentops.instrumentation.crewai",
"class_name": "CrewAIInstrumentor",
"min_version": "0.56.0",
},
"autogen": {"module_name": "agentops.instrumentation.ag2", "class_name": "AG2Instrumentor", "min_version": "0.1.0"},
"agents": {
"module_name": "agentops.instrumentation.openai_agents",
"class_name": "OpenAIAgentsInstrumentor",
"min_version": "0.1.0",
},
}

# Combine all target packages for monitoring
TARGET_PACKAGES = set(PROVIDERS.keys()) | set(AGENTIC_LIBRARIES.keys())

# Create a single instance of the manager
# _manager = InstrumentationManager() # Removed


@dataclass
class InstrumentorLoader:
"""
Represents a dynamically-loadable instrumentor.
Handles version checking and instantiation of instrumentors.
"""

module_name: str
class_name: str
provider_import_name: str
min_version: str

@property
def module(self) -> ModuleType:
"""Reference to the instrumentor module."""
"""Get the instrumentor module."""
return importlib.import_module(self.module_name)

@property
def should_activate(self) -> bool:
"""Is the provider import available in the environment?"""
"""Check if the package is available and meets version requirements."""
try:
importlib.import_module(self.provider_import_name)
return True
provider_name = self.module_name.split(".")[-1]
module_version = version(provider_name)
return module_version is not None and Version(module_version) >= parse(self.min_version)
except ImportError:
return False

def get_instance(self) -> BaseInstrumentor:
"""Return a new instance of the instrumentor."""
"""Create and return a new instance of the instrumentor."""
return getattr(self.module, self.class_name)()


available_instrumentors: list[InstrumentorLoader] = [
InstrumentorLoader(
module_name="agentops.instrumentation.openai",
class_name="OpenAIInstrumentor",
provider_import_name="openai",
),
InstrumentorLoader(
module_name="agentops.instrumentation.anthropic",
class_name="AnthropicInstrumentor",
provider_import_name="anthropic",
),
InstrumentorLoader(
module_name="agentops.instrumentation.crewai",
class_name="CrewAIInstrumentor",
provider_import_name="crewai",
),
InstrumentorLoader(
module_name="agentops.instrumentation.openai_agents",
class_name="OpenAIAgentsInstrumentor",
provider_import_name="agents",
),
InstrumentorLoader(
module_name="agentops.instrumentation.google_generativeai",
class_name="GoogleGenerativeAIInstrumentor",
provider_import_name="google.genai",
),
InstrumentorLoader(
module_name="agentops.instrumentation.ibm_watsonx_ai",
class_name="IBMWatsonXInstrumentor",
provider_import_name="ibm_watsonx_ai",
),
InstrumentorLoader(
module_name="agentops.instrumentation.ag2",
class_name="AG2Instrumentor",
provider_import_name="autogen",
),
]


def instrument_one(loader: InstrumentorLoader) -> Optional[BaseInstrumentor]:
"""Instrument a single instrumentor."""
"""
Instrument a single package using the provided loader.
Returns the instrumentor instance if successful, None otherwise.
"""
if not loader.should_activate:
# this package is not in the environment; skip
logger.debug(
f"Package {loader.provider_import_name} not found; skipping instrumentation of {loader.class_name}"
f"Package {loader.module_name} not found or version < {loader.min_version}; skipping instrumentation"
)
return None

instrumentor = loader.get_instance()
instrumentor.instrument(tracer_provider=TracingCore.get_instance()._provider)
logger.debug(f"Instrumented {loader.class_name}")

return instrumentor


def instrument_all():
"""
Instrument all available instrumentors.
This function is called when `instrument_llm_calls` is enabled.
"""
global _active_instrumentors
"""Start monitoring and instrumenting packages if not already started."""
# Check if active_instrumentors is empty, as a proxy for not started.
if not _active_instrumentors:
builtins.__import__ = _import_monitor
global _instrumenting_packages
for name in list(sys.modules.keys()):
module = sys.modules.get(name)
if not isinstance(module, ModuleType):
continue

if len(_active_instrumentors):
logger.debug("Instrumentors have already been populated.")
return

for loader in available_instrumentors:
if loader.class_name in _active_instrumentors:
# already instrumented
logger.debug(f"Instrumentor {loader.class_name} has already been instrumented.")
return None
root = name.split(".", 1)[0]
if _has_agentic_library and root in PROVIDERS:
continue

instrumentor = instrument_one(loader)
if instrumentor is not None:
_active_instrumentors.append(instrumentor)
if root in TARGET_PACKAGES and root not in _instrumenting_packages and not _is_package_instrumented(root):
_instrumenting_packages.add(root)
try:
_perform_instrumentation(root)
except Exception as e:
logger.error(f"Error instrumenting {root}: {str(e)}")

Check warning on line 251 in agentops/instrumentation/__init__.py

View check run for this annotation

Codecov / codecov/patch

agentops/instrumentation/__init__.py#L250-L251

Added lines #L250 - L251 were not covered by tests
finally:
_instrumenting_packages.discard(root)


def uninstrument_all():
"""
Uninstrument all available instrumentors.
This can be called to disable instrumentation.
"""
global _active_instrumentors
"""Stop monitoring and uninstrument all packages."""
global _active_instrumentors, _has_agentic_library
builtins.__import__ = _original_builtins_import

Check warning on line 259 in agentops/instrumentation/__init__.py

View check run for this annotation

Codecov / codecov/patch

agentops/instrumentation/__init__.py#L259

Added line #L259 was not covered by tests
for instrumentor in _active_instrumentors:
instrumentor.uninstrument()
logger.debug(f"Uninstrumented {instrumentor.__class__.__name__}")
_active_instrumentors = []
_has_agentic_library = False

Check warning on line 264 in agentops/instrumentation/__init__.py

View check run for this annotation

Codecov / codecov/patch

agentops/instrumentation/__init__.py#L264

Added line #L264 was not covered by tests


def get_active_libraries() -> set[str]:
"""
Get all actively used libraries in the current execution context.
Returns a set of package names that are currently imported and being monitored.
"""
return {

Check warning on line 272 in agentops/instrumentation/__init__.py

View check run for this annotation

Codecov / codecov/patch

agentops/instrumentation/__init__.py#L272

Added line #L272 was not covered by tests
name.split(".")[0]
for name, module in sys.modules.items()
if isinstance(module, ModuleType) and name.split(".")[0] in TARGET_PACKAGES
}
Loading