diff --git a/agentops/__init__.py b/agentops/__init__.py index 25e28dcf3..8fdf88625 100755 --- a/agentops/__init__.py +++ b/agentops/__init__.py @@ -28,6 +28,7 @@ def record(event): def init( api_key: Optional[str] = None, endpoint: Optional[str] = None, + app_url: Optional[str] = None, max_wait_time: Optional[int] = None, max_queue_size: Optional[int] = None, tags: Optional[List[str]] = None, @@ -50,6 +51,8 @@ def init( be read from the AGENTOPS_API_KEY environment variable. endpoint (str, optional): The endpoint for the AgentOps service. If none is provided, key will be read from the AGENTOPS_API_ENDPOINT environment variable. Defaults to 'https://api.agentops.ai'. + app_url (str, optional): The dashboard URL for the AgentOps app. If none is provided, key will + be read from the AGENTOPS_APP_URL environment variable. Defaults to 'https://app.agentops.ai'. max_wait_time (int, optional): The maximum time to wait in milliseconds before flushing the queue. Defaults to 5,000 (5 seconds) max_queue_size (int, optional): The maximum size of the event queue. Defaults to 512. @@ -79,6 +82,7 @@ def init( return _client.init( api_key=api_key, endpoint=endpoint, + app_url=app_url, max_wait_time=max_wait_time, max_queue_size=max_queue_size, default_tags=merged_tags, @@ -101,6 +105,7 @@ def configure(**kwargs): **kwargs: Configuration parameters. Supported parameters include: - api_key: API Key for AgentOps services - endpoint: The endpoint for the AgentOps service + - app_url: The dashboard URL for the AgentOps app - max_wait_time: Maximum time to wait in milliseconds before flushing the queue - max_queue_size: Maximum size of the event queue - default_tags: Default tags for the sessions @@ -118,6 +123,7 @@ def configure(**kwargs): valid_params = { "api_key", "endpoint", + "app_url", "max_wait_time", "max_queue_size", "default_tags", diff --git a/agentops/config.py b/agentops/config.py index 8ee08db22..a1097b6c7 100644 --- a/agentops/config.py +++ b/agentops/config.py @@ -19,6 +19,7 @@ class ConfigDict(TypedDict): api_key: Optional[str] endpoint: Optional[str] + app_url: Optional[str] max_wait_time: Optional[int] export_flush_interval: Optional[int] max_queue_size: Optional[int] @@ -45,6 +46,11 @@ class Config: metadata={"description": "Base URL for the AgentOps API"}, ) + app_url: str = field( + default_factory=lambda: os.getenv("AGENTOPS_APP_URL", "https://app.agentops.ai"), + metadata={"description": "Dashboard URL for the AgentOps application"}, + ) + max_wait_time: int = field( default_factory=lambda: get_env_int("AGENTOPS_MAX_WAIT_TIME", 5000), metadata={"description": "Maximum time in milliseconds to wait for API responses"}, @@ -124,6 +130,7 @@ def configure( self, api_key: Optional[str] = None, endpoint: Optional[str] = None, + app_url: Optional[str] = None, max_wait_time: Optional[int] = None, export_flush_interval: Optional[int] = None, max_queue_size: Optional[int] = None, @@ -151,6 +158,9 @@ def configure( if endpoint is not None: self.endpoint = endpoint + + if app_url is not None: + self.app_url = app_url if max_wait_time is not None: self.max_wait_time = max_wait_time @@ -211,6 +221,7 @@ def dict(self): return { "api_key": self.api_key, "endpoint": self.endpoint, + "app_url": self.app_url, "max_wait_time": self.max_wait_time, "export_flush_interval": self.export_flush_interval, "max_queue_size": self.max_queue_size, diff --git a/agentops/helpers/dashboard.py b/agentops/helpers/dashboard.py new file mode 100644 index 000000000..a72033df6 --- /dev/null +++ b/agentops/helpers/dashboard.py @@ -0,0 +1,43 @@ +""" +Helpers for interacting with the AgentOps dashboard. +""" +from typing import Union +from termcolor import colored +from opentelemetry.sdk.trace import Span, ReadableSpan +from agentops.logging import logger + + +def get_trace_url(span: Union[Span, ReadableSpan]) -> str: + """ + Generate a trace URL for a direct link to the session on the AgentOps dashboard. + + Args: + span: The span to generate the URL for. + + Returns: + The session URL. + """ + trace_id: Union[int, str] = span.context.trace_id + + # Convert trace_id to hex string if it's not already + # We don't add dashes to this to format it as a UUID since the dashboard doesn't either + if isinstance(trace_id, int): + trace_id = format(trace_id, "032x") + + # Get the app_url from the config - import here to avoid circular imports + from agentops import get_client + app_url = get_client().config.app_url + + return f"{app_url}/sessions?trace_id={trace_id}" + + +def log_trace_url(span: Union[Span, ReadableSpan]) -> None: + """ + Log the trace URL for the AgentOps dashboard. + + Args: + span: The span to log the URL for. + """ + session_url = get_trace_url(span) + logger.info(colored(f"\x1b[34mSession Replay: {session_url}\x1b[0m", "blue")) + diff --git a/agentops/sdk/processors.py b/agentops/sdk/processors.py index 0c6b6fe71..135614978 100644 --- a/agentops/sdk/processors.py +++ b/agentops/sdk/processors.py @@ -4,20 +4,17 @@ This module contains processors for OpenTelemetry spans. """ -import copy -import threading import time from threading import Event, Lock, Thread -from typing import Any, Dict, List, Optional +from typing import Dict, Optional from opentelemetry.context import Context from opentelemetry.sdk.trace import ReadableSpan, Span, SpanProcessor from opentelemetry.sdk.trace.export import SpanExporter -from termcolor import colored import agentops.semconv as semconv from agentops.logging import logger -from agentops.sdk.converters import trace_id_to_uuid, uuid_to_int16 +from agentops.helpers.dashboard import log_trace_url from agentops.semconv.core import CoreAttributes @@ -89,14 +86,7 @@ class InternalSpanProcessor(SpanProcessor): For session spans, it prints a URL to the AgentOps dashboard. """ - def __init__(self, app_url: str = "https://app.agentops.ai"): - """ - Initialize the PrintSpanProcessor. - - Args: - app_url: The base URL for the AgentOps dashboard. - """ - self.app_url = app_url + _root_span_id: Optional[Span] = None def on_start(self, span: Span, parent_context: Optional[Context] = None) -> None: """ @@ -110,29 +100,10 @@ def on_start(self, span: Span, parent_context: Optional[Context] = None) -> None if not span.context or not span.context.trace_flags.sampled: return - # Get the span kind from attributes - span_kind = ( - span.attributes.get(semconv.SpanAttributes.AGENTOPS_SPAN_KIND, "unknown") if span.attributes else "unknown" - ) - - # Print basic information about the span - logger.debug(f"Started span: {span.name} (kind: {span_kind})") - - # Special handling for session spans - if span_kind == semconv.SpanKind.SESSION: - trace_id = span.context.trace_id - # Convert trace_id to hex string if it's not already - if isinstance(trace_id, int): - session_url = f"{self.app_url}/drilldown?session_id={trace_id_to_uuid(trace_id)}" - logger.info( - colored( - f"\x1b[34mSession started: {session_url}\x1b[0m", - "light_green", - ) - ) - else: - # Print basic information for other span kinds - logger.debug(f"Ended span: {span.name} (kind: {span_kind})") + if not self._root_span_id: + self._root_span_id = span.context.span_id + logger.debug(f"[agentops.InternalSpanProcessor] Found root span: {span.name}") + log_trace_url(span) def on_end(self, span: ReadableSpan) -> None: """ @@ -145,30 +116,13 @@ def on_end(self, span: ReadableSpan) -> None: if not span.context or not span.context.trace_flags.sampled: return - # Get the span kind from attributes - span_kind = ( - span.attributes.get(semconv.SpanAttributes.AGENTOPS_SPAN_KIND, "unknown") if span.attributes else "unknown" - ) - - # Special handling for session spans - if span_kind == semconv.SpanKind.SESSION: - trace_id = span.context.trace_id - # Convert trace_id to hex string if it's not already - if isinstance(trace_id, int): - session_url = f"{self.app_url}/drilldown?session_id={trace_id_to_uuid(trace_id)}" - logger.info( - colored( - f"\x1b[34mSession Replay: {session_url}\x1b[0m", - "blue", - ) - ) - else: - # Print basic information for other span kinds - logger.debug(f"Ended span: {span.name} (kind: {span_kind})") + if self._root_span_id and (span.context.span_id is self._root_span_id): + logger.debug(f"[agentops.InternalSpanProcessor] Ending root span: {span.name}") + log_trace_url(span) def shutdown(self) -> None: """Shutdown the processor.""" - pass + self._root_span_id = None def force_flush(self, timeout_millis: int = 30000) -> bool: """Force flush the processor.""" diff --git a/tests/unit/helpers/test_dashboard.py b/tests/unit/helpers/test_dashboard.py new file mode 100644 index 000000000..46df3aa37 --- /dev/null +++ b/tests/unit/helpers/test_dashboard.py @@ -0,0 +1,80 @@ +""" +Unit tests for dashboard URL generation and logging. +""" + +import unittest +from unittest.mock import patch, MagicMock + +from agentops.helpers.dashboard import get_trace_url, log_trace_url + + +class TestDashboardHelpers(unittest.TestCase): + """Tests for dashboard URL generation and logging functions.""" + + @patch('agentops.get_client') + def test_get_trace_url_with_hex_trace_id(self, mock_get_client): + """Test get_trace_url with a hexadecimal trace ID.""" + # Mock the config's app_url + mock_client = MagicMock() + mock_client.config.app_url = "https://test-app.agentops.ai" + mock_get_client.return_value = mock_client + + # Create a mock span with a hex string trace ID (using a full 32-character trace ID) + mock_span = MagicMock() + mock_span.context.trace_id = "1234567890abcdef1234567890abcdef" + + # Call get_trace_url + url = get_trace_url(mock_span) + + # Assert that the URL is correctly formed with the config's app_url + self.assertEqual(url, "https://test-app.agentops.ai/sessions?trace_id=1234567890abcdef1234567890abcdef") + + @patch('agentops.get_client') + def test_get_trace_url_with_int_trace_id(self, mock_get_client): + """Test get_trace_url with an integer trace ID.""" + # Mock the config's app_url + mock_client = MagicMock() + mock_client.config.app_url = "https://test-app.agentops.ai" + mock_get_client.return_value = mock_client + + # Create a mock span with an int trace ID + mock_span = MagicMock() + mock_span.context.trace_id = 12345 + + # Call get_trace_url + url = get_trace_url(mock_span) + + # Assert that the URL follows the expected format with a 32-character hex string + self.assertTrue(url.startswith("https://test-app.agentops.ai/sessions?trace_id=")) + + # Verify the format is a 32-character hex string (no dashes) + hex_part = url.split("trace_id=")[1] + self.assertRegex(hex_part, r"^[0-9a-f]{32}$") + + # Verify the value is correctly formatted from the integer 12345 + expected_hex = format(12345, "032x") + self.assertEqual(hex_part, expected_hex) + + @patch('agentops.helpers.dashboard.logger') + @patch('agentops.get_client') + def test_log_trace_url(self, mock_get_client, mock_logger): + """Test log_trace_url includes the session URL in the log message.""" + # Mock the config's app_url + mock_client = MagicMock() + mock_client.config.app_url = "https://test-app.agentops.ai" + mock_get_client.return_value = mock_client + + # Create a mock span + mock_span = MagicMock() + mock_span.context.trace_id = "test-trace-id" + + # Mock get_trace_url to return a known value that uses the app_url + expected_url = "https://test-app.agentops.ai/sessions?trace_id=test-trace-id" + with patch('agentops.helpers.dashboard.get_trace_url', return_value=expected_url): + # Call log_trace_url + log_trace_url(mock_span) + + # Assert that logger.info was called with a message containing the URL + mock_logger.info.assert_called_once() + log_message = mock_logger.info.call_args[0][0] + self.assertIn(expected_url, log_message) \ No newline at end of file diff --git a/tests/unit/sdk/test_internal_span_processor.py b/tests/unit/sdk/test_internal_span_processor.py new file mode 100644 index 000000000..7aaf7f198 --- /dev/null +++ b/tests/unit/sdk/test_internal_span_processor.py @@ -0,0 +1,165 @@ +""" +Unit tests for the InternalSpanProcessor. +""" + +import unittest +from unittest.mock import patch, MagicMock, call + +from opentelemetry.sdk.trace import Span, ReadableSpan + +from agentops.sdk.processors import InternalSpanProcessor + + +class TestInternalSpanProcessor(unittest.TestCase): + """Tests for InternalSpanProcessor.""" + + def setUp(self): + self.processor = InternalSpanProcessor() + + # Reset the root span ID before each test + self.processor._root_span_id = None + + @patch('agentops.sdk.processors.log_trace_url') + def test_logs_url_for_first_span(self, mock_log_trace_url): + """Test that the first span triggers a log_trace_url call.""" + # Create a mock span + mock_span = MagicMock(spec=Span) + mock_context = MagicMock() + mock_context.trace_flags.sampled = True + mock_context.span_id = 12345 + mock_span.context = mock_context + + # Call on_start + self.processor.on_start(mock_span) + + # Assert that log_trace_url was called once + mock_log_trace_url.assert_called_once_with(mock_span) + + @patch('agentops.sdk.processors.log_trace_url') + def test_logs_url_only_for_root_span(self, mock_log_trace_url): + """Test that log_trace_url is only called for the root span.""" + # First, create and start the root span + mock_root_span = MagicMock(spec=Span) + mock_root_context = MagicMock() + mock_root_context.trace_flags.sampled = True + mock_root_context.span_id = 12345 + mock_root_span.context = mock_root_context + + self.processor.on_start(mock_root_span) + + # Reset the mock after root span creation + mock_log_trace_url.reset_mock() + + # Now create and start a non-root span + mock_non_root_span = MagicMock(spec=Span) + mock_non_root_context = MagicMock() + mock_non_root_context.trace_flags.sampled = True + mock_non_root_context.span_id = 67890 # Different from root span ID + mock_non_root_span.context = mock_non_root_context + + self.processor.on_start(mock_non_root_span) + + # Assert that log_trace_url was not called for the non-root span + mock_log_trace_url.assert_not_called() + + # End the non-root span + mock_non_root_readable = MagicMock(spec=ReadableSpan) + mock_non_root_readable.context = mock_non_root_context + + self.processor.on_end(mock_non_root_readable) + + # Assert that log_trace_url was still not called + mock_log_trace_url.assert_not_called() + + # Now end the root span + mock_root_readable = MagicMock(spec=ReadableSpan) + mock_root_readable.context = mock_root_context + + self.processor.on_end(mock_root_readable) + + # Assert that log_trace_url was called for the root span end + mock_log_trace_url.assert_called_once_with(mock_root_readable) + + @patch('agentops.sdk.processors.log_trace_url') + def test_logs_url_exactly_twice_for_root_span(self, mock_log_trace_url): + """Test that log_trace_url is called exactly twice for the root span (start and end).""" + # Create a mock root span + mock_root_span = MagicMock(spec=Span) + mock_root_context = MagicMock() + mock_root_context.trace_flags.sampled = True + mock_root_context.span_id = 12345 + mock_root_span.context = mock_root_context + + # Start the root span + self.processor.on_start(mock_root_span) + + # Create a mock readable span for the end event + mock_root_readable = MagicMock(spec=ReadableSpan) + mock_root_readable.context = mock_root_context + + # End the root span + self.processor.on_end(mock_root_readable) + + # Assert that log_trace_url was called exactly twice + self.assertEqual(mock_log_trace_url.call_count, 2) + mock_log_trace_url.assert_has_calls([ + call(mock_root_span), + call(mock_root_readable) + ]) + + @patch('agentops.sdk.processors.log_trace_url') + def test_ignores_unsampled_spans(self, mock_log_trace_url): + """Test that unsampled spans are ignored.""" + # Create a mock unsampled span + mock_span = MagicMock(spec=Span) + mock_context = MagicMock() + mock_context.trace_flags.sampled = False + mock_span.context = mock_context + + # Start and end the span + self.processor.on_start(mock_span) + self.processor.on_end(mock_span) + + # Assert that log_trace_url was not called + mock_log_trace_url.assert_not_called() + + # Assert that root_span_id was not set + self.assertIsNone(self.processor._root_span_id) + + @patch('agentops.sdk.processors.log_trace_url') + def test_shutdown_resets_root_span_id(self, mock_log_trace_url): + """Test that shutdown resets the root span ID.""" + # First set a root span + mock_root_span = MagicMock(spec=Span) + mock_root_context = MagicMock() + mock_root_context.trace_flags.sampled = True + mock_root_context.span_id = 12345 + mock_root_span.context = mock_root_context + + self.processor.on_start(mock_root_span) + + # Verify root span ID was set + self.assertEqual(self.processor._root_span_id, 12345) + + # Call shutdown + self.processor.shutdown() + + # Verify root span ID was reset + self.assertIsNone(self.processor._root_span_id) + + # Create another span after shutdown + mock_span = MagicMock(spec=Span) + mock_context = MagicMock() + mock_context.trace_flags.sampled = True + mock_context.span_id = 67890 + mock_span.context = mock_context + + # Reset mocks + mock_log_trace_url.reset_mock() + + # Start the span, it should be treated as a new root span + self.processor.on_start(mock_span) + + # Verify new root span was identified + self.assertEqual(self.processor._root_span_id, 67890) + mock_log_trace_url.assert_called_once_with(mock_span) \ No newline at end of file diff --git a/tests/unit/test_config.py b/tests/unit/test_config.py index 1847d8033..10ededf29 100644 --- a/tests/unit/test_config.py +++ b/tests/unit/test_config.py @@ -18,6 +18,7 @@ def mock_env(): env_vars = { "AGENTOPS_API_KEY": "test-api-key", "AGENTOPS_API_ENDPOINT": "https://test.agentops.ai", + "AGENTOPS_APP_URL": "https://test-app.agentops.ai", "AGENTOPS_MAX_WAIT_TIME": "1000", "AGENTOPS_MAX_QUEUE_SIZE": "256", "AGENTOPS_DEFAULT_TAGS": "tag1,tag2,tag3", @@ -43,6 +44,7 @@ def test_config_from_env(mock_env): assert config.api_key == "test-api-key" assert config.endpoint == "https://test.agentops.ai" + assert config.app_url == "https://test-app.agentops.ai" assert config.max_wait_time == 1000 assert config.max_queue_size == 256 assert config.default_tags == {"tag1", "tag2", "tag3"} @@ -63,6 +65,7 @@ def test_config_override_env(mock_env, valid_uuid): config.configure( api_key=valid_uuid, endpoint="https://override.agentops.ai", + app_url="https://override-app.agentops.ai", max_wait_time=2000, default_tags=["new-tag"], instrument_llm_calls=True, @@ -71,6 +74,7 @@ def test_config_override_env(mock_env, valid_uuid): assert config.api_key == valid_uuid assert config.endpoint == "https://override.agentops.ai" + assert config.app_url == "https://override-app.agentops.ai" assert config.max_wait_time == 2000 assert config.default_tags == {"new-tag"} assert config.instrument_llm_calls is True