Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 36 additions & 6 deletions langfuse/langchain/CallbackHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@

import pydantic
from opentelemetry import context, trace
from opentelemetry.context import _RUNTIME_CONTEXT

from langfuse._client.attributes import LangfuseOtelSpanAttributes
from langfuse._client.client import Langfuse
from langfuse._client.get_client import get_client
from langfuse._client.span import (
LangfuseAgent,
Expand Down Expand Up @@ -272,7 +274,7 @@ def on_chain_start(
serialized, "chain", **kwargs
)

span = self.client.start_observation(
span = self._get_parent_observation(parent_run_id).start_observation(
name=span_name,
as_type=observation_type,
metadata=span_metadata,
Expand Down Expand Up @@ -336,6 +338,22 @@ def _deregister_langfuse_prompt(self, run_id: Optional[UUID]) -> None:
if run_id is not None and run_id in self.prompt_to_parent_run_map:
del self.prompt_to_parent_run_map[run_id]

def _get_parent_observation(
self, parent_run_id: Optional[UUID]
) -> Union[
Langfuse,
LangfuseAgent,
LangfuseChain,
LangfuseGeneration,
LangfuseRetriever,
LangfuseSpan,
LangfuseTool,
]:
if parent_run_id and parent_run_id in self.runs:
return self.runs[parent_run_id]

return self.client

def _attach_observation(
self,
run_id: UUID,
Expand Down Expand Up @@ -369,7 +387,18 @@ def _detach_observation(
token = self.context_tokens.pop(run_id, None)

if token:
context.detach(token)
try:
# Directly detach from runtime context to avoid error logging
_RUNTIME_CONTEXT.detach(token)
except Exception:
# Context detach can fail in async scenarios - this is expected and safe to ignore
# The span itself was properly ended and tracing data is correctly captured
#
# Examples:
# 1. Token created in one async task/thread, detached in another
# 2. Context already detached by framework or other handlers
# 3. Runtime context state mismatch in concurrent execution
pass

return cast(
Union[
Expand Down Expand Up @@ -591,7 +620,7 @@ def on_tool_start(
serialized, "tool", **kwargs
)

span = self.client.start_observation(
span = self._get_parent_observation(parent_run_id).start_observation(
name=self.get_langchain_run_name(serialized, **kwargs),
as_type=observation_type,
input=input_str,
Expand Down Expand Up @@ -626,8 +655,7 @@ def on_retriever_start(
observation_type = self._get_observation_type_from_serialized(
serialized, "retriever", **kwargs
)

span = self.client.start_observation(
span = self._get_parent_observation(parent_run_id).start_observation(
name=span_name,
as_type=observation_type,
metadata=span_metadata,
Expand Down Expand Up @@ -753,7 +781,9 @@ def __on_llm_action(
"prompt": registered_prompt,
}

generation = self.client.start_observation(as_type="generation", **content) # type: ignore
generation = self._get_parent_observation(parent_run_id).start_observation(
as_type="generation", **content
) # type: ignore
self._attach_observation(run_id, generation)

self.last_trace_id = self.runs[run_id].trace_id
Expand Down