Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cx-agent-backend/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@ RUN uv sync --no-dev
EXPOSE 8080

# Start the CX Agent service
CMD ["opentelemetry-instrument", "python", "-m", "cx_agent_backend"]
CMD ["python", "-m", "cx_agent_backend"]
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
"""Domain service for conversation business logic."""

import logging
import os
from uuid import UUID

from langfuse import get_client, Langfuse

from cx_agent_backend.domain.entities.conversation import Conversation, Message
from cx_agent_backend.domain.repositories.conversation_repository import ConversationRepository
from cx_agent_backend.domain.services.agent_service import AgentRequest, AgentService, AgentType
Expand All @@ -21,12 +18,10 @@ def __init__(
conversation_repo: ConversationRepository,
agent_service: AgentService,
guardrail_service: GuardrailService | None = None,
langfuse_config: dict | None = None,
):
self._conversation_repo = conversation_repo
self._agent_service = agent_service
self._guardrail_service = guardrail_service
self._langfuse_config = langfuse_config or {}

async def start_conversation(self, user_id: str) -> Conversation:
"""Start a new conversation."""
Expand Down Expand Up @@ -130,41 +125,5 @@ async def get_user_conversations(self, user_id: str) -> list[Conversation]:
return await self._conversation_repo.get_by_user_id(user_id)

async def log_feedback(self, user_id: str, session_id: str, message_id: str, score: int, comment: str = "") -> None:
"""Log user feedback to Langfuse."""

# Log feedback attempt
feedback_msg = f"[FEEDBACK] Attempting to log feedback - user_id: {user_id}, session_id: {session_id}, message_id: {message_id}, score: {score}"
logger.info(feedback_msg)

try:

logger.info("[FEEDBACK] Langfuse config - enabled: %s, host: %s",
self._langfuse_config.get("enabled"),
self._langfuse_config.get("host"))

if self._langfuse_config.get("enabled"):
logger.info("[FEEDBACK] Langfuse is enabled, setting environment variables")
os.environ["LANGFUSE_SECRET_KEY"] = self._langfuse_config.get("secret_key")
os.environ["LANGFUSE_PUBLIC_KEY"] = self._langfuse_config.get("public_key")
os.environ["LANGFUSE_HOST"] = self._langfuse_config.get("host")

langfuse = get_client()
predefined_trace_id = Langfuse.create_trace_id(seed=session_id)

logger.info("[FEEDBACK] Calling span.score_trace")
with langfuse.start_as_current_span(
name="langchain-request",
trace_context={"trace_id": predefined_trace_id}
) as span:
result = span.score_trace(
name="user-feedback",
value=score,
data_type="NUMERIC",
comment=comment
)

logger.info("[FEEDBACK] Successfully created score: %s", result)
else:
logger.info("[FEEDBACK] Langfuse is not enabled in config")
except Exception as e:
logger.error(f"[FEEDBACK] Failed to log feedback to Langfuse: {e}")
"""Log user feedback."""
logger.info(f"[FEEDBACK] Received feedback - user_id: {user_id}, session_id: {session_id}, message_id: {message_id}, score: {score}, comment: {comment}")
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
"""LangGraph implementation of agent service."""

import base64
from langchain_core.messages import AIMessage, HumanMessage
from langchain_core.runnables import RunnableConfig
from langchain_core.tools import tool
import os
import logging
from langgraph.prebuilt import create_react_agent
from langfuse import get_client, Langfuse
from langfuse.langchain import CallbackHandler
from bedrock_agentcore.memory import MemoryClient

logger = logging.getLogger(__name__)
Expand All @@ -33,11 +32,9 @@ class LangGraphAgentService(AgentService):

def __init__(
self,
langfuse_config: dict | None = None,
guardrail_service: GuardrailService | None = None,
llm_service: LLMService | None = None,
):
self._langfuse_config = langfuse_config or {}
self._guardrail_service = guardrail_service
self._llm_service = llm_service

Expand Down Expand Up @@ -108,13 +105,7 @@ async def process_request(self, request: AgentRequest) -> AgentResponse:
request.agent_type,
)

# Use trace_id from request if provided, otherwise create one
langfuse = None
predefined_trace_id = getattr(request, 'trace_id', None)
if self._langfuse_config.get("enabled"):
langfuse = get_client()
if not predefined_trace_id:
predefined_trace_id = Langfuse.create_trace_id(seed=request.session_id)


# Check input guardrails if enabled
if self._guardrail_service and request.messages:
Expand All @@ -138,7 +129,7 @@ async def process_request(self, request: AgentRequest) -> AgentResponse:
input_result.blocked_categories
)
},
trace_id=predefined_trace_id,
trace_id=None,
)

# Get memory parameters from environment or request
Expand All @@ -160,94 +151,35 @@ async def process_request(self, request: AgentRequest) -> AgentResponse:
elif msg.role == MessageRole.ASSISTANT:
lc_messages.append(AIMessage(content=msg.content))

# Create config with Langfuse callback if enabled
trace_id = None
response = None

if self._langfuse_config.get("enabled"):
os.environ["LANGFUSE_SECRET_KEY"] = self._langfuse_config.get("secret_key")
os.environ["LANGFUSE_PUBLIC_KEY"] = self._langfuse_config.get("public_key")
os.environ["LANGFUSE_HOST"] = self._langfuse_config.get("host")

trace_id = predefined_trace_id

langfuse_handler = CallbackHandler()

with langfuse.start_as_current_span(
name="langchain-request",
trace_context={"trace_id": predefined_trace_id}
) as span:
trace_update_params = {
"user_id": request.user_id,
"input": {"messages": [msg.content for msg in request.messages]}
}
# Add default tag and any additional tags
tags = ["langgraph-cx-agent"]
if request.langfuse_tags:
logger.info(f"Adding langfuse_tags: {request.langfuse_tags}")
tags.extend(request.langfuse_tags)
else:
logger.info("No langfuse_tags provided")
logger.info(f"Final tags for trace: {tags}")
trace_update_params["tags"] = tags
span.update_trace(**trace_update_params)

config = RunnableConfig(
configurable={
"thread_id": f"{request.session_id}",
"user_id": request.user_id,
},
callbacks=[langfuse_handler],
)

# Invoke agent
logger.debug("Invoking agent with %s messages", len(lc_messages))
response = await agent.ainvoke({"messages": lc_messages}, config=config)

# Save conversation to memory if available
if memory_client and lc_messages:
try:
last_user_msg = next((msg.content for msg in reversed(lc_messages) if isinstance(msg, HumanMessage)), None)
assistant_response = response["messages"][-1].content if response["messages"] else ""

if last_user_msg and assistant_response:
memory_client.create_event(
memory_id=stm_memory_id,
actor_id=actor_id,
session_id=session_id,
messages=[(last_user_msg, "USER"), (assistant_response, "ASSISTANT")]
)
except Exception as e:
logger.warning(f"Failed to save conversation to memory: {e}")

# Create config
config = RunnableConfig(
configurable={
"thread_id": f"{request.session_id}",
"user_id": request.user_id,
},
)

# Invoke agent
logger.debug("Invoking agent with %s messages", len(lc_messages))
response = await agent.ainvoke({"messages": lc_messages}, config=config)

# Save conversation to memory if available
if memory_client and lc_messages:
try:
last_user_msg = next((msg.content for msg in reversed(lc_messages) if isinstance(msg, HumanMessage)), None)
assistant_response = response["messages"][-1].content if response["messages"] else ""

span.update_trace(output={"response": response["messages"][-1].content if response["messages"] else ""})
else:
config = RunnableConfig(
configurable={
"thread_id": f"{request.session_id}",
"user_id": request.user_id,
},
)

# Invoke agent
logger.debug("Invoking agent with %s messages", len(lc_messages))
response = await agent.ainvoke({"messages": lc_messages}, config=config)

# Save conversation to memory if available
if memory_client and lc_messages:
try:
last_user_msg = next((msg.content for msg in reversed(lc_messages) if isinstance(msg, HumanMessage)), None)
assistant_response = response["messages"][-1].content if response["messages"] else ""

if last_user_msg and assistant_response:
memory_client.create_event(
memory_id=stm_memory_id,
actor_id=actor_id,
session_id=session_id,
messages=[(last_user_msg, "USER"), (assistant_response, "ASSISTANT")]
)
except Exception as e:
logger.warning(f"Failed to save conversation to memory: {e}")
if last_user_msg and assistant_response:
memory_client.create_event(
memory_id=stm_memory_id,
actor_id=actor_id,
session_id=session_id,
messages=[(last_user_msg, "USER"), (assistant_response, "ASSISTANT")]
)
except Exception as e:
logger.warning(f"Failed to save conversation to memory: {e}")
# Extract response
last_message = response["messages"][-1]
tools_used = []
Expand Down Expand Up @@ -294,7 +226,7 @@ async def process_request(self, request: AgentRequest) -> AgentResponse:
metadata={
"blocked_categories": ",".join(output_result.blocked_categories)
},
trace_id=trace_id,
trace_id=None,
)

# Add trace metadata
Expand All @@ -313,7 +245,7 @@ async def process_request(self, request: AgentRequest) -> AgentResponse:
agent_type=request.agent_type,
tools_used=tools_used,
metadata=metadata,
trace_id=trace_id
trace_id=None
)

async def stream_response(self, request: AgentRequest):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ class Container(containers.DeclarativeContainer):
parameter_store_reader = AWSParameterStoreReader()

gateway_secret = json.loads(secret_reader.read_secret("gateway_credentials"))
langfuse_secret = json.loads(secret_reader.read_secret("langfuse_credentials"))

# Repositories
conversation_repository = providers.Singleton(MemoryConversationRepository)
Expand All @@ -49,12 +48,6 @@ class Container(containers.DeclarativeContainer):

agent_service = providers.Singleton(
LangGraphAgentService,
langfuse_config={
"enabled": settings.langfuse_enabled,
"secret_key": langfuse_secret["langfuse_secret_key"],
"public_key": langfuse_secret["langfuse_public_key"],
"host": langfuse_secret["langfuse_host"],
},
guardrail_service=guardrail_service,
llm_service=llm_service,
)
Expand All @@ -64,10 +57,4 @@ class Container(containers.DeclarativeContainer):
conversation_repo=conversation_repository,
agent_service=agent_service,
guardrail_service=guardrail_service,
langfuse_config={
"enabled": settings.langfuse_enabled,
"secret_key": langfuse_secret["langfuse_secret_key"],
"public_key": langfuse_secret["langfuse_public_key"],
"host": langfuse_secret["langfuse_host"],
},
)
6 changes: 3 additions & 3 deletions cx-agent-backend/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,22 @@ version = "0.1.0"
description = "Customer service demo agent compatible with Bedrock AgentCore"
requires-python = ">=3.11"
dependencies = [
"aws-opentelemetry-distro>=0.1.0",
"bedrock-agentcore>=1.0.3",
"boto3>=1.34.0",
"dependency-injector>=4.41.0",
"fastapi>=0.104.0",
"langchain>=0.1.0",
"langchain-aws>=0.1.0",
"langchain[aws]",
"langchain-core>=0.1.0",
"langchain-openai>=0.1.0",
"langfuse>=2.0.0",
"langgraph>=0.1.0",
"pydantic>=2.5.0",
"pydantic-settings>=2.1.0",
"structlog>=23.2.0",
"tavily-python>=0.3.0",
"uvicorn[standard]>=0.24.0",
"opentelemetry-instrumentation-langchain>=0.48.1",
"langsmith[otel]"
]

[project.optional-dependencies]
Expand Down
10 changes: 10 additions & 0 deletions infra/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,14 @@ resource "aws_bedrockagentcore_agent_runtime" "agent_runtime" {
protocol_configuration {
server_protocol = "HTTP"
}
environment_variables = {
"LOG_LEVEL" = "INFO"
"OTEL_EXPORTER_OTLP_ENDPOINT" = "${var.langfuse_host}/api/public/otel"
"OTEL_EXPORTER_OTLP_HEADERS" = "Authorization=Basic ${base64encode("${var.langfuse_public_key}:${var.langfuse_secret_key}")}"
"DISABLE_ADOT_OBSERVABILITY" = "true",
"LANGSMITH_OTEL_ENABLED" = "true",
"LANGSMITH_TRACING" = "true"

}

}
Loading