diff --git a/langfuse/langchain/CallbackHandler.py b/langfuse/langchain/CallbackHandler.py index 431529ac0..a1bf51bd7 100644 --- a/langfuse/langchain/CallbackHandler.py +++ b/langfuse/langchain/CallbackHandler.py @@ -209,10 +209,6 @@ def on_retriever_error( self._log_debug_event( "on_retriever_error", run_id, parent_run_id, error=error ) - - if run_id is None or run_id not in self.runs: - raise Exception("run not found") - observation = self._detach_observation(run_id) if observation is not None: @@ -401,19 +397,17 @@ def on_agent_action( "on_agent_action", run_id, parent_run_id, action=action ) - if run_id not in self.runs: - raise Exception("run not found") + agent_run = self.runs.get(run_id, None) - agent_run = self.runs[run_id] - if hasattr(agent_run, "_otel_span"): + if agent_run is not None: agent_run._otel_span.set_attribute( LangfuseOtelSpanAttributes.OBSERVATION_TYPE, "agent" ) - agent_run.update( - output=action, - input=kwargs.get("inputs"), - ).end() + agent_run.update( + output=action, + input=kwargs.get("inputs"), + ) except Exception as e: langfuse_logger.exception(e) @@ -430,10 +424,9 @@ def on_agent_finish( self._log_debug_event( "on_agent_finish", run_id, parent_run_id, finish=finish ) - if run_id not in self.runs: - raise Exception("run not found") - - agent_run = self._detach_observation(run_id) + # Langchain is sending same run ID for both agent finish and chain end + # handle cleanup of observation in the chain end callback + agent_run = self.runs.get(run_id, None) if agent_run is not None: agent_run._otel_span.set_attribute( @@ -443,7 +436,7 @@ def on_agent_finish( agent_run.update( output=finish, input=kwargs.get("inputs"), - ).end() + ) except Exception as e: langfuse_logger.exception(e) @@ -461,9 +454,6 @@ def on_chain_end( "on_chain_end", run_id, parent_run_id, outputs=outputs ) - if run_id not in self.runs: - raise Exception("run not found") - span = self._detach_observation(run_id) if span is not None: @@ -846,31 +836,28 @@ def on_llm_end( self._log_debug_event( "on_llm_end", run_id, parent_run_id, response=response, kwargs=kwargs ) - if run_id not in self.runs: - raise Exception("Run not found, see docs what to do in this case.") - else: - response_generation = response.generations[-1][-1] - extracted_response = ( - self._convert_message_to_dict(response_generation.message) - if isinstance(response_generation, ChatGeneration) - else _extract_raw_response(response_generation) - ) + response_generation = response.generations[-1][-1] + extracted_response = ( + self._convert_message_to_dict(response_generation.message) + if isinstance(response_generation, ChatGeneration) + else _extract_raw_response(response_generation) + ) - llm_usage = _parse_usage(response) + llm_usage = _parse_usage(response) - # e.g. azure returns the model name in the response - model = _parse_model(response) + # e.g. azure returns the model name in the response + model = _parse_model(response) - generation = self._detach_observation(run_id) + generation = self._detach_observation(run_id) - if generation is not None: - generation.update( - output=extracted_response, - usage=llm_usage, - usage_details=llm_usage, - input=kwargs.get("inputs"), - model=model, - ).end() + if generation is not None: + generation.update( + output=extracted_response, + usage=llm_usage, + usage_details=llm_usage, + input=kwargs.get("inputs"), + model=model, + ).end() except Exception as e: langfuse_logger.exception(e)