From 0e35c4d9348dd3c67813ebe8e3fe313817974175 Mon Sep 17 00:00:00 2001 From: Aamna Najmi Date: Tue, 18 Nov 2025 17:34:55 +0100 Subject: [PATCH 1/2] Fix agentcore langfuse integration --- cx-agent-backend/Dockerfile | 2 +- .../domain/services/conversation_service.py | 45 +----- .../adapters/langgraph_agent_service.py | 132 +++++------------- .../infrastructure/config/container.py | 13 -- .../infrastructure/config/settings.py | 2 +- cx-agent-backend/pyproject.toml | 6 +- infra/main.tf | 10 ++ 7 files changed, 49 insertions(+), 161 deletions(-) diff --git a/cx-agent-backend/Dockerfile b/cx-agent-backend/Dockerfile index 26a2fde..b3367d6 100644 --- a/cx-agent-backend/Dockerfile +++ b/cx-agent-backend/Dockerfile @@ -20,4 +20,4 @@ RUN uv sync --no-dev EXPOSE 8080 # Start the CX Agent service -CMD ["opentelemetry-instrument", "python", "-m", "cx_agent_backend"] +CMD ["python", "-m", "cx_agent_backend"] diff --git a/cx-agent-backend/cx_agent_backend/domain/services/conversation_service.py b/cx-agent-backend/cx_agent_backend/domain/services/conversation_service.py index 311d1b0..590af43 100644 --- a/cx-agent-backend/cx_agent_backend/domain/services/conversation_service.py +++ b/cx-agent-backend/cx_agent_backend/domain/services/conversation_service.py @@ -1,11 +1,8 @@ """Domain service for conversation business logic.""" import logging -import os from uuid import UUID -from langfuse import get_client, Langfuse - from cx_agent_backend.domain.entities.conversation import Conversation, Message from cx_agent_backend.domain.repositories.conversation_repository import ConversationRepository from cx_agent_backend.domain.services.agent_service import AgentRequest, AgentService, AgentType @@ -21,12 +18,10 @@ def __init__( conversation_repo: ConversationRepository, agent_service: AgentService, guardrail_service: GuardrailService | None = None, - langfuse_config: dict | None = None, ): self._conversation_repo = conversation_repo self._agent_service = agent_service self._guardrail_service = guardrail_service - self._langfuse_config = langfuse_config or {} async def start_conversation(self, user_id: str) -> Conversation: """Start a new conversation.""" @@ -130,41 +125,5 @@ async def get_user_conversations(self, user_id: str) -> list[Conversation]: return await self._conversation_repo.get_by_user_id(user_id) async def log_feedback(self, user_id: str, session_id: str, message_id: str, score: int, comment: str = "") -> None: - """Log user feedback to Langfuse.""" - - # Log feedback attempt - feedback_msg = f"[FEEDBACK] Attempting to log feedback - user_id: {user_id}, session_id: {session_id}, message_id: {message_id}, score: {score}" - logger.info(feedback_msg) - - try: - - logger.info("[FEEDBACK] Langfuse config - enabled: %s, host: %s", - self._langfuse_config.get("enabled"), - self._langfuse_config.get("host")) - - if self._langfuse_config.get("enabled"): - logger.info("[FEEDBACK] Langfuse is enabled, setting environment variables") - os.environ["LANGFUSE_SECRET_KEY"] = self._langfuse_config.get("secret_key") - os.environ["LANGFUSE_PUBLIC_KEY"] = self._langfuse_config.get("public_key") - os.environ["LANGFUSE_HOST"] = self._langfuse_config.get("host") - - langfuse = get_client() - predefined_trace_id = Langfuse.create_trace_id(seed=session_id) - - logger.info("[FEEDBACK] Calling span.score_trace") - with langfuse.start_as_current_span( - name="langchain-request", - trace_context={"trace_id": predefined_trace_id} - ) as span: - result = span.score_trace( - name="user-feedback", - value=score, - data_type="NUMERIC", - comment=comment - ) - - logger.info("[FEEDBACK] Successfully created score: %s", result) - else: - logger.info("[FEEDBACK] Langfuse is not enabled in config") - except Exception as e: - logger.error(f"[FEEDBACK] Failed to log feedback to Langfuse: {e}") + """Log user feedback.""" + logger.info(f"[FEEDBACK] Received feedback - user_id: {user_id}, session_id: {session_id}, message_id: {message_id}, score: {score}, comment: {comment}") diff --git a/cx-agent-backend/cx_agent_backend/infrastructure/adapters/langgraph_agent_service.py b/cx-agent-backend/cx_agent_backend/infrastructure/adapters/langgraph_agent_service.py index 807c1d6..45c7bdd 100644 --- a/cx-agent-backend/cx_agent_backend/infrastructure/adapters/langgraph_agent_service.py +++ b/cx-agent-backend/cx_agent_backend/infrastructure/adapters/langgraph_agent_service.py @@ -1,13 +1,12 @@ """LangGraph implementation of agent service.""" +import base64 from langchain_core.messages import AIMessage, HumanMessage from langchain_core.runnables import RunnableConfig from langchain_core.tools import tool import os import logging from langgraph.prebuilt import create_react_agent -from langfuse import get_client, Langfuse -from langfuse.langchain import CallbackHandler from bedrock_agentcore.memory import MemoryClient logger = logging.getLogger(__name__) @@ -33,11 +32,9 @@ class LangGraphAgentService(AgentService): def __init__( self, - langfuse_config: dict | None = None, guardrail_service: GuardrailService | None = None, llm_service: LLMService | None = None, ): - self._langfuse_config = langfuse_config or {} self._guardrail_service = guardrail_service self._llm_service = llm_service @@ -108,13 +105,7 @@ async def process_request(self, request: AgentRequest) -> AgentResponse: request.agent_type, ) - # Use trace_id from request if provided, otherwise create one - langfuse = None - predefined_trace_id = getattr(request, 'trace_id', None) - if self._langfuse_config.get("enabled"): - langfuse = get_client() - if not predefined_trace_id: - predefined_trace_id = Langfuse.create_trace_id(seed=request.session_id) + # Check input guardrails if enabled if self._guardrail_service and request.messages: @@ -138,7 +129,7 @@ async def process_request(self, request: AgentRequest) -> AgentResponse: input_result.blocked_categories ) }, - trace_id=predefined_trace_id, + trace_id=None, ) # Get memory parameters from environment or request @@ -160,94 +151,35 @@ async def process_request(self, request: AgentRequest) -> AgentResponse: elif msg.role == MessageRole.ASSISTANT: lc_messages.append(AIMessage(content=msg.content)) - # Create config with Langfuse callback if enabled - trace_id = None - response = None - if self._langfuse_config.get("enabled"): - os.environ["LANGFUSE_SECRET_KEY"] = self._langfuse_config.get("secret_key") - os.environ["LANGFUSE_PUBLIC_KEY"] = self._langfuse_config.get("public_key") - os.environ["LANGFUSE_HOST"] = self._langfuse_config.get("host") - - trace_id = predefined_trace_id - - langfuse_handler = CallbackHandler() - - with langfuse.start_as_current_span( - name="langchain-request", - trace_context={"trace_id": predefined_trace_id} - ) as span: - trace_update_params = { - "user_id": request.user_id, - "input": {"messages": [msg.content for msg in request.messages]} - } - # Add default tag and any additional tags - tags = ["langgraph-cx-agent"] - if request.langfuse_tags: - logger.info(f"Adding langfuse_tags: {request.langfuse_tags}") - tags.extend(request.langfuse_tags) - else: - logger.info("No langfuse_tags provided") - logger.info(f"Final tags for trace: {tags}") - trace_update_params["tags"] = tags - span.update_trace(**trace_update_params) - - config = RunnableConfig( - configurable={ - "thread_id": f"{request.session_id}", - "user_id": request.user_id, - }, - callbacks=[langfuse_handler], - ) - - # Invoke agent - logger.debug("Invoking agent with %s messages", len(lc_messages)) - response = await agent.ainvoke({"messages": lc_messages}, config=config) - - # Save conversation to memory if available - if memory_client and lc_messages: - try: - last_user_msg = next((msg.content for msg in reversed(lc_messages) if isinstance(msg, HumanMessage)), None) - assistant_response = response["messages"][-1].content if response["messages"] else "" - - if last_user_msg and assistant_response: - memory_client.create_event( - memory_id=stm_memory_id, - actor_id=actor_id, - session_id=session_id, - messages=[(last_user_msg, "USER"), (assistant_response, "ASSISTANT")] - ) - except Exception as e: - logger.warning(f"Failed to save conversation to memory: {e}") + + # Create config + config = RunnableConfig( + configurable={ + "thread_id": f"{request.session_id}", + "user_id": request.user_id, + }, + ) + + # Invoke agent + logger.debug("Invoking agent with %s messages", len(lc_messages)) + response = await agent.ainvoke({"messages": lc_messages}, config=config) + + # Save conversation to memory if available + if memory_client and lc_messages: + try: + last_user_msg = next((msg.content for msg in reversed(lc_messages) if isinstance(msg, HumanMessage)), None) + assistant_response = response["messages"][-1].content if response["messages"] else "" - span.update_trace(output={"response": response["messages"][-1].content if response["messages"] else ""}) - else: - config = RunnableConfig( - configurable={ - "thread_id": f"{request.session_id}", - "user_id": request.user_id, - }, - ) - - # Invoke agent - logger.debug("Invoking agent with %s messages", len(lc_messages)) - response = await agent.ainvoke({"messages": lc_messages}, config=config) - - # Save conversation to memory if available - if memory_client and lc_messages: - try: - last_user_msg = next((msg.content for msg in reversed(lc_messages) if isinstance(msg, HumanMessage)), None) - assistant_response = response["messages"][-1].content if response["messages"] else "" - - if last_user_msg and assistant_response: - memory_client.create_event( - memory_id=stm_memory_id, - actor_id=actor_id, - session_id=session_id, - messages=[(last_user_msg, "USER"), (assistant_response, "ASSISTANT")] - ) - except Exception as e: - logger.warning(f"Failed to save conversation to memory: {e}") + if last_user_msg and assistant_response: + memory_client.create_event( + memory_id=stm_memory_id, + actor_id=actor_id, + session_id=session_id, + messages=[(last_user_msg, "USER"), (assistant_response, "ASSISTANT")] + ) + except Exception as e: + logger.warning(f"Failed to save conversation to memory: {e}") # Extract response last_message = response["messages"][-1] tools_used = [] @@ -294,7 +226,7 @@ async def process_request(self, request: AgentRequest) -> AgentResponse: metadata={ "blocked_categories": ",".join(output_result.blocked_categories) }, - trace_id=trace_id, + trace_id=None, ) # Add trace metadata @@ -313,7 +245,7 @@ async def process_request(self, request: AgentRequest) -> AgentResponse: agent_type=request.agent_type, tools_used=tools_used, metadata=metadata, - trace_id=trace_id + trace_id=None ) async def stream_response(self, request: AgentRequest): diff --git a/cx-agent-backend/cx_agent_backend/infrastructure/config/container.py b/cx-agent-backend/cx_agent_backend/infrastructure/config/container.py index 6b29874..ba49e5b 100644 --- a/cx-agent-backend/cx_agent_backend/infrastructure/config/container.py +++ b/cx-agent-backend/cx_agent_backend/infrastructure/config/container.py @@ -24,7 +24,6 @@ class Container(containers.DeclarativeContainer): parameter_store_reader = AWSParameterStoreReader() gateway_secret = json.loads(secret_reader.read_secret("gateway_credentials")) - langfuse_secret = json.loads(secret_reader.read_secret("langfuse_credentials")) # Repositories conversation_repository = providers.Singleton(MemoryConversationRepository) @@ -49,12 +48,6 @@ class Container(containers.DeclarativeContainer): agent_service = providers.Singleton( LangGraphAgentService, - langfuse_config={ - "enabled": settings.langfuse_enabled, - "secret_key": langfuse_secret["langfuse_secret_key"], - "public_key": langfuse_secret["langfuse_public_key"], - "host": langfuse_secret["langfuse_host"], - }, guardrail_service=guardrail_service, llm_service=llm_service, ) @@ -64,10 +57,4 @@ class Container(containers.DeclarativeContainer): conversation_repo=conversation_repository, agent_service=agent_service, guardrail_service=guardrail_service, - langfuse_config={ - "enabled": settings.langfuse_enabled, - "secret_key": langfuse_secret["langfuse_secret_key"], - "public_key": langfuse_secret["langfuse_public_key"], - "host": langfuse_secret["langfuse_host"], - }, ) diff --git a/cx-agent-backend/cx_agent_backend/infrastructure/config/settings.py b/cx-agent-backend/cx_agent_backend/infrastructure/config/settings.py index a00fe89..94f9351 100644 --- a/cx-agent-backend/cx_agent_backend/infrastructure/config/settings.py +++ b/cx-agent-backend/cx_agent_backend/infrastructure/config/settings.py @@ -30,7 +30,7 @@ class Settings(BaseSettings): # AWS Settings aws_region: str = Field( - default=environ.get("AWS_REGION", environ.get("AWS_DEFAULT_REGION", "us-east-1")), + default=environ.get("AWS_REGION", environ.get("AWS_DEFAULT_REGION", "eu-central-1")), description="AWS region", ) diff --git a/cx-agent-backend/pyproject.toml b/cx-agent-backend/pyproject.toml index 1a27cac..79cfc14 100644 --- a/cx-agent-backend/pyproject.toml +++ b/cx-agent-backend/pyproject.toml @@ -4,22 +4,22 @@ version = "0.1.0" description = "Customer service demo agent compatible with Bedrock AgentCore" requires-python = ">=3.11" dependencies = [ - "aws-opentelemetry-distro>=0.1.0", "bedrock-agentcore>=1.0.3", "boto3>=1.34.0", "dependency-injector>=4.41.0", "fastapi>=0.104.0", "langchain>=0.1.0", - "langchain-aws>=0.1.0", + "langchain[aws]", "langchain-core>=0.1.0", "langchain-openai>=0.1.0", - "langfuse>=2.0.0", "langgraph>=0.1.0", "pydantic>=2.5.0", "pydantic-settings>=2.1.0", "structlog>=23.2.0", "tavily-python>=0.3.0", "uvicorn[standard]>=0.24.0", + "opentelemetry-instrumentation-langchain>=0.48.1", + "langsmith[otel]" ] [project.optional-dependencies] diff --git a/infra/main.tf b/infra/main.tf index 726a88a..fde47e3 100644 --- a/infra/main.tf +++ b/infra/main.tf @@ -103,4 +103,14 @@ resource "aws_bedrockagentcore_agent_runtime" "agent_runtime" { protocol_configuration { server_protocol = "HTTP" } + environment_variables = { + "LOG_LEVEL" = "INFO" + "OTEL_EXPORTER_OTLP_ENDPOINT" = "${var.langfuse_host}/api/public/otel" + "OTEL_EXPORTER_OTLP_HEADERS" = "Authorization=Basic ${base64encode("${var.langfuse_public_key}:${var.langfuse_secret_key}")}" + "DISABLE_ADOT_OBSERVABILITY" = "true", + "LANGSMITH_OTEL_ENABLED" = "true", + "LANGSMITH_TRACING" = "true" + + } + } From f256659ada96a65b28f0de4939607d2ee3ce845a Mon Sep 17 00:00:00 2001 From: Aamna Najmi <56640566+najmia@users.noreply.github.com> Date: Tue, 18 Nov 2025 17:39:50 +0100 Subject: [PATCH 2/2] Update settings.py --- .../cx_agent_backend/infrastructure/config/settings.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cx-agent-backend/cx_agent_backend/infrastructure/config/settings.py b/cx-agent-backend/cx_agent_backend/infrastructure/config/settings.py index 94f9351..a00fe89 100644 --- a/cx-agent-backend/cx_agent_backend/infrastructure/config/settings.py +++ b/cx-agent-backend/cx_agent_backend/infrastructure/config/settings.py @@ -30,7 +30,7 @@ class Settings(BaseSettings): # AWS Settings aws_region: str = Field( - default=environ.get("AWS_REGION", environ.get("AWS_DEFAULT_REGION", "eu-central-1")), + default=environ.get("AWS_REGION", environ.get("AWS_DEFAULT_REGION", "us-east-1")), description="AWS region", )