Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 28 additions & 41 deletions langfuse/langchain/CallbackHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,10 +209,6 @@ def on_retriever_error(
self._log_debug_event(
"on_retriever_error", run_id, parent_run_id, error=error
)

if run_id is None or run_id not in self.runs:
raise Exception("run not found")

observation = self._detach_observation(run_id)

if observation is not None:
Expand Down Expand Up @@ -401,19 +397,17 @@ def on_agent_action(
"on_agent_action", run_id, parent_run_id, action=action
)

if run_id not in self.runs:
raise Exception("run not found")
agent_run = self.runs.get(run_id, None)

agent_run = self.runs[run_id]
if hasattr(agent_run, "_otel_span"):
if agent_run is not None:
agent_run._otel_span.set_attribute(
LangfuseOtelSpanAttributes.OBSERVATION_TYPE, "agent"
)

agent_run.update(
output=action,
input=kwargs.get("inputs"),
).end()
agent_run.update(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @hassiebp , I have a question. Should we update the input and output during the agent action or finish process? It’s an intermediate step and may not accurately represent the true input and output of the span.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please try with the latest SDK version and report whether you see unexpected results?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please try with the latest SDK version and report whether you see unexpected results?

This appears to be a different issue that I didn't explain clearly before (or it might not be an issue at all).

  • In the agent action callback when setting output, if the tool execution fails, the current output is the action's output, but I expect the output to be empty in such cases.

output=action,
input=kwargs.get("inputs"),
)

except Exception as e:
langfuse_logger.exception(e)
Expand All @@ -430,10 +424,9 @@ def on_agent_finish(
self._log_debug_event(
"on_agent_finish", run_id, parent_run_id, finish=finish
)
if run_id not in self.runs:
raise Exception("run not found")

agent_run = self._detach_observation(run_id)
# Langchain is sending same run ID for both agent finish and chain end
# handle cleanup of observation in the chain end callback
agent_run = self.runs.get(run_id, None)

if agent_run is not None:
agent_run._otel_span.set_attribute(
Expand All @@ -443,7 +436,7 @@ def on_agent_finish(
agent_run.update(
output=finish,
input=kwargs.get("inputs"),
).end()
)

except Exception as e:
langfuse_logger.exception(e)
Expand All @@ -461,9 +454,6 @@ def on_chain_end(
"on_chain_end", run_id, parent_run_id, outputs=outputs
)

if run_id not in self.runs:
raise Exception("run not found")

span = self._detach_observation(run_id)

if span is not None:
Expand Down Expand Up @@ -846,31 +836,28 @@ def on_llm_end(
self._log_debug_event(
"on_llm_end", run_id, parent_run_id, response=response, kwargs=kwargs
)
if run_id not in self.runs:
raise Exception("Run not found, see docs what to do in this case.")
else:
response_generation = response.generations[-1][-1]
extracted_response = (
self._convert_message_to_dict(response_generation.message)
if isinstance(response_generation, ChatGeneration)
else _extract_raw_response(response_generation)
)
response_generation = response.generations[-1][-1]
extracted_response = (
self._convert_message_to_dict(response_generation.message)
if isinstance(response_generation, ChatGeneration)
else _extract_raw_response(response_generation)
)

llm_usage = _parse_usage(response)
llm_usage = _parse_usage(response)

# e.g. azure returns the model name in the response
model = _parse_model(response)
# e.g. azure returns the model name in the response
model = _parse_model(response)

generation = self._detach_observation(run_id)
generation = self._detach_observation(run_id)

if generation is not None:
generation.update(
output=extracted_response,
usage=llm_usage,
usage_details=llm_usage,
input=kwargs.get("inputs"),
model=model,
).end()
if generation is not None:
generation.update(
output=extracted_response,
usage=llm_usage,
usage_details=llm_usage,
input=kwargs.get("inputs"),
model=model,
).end()

except Exception as e:
langfuse_logger.exception(e)
Expand Down