33from langchain_core .messages import AIMessage , HumanMessage
44from langchain_core .runnables import RunnableConfig
55import os
6- import base64
76import logging
87from langgraph .prebuilt import create_react_agent
8+ from langfuse import get_client , Langfuse
99from langfuse .langchain import CallbackHandler
1010
1111logger = logging .getLogger (__name__ )
@@ -76,7 +76,20 @@ def _create_agent(self, agent_type: AgentType, model: str) -> any:
7676
7777 async def process_request (self , request : AgentRequest ) -> AgentResponse :
7878 """Process request through appropriate agent."""
79- logger .info (f"Processing request for user { request .user_id } , session { request .session_id } , agent type { request .agent_type } " )
79+ logger .info (
80+ "Processing request for user %s, session %s, agent type %s" ,
81+ request .user_id ,
82+ request .session_id ,
83+ request .agent_type ,
84+ )
85+
86+ # Use trace_id from request if provided, otherwise create one
87+ langfuse = None
88+ predefined_trace_id = getattr (request , 'trace_id' , None )
89+ if self ._langfuse_config .get ("enabled" ):
90+ langfuse = get_client ()
91+ if not predefined_trace_id :
92+ predefined_trace_id = Langfuse .create_trace_id (seed = request .session_id )
8093
8194 # Check input guardrails if enabled
8295 if self ._guardrail_service and request .messages :
@@ -115,28 +128,51 @@ async def process_request(self, request: AgentRequest) -> AgentResponse:
115128 # Create config with Langfuse callback if enabled
116129 callbacks = []
117130 trace_id = None
131+ response = None
118132
119133 if self ._langfuse_config .get ("enabled" ):
120134 os .environ ["LANGFUSE_SECRET_KEY" ] = self ._langfuse_config .get ("secret_key" )
121135 os .environ ["LANGFUSE_PUBLIC_KEY" ] = self ._langfuse_config .get ("public_key" )
122136 os .environ ["LANGFUSE_HOST" ] = self ._langfuse_config .get ("host" )
123137
138+ trace_id = predefined_trace_id
139+
124140 langfuse_handler = CallbackHandler ()
125- callbacks .append (langfuse_handler )
126- trace_id = str (f"{ request .user_id } _{ request .session_id } " )
127-
128- config = RunnableConfig (
129- configurable = {
130- "thread_id" : f"{ request .user_id } _{ request .session_id } " ,
131- "user_id" : request .user_id ,
132- },
133- callbacks = callbacks ,
134- )
135-
136- # Invoke agent
137- logger .debug (f"Invoking agent with { len (lc_messages )} messages" )
138- response = await agent .ainvoke ({"messages" : lc_messages }, config = config )
139- logger .debug (f"Agent response contains { len (response ['messages' ])} messages" )
141+
142+ with langfuse .start_as_current_span (
143+ name = "langchain-request" ,
144+ trace_context = {"trace_id" : predefined_trace_id }
145+ ) as span :
146+ span .update_trace (
147+ user_id = request .user_id ,
148+ input = {"messages" : [msg .content for msg in request .messages ]}
149+ )
150+
151+ config = RunnableConfig (
152+ configurable = {
153+ "thread_id" : f"{ request .session_id } " ,
154+ "user_id" : request .user_id ,
155+ },
156+ callbacks = [langfuse_handler ],
157+ )
158+
159+ # Invoke agent
160+ logger .debug ("Invoking agent with %s messages" , len (lc_messages ))
161+ response = await agent .ainvoke ({"messages" : lc_messages }, config = config )
162+
163+ span .update_trace (output = {"response" : response ["messages" ][- 1 ].content if response ["messages" ] else "" })
164+ else :
165+ config = RunnableConfig (
166+ configurable = {
167+ "thread_id" : f"{ request .session_id } " ,
168+ "user_id" : request .user_id ,
169+ },
170+ )
171+
172+ # Invoke agent
173+ logger .debug ("Invoking agent with %s messages" , len (lc_messages ))
174+ response = await agent .ainvoke ({"messages" : lc_messages }, config = config )
175+ logger .debug ("Agent response contains %s messages" , len (response ["messages" ]))
140176 # Extract response
141177 last_message = response ["messages" ][- 1 ]
142178 tools_used = []
@@ -152,7 +188,7 @@ async def process_request(self, request: AgentRequest) -> AgentResponse:
152188 tools_used .append (tool_call ["name" ])
153189 # Remove duplicates
154190 tools_used = list (set (tools_used ))
155- logger .info (f "Agent completed. Tools used: { tools_used } " )
191+ logger .info ("Agent completed. Tools used: %s" , tools_used )
156192
157193 # Check output guardrails if enabled
158194 if self ._guardrail_service :
@@ -173,12 +209,12 @@ async def process_request(self, request: AgentRequest) -> AgentResponse:
173209 "model" : request .model ,
174210 "agent_type" : request .agent_type .value ,
175211 "trace_id" : trace_id ,
176- "debug_message_count" : len (response ["messages" ]),
212+ "debug_message_count" : len (response ["messages" ]) if response else 0 ,
177213 "debug_message_types" : message_types ,
178214 "debug_tools_found" : len (tools_used ) > 0 ,
179215 }
180216
181- logger .info (f "Returning response for session { request .session_id } " )
217+ logger .info ("Returning response for session %s" , request .session_id )
182218 return AgentResponse (
183219 content = last_message .content ,
184220 agent_type = request .agent_type ,
@@ -201,7 +237,7 @@ async def stream_response(self, request: AgentRequest):
201237 # Create config
202238 config = RunnableConfig (
203239 configurable = {
204- "thread_id" : f"{ request .user_id } _ { request . session_id } " ,
240+ "thread_id" : f"{ request .session_id } " ,
205241 "user_id" : request .user_id ,
206242 }
207243 )
0 commit comments