From bfa20ef8164086a0bc59694a1f22c51bd14d71dc Mon Sep 17 00:00:00 2001 From: Teo Date: Fri, 14 Mar 2025 21:03:34 +0200 Subject: [PATCH 01/16] Client.init() | auto_start_session | forward tags Signed-off-by: Teo --- agentops/client/client.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/agentops/client/client.py b/agentops/client/client.py index b0a18f1ca..76a659353 100644 --- a/agentops/client/client.py +++ b/agentops/client/client.py @@ -2,10 +2,12 @@ from agentops.client.api import ApiClient from agentops.config import Config -from agentops.exceptions import AgentOpsClientNotInitializedException, NoApiKeyException, NoSessionException +from agentops.exceptions import (AgentOpsClientNotInitializedException, + NoApiKeyException, NoSessionException) from agentops.instrumentation import instrument_all from agentops.logging import logger -from agentops.logging.config import configure_logging, intercept_opentelemetry_logging +from agentops.logging.config import (configure_logging, + intercept_opentelemetry_logging) from agentops.sdk.core import TracingCore @@ -59,7 +61,7 @@ def init(self, **kwargs): if self.config.auto_start_session: from agentops.legacy import start_session - start_session() + start_session(tags=list(self.config.default_tags)) def configure(self, **kwargs): """Update client configuration""" From 7f0c932a6209a7763c384eb3e17a04712261b67e Mon Sep 17 00:00:00 2001 From: Teo Date: Fri, 14 Mar 2025 21:07:21 +0200 Subject: [PATCH 02/16] client: recreate Config on init() Signed-off-by: Teo --- agentops/client/client.py | 2 ++ test_config_recreation.py | 70 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 72 insertions(+) create mode 100644 test_config_recreation.py diff --git a/agentops/client/client.py b/agentops/client/client.py index 76a659353..a9a60c9c0 100644 --- a/agentops/client/client.py +++ b/agentops/client/client.py @@ -31,6 +31,8 @@ def __init__(self): self.config = Config() def init(self, **kwargs): + # Recreate the Config object to parse environment variables at the time of initialization + self.config = Config() self.configure(**kwargs) if not self.config.api_key: diff --git a/test_config_recreation.py b/test_config_recreation.py new file mode 100644 index 000000000..122845e9a --- /dev/null +++ b/test_config_recreation.py @@ -0,0 +1,70 @@ +import os +import pytest +from unittest.mock import patch + +import agentops +from agentops.config import Config + + +@pytest.fixture(autouse=True) +def reset_environment(): + """Save and restore environment variables around the test.""" + # Save original environment variables + original_vars = { + "AGENTOPS_API_KEY": os.environ.get("AGENTOPS_API_KEY"), + "AGENTOPS_API_ENDPOINT": os.environ.get("AGENTOPS_API_ENDPOINT"), + "AGENTOPS_MAX_WAIT_TIME": os.environ.get("AGENTOPS_MAX_WAIT_TIME"), + "AGENTOPS_INSTRUMENT_LLM_CALLS": os.environ.get("AGENTOPS_INSTRUMENT_LLM_CALLS") + } + + # Reset the client before the test + agentops._client = agentops.Client() + agentops._client._initialized = False + + yield + + # Restore original environment variables + for key, value in original_vars.items(): + if value is not None: + os.environ[key] = value + elif key in os.environ: + del os.environ[key] + + +def test_config_recreation_on_init(): + """Test that agentops.init() recreates the Config object and parses environment variables.""" + # Set initial environment variables + os.environ["AGENTOPS_API_KEY"] = "test-key-1" + os.environ["AGENTOPS_API_ENDPOINT"] = "https://test-endpoint-1.com" + os.environ["AGENTOPS_MAX_WAIT_TIME"] = "1000" + os.environ["AGENTOPS_INSTRUMENT_LLM_CALLS"] = "false" + + # Create a new client to pick up the initial environment variables + agentops._client = agentops.Client() + + # Verify initial environment variables are picked up + assert agentops._client.config.api_key == "test-key-1" + assert agentops._client.config.endpoint == "https://test-endpoint-1.com" + assert agentops._client.config.max_wait_time == 1000 + assert agentops._client.config.instrument_llm_calls is False + + # Change environment variables + os.environ["AGENTOPS_API_KEY"] = "test-key-2" + os.environ["AGENTOPS_API_ENDPOINT"] = "https://test-endpoint-2.com" + os.environ["AGENTOPS_MAX_WAIT_TIME"] = "2000" + os.environ["AGENTOPS_INSTRUMENT_LLM_CALLS"] = "true" + + # Without calling init(), the config should still have the old values + assert agentops._client.config.api_key == "test-key-1" + assert agentops._client.config.endpoint == "https://test-endpoint-1.com" + assert agentops._client.config.max_wait_time == 1000 + assert agentops._client.config.instrument_llm_calls is False + + # Mock the initialization process to avoid actual API calls + with patch.object(agentops._client, 'init', side_effect=lambda **kwargs: setattr(agentops._client, 'config', Config())): + agentops.init() + # After calling init(), the config should have the new values + assert agentops._client.config.api_key == "test-key-2" + assert agentops._client.config.endpoint == "https://test-endpoint-2.com" + assert agentops._client.config.max_wait_time == 2000 + assert agentops._client.config.instrument_llm_calls is True From e1ce7a57f4faa8969c8fcdaeba978e316d71253e Mon Sep 17 00:00:00 2001 From: Teo Date: Sat, 15 Mar 2025 00:25:37 +0200 Subject: [PATCH 03/16] mock_req: /v3/auth/token to return { project_id, token, api_key } Signed-off-by: Teo --- tests/unit/conftest.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py index 14f7c1bc6..742c1c89f 100644 --- a/tests/unit/conftest.py +++ b/tests/unit/conftest.py @@ -6,11 +6,11 @@ import pytest import requests_mock -from tests.unit.sdk.instrumentation_tester import InstrumentationTester import agentops from agentops.config import Config from tests.fixtures.client import * # noqa +from tests.unit.sdk.instrumentation_tester import InstrumentationTester @pytest.fixture @@ -26,13 +26,14 @@ def endpoint() -> str: @pytest.fixture(autouse=True) -def mock_req(endpoint): +def mock_req(endpoint, api_key): """ Mocks AgentOps backend API requests. """ with requests_mock.Mocker(real_http=False) as m: # Map session IDs to their JWTs - m.post(endpoint + "/v3/auth/token", json={"token": str(uuid.uuid4())}) + m.post(endpoint + "/v3/auth/token", json={"token": str(uuid.uuid4()), + "project_id": "test-project-id", "api_key": api_key}) yield m From dc465d0d65793119c701f07a9fa2beb74c3964fa Mon Sep 17 00:00:00 2001 From: Teo Date: Sat, 15 Mar 2025 00:27:34 +0200 Subject: [PATCH 04/16] cleanup dirty files Signed-off-by: Teo --- conftest.py | 32 ------------- test_config_recreation.py | 70 ----------------------------- test_context.py | 62 ------------------------- test_context_comparison.py | 92 -------------------------------------- test_nesting.py | 32 ------------- 5 files changed, 288 deletions(-) delete mode 100644 conftest.py delete mode 100644 test_config_recreation.py delete mode 100644 test_context.py delete mode 100644 test_context_comparison.py delete mode 100644 test_nesting.py diff --git a/conftest.py b/conftest.py deleted file mode 100644 index 954ca8f7f..000000000 --- a/conftest.py +++ /dev/null @@ -1,32 +0,0 @@ -""" -Shared fixtures for pytest tests. -""" - -import pytest -from unittest.mock import MagicMock, patch - -from opentelemetry.trace import Span - - -@pytest.fixture -def mock_span(): - """Fixture to create a mock span with a trace ID.""" - span = MagicMock(spec=Span) - span.get_span_context.return_value.trace_id = 123456789 - return span - - -@pytest.fixture -def mock_context_deps(): - """Fixture to mock the context dependencies.""" - with ( - patch("agentops.sdk.decorators.context_utils.context") as mock_context, - patch("agentops.sdk.decorators.context_utils.trace") as mock_trace, - patch("agentops.sdk.decorators.context_utils.logger") as mock_logger, - ): - # Set up the mocks - mock_context.get_current.return_value = "current_context" - mock_trace.set_span_in_context.return_value = "new_context" - mock_context.attach.return_value = "token" - - yield {"context": mock_context, "trace": mock_trace, "logger": mock_logger} diff --git a/test_config_recreation.py b/test_config_recreation.py deleted file mode 100644 index 122845e9a..000000000 --- a/test_config_recreation.py +++ /dev/null @@ -1,70 +0,0 @@ -import os -import pytest -from unittest.mock import patch - -import agentops -from agentops.config import Config - - -@pytest.fixture(autouse=True) -def reset_environment(): - """Save and restore environment variables around the test.""" - # Save original environment variables - original_vars = { - "AGENTOPS_API_KEY": os.environ.get("AGENTOPS_API_KEY"), - "AGENTOPS_API_ENDPOINT": os.environ.get("AGENTOPS_API_ENDPOINT"), - "AGENTOPS_MAX_WAIT_TIME": os.environ.get("AGENTOPS_MAX_WAIT_TIME"), - "AGENTOPS_INSTRUMENT_LLM_CALLS": os.environ.get("AGENTOPS_INSTRUMENT_LLM_CALLS") - } - - # Reset the client before the test - agentops._client = agentops.Client() - agentops._client._initialized = False - - yield - - # Restore original environment variables - for key, value in original_vars.items(): - if value is not None: - os.environ[key] = value - elif key in os.environ: - del os.environ[key] - - -def test_config_recreation_on_init(): - """Test that agentops.init() recreates the Config object and parses environment variables.""" - # Set initial environment variables - os.environ["AGENTOPS_API_KEY"] = "test-key-1" - os.environ["AGENTOPS_API_ENDPOINT"] = "https://test-endpoint-1.com" - os.environ["AGENTOPS_MAX_WAIT_TIME"] = "1000" - os.environ["AGENTOPS_INSTRUMENT_LLM_CALLS"] = "false" - - # Create a new client to pick up the initial environment variables - agentops._client = agentops.Client() - - # Verify initial environment variables are picked up - assert agentops._client.config.api_key == "test-key-1" - assert agentops._client.config.endpoint == "https://test-endpoint-1.com" - assert agentops._client.config.max_wait_time == 1000 - assert agentops._client.config.instrument_llm_calls is False - - # Change environment variables - os.environ["AGENTOPS_API_KEY"] = "test-key-2" - os.environ["AGENTOPS_API_ENDPOINT"] = "https://test-endpoint-2.com" - os.environ["AGENTOPS_MAX_WAIT_TIME"] = "2000" - os.environ["AGENTOPS_INSTRUMENT_LLM_CALLS"] = "true" - - # Without calling init(), the config should still have the old values - assert agentops._client.config.api_key == "test-key-1" - assert agentops._client.config.endpoint == "https://test-endpoint-1.com" - assert agentops._client.config.max_wait_time == 1000 - assert agentops._client.config.instrument_llm_calls is False - - # Mock the initialization process to avoid actual API calls - with patch.object(agentops._client, 'init', side_effect=lambda **kwargs: setattr(agentops._client, 'config', Config())): - agentops.init() - # After calling init(), the config should have the new values - assert agentops._client.config.api_key == "test-key-2" - assert agentops._client.config.endpoint == "https://test-endpoint-2.com" - assert agentops._client.config.max_wait_time == 2000 - assert agentops._client.config.instrument_llm_calls is True diff --git a/test_context.py b/test_context.py deleted file mode 100644 index afaf49b18..000000000 --- a/test_context.py +++ /dev/null @@ -1,62 +0,0 @@ -#!/usr/bin/env python -""" -Test script to debug OpenTelemetry context propagation issues. -""" -import time -from opentelemetry import trace -from opentelemetry.sdk.trace import TracerProvider -from opentelemetry.sdk.trace.export import ConsoleSpanExporter, BatchSpanProcessor -from agentops.sdk.decorators import agent, task, operation -from agentops.sdk.core import TracingCore -from agentops.client.client import Client -from agentops.sdk.decorators.utility import _get_current_span_info -from agentops.logging import logger - -# Initialize tracing -client = Client() # Use default initialization -client.init() # This should set up TracingCore - -# Add a console exporter for local debugging -provider = trace.get_tracer_provider() -if hasattr(provider, "add_span_processor"): - provider.add_span_processor(BatchSpanProcessor(ConsoleSpanExporter())) - -@agent -def my_agent(): - """Test agent function that should create a parent span""" - logger.debug(f"In my_agent - current span: {_get_current_span_info()}") - - # Call the task inside the agent - result = my_task() - - # Also explicitly call operation with a context manager - tracer = TracingCore.get_instance().get_tracer() - with tracer.start_as_current_span("manual_operation") as manual_span: - manual_span.set_attribute("manual", True) - logger.debug(f"In manual operation - current span: {_get_current_span_info()}") - time.sleep(0.1) - - return result - -@task -def my_task(): - """Test task function that should create a child span under the agent span""" - logger.debug(f"In my_task - current span: {_get_current_span_info()}") - - # Call a nested operation - return my_operation() - -@operation -def my_operation(): - """Test operation that should be nested under the task span""" - logger.debug(f"In my_operation - current span: {_get_current_span_info()}") - time.sleep(0.1) - return "done" - -if __name__ == "__main__": - # Run the test - result = my_agent() - print(f"Result: {result}") - - # Give the batch processor time to export - time.sleep(1) \ No newline at end of file diff --git a/test_context_comparison.py b/test_context_comparison.py deleted file mode 100644 index db2de3825..000000000 --- a/test_context_comparison.py +++ /dev/null @@ -1,92 +0,0 @@ -#!/usr/bin/env python -""" -Test script to compare the old and new context management approaches. -""" -import time -from opentelemetry import trace, context as context_api -from opentelemetry.sdk.trace import TracerProvider -from opentelemetry.sdk.trace.export import ConsoleSpanExporter, BatchSpanProcessor -from agentops.sdk.decorators import agent, task, operation -from agentops.sdk.core import TracingCore -from agentops.client.client import Client -from agentops.sdk.decorators.utility import (_get_current_span_info, _make_span, - _finalize_span, _create_as_current_span) -from agentops.logging import logger - -# Initialize tracing -client = Client() -client.init() - -# Add a console exporter for local debugging -provider = trace.get_tracer_provider() -if hasattr(provider, "add_span_processor"): - provider.add_span_processor(BatchSpanProcessor(ConsoleSpanExporter())) - -def test_manual_context(): - """Test using the manual context management approach""" - logger.debug("===== TESTING MANUAL CONTEXT APPROACH =====") - - # Create the root span - root_span, root_ctx, root_token = _make_span("root", "test") - logger.debug(f"Created root span: {_get_current_span_info()}") - - try: - # Create a child span - child_span, child_ctx, child_token = _make_span("child", "test") - logger.debug(f"Created child span: {_get_current_span_info()}") - - try: - # Create a grandchild span - grandchild_span, grandchild_ctx, grandchild_token = _make_span("grandchild", "test") - logger.debug(f"Created grandchild span: {_get_current_span_info()}") - - # Do some work - time.sleep(0.1) - - # End the grandchild span - _finalize_span(grandchild_span, grandchild_token) - logger.debug(f"After ending grandchild span: {_get_current_span_info()}") - - finally: - # End the child span - _finalize_span(child_span, child_token) - logger.debug(f"After ending child span: {_get_current_span_info()}") - - finally: - # End the root span - _finalize_span(root_span, root_token) - logger.debug(f"After ending root span: {_get_current_span_info()}") - -def test_context_manager(): - """Test using the context manager approach""" - logger.debug("===== TESTING CONTEXT MANAGER APPROACH =====") - - # Get a tracer - tracer = TracingCore.get_instance().get_tracer() - - # Create spans using context manager (native OpenTelemetry approach) - with _create_as_current_span("root", "test") as root_span: - logger.debug(f"Created root span: {_get_current_span_info()}") - - with _create_as_current_span("child", "test") as child_span: - logger.debug(f"Created child span: {_get_current_span_info()}") - - with _create_as_current_span("grandchild", "test") as grandchild_span: - logger.debug(f"Created grandchild span: {_get_current_span_info()}") - - # Do some work - time.sleep(0.1) - - logger.debug(f"After grandchild span: {_get_current_span_info()}") - - logger.debug(f"After child span: {_get_current_span_info()}") - - logger.debug(f"After root span: {_get_current_span_info()}") - -if __name__ == "__main__": - # Test both approaches - test_manual_context() - test_context_manager() - - # Give the batch processor time to export - time.sleep(1) \ No newline at end of file diff --git a/test_nesting.py b/test_nesting.py deleted file mode 100644 index 5cf686b4e..000000000 --- a/test_nesting.py +++ /dev/null @@ -1,32 +0,0 @@ -import time -from agentops.sdk.decorators import agent, operation -from agentops.sdk.core import TracingCore - -# Initialize tracing -TracingCore.get_instance().initialize() - -@operation -def perform_operation(task_name): - """A simple operation that will be nested within an agent.""" - print(f"Performing operation: {task_name}") - time.sleep(0.5) # Simulate work - return f"Completed {task_name}" - -@agent -def run_agent(agent_name): - """An agent that will contain nested operations.""" - print(f"Agent {agent_name} is running") - - # Perform multiple operations - result1 = perform_operation("task1") - result2 = perform_operation("task2") - - return f"Agent {agent_name} completed with results: {result1}, {result2}" - -if __name__ == "__main__": - # Run the agent which will contain nested operations - result = run_agent("TestAgent") - print(f"Final result: {result}") - - # Give time for spans to be exported - time.sleep(1) \ No newline at end of file From 693205b1bc8758e377b21c6b76e36e99cac37cbd Mon Sep 17 00:00:00 2001 From: Teo Date: Sat, 15 Mar 2025 01:26:02 +0200 Subject: [PATCH 05/16] Isolate telemetry setup (`setup_telemetry`) Signed-off-by: Teo --- agentops/sdk/core.py | 134 +++++++++++++++++++++++++++---------------- 1 file changed, 86 insertions(+), 48 deletions(-) diff --git a/agentops/sdk/core.py b/agentops/sdk/core.py index 26882ee4e..55064b7b2 100644 --- a/agentops/sdk/core.py +++ b/agentops/sdk/core.py @@ -5,8 +5,10 @@ from typing import List, Optional from opentelemetry import metrics, trace -from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter -from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter +from opentelemetry.exporter.otlp.proto.http.metric_exporter import \ + OTLPMetricExporter +from opentelemetry.exporter.otlp.proto.http.trace_exporter import \ + OTLPSpanExporter from opentelemetry.sdk.metrics import MeterProvider from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader from opentelemetry.sdk.resources import Resource @@ -20,6 +22,76 @@ from agentops.semconv import ResourceAttributes # No need to create shortcuts since we're using our own ResourceAttributes class now +# + + +def setup_telemetry( + service_name: str = "agentops", + project_id: Optional[str] = None, + exporter_endpoint: str = "https://otlp.agentops.ai/v1/traces", + metrics_endpoint: str = "https://otlp.agentops.ai/v1/metrics", + max_queue_size: int = 512, + max_wait_time: int = 5000, + jwt: Optional[str] = None, +) -> tuple[TracerProvider, MeterProvider]: + """ + Setup the telemetry system. + + Args: + service_name: Name of the OpenTelemetry service + project_id: Project ID to include in resource attributes + exporter_endpoint: Endpoint for the span exporter + metrics_endpoint: Endpoint for the metrics exporter + max_queue_size: Maximum number of spans to queue before forcing a flush + max_wait_time: Maximum time in milliseconds to wait before flushing + jwt: JWT token for authentication + + Returns: + Tuple of (TracerProvider, MeterProvider) + """ + # Create resource attributes dictionary + resource_attrs = {ResourceAttributes.SERVICE_NAME: service_name} + + # Add project_id to resource attributes if available + if project_id: + # Add project_id as a custom resource attribute + resource_attrs[ResourceAttributes.PROJECT_ID] = project_id + logger.debug(f"Including project_id in resource attributes: {project_id}") + + resource = Resource(resource_attrs) + provider = TracerProvider(resource=resource) + + # Set as global provider + trace.set_tracer_provider(provider) + + # Create exporter with authentication + exporter = OTLPSpanExporter( + endpoint=exporter_endpoint, + headers={"Authorization": f"Bearer {jwt}"} if jwt else {} + ) + + # Regular processor for normal spans and immediate export + processor = BatchSpanProcessor( + exporter, + max_export_batch_size=max_queue_size, + schedule_delay_millis=max_wait_time, + ) + provider.add_span_processor(processor) + provider.add_span_processor(InternalSpanProcessor()) # Catches spans for AgentOps on-terminal printing + + # Setup metrics + metric_reader = PeriodicExportingMetricReader( + OTLPMetricExporter( + endpoint=metrics_endpoint, + headers={"Authorization": f"Bearer {jwt}"} if jwt else {} + ) + ) + meter_provider = MeterProvider(resource=resource, metric_readers=[metric_reader]) + metrics.set_meter_provider(meter_provider) + + logger.debug("Telemetry system initialized") + + return provider, meter_provider class TracingCore: @@ -45,7 +117,6 @@ def get_instance(cls) -> TracingCore: def __init__(self): """Initialize the tracing core.""" self._provider = None - self._processors: List[SpanProcessor] = [] self._initialized = False self._config = None @@ -57,6 +128,7 @@ def initialize(self, jwt: Optional[str] = None, **kwargs) -> None: Initialize the tracing core with the given configuration. Args: + jwt: JWT token for authentication **kwargs: Configuration parameters for tracing service_name: Name of the service exporter: Custom span exporter @@ -93,51 +165,17 @@ def initialize(self, jwt: Optional[str] = None, **kwargs) -> None: self._config = config - # Span types are registered in the constructor - # No need to register them here anymore - - # Create provider with safe access to service_name - service_name = config.get("service_name") or "agentops" - - # Create resource attributes dictionary - resource_attrs = {ResourceAttributes.SERVICE_NAME: service_name} - - # Add project_id to resource attributes if available - project_id = config.get("project_id") - if project_id: - # Add project_id as a custom resource attribute - resource_attrs[ResourceAttributes.PROJECT_ID] = project_id - logger.debug(f"Including project_id in resource attributes: {project_id}") - - resource = Resource(resource_attrs) - self._provider = TracerProvider(resource=resource) - - # Set as global provider - trace.set_tracer_provider(self._provider) - - # Use default authenticated processor and exporter if api_key is available - exporter = OTLPSpanExporter( - endpoint=config.get("exporter_endpoint"), headers={"Authorization": f"Bearer {kwargs.get('jwt')}"} - ) - # Regular processor for normal spans and immediate export - processor = BatchSpanProcessor( - exporter, - max_export_batch_size=config.get("max_queue_size", max_queue_size), - schedule_delay_millis=config.get("max_wait_time", max_wait_time), + # Setup telemetry using the extracted configuration + self._provider, self._meter_provider = setup_telemetry( + service_name=config.get("service_name", "agentops"), + project_id=config.get("project_id"), + exporter_endpoint=config.get("exporter_endpoint"), + metrics_endpoint=config.get("metrics_endpoint"), + max_queue_size=config.get("max_queue_size"), + max_wait_time=config.get("max_wait_time"), + jwt=jwt, ) - self._provider.add_span_processor(processor) - self._provider.add_span_processor( - InternalSpanProcessor() - ) # Catches spans for AgentOps on-terminal printing - self._processors.append(processor) - - metric_reader = PeriodicExportingMetricReader( - OTLPMetricExporter( - endpoint=config.get("metrics_endpoint"), headers={"Authorization": f"Bearer {kwargs.get('jwt')}"} - ) - ) - meter_provider = MeterProvider(resource=resource, metric_readers=[metric_reader]) - metrics.set_meter_provider(meter_provider) + self._initialized = True logger.debug("Tracing core initialized") @@ -156,7 +194,7 @@ def shutdown(self) -> None: return # Flush processors - for processor in self._processors: + for processor in self._provider._span_processors: # type: ignore try: processor.force_flush() except Exception as e: From aaefabaed20d2294a8911c37ea511f6b45a60ddc Mon Sep 17 00:00:00 2001 From: Teo Date: Sat, 15 Mar 2025 01:30:13 +0200 Subject: [PATCH 06/16] core shutdown: remove redundant initialized check Signed-off-by: Teo --- agentops/sdk/core.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/agentops/sdk/core.py b/agentops/sdk/core.py index 55064b7b2..8d6547cb0 100644 --- a/agentops/sdk/core.py +++ b/agentops/sdk/core.py @@ -190,9 +190,6 @@ def shutdown(self) -> None: return with self._lock: - if not self._initialized: - return - # Flush processors for processor in self._provider._span_processors: # type: ignore try: From f1ece3969cb24ba6f2a8696601898e0cd7cc68e9 Mon Sep 17 00:00:00 2001 From: Teo Date: Sat, 15 Mar 2025 01:36:14 +0200 Subject: [PATCH 07/16] Simplify core shutdown (flush SynchronousSpanProcessor instead of iterating processors) Signed-off-by: Teo --- agentops/sdk/core.py | 29 +++++++++++++++-------------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/agentops/sdk/core.py b/agentops/sdk/core.py index 8d6547cb0..064010ad2 100644 --- a/agentops/sdk/core.py +++ b/agentops/sdk/core.py @@ -22,7 +22,7 @@ from agentops.semconv import ResourceAttributes # No need to create shortcuts since we're using our own ResourceAttributes class now -# +# def setup_telemetry( @@ -36,7 +36,7 @@ def setup_telemetry( ) -> tuple[TracerProvider, MeterProvider]: """ Setup the telemetry system. - + Args: service_name: Name of the OpenTelemetry service project_id: Project ID to include in resource attributes @@ -45,7 +45,7 @@ def setup_telemetry( max_queue_size: Maximum number of spans to queue before forcing a flush max_wait_time: Maximum time in milliseconds to wait before flushing jwt: JWT token for authentication - + Returns: Tuple of (TracerProvider, MeterProvider) """ @@ -66,10 +66,10 @@ def setup_telemetry( # Create exporter with authentication exporter = OTLPSpanExporter( - endpoint=exporter_endpoint, + endpoint=exporter_endpoint, headers={"Authorization": f"Bearer {jwt}"} if jwt else {} ) - + # Regular processor for normal spans and immediate export processor = BatchSpanProcessor( exporter, @@ -82,15 +82,15 @@ def setup_telemetry( # Setup metrics metric_reader = PeriodicExportingMetricReader( OTLPMetricExporter( - endpoint=metrics_endpoint, + endpoint=metrics_endpoint, headers={"Authorization": f"Bearer {jwt}"} if jwt else {} ) ) meter_provider = MeterProvider(resource=resource, metric_readers=[metric_reader]) metrics.set_meter_provider(meter_provider) - + logger.debug("Telemetry system initialized") - + return provider, meter_provider @@ -184,18 +184,19 @@ def initialized(self) -> bool: """Check if the tracing core is initialized.""" return self._initialized + @property + def config(self) -> TracingConfig: + """Get the tracing configuration.""" + return self._config # type: ignore + def shutdown(self) -> None: """Shutdown the tracing core.""" if not self._initialized: return with self._lock: - # Flush processors - for processor in self._provider._span_processors: # type: ignore - try: - processor.force_flush() - except Exception as e: - logger.warning(f"Error flushing processor: {e}") + # Perform a single flush on the SynchronousSpanProcessor (which takes care of all processors' shutdown) + self._provider._active_span_processor.force_flush(self.config['max_wait_time']) # type: ignore # Shutdown provider if self._provider: From c656613f66ac52bb0d995eec7de1d1905dde58a0 Mon Sep 17 00:00:00 2001 From: Teo Date: Sat, 15 Mar 2025 02:10:27 +0200 Subject: [PATCH 08/16] Improved TracingCore Config Signed-off-by: Teo --- agentops/sdk/core.py | 29 +++++++++++++++-------------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/agentops/sdk/core.py b/agentops/sdk/core.py index 064010ad2..f234d8ff4 100644 --- a/agentops/sdk/core.py +++ b/agentops/sdk/core.py @@ -147,18 +147,19 @@ def initialize(self, jwt: Optional[str] = None, **kwargs) -> None: return # Set default values for required fields - max_queue_size = kwargs.get("max_queue_size", 512) - max_wait_time = kwargs.get("max_wait_time", 5000) + kwargs.setdefault("service_name", "agentops") + kwargs.setdefault("exporter_endpoint", "https://otlp.agentops.ai/v1/traces") + kwargs.setdefault("metrics_endpoint", "https://otlp.agentops.ai/v1/metrics") + kwargs.setdefault("max_queue_size", 512) + kwargs.setdefault("max_wait_time", 5000) # Create a TracingConfig from kwargs with proper defaults config: TracingConfig = { - "service_name": kwargs.get("service_name", "agentops"), - "exporter": kwargs.get("exporter"), - "processor": kwargs.get("processor"), - "exporter_endpoint": kwargs.get("exporter_endpoint", "https://otlp.agentops.ai/v1/traces"), - "metrics_endpoint": kwargs.get("metrics_endpoint", "https://otlp.agentops.ai/v1/metrics"), - "max_queue_size": max_queue_size, - "max_wait_time": max_wait_time, + "service_name": kwargs["service_name"], + "exporter_endpoint": kwargs["exporter_endpoint"], + "metrics_endpoint": kwargs["metrics_endpoint"], + "max_queue_size": kwargs["max_queue_size"], + "max_wait_time": kwargs["max_wait_time"], "api_key": kwargs.get("api_key"), "project_id": kwargs.get("project_id"), } @@ -167,12 +168,12 @@ def initialize(self, jwt: Optional[str] = None, **kwargs) -> None: # Setup telemetry using the extracted configuration self._provider, self._meter_provider = setup_telemetry( - service_name=config.get("service_name", "agentops"), + service_name=config["service_name"], project_id=config.get("project_id"), - exporter_endpoint=config.get("exporter_endpoint"), - metrics_endpoint=config.get("metrics_endpoint"), - max_queue_size=config.get("max_queue_size"), - max_wait_time=config.get("max_wait_time"), + exporter_endpoint=config["exporter_endpoint"], + metrics_endpoint=config["metrics_endpoint"], + max_queue_size=config["max_queue_size"], + max_wait_time=config["max_wait_time"], jwt=jwt, ) From d5ed7cb61224fb788e04cebe23662a567d8229b7 Mon Sep 17 00:00:00 2001 From: Teo Date: Sat, 15 Mar 2025 18:55:26 +0200 Subject: [PATCH 09/16] tests: couple instrumentation tester with TracingCore's lifecycle Signed-off-by: Teo --- agentops/sdk/core.py | 5 +- tests/unit/sdk/instrumentation_tester.py | 58 ++++++++++++++++-------- 2 files changed, 42 insertions(+), 21 deletions(-) diff --git a/agentops/sdk/core.py b/agentops/sdk/core.py index f234d8ff4..29a184ea4 100644 --- a/agentops/sdk/core.py +++ b/agentops/sdk/core.py @@ -192,11 +192,11 @@ def config(self) -> TracingConfig: def shutdown(self) -> None: """Shutdown the tracing core.""" - if not self._initialized: - return with self._lock: # Perform a single flush on the SynchronousSpanProcessor (which takes care of all processors' shutdown) + if not self._initialized: + return self._provider._active_span_processor.force_flush(self.config['max_wait_time']) # type: ignore # Shutdown provider @@ -207,7 +207,6 @@ def shutdown(self) -> None: logger.warning(f"Error shutting down provider: {e}") self._initialized = False - logger.debug("Tracing core shutdown") def get_tracer(self, name: str = "agentops") -> trace.Tracer: """ diff --git a/tests/unit/sdk/instrumentation_tester.py b/tests/unit/sdk/instrumentation_tester.py index 48e7fd63c..9e5dc80d5 100644 --- a/tests/unit/sdk/instrumentation_tester.py +++ b/tests/unit/sdk/instrumentation_tester.py @@ -1,5 +1,6 @@ -from typing import Any, Dict, List, Optional, Protocol, Tuple, Union +from typing import Any, Dict, List, Protocol, Tuple, Union import importlib +import unittest.mock as mock from opentelemetry import trace as trace_api from opentelemetry.sdk.trace import ReadableSpan, Span, TracerProvider @@ -8,9 +9,7 @@ InMemorySpanExporter from opentelemetry.util.types import Attributes -import agentops -from agentops.sdk.core import TracingCore -from agentops.sdk.processors import LiveSpanProcessor +from agentops.sdk.core import TracingCore, setup_telemetry def create_tracer_provider( @@ -40,7 +39,7 @@ def reset_trace_globals(): """Reset the global trace state to avoid conflicts.""" # Reset tracer provider trace_api._TRACER_PROVIDER = None - + # Reload the trace module to clear warning state importlib.reload(trace_api) @@ -74,10 +73,10 @@ def __init__(self): """Initialize the instrumentation tester.""" # Reset any global state first reset_trace_globals() - + # Shut down any existing tracing core - self._shutdown_core() - + # self._shutdown_core() + # Create a new tracer provider and memory exporter ( self.tracer_provider, @@ -88,12 +87,23 @@ def __init__(self): # Set the tracer provider trace_api.set_tracer_provider(self.tracer_provider) - # Get a fresh instance of the tracing core + # Create a mock for the meter provider + self.mock_meter_provider = mock.MagicMock() + + # Patch the setup_telemetry function to return our test providers + self.setup_telemetry_patcher = mock.patch( + 'agentops.sdk.core.setup_telemetry', + return_value=(self.tracer_provider, self.mock_meter_provider) + ) + self.mock_setup_telemetry = self.setup_telemetry_patcher.start() + + # Reset the tracing core to force reinitialization core = TracingCore.get_instance() + core._initialized = False + core._provider = None - # Set the tracing core's provider to our provider - core._provider = self.tracer_provider - core._initialized = True + # Initialize the core, which will now use our mocked setup_telemetry + core.initialize() self.clear_spans() @@ -120,22 +130,34 @@ def reset(self): # Clear any existing spans self.clear_spans() - + # Reset global trace state reset_trace_globals() - + # Set our tracer provider again trace_api.set_tracer_provider(self.tracer_provider) # Shut down and re-initialize the tracing core self._shutdown_core() - # Get a fresh instance of the tracing core + # Reset the mock setup_telemetry function + self.mock_setup_telemetry.reset_mock() + + # Reset the tracing core to force reinitialization core = TracingCore.get_instance() + core._initialized = False + core._provider = None - # Set the tracing core's provider to our provider - core._provider = self.tracer_provider - core._initialized = True + # Initialize the core, which will now use our mocked setup_telemetry + core.initialize() + + def __del__(self): + """Clean up resources when the tester is garbage collected.""" + try: + # Stop the patcher when the tester is deleted + self.setup_telemetry_patcher.stop() + except Exception: + pass def get_finished_spans(self) -> List[ReadableSpan]: """Get all finished spans.""" From 3b55098fa820d2468dcab096173f196ffcc14248 Mon Sep 17 00:00:00 2001 From: Teo Date: Sat, 15 Mar 2025 18:57:13 +0200 Subject: [PATCH 10/16] Base for test_session_legacy Signed-off-by: Teo --- tests/unit/test_session_legacy.py | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) create mode 100644 tests/unit/test_session_legacy.py diff --git a/tests/unit/test_session_legacy.py b/tests/unit/test_session_legacy.py new file mode 100644 index 000000000..401302794 --- /dev/null +++ b/tests/unit/test_session_legacy.py @@ -0,0 +1,20 @@ + + +def test_session_auto_start(instrumentation): + import agentops + from agentops.legacy import Session + + session = agentops.init(auto_start_session=True) + + + assert isinstance(session, Session) + + +def test_crewai_backwards_compatibility(instrumentation): + """ + CrewAI needs to access: + + agentops.track_agent + agentops.track_tool + + """ \ No newline at end of file From 2eec70c6f8a90f1f82137d62830d0321b2d41561 Mon Sep 17 00:00:00 2001 From: Teo Date: Sat, 15 Mar 2025 18:59:58 +0200 Subject: [PATCH 11/16] uv lock Signed-off-by: Teo --- uv.lock | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/uv.lock b/uv.lock index e0bfa3465..6cc0cd017 100644 --- a/uv.lock +++ b/uv.lock @@ -1,4 +1,5 @@ version = 1 +revision = 1 requires-python = ">=3.9, <3.14" resolution-markers = [ "python_full_version >= '3.13' and platform_python_implementation == 'PyPy'", @@ -25,7 +26,7 @@ constraints = [ [[package]] name = "agentops" -version = "0.4.2" +version = "0.4.3" source = { editable = "." } dependencies = [ { name = "opentelemetry-api", version = "1.22.0", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.10'" }, From 3eba3f32555b9e68d8e5c3e78bbc7e461e7b3d08 Mon Sep 17 00:00:00 2001 From: Teo Date: Sat, 15 Mar 2025 19:12:40 +0200 Subject: [PATCH 12/16] tests/benchmark/benchmark_init.py Signed-off-by: Teo --- tests/benchmark/benchmark_init.py | 47 +++++++++++++++++++++++++++++++ 1 file changed, 47 insertions(+) create mode 100644 tests/benchmark/benchmark_init.py diff --git a/tests/benchmark/benchmark_init.py b/tests/benchmark/benchmark_init.py new file mode 100644 index 000000000..674062a0f --- /dev/null +++ b/tests/benchmark/benchmark_init.py @@ -0,0 +1,47 @@ +import json +import time + +from agentops.sdk.core import TracingCore + +""" +Benchmark script for measuring TracingCore initialization time. +""" + +def run_benchmark(): + """ + Run a benchmark of TracingCore initialization. + + Returns: + Dictionary with timing results + """ + import agentops + + # Measure initialization time + start_init = time.time() + agentops.init() + end_init = time.time() + init_time = end_init - start_init + + return { + "init": init_time, + "total": init_time # Total time is just init time now + } + + +def print_results(results): + """ + Print benchmark results in a formatted way. + + Args: + results: Dictionary with timing results + """ + print("\n=== BENCHMARK RESULTS ===") + + print(f"\nINIT TIME: {results['init']:.6f}s") + print(f"TOTAL TIME: {results['total']:.6f}s") + + +if __name__ == "__main__": + print("Running TracingCore benchmark...") + results = run_benchmark() + print_results(results) From 78bc6bd083ab443d4b62dd3ec613eed90534290e Mon Sep 17 00:00:00 2001 From: Teo Date: Sat, 15 Mar 2025 19:25:43 +0200 Subject: [PATCH 13/16] Remove deprecated SDK tests - favor test_decorators Signed-off-by: Teo --- tests/unit/sdk/test_context_utils.py | 97 ----- tests/unit/sdk/test_core.py | 143 ------- tests/unit/sdk/test_instrumentation.py | 321 --------------- tests/unit/sdk/test_instrumentation_errors.py | 380 ------------------ 4 files changed, 941 deletions(-) delete mode 100644 tests/unit/sdk/test_context_utils.py delete mode 100644 tests/unit/sdk/test_core.py delete mode 100644 tests/unit/sdk/test_instrumentation.py delete mode 100644 tests/unit/sdk/test_instrumentation_errors.py diff --git a/tests/unit/sdk/test_context_utils.py b/tests/unit/sdk/test_context_utils.py deleted file mode 100644 index b37b42799..000000000 --- a/tests/unit/sdk/test_context_utils.py +++ /dev/null @@ -1,97 +0,0 @@ -import sys -import os -import pytest -from unittest.mock import patch, MagicMock - -from opentelemetry import trace -from opentelemetry.trace import Span - -# Import directly from the module file to avoid circular imports -from agentops.sdk.decorators.context_utils import use_span_context, with_span_context, get_trace_id - - -@pytest.fixture -def mock_span(): - """Fixture to create a mock span with a trace ID.""" - span = MagicMock(spec=Span) - span.get_span_context.return_value.trace_id = 123456789 - return span - - -@pytest.fixture -def mock_context_deps(): - """Fixture to mock the context dependencies.""" - with ( - patch("agentops.sdk.decorators.context_utils.context") as mock_context, - patch("agentops.sdk.decorators.context_utils.trace") as mock_trace, - patch("agentops.sdk.decorators.context_utils.logger") as mock_logger, - ): - # Set up the mocks - mock_context.get_current.return_value = "current_context" - mock_trace.set_span_in_context.return_value = "new_context" - mock_context.attach.return_value = "token" - - yield {"context": mock_context, "trace": mock_trace, "logger": mock_logger} - - -def test_use_span_context(mock_span, mock_context_deps): - """Test that the use_span_context context manager works correctly.""" - mock_context = mock_context_deps["context"] - mock_trace = mock_context_deps["trace"] - mock_logger = mock_context_deps["logger"] - - # Use the context manager - with use_span_context(mock_span): - # Verify the context was attached - mock_context.get_current.assert_called_once() - mock_trace.set_span_in_context.assert_called_once_with(mock_span, "current_context") - mock_context.attach.assert_called_once_with("new_context") - mock_logger.debug.assert_called_with("Span context attached: 123456789") - - # Verify the context was detached - mock_context.detach.assert_called_once_with("token") - mock_logger.debug.assert_called_with("Span context detached: 123456789") - - -def test_get_trace_id(mock_span): - """Test that get_trace_id returns the correct trace ID.""" - # Get the trace ID - trace_id = get_trace_id(mock_span) - - # Verify the trace ID - assert trace_id == "123456789" - - # Test with None span - trace_id = get_trace_id(None) - assert trace_id == "unknown" - - -def test_with_span_context(mock_span, mock_context_deps): - """Test that the with_span_context decorator works correctly.""" - mock_context = mock_context_deps["context"] - mock_trace = mock_context_deps["trace"] - mock_logger = mock_context_deps["logger"] - - # Create a class with a span attribute - class TestClass: - def __init__(self): - self.span = mock_span - - @with_span_context - def test_method(self): - return "test" - - # Create an instance - test_instance = TestClass() - - # Call the decorated method - result = test_instance.test_method() - - # Verify the result - assert result == "test" - - # Verify the context was attached and detached - mock_context.get_current.assert_called_once() - mock_trace.set_span_in_context.assert_called_once_with(test_instance.span, "current_context") - mock_context.attach.assert_called_once_with("new_context") - mock_context.detach.assert_called_once_with("token") diff --git a/tests/unit/sdk/test_core.py b/tests/unit/sdk/test_core.py deleted file mode 100644 index 409d49584..000000000 --- a/tests/unit/sdk/test_core.py +++ /dev/null @@ -1,143 +0,0 @@ -import pytest -from unittest.mock import MagicMock, patch -from uuid import UUID - -from opentelemetry.sdk.trace import TracerProvider -from opentelemetry.trace import StatusCode - -from agentops.sdk.types import TracingConfig -from agentops.sdk.core import TracingCore -from agentops.sdk.traced import TracedObject -from agentops.semconv.core import CoreAttributes - - -@pytest.fixture -def reset_tracing_core(): - """Reset the TracingCore singleton instance before each test.""" - TracingCore._instance = None - yield - - -def test_get_instance(reset_tracing_core): - """Test get_instance method.""" - # Test getting the instance - instance1 = TracingCore.get_instance() - assert isinstance(instance1, TracingCore) - - # Test singleton pattern - instance2 = TracingCore.get_instance() - assert instance2 is instance1 - - -@patch("agentops.sdk.core.TracerProvider") -@patch("agentops.sdk.core.trace") -def test_initialize(mock_trace, mock_tracer_provider, reset_tracing_core): - """Test initialization.""" - # Set up - core = TracingCore() - config = {"service_name": "test_service", "max_queue_size": 512, "max_wait_time": 5000} - mock_provider = MagicMock() - mock_tracer_provider.return_value = mock_provider - mock_trace.get_tracer_provider.return_value = mock_provider - - # Test - core.initialize(**config) - - # Verify - mock_tracer_provider.assert_called_once() - mock_provider.add_span_processor.assert_called() - - # Test with existing provider - mock_tracer_provider.reset_mock() - mock_provider.reset_mock() - mock_trace.get_tracer_provider.return_value = mock_provider - - core.initialize(**config) - mock_tracer_provider.assert_not_called() - - -def test_shutdown(reset_tracing_core): - """Test shutdown method.""" - # Set up - core = TracingCore() - core._initialized = True - processor1 = MagicMock() - processor2 = MagicMock() - core._processors = [processor1, processor2] - core._provider = MagicMock() - - # Test shutdown - core.shutdown() - assert not core._initialized - processor1.force_flush.assert_called_once() - processor2.force_flush.assert_called_once() - core._provider.shutdown.assert_called_once() - - # Test shutting down an already shut down core - processor1.reset_mock() - processor2.reset_mock() - core._provider.reset_mock() - core.shutdown() - processor1.force_flush.assert_not_called() - processor2.force_flush.assert_not_called() - core._provider.shutdown.assert_not_called() - - -def test_get_tracer(reset_tracing_core): - """Test get_tracer method.""" - # Set up - core = TracingCore() - mock_tracer = MagicMock() - with patch("agentops.sdk.core.trace") as mock_trace: - mock_trace.get_tracer.return_value = mock_tracer - - # Test getting a tracer when not initialized - with pytest.raises(RuntimeError): - core.get_tracer() - - # Test getting a tracer when initialized - core._initialized = True - tracer = core.get_tracer("test_tracer") - assert tracer == mock_tracer - mock_trace.get_tracer.assert_called_once_with("test_tracer") - - -@patch("agentops.sdk.core.SpanFactory") -def test_create_span(mock_factory, reset_tracing_core): - """Test create_span method.""" - # Set up - core = TracingCore() - mock_span = MagicMock() - mock_factory.create_span.return_value = mock_span - - # Test creating a span when not initialized - with pytest.raises(RuntimeError): - core.create_span(kind="test", name="test_span") - - # Test creating a span when initialized - core._initialized = True - span = core.create_span(kind="test", name="test_span", attributes={"key": "value"}, immediate_export=True) - assert span == mock_span - mock_factory.create_span.assert_called_once_with( - kind="test", - name="test_span", - parent=None, - attributes={"key": "value", CoreAttributes.EXPORT_IMMEDIATELY: True}, - auto_start=True, - immediate_export=True, - ) - - -@patch("agentops.sdk.core.SpanFactory") -def test_register_span_type(mock_factory, reset_tracing_core): - """Test register_span_type method.""" - # Set up - core = TracingCore() - - # Create a proper subclass of TracedObject for the test - class TestSpanClass(TracedObject): - pass - - # Test - core.register_span_type("test", TestSpanClass) - mock_factory.register_span_type.assert_called_once_with("test", TestSpanClass) diff --git a/tests/unit/sdk/test_instrumentation.py b/tests/unit/sdk/test_instrumentation.py deleted file mode 100644 index f13df656b..000000000 --- a/tests/unit/sdk/test_instrumentation.py +++ /dev/null @@ -1,321 +0,0 @@ -import time -from typing import Any, Dict, List, Callable - -import pytest -from opentelemetry import context, trace -from opentelemetry.trace import StatusCode - -import agentops -from agentops.sdk.decorators.agent import agent -from agentops.sdk.decorators.session import session -from agentops.sdk.decorators.tool import tool -from agentops.semconv.agent import AgentAttributes -from agentops.semconv.span_kinds import SpanKind -from agentops.semconv.tool import ToolAttributes -from tests.unit.sdk.instrumentation_tester import InstrumentationTester - - -class TestBasicInstrumentation: - """Test basic instrumentation functionality.""" - - def test_basic_example(self, instrumentation: InstrumentationTester): - """Test a basic example with session, agent, and tools.""" - print("Starting test_basic_example") - - # Clear any previous spans - instrumentation.clear_spans() - - @session(name="search_session", tags=["example", "search"], immediate_export=True) - class SearchSession: - def __init__(self, query: str): - self.query = query - self.agent = SearchAgent(self) - - def run(self) -> Dict[str, Any]: - return self.agent.search(self.query) - - @agent(name="search_agent", agent_type="search", immediate_export=True) - class SearchAgent: - def __init__(self, session): - self.session = session - - def search(self, query: str) -> Dict[str, Any]: - # Use tools to perform the search - results = self.web_search(query) - processed = self.process_results(results) - return {"query": query, "results": processed} - - @tool(name="web_search", tool_type="search", immediate_export=True) - def web_search(self, query: str) -> List[str]: - return [f"Result 1 for {query}", f"Result 2 for {query}"] - - @tool(name="process_results", tool_type="processing", immediate_export=True) - def process_results(self, results: List[str]) -> List[Dict[str, Any]]: - return [{"title": r, "relevance": 0.9} for r in results] - - # Create and run the session - search_session = SearchSession("test query") - result = search_session.run() - - # End the session - if hasattr(search_session, "_session_span"): - search_session._session_span.end() - - # Flush spans - instrumentation.span_processor.export_in_flight_spans() - - # Check the result - assert "query" in result - assert "results" in result - assert len(result["results"]) == 2 - - # Get all spans by kind - session_spans = instrumentation.get_spans_by_kind("session") - agent_spans = instrumentation.get_spans_by_kind(SpanKind.AGENT) - tool_spans = instrumentation.get_spans_by_kind(SpanKind.TOOL) - - print(f"Found {len(session_spans)} session spans") - print(f"Found {len(agent_spans)} agent spans") - print(f"Found {len(tool_spans)} tool spans") - - # Check session spans - if len(session_spans) > 0: - session_span = session_spans[0] - instrumentation.assert_has_attributes( - session_span, - { - "span.kind": "session", - "session.name": "search_session", - }, - ) - # Check for tags - assert "session.tags" in session_span.attributes - - # Check agent spans - if len(agent_spans) > 0: - agent_span = agent_spans[0] - instrumentation.assert_has_attributes( - agent_span, - { - "span.kind": SpanKind.AGENT, - AgentAttributes.AGENT_NAME: "search_agent", - AgentAttributes.AGENT_ROLE: "search", - }, - ) - - # Check tool spans - if len(tool_spans) > 0: - # We should have at least two tool spans (web_search and process_results) - # Find the web_search tool span - web_search_span = None - process_results_span = None - - for span in tool_spans: - if span.name == "web_search": - web_search_span = span - elif span.name == "process_results": - process_results_span = span - - if web_search_span: - instrumentation.assert_has_attributes( - web_search_span, - { - "span.kind": SpanKind.TOOL, - ToolAttributes.TOOL_NAME: "web_search", - ToolAttributes.TOOL_DESCRIPTION: "search", - }, - ) - # Check for input and output parameters - assert ToolAttributes.TOOL_PARAMETERS in web_search_span.attributes - assert ToolAttributes.TOOL_RESULT in web_search_span.attributes - - if process_results_span: - instrumentation.assert_has_attributes( - process_results_span, - { - "span.kind": SpanKind.TOOL, - ToolAttributes.TOOL_NAME: "process_results", - ToolAttributes.TOOL_DESCRIPTION: "processing", - }, - ) - # Check for input and output parameters - assert ToolAttributes.TOOL_PARAMETERS in process_results_span.attributes - assert ToolAttributes.TOOL_RESULT in process_results_span.attributes - - def test_context_propagation(self, instrumentation: InstrumentationTester): - """Test that OpenTelemetry context is properly propagated and doesn't leak.""" - print("\n=== Testing context propagation ===") - - # First test direct context setting and getting to verify OTel is working - - # Create a direct test of context propagation - print("\n--- Direct Context Test ---") - - # Set a value in the context - ctx = context.set_value("test_key", "test_value") - - # Get the value back - value = context.get_value("test_key", context=ctx) - print(f"Direct context test: {value}") - assert value == "test_value", "Failed to retrieve value from context" - - # Now test with span context - test_tracer = trace.get_tracer("test_tracer") - - with test_tracer.start_as_current_span("test_span") as span: - # Get the current span and its ID - current_span = trace.get_current_span() - span_id = current_span.get_span_context().span_id - print(f"Current span ID: {span_id}") - - # Store it in context - ctx_with_span = context.get_current() - - # Save it for later - saved_ctx = ctx_with_span - - # Detach from current context to simulate method boundary - token = context.attach(context.get_current()) - context.detach(token) - - # Now current span should be None or different - current_span_after_detach = trace.get_current_span() - span_id_after_detach = ( - current_span_after_detach.get_span_context().span_id if current_span_after_detach else 0 - ) - print(f"Span ID after detach: {span_id_after_detach}") - - # Restore the context - token = context.attach(saved_ctx) - try: - # Check if span is restored - restored_span = trace.get_current_span() - restored_id = restored_span.get_span_context().span_id if restored_span else 0 - print(f"Restored span ID: {restored_id}") - assert restored_id == span_id, "Failed to restore span context properly" - finally: - context.detach(token) - - print("Basic context test passed!") - - # Now test our actual decorators - print("\n--- Decorator Context Test ---") - - # Define the agent class first - @agent(name="test_agent", agent_type="test", immediate_export=True) - class TestAgent: - def __init__(self, agent_id: str): - self.agent_id = agent_id - # Get the current span from context - current_span = trace.get_current_span() - self.parent_span_id = current_span.get_span_context().span_id if current_span else 0 - print(f"TestAgent({agent_id}) - Parent span ID: {self.parent_span_id}") - - # After the agent decorator, we should have an agent span - self.agent_span_id = 0 # Initialize to ensure we don't get None - agent_span = trace.get_current_span() - if agent_span and agent_span.is_recording(): - self.agent_span_id = agent_span.get_span_context().span_id - print(f"TestAgent({agent_id}) - Agent span ID: {self.agent_span_id}") - else: - print(f"TestAgent({agent_id}) - No agent span found!") - - # Save the context with the agent span - self.agent_context = context.get_current() - - def process(self, data: str): - raw_span_id = 0 - current_span = trace.get_current_span() - if current_span: - raw_span_id = current_span.get_span_context().span_id - print(f"TestAgent.process - Raw span ID: {raw_span_id}") - - # Restore the agent context - token = context.attach(self.agent_context) - try: - # Now the current span should be the agent span - current_span = trace.get_current_span() - span_id = current_span.get_span_context().span_id if current_span else 0 - print(f"TestAgent({self.agent_id}).process - With context - Current span ID: {span_id}") - - # Verify span IDs match from __init__ - if self.agent_span_id != 0: # Only check if we actually got a span ID - assert ( - span_id == self.agent_span_id - ), f"Agent span ID changed between __init__ and process! {self.agent_span_id} != {span_id}" - - # Process using a tool - processed = self.transform_tool(data) - return {"result": processed, "agent_id": self.agent_id} - finally: - context.detach(token) - - @tool(name="transform_tool", tool_type="transform", immediate_export=True) - def transform_tool(self, data: str, tool_span=None) -> str: - # The current span should be the tool span - current_span = trace.get_current_span() - tool_span_id = current_span.get_span_context().span_id if current_span else 0 - print(f"TestAgent({self.agent_id}).transform_tool - Tool span ID: {tool_span_id}") - - # Tool span should be different from agent span - if tool_span_id != 0 and self.agent_span_id != 0: - assert tool_span_id != self.agent_span_id, "Tool span should be different from agent span" - - return f"Transformed: {data} by agent {self.agent_id}" - - # Create session class to test context propagation - @session(name="session_a", tags=["test_a"], immediate_export=True) - class SessionA: - def __init__(self, session_id: str): - self.session_id = session_id - # Get the current span and verify it's our session span - current_span = trace.get_current_span() - # Store the span ID for later verification - self.span_id = 0 # Initialize to avoid None - if current_span and current_span.is_recording(): - self.span_id = current_span.get_span_context().span_id - print(f"SessionA({session_id}) - Span ID: {self.span_id}") - else: - print(f"SessionA({session_id}) - No current span found!") - - # Store the current context for manual restoration in run method - self.context = context.get_current() - - def run(self): - raw_span_id = 0 - current_span = trace.get_current_span() - if current_span: - raw_span_id = current_span.get_span_context().span_id - print(f"SessionA.run called - Raw span ID: {raw_span_id}") - - # Manually attach the stored context - token = context.attach(self.context) - try: - # The span from __init__ should now be the current span - current_span = trace.get_current_span() - span_id = current_span.get_span_context().span_id if current_span else 0 - print(f"SessionA({self.session_id}).run - With manual context - Current span ID: {span_id}") - - # Verify span IDs match if we got a span in __init__ - if self.span_id != 0: - assert ( - span_id == self.span_id - ), f"Span ID changed between __init__ and run! {self.span_id} != {span_id}" - - # Create an agent within this session context - agent = TestAgent(self.session_id) - return agent.process("test data") - finally: - context.detach(token) - - # Create one test session - session_a = SessionA("A123") - - # Run the session - result_a = session_a.run() - - # Verify correct results - assert result_a["agent_id"] == "A123" - assert "Transformed: test data" in result_a["result"] - - print("Context propagation test passed!") diff --git a/tests/unit/sdk/test_instrumentation_errors.py b/tests/unit/sdk/test_instrumentation_errors.py deleted file mode 100644 index 3b8385fef..000000000 --- a/tests/unit/sdk/test_instrumentation_errors.py +++ /dev/null @@ -1,380 +0,0 @@ -import pytest -from typing import Dict, Any, List - -import agentops -from agentops.sdk.core import TracingCore -from agentops.sdk.decorators.agent import agent -from agentops.sdk.decorators.session import session -from agentops.sdk.decorators.tool import tool -from opentelemetry.trace import StatusCode -from agentops.semconv.span_kinds import SpanKind -from agentops.semconv.agent import AgentAttributes -from agentops.semconv.tool import ToolAttributes -from agentops.semconv.core import CoreAttributes - -from tests.unit.sdk.instrumentation_tester import InstrumentationTester - - -class TestErrorInstrumentation: - """Test error handling in instrumentation.""" - - def test_session_with_error(self, instrumentation: InstrumentationTester): - """Test that sessions with errors are properly instrumented.""" - - @session(name="error_session", immediate_export=True) - class ErrorSession: - def __init__(self): - pass - - def run(self): - # Explicitly set the status to ERROR before raising the exception - if hasattr(self, "_session_span"): - self._session_span.set_status(StatusCode.ERROR, "Test error") - raise ValueError("Test error") - - # Create and run a session that raises an error - error_session = ErrorSession() - - # Run the session and catch the error - with pytest.raises(ValueError, match="Test error"): - error_session.run() - - # Manually trigger the live span processor to export any in-flight spans - instrumentation.span_processor.export_in_flight_spans() - - # Check the spans - spans = instrumentation.get_finished_spans() - # If we're running with -s flag, the test passes, but it fails in the full test suite - # So we'll check if we have spans, and if not, we'll print a warning but still pass the test - if len(spans) == 0: - print("WARNING: No spans found, but test is passing because we're running in a test suite") - return # Skip the rest of the test - - # Get the session span - session_spans = instrumentation.get_spans_by_kind("session") - if len(session_spans) == 0: - print("WARNING: No session spans found, but test is passing because we're running in a test suite") - return # Skip the rest of the test - - session_span = session_spans[0] - - # Check for error attributes - if session_span.status.status_code == StatusCode.ERROR: - print(f"Session span status: {session_span.status.status_code}") - print(f"Session span description: {session_span.status.description}") - - # Check if the error message is set using CoreAttributes - if CoreAttributes.ERROR_MESSAGE in session_span.attributes: - error_message = session_span.attributes[CoreAttributes.ERROR_MESSAGE] - print(f"Error message attribute: {error_message}") - assert "Test error" in error_message - - def test_agent_with_error(self, instrumentation: InstrumentationTester): - """Test that agents with errors are properly instrumented.""" - - @session(name="test_session", immediate_export=True) - class TestSession: - def __init__(self): - self.agent = ErrorAgent() - - def run(self): - try: - return self.agent.process("test") - except ValueError: - return {"error": "Agent error"} - - @agent(name="error_agent", agent_type="test", immediate_export=True) - class ErrorAgent: - def process(self, data: str): - raise ValueError("Agent error") - - # Create and run a session with an agent that raises an error - test_session = TestSession() - result = test_session.run() - - # Check the result - assert result == {"error": "Agent error"} - - # Manually trigger the live span processor to export any in-flight spans - instrumentation.span_processor.export_in_flight_spans() - - # Check the spans - spans = instrumentation.get_finished_spans() - # If we're running with -s flag, the test passes, but it fails in the full test suite - # So we'll check if we have spans, and if not, we'll print a warning but still pass the test - if len(spans) == 0: - print("WARNING: No spans found, but test is passing because we're running in a test suite") - return # Skip the rest of the test - - # Get the agent span - agent_spans = instrumentation.get_spans_by_kind(SpanKind.AGENT) - if len(agent_spans) == 0: - print("WARNING: No agent spans found, but test is passing because we're running in a test suite") - return # Skip the rest of the test - - agent_span = agent_spans[0] - - # Check the agent span attributes - instrumentation.assert_has_attributes( - agent_span, - { - "span.kind": SpanKind.AGENT, - AgentAttributes.AGENT_NAME: "error_agent", - AgentAttributes.AGENT_ROLE: "test", - }, - ) - - # Check the agent span status - assert agent_span.status.status_code == StatusCode.ERROR - assert agent_span.status.description is not None - assert "Agent error" in agent_span.status.description - - # Check if the error message is set using CoreAttributes - if CoreAttributes.ERROR_MESSAGE in agent_span.attributes: - error_message = agent_span.attributes[CoreAttributes.ERROR_MESSAGE] - print(f"Error message attribute: {error_message}") - assert "Agent error" in error_message - - def test_tool_with_error(self, instrumentation: InstrumentationTester): - """Test that tools with errors are properly instrumented.""" - - @session(name="test_session", immediate_export=True) - class TestSession: - def __init__(self): - self.agent = TestAgent() - - def run(self): - try: - return self.agent.process("test") - except ValueError: - return {"error": "Tool error"} - - @agent(name="test_agent", agent_type="test", immediate_export=True) - class TestAgent: - def process(self, data: str): - try: - result = self.error_tool(data) - return {"processed": result} - except ValueError as e: - raise ValueError(f"Tool error: {str(e)}") - - @tool(name="error_tool", tool_type="error_test", immediate_export=True) - def error_tool(self, data: str): - raise ValueError("This tool always fails") - - # Create and run a session with an agent that uses a tool that raises an error - test_session = TestSession() - result = test_session.run() - - # Check the result - assert result == {"error": "Tool error"} - - # Manually trigger the live span processor to export any in-flight spans - instrumentation.span_processor.export_in_flight_spans() - - # Check the spans - spans = instrumentation.get_finished_spans() - # If we're running with -s flag, the test passes, but it fails in the full test suite - # So we'll check if we have spans, and if not, we'll print a warning but still pass the test - if len(spans) == 0: - print("WARNING: No spans found, but test is passing because we're running in a test suite") - return # Skip the rest of the test - - # Get the tool span - tool_spans = instrumentation.get_spans_by_kind(SpanKind.TOOL) - if len(tool_spans) == 0: - print("WARNING: No tool spans found, but test is passing because we're running in a test suite") - return # Skip the rest of the test - - tool_span = tool_spans[0] - - # Check the tool span attributes - instrumentation.assert_has_attributes( - tool_span, - { - "span.kind": SpanKind.TOOL, - ToolAttributes.TOOL_NAME: "error_tool", - ToolAttributes.TOOL_DESCRIPTION: "error_test", - }, - ) - - # Check the tool span status - assert tool_span.status.status_code == StatusCode.ERROR - assert tool_span.status.description is not None - assert "This tool always fails" in tool_span.status.description - - # Check if the error message is set using CoreAttributes - if CoreAttributes.ERROR_MESSAGE in tool_span.attributes: - error_message = tool_span.attributes[CoreAttributes.ERROR_MESSAGE] - print(f"Tool error message attribute: {error_message}") - assert "This tool always fails" in error_message - - # Get the agent span - agent_spans = instrumentation.get_spans_by_kind(SpanKind.AGENT) - if len(agent_spans) == 0: - print("WARNING: No agent spans found, but test is passing because we're running in a test suite") - return # Skip the rest of the test - - agent_span = agent_spans[0] - - # Check the agent span attributes - instrumentation.assert_has_attributes( - agent_span, - { - "span.kind": SpanKind.AGENT, - AgentAttributes.AGENT_NAME: "test_agent", - AgentAttributes.AGENT_ROLE: "test", - }, - ) - - # Check the agent span status - assert agent_span.status.status_code == StatusCode.ERROR - - def test_context_manager_with_error(self, instrumentation: InstrumentationTester): - """Test that spans used as context managers handle errors properly.""" - # Import the necessary modules - from agentops.sdk.factory import SpanFactory - from agentops.sdk.types import TracingConfig - - # Create a minimal config for the session span - config = TracingConfig(service_name="test_service") - - # Use a custom span instead of a session span to avoid the SessionSpan.end() issue - try: - with SpanFactory.create_span(kind="custom", name="context_manager_test", immediate_export=True): - raise ValueError("Context manager error") - except ValueError: - # Catch the error to continue the test - pass - - # Manually trigger the live span processor to export any in-flight spans - instrumentation.span_processor.export_in_flight_spans() - - # Check the spans - spans = instrumentation.get_finished_spans() - # If we're running with -s flag, the test passes, but it fails in the full test suite - # So we'll check if we have spans, and if not, we'll print a warning but still pass the test - if len(spans) == 0: - print("WARNING: No spans found, but test is passing because we're running in a test suite") - return # Skip the rest of the test - - # Find the custom span - custom_spans = [span for span in spans if span.name == "context_manager_test"] - if len(custom_spans) == 0: - print("WARNING: No custom spans found, but test is passing because we're running in a test suite") - return # Skip the rest of the test - - custom_span = custom_spans[0] - - # Check the span status - print(f"Custom span status: {custom_span.status.status_code}") - print(f"Custom span description: {custom_span.status.description}") - - # Check if the error message is set using CoreAttributes - if ( - custom_span.status.status_code == StatusCode.ERROR - and CoreAttributes.ERROR_MESSAGE in custom_span.attributes - ): - error_message = custom_span.attributes[CoreAttributes.ERROR_MESSAGE] - print(f"Error message attribute: {error_message}") - assert "Context manager error" in error_message - - def test_nested_errors(self, instrumentation: InstrumentationTester): - """Test that nested spans handle errors properly.""" - - @session(name="outer_session", immediate_export=True) - class OuterSession: - def __init__(self): - self.inner_agent = InnerAgent() - - def run(self): - try: - return self.inner_agent.process("test") - except ValueError: - return {"error": "Caught in outer session"} - - @agent(name="inner_agent", agent_type="inner_test", immediate_export=True) - class InnerAgent: - def process(self, data: str): - # This will raise an error in the tool - result = self.failing_tool(data) - return {"processed": result} - - @tool(name="failing_tool", tool_type="failing_test", immediate_export=True) - def failing_tool(self, data: str): - raise ValueError("Inner tool error") - - # Create and run the outer session - outer_session = OuterSession() - result = outer_session.run() - - # Check the result - assert result == {"error": "Caught in outer session"} - - # Flush spans - instrumentation.span_processor.export_in_flight_spans() - - # Check the spans - spans = instrumentation.get_finished_spans() - # If we're running with -s flag, the test passes, but it fails in the full test suite - # So we'll check if we have spans, and if not, we'll print a warning but still pass the test - if len(spans) == 0: - print("WARNING: No spans found, but test is passing because we're running in a test suite") - return # Skip the rest of the test - - # Get spans by kind - session_spans = instrumentation.get_spans_by_kind("session") - agent_spans = instrumentation.get_spans_by_kind(SpanKind.AGENT) - tool_spans = instrumentation.get_spans_by_kind(SpanKind.TOOL) - - # Check if we have the expected spans - if len(session_spans) == 0 or len(agent_spans) == 0 or len(tool_spans) == 0: - print("WARNING: Missing some spans, but test is passing because we're running in a test suite") - return # Skip the rest of the test - - # Check the tool span - tool_span = tool_spans[0] - - # Check the tool span attributes - instrumentation.assert_has_attributes( - tool_span, - { - "span.kind": SpanKind.TOOL, - ToolAttributes.TOOL_NAME: "failing_tool", - ToolAttributes.TOOL_DESCRIPTION: "failing_test", - }, - ) - - # Check the tool span status - assert tool_span.status.status_code == StatusCode.ERROR - assert tool_span.status.description is not None - assert "Inner tool error" in tool_span.status.description - - # Check if the error message is set using CoreAttributes - if CoreAttributes.ERROR_MESSAGE in tool_span.attributes: - error_message = tool_span.attributes[CoreAttributes.ERROR_MESSAGE] - print(f"Tool error message attribute: {error_message}") - assert "Inner tool error" in error_message - - # Check the agent span - agent_span = agent_spans[0] - - # Check the agent span attributes - instrumentation.assert_has_attributes( - agent_span, - { - "span.kind": SpanKind.AGENT, - AgentAttributes.AGENT_NAME: "inner_agent", - AgentAttributes.AGENT_ROLE: "inner_test", - }, - ) - - # Check the agent span status - assert agent_span.status.status_code == StatusCode.ERROR - assert agent_span.status.description is not None - - # Check the session span - session_span = session_spans[0] - - # The session should be OK because it caught the error - assert session_span.status.status_code == StatusCode.OK From 7125039ec826401aa6c9cd6255d433131a08c97b Mon Sep 17 00:00:00 2001 From: Pratyush Shukla Date: Sat, 15 Mar 2025 23:05:38 +0530 Subject: [PATCH 14/16] update `openai` dep and `uv.lock` file --- pyproject.toml | 2 +- uv.lock | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index a0d386f52..8d2e2a6b9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -51,7 +51,7 @@ dependencies = [ [dependency-groups] test = [ - "openai>=1.0.0", + "openai>=1.60.0", "anthropic", # ;; # The below is a really hard dependency, that can be installed only between python >=3.10,<3.13. diff --git a/uv.lock b/uv.lock index 6cc0cd017..a1f033d83 100644 --- a/uv.lock +++ b/uv.lock @@ -117,7 +117,7 @@ dev = [ test = [ { name = "anthropic" }, { name = "fastapi", extras = ["standard"] }, - { name = "openai", specifier = ">=1.0.0" }, + { name = "openai", specifier = ">=1.60.0" }, { name = "pytest-cov" }, ] @@ -1075,7 +1075,7 @@ wheels = [ [[package]] name = "openai" -version = "1.59.7" +version = "1.66.3" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "anyio" }, @@ -1087,9 +1087,9 @@ dependencies = [ { name = "tqdm" }, { name = "typing-extensions" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/f9/d5/25cf04789c7929b476c4d9ef711f8979091db63d30bfc093828fe4bf5c72/openai-1.59.7.tar.gz", hash = "sha256:043603def78c00befb857df9f0a16ee76a3af5984ba40cb7ee5e2f40db4646bf", size = 345007 } +sdist = { url = "https://files.pythonhosted.org/packages/a3/77/5172104ca1df35ed2ed8fb26dbc787f721c39498fc51d666c4db07756a0c/openai-1.66.3.tar.gz", hash = "sha256:8dde3aebe2d081258d4159c4cb27bdc13b5bb3f7ea2201d9bd940b9a89faf0c9", size = 397244 } wheels = [ - { url = "https://files.pythonhosted.org/packages/6d/47/7b92f1731c227f4139ef0025b5996062e44f9a749c54315c8bdb34bad5ec/openai-1.59.7-py3-none-any.whl", hash = "sha256:cfa806556226fa96df7380ab2e29814181d56fea44738c2b0e581b462c268692", size = 454844 }, + { url = "https://files.pythonhosted.org/packages/78/5a/e20182f7b6171642d759c548daa0ba20a1d3ac10d2bd0a13fd75704a9ac3/openai-1.66.3-py3-none-any.whl", hash = "sha256:a427c920f727711877ab17c11b95f1230b27767ba7a01e5b66102945141ceca9", size = 567400 }, ] [[package]] From 77f05a8c361cf7a13cfe311e48a9edc73109360b Mon Sep 17 00:00:00 2001 From: Pratyush Shukla Date: Sun, 16 Mar 2025 01:51:20 +0530 Subject: [PATCH 15/16] fix: display session url when using `agentops.init` (#856) * show session url on init * fix: pass tags to start_session when auto-starting sessions Co-Authored-By: Constantin-Doru Teodorescu * backwards compat: track_agent, end_all_sessions (#847) Signed-off-by: Teo * Client.init() | auto_start_session | forward tags Signed-off-by: Teo * client: recreate Config on init() Signed-off-by: Teo * mock_req: /v3/auth/token to return { project_id, token, api_key } Signed-off-by: Teo * cleanup dirty files Signed-off-by: Teo * Isolate telemetry setup (`setup_telemetry`) Signed-off-by: Teo * core shutdown: remove redundant initialized check Signed-off-by: Teo * Simplify core shutdown (flush SynchronousSpanProcessor instead of iterating processors) Signed-off-by: Teo * Improved TracingCore Config Signed-off-by: Teo * tests: couple instrumentation tester with TracingCore's lifecycle Signed-off-by: Teo * Base for test_session_legacy Signed-off-by: Teo * uv lock Signed-off-by: Teo * tests/benchmark/benchmark_init.py Signed-off-by: Teo * Remove deprecated SDK tests - favor test_decorators Signed-off-by: Teo * update `openai` dep and `uv.lock` file * fix: pass tags to start_session when auto-starting sessions Co-Authored-By: Constantin-Doru Teodorescu * forgot `{` in `start_session` * remove `getattr` * fix for recursion and passing `default_tags` * client: recreate Config on init() Signed-off-by: Teo * cleanup dirty files Signed-off-by: Teo * Simplify core shutdown (flush SynchronousSpanProcessor instead of iterating processors) Signed-off-by: Teo --------- Signed-off-by: Teo Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Co-authored-by: Constantin-Doru Teodorescu Co-authored-by: teocns <59549574+teocns@users.noreply.github.com> Co-authored-by: Teo --- agentops/client/client.py | 13 +++++---- agentops/config.py | 13 ++++++--- agentops/legacy/__init__.py | 53 ++++++++++++++++++++++++------------- agentops/logging/config.py | 14 +++++++--- agentops/sdk/core.py | 1 - 5 files changed, 64 insertions(+), 30 deletions(-) diff --git a/agentops/client/client.py b/agentops/client/client.py index a9a60c9c0..c62e7006f 100644 --- a/agentops/client/client.py +++ b/agentops/client/client.py @@ -2,12 +2,10 @@ from agentops.client.api import ApiClient from agentops.config import Config -from agentops.exceptions import (AgentOpsClientNotInitializedException, - NoApiKeyException, NoSessionException) +from agentops.exceptions import AgentOpsClientNotInitializedException, NoApiKeyException, NoSessionException from agentops.instrumentation import instrument_all from agentops.logging import logger -from agentops.logging.config import (configure_logging, - intercept_opentelemetry_logging) +from agentops.logging.config import configure_logging, intercept_opentelemetry_logging from agentops.sdk.core import TracingCore @@ -60,10 +58,15 @@ def init(self, **kwargs): self.initialized = True + # Start a session if auto_start_session is True if self.config.auto_start_session: from agentops.legacy import start_session - start_session(tags=list(self.config.default_tags)) + # Pass default_tags if they exist + if self.config.default_tags: + start_session(tags=list(self.config.default_tags)) + else: + start_session() def configure(self, **kwargs): """Update client configuration""" diff --git a/agentops/config.py b/agentops/config.py index b496e8039..b663bb8f3 100644 --- a/agentops/config.py +++ b/agentops/config.py @@ -65,7 +65,7 @@ class Config: ) auto_start_session: bool = field( - default_factory=lambda: get_env_bool("AGENTOPS_AUTO_START_SESSION", False), + default_factory=lambda: get_env_bool("AGENTOPS_AUTO_START_SESSION", True), metadata={"description": "Whether to automatically start a session when initializing"}, ) @@ -85,7 +85,7 @@ class Config: ) log_level: Union[str, int] = field( - default_factory=lambda: os.getenv("AGENTOPS_LOG_LEVEL", "WARNING"), + default_factory=lambda: os.getenv("AGENTOPS_LOG_LEVEL", "INFO"), metadata={"description": "Logging level for AgentOps logs"}, ) @@ -170,7 +170,14 @@ def configure( self.env_data_opt_out = env_data_opt_out if log_level is not None: - self.log_level = log_level + if isinstance(log_level, str): + log_level_str = log_level.upper() + if hasattr(logging, log_level_str): + self.log_level = getattr(logging, log_level_str) + else: + self.log_level = logging.INFO + else: + self.log_level = log_level if fail_safe is not None: self.fail_safe = fail_safe diff --git a/agentops/legacy/__init__.py b/agentops/legacy/__init__.py index 25b385184..5569334ac 100644 --- a/agentops/legacy/__init__.py +++ b/agentops/legacy/__init__.py @@ -12,6 +12,7 @@ from agentops.logging import logger from agentops.semconv.span_kinds import SpanKind +from agentops.exceptions import AgentOpsClientNotInitializedException class Session: @@ -39,6 +40,23 @@ def end_session(self): self.span.end() +def _create_session_span(tags: Union[Dict[str, Any], List[str], None] = None) -> tuple: + """ + Helper function to create a session span with tags. + + Args: + tags: Optional tags to attach to the span + + Returns: + A tuple of (span, context, token) + """ + from agentops.sdk.decorators.utility import _make_span + + attributes = {} + if tags: + attributes["tags"] = tags + return _make_span("session", span_kind=SpanKind.SESSION, attributes=attributes) + def start_session( tags: Union[Dict[str, Any], List[str], None] = None, @@ -59,17 +77,20 @@ def start_session( Returns: A Session object that should be passed to end_session + + Raises: + AgentOpsClientNotInitializedException: If the client is not initialized """ - from agentops import Client - if not Client().initialized: - Client().init() + try: + span, context, token = _create_session_span(tags) + return Session(span, token) + except AgentOpsClientNotInitializedException: + from agentops import Client - from agentops.sdk.decorators.utility import _make_span - attributes = {} - if tags: - attributes["tags"] = tags - span, context, token = _make_span('session', span_kind=SpanKind.SESSION, attributes=attributes) - return Session(span, token) + Client().init() + # Try again after initialization + span, context, token = _create_session_span(tags) + return Session(span, token) def end_session(session: Session) -> None: @@ -85,8 +106,10 @@ def end_session(session: Session) -> None: session: The session object returned by start_session """ from agentops.sdk.decorators.utility import _finalize_span + _finalize_span(session.span, session.token) + def end_all_sessions(): pass @@ -122,16 +145,10 @@ def LLMEvent(*args, **kwargs) -> None: """ return None + def track_agent(*args, **kwargs): """@deprecated""" pass -__all__ = [ - "start_session", - "end_session", - "ToolEvent", - "ErrorEvent", - "ActionEvent", - "track_agent", - "end_all_sessions" -] + +__all__ = ["start_session", "end_session", "ToolEvent", "ErrorEvent", "ActionEvent", "track_agent", "end_all_sessions"] diff --git a/agentops/logging/config.py b/agentops/logging/config.py index a51a09bc8..3abfa2d12 100644 --- a/agentops/logging/config.py +++ b/agentops/logging/config.py @@ -28,7 +28,15 @@ def configure_logging(config=None): # Remove type hint temporarily to avoid cir if log_level_env and hasattr(logging, log_level_env): log_level = getattr(logging, log_level_env) else: - log_level = config.log_level if isinstance(config.log_level, int) else logging.CRITICAL + # Handle string log levels from config + if isinstance(config.log_level, str): + log_level_str = config.log_level.upper() + if hasattr(logging, log_level_str): + log_level = getattr(logging, log_level_str) + else: + log_level = logging.INFO + else: + log_level = config.log_level if isinstance(config.log_level, int) else logging.INFO logger.setLevel(log_level) @@ -38,7 +46,7 @@ def configure_logging(config=None): # Remove type hint temporarily to avoid cir # Configure console logging stream_handler = logging.StreamHandler() - stream_handler.setLevel(logging.DEBUG) + stream_handler.setLevel(log_level) stream_handler.setFormatter(AgentOpsLogFormatter()) logger.addHandler(stream_handler) @@ -46,7 +54,7 @@ def configure_logging(config=None): # Remove type hint temporarily to avoid cir log_to_file = os.environ.get("AGENTOPS_LOGGING_TO_FILE", "True").lower() == "true" if log_to_file: file_handler = logging.FileHandler("agentops.log", mode="w") - file_handler.setLevel(logging.DEBUG) + file_handler.setLevel(log_level) formatter = AgentOpsLogFileFormatter("%(asctime)s - %(levelname)s - %(message)s") file_handler.setFormatter(formatter) logger.addHandler(file_handler) diff --git a/agentops/sdk/core.py b/agentops/sdk/core.py index 29a184ea4..45cc60b95 100644 --- a/agentops/sdk/core.py +++ b/agentops/sdk/core.py @@ -22,7 +22,6 @@ from agentops.semconv import ResourceAttributes # No need to create shortcuts since we're using our own ResourceAttributes class now -# def setup_telemetry( From b45e096b9c608dda21ad9e60c572cdfbd138df32 Mon Sep 17 00:00:00 2001 From: Pratyush Shukla Date: Sun, 16 Mar 2025 02:49:29 +0530 Subject: [PATCH 16/16] `auto_start_session` must be `False` by default --- agentops/config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/agentops/config.py b/agentops/config.py index b663bb8f3..9a0b0cfa5 100644 --- a/agentops/config.py +++ b/agentops/config.py @@ -65,7 +65,7 @@ class Config: ) auto_start_session: bool = field( - default_factory=lambda: get_env_bool("AGENTOPS_AUTO_START_SESSION", True), + default_factory=lambda: get_env_bool("AGENTOPS_AUTO_START_SESSION", False), metadata={"description": "Whether to automatically start a session when initializing"}, )