From 2122c918494a952627a3b3107ce465e8f8fcfa02 Mon Sep 17 00:00:00 2001 From: Hassieb Pakzad <68423100+hassiebp@users.noreply.github.com> Date: Mon, 1 Sep 2025 16:55:00 +0200 Subject: [PATCH] fix(langchain): token detach exceptions in async contexts --- langfuse/langchain/CallbackHandler.py | 42 +++++++++++++++++++++++---- 1 file changed, 36 insertions(+), 6 deletions(-) diff --git a/langfuse/langchain/CallbackHandler.py b/langfuse/langchain/CallbackHandler.py index a1bf51bd7..0c4fb5c2d 100644 --- a/langfuse/langchain/CallbackHandler.py +++ b/langfuse/langchain/CallbackHandler.py @@ -3,8 +3,10 @@ import pydantic from opentelemetry import context, trace +from opentelemetry.context import _RUNTIME_CONTEXT from langfuse._client.attributes import LangfuseOtelSpanAttributes +from langfuse._client.client import Langfuse from langfuse._client.get_client import get_client from langfuse._client.span import ( LangfuseAgent, @@ -272,7 +274,7 @@ def on_chain_start( serialized, "chain", **kwargs ) - span = self.client.start_observation( + span = self._get_parent_observation(parent_run_id).start_observation( name=span_name, as_type=observation_type, metadata=span_metadata, @@ -336,6 +338,22 @@ def _deregister_langfuse_prompt(self, run_id: Optional[UUID]) -> None: if run_id is not None and run_id in self.prompt_to_parent_run_map: del self.prompt_to_parent_run_map[run_id] + def _get_parent_observation( + self, parent_run_id: Optional[UUID] + ) -> Union[ + Langfuse, + LangfuseAgent, + LangfuseChain, + LangfuseGeneration, + LangfuseRetriever, + LangfuseSpan, + LangfuseTool, + ]: + if parent_run_id and parent_run_id in self.runs: + return self.runs[parent_run_id] + + return self.client + def _attach_observation( self, run_id: UUID, @@ -369,7 +387,18 @@ def _detach_observation( token = self.context_tokens.pop(run_id, None) if token: - context.detach(token) + try: + # Directly detach from runtime context to avoid error logging + _RUNTIME_CONTEXT.detach(token) + except Exception: + # Context detach can fail in async scenarios - this is expected and safe to ignore + # The span itself was properly ended and tracing data is correctly captured + # + # Examples: + # 1. Token created in one async task/thread, detached in another + # 2. Context already detached by framework or other handlers + # 3. Runtime context state mismatch in concurrent execution + pass return cast( Union[ @@ -591,7 +620,7 @@ def on_tool_start( serialized, "tool", **kwargs ) - span = self.client.start_observation( + span = self._get_parent_observation(parent_run_id).start_observation( name=self.get_langchain_run_name(serialized, **kwargs), as_type=observation_type, input=input_str, @@ -626,8 +655,7 @@ def on_retriever_start( observation_type = self._get_observation_type_from_serialized( serialized, "retriever", **kwargs ) - - span = self.client.start_observation( + span = self._get_parent_observation(parent_run_id).start_observation( name=span_name, as_type=observation_type, metadata=span_metadata, @@ -753,7 +781,9 @@ def __on_llm_action( "prompt": registered_prompt, } - generation = self.client.start_observation(as_type="generation", **content) # type: ignore + generation = self._get_parent_observation(parent_run_id).start_observation( + as_type="generation", **content + ) # type: ignore self._attach_observation(run_id, generation) self.last_trace_id = self.runs[run_id].trace_id