From 012fbac95bdb460bc10decd13ea8160e5d32fa69 Mon Sep 17 00:00:00 2001 From: Hassieb Pakzad <68423100+hassiebp@users.noreply.github.com> Date: Fri, 19 Dec 2025 14:47:47 +0100 Subject: [PATCH 1/2] fix(langchain): allow prompt linking with langchain v1 create_agent --- langfuse/langchain/CallbackHandler.py | 57 ++++++++++++++++++++++----- 1 file changed, 47 insertions(+), 10 deletions(-) diff --git a/langfuse/langchain/CallbackHandler.py b/langfuse/langchain/CallbackHandler.py index 7f6479867..e3b0df2ec 100644 --- a/langfuse/langchain/CallbackHandler.py +++ b/langfuse/langchain/CallbackHandler.py @@ -1,3 +1,4 @@ +from collections import defaultdict from contextvars import Token from typing import ( Any, @@ -28,10 +29,10 @@ LangfuseSpan, LangfuseTool, ) -from langfuse.types import TraceContext from langfuse._utils import _get_timestamp from langfuse.langchain.utils import _extract_model_name from langfuse.logger import langfuse_logger +from langfuse.types import TraceContext try: import langchain @@ -132,6 +133,7 @@ def __init__( LangfuseRetriever, ], ] = {} + self._child_to_parent_run_id_map: Dict[UUID, Optional[UUID]] = defaultdict(None) self.context_tokens: Dict[UUID, Token] = {} self.prompt_to_parent_run_map: Dict[UUID, Any] = {} self.updated_completion_start_time_memo: Set[UUID] = set() @@ -302,6 +304,8 @@ def on_chain_start( metadata: Optional[Dict[str, Any]] = None, **kwargs: Any, ) -> Any: + self._child_to_parent_run_id_map[run_id] = parent_run_id + try: self._log_debug_event( "on_chain_start", run_id, parent_run_id, inputs=inputs @@ -480,6 +484,8 @@ def on_agent_action( **kwargs: Any, ) -> Any: """Run on agent action.""" + self._child_to_parent_run_id_map[run_id] = parent_run_id + try: self._log_debug_event( "on_agent_action", run_id, parent_run_id, action=action @@ -560,6 +566,10 @@ def on_chain_end( except Exception as e: langfuse_logger.exception(e) + finally: + if parent_run_id is None: + self._reset() + def on_chain_error( self, error: BaseException, @@ -603,6 +613,8 @@ def on_chat_model_start( metadata: Optional[Dict[str, Any]] = None, **kwargs: Any, ) -> Any: + self._child_to_parent_run_id_map[run_id] = parent_run_id + try: self._log_debug_event( "on_chat_model_start", run_id, parent_run_id, messages=messages @@ -635,6 +647,8 @@ def on_llm_start( metadata: Optional[Dict[str, Any]] = None, **kwargs: Any, ) -> Any: + self._child_to_parent_run_id_map[run_id] = parent_run_id + try: self._log_debug_event( "on_llm_start", run_id, parent_run_id, prompts=prompts @@ -662,6 +676,8 @@ def on_tool_start( metadata: Optional[Dict[str, Any]] = None, **kwargs: Any, ) -> Any: + self._child_to_parent_run_id_map[run_id] = parent_run_id + try: self._log_debug_event( "on_tool_start", run_id, parent_run_id, input_str=input_str @@ -704,6 +720,8 @@ def on_retriever_start( metadata: Optional[Dict[str, Any]] = None, **kwargs: Any, ) -> Any: + self._child_to_parent_run_id_map[run_id] = parent_run_id + try: self._log_debug_event( "on_retriever_start", run_id, parent_run_id, query=query @@ -809,6 +827,8 @@ def __on_llm_action( metadata: Optional[Dict[str, Any]] = None, **kwargs: Any, ) -> None: + self._child_to_parent_run_id_map[run_id] = parent_run_id + try: tools = kwargs.get("invocation_params", {}).get("tools", None) if tools and isinstance(tools, list): @@ -817,14 +837,23 @@ def __on_llm_action( model_name = self._parse_model_and_log_errors( serialized=serialized, metadata=metadata, kwargs=kwargs ) - registered_prompt = ( - self.prompt_to_parent_run_map.get(parent_run_id) - if parent_run_id is not None - else None - ) - if registered_prompt: - self._deregister_langfuse_prompt(parent_run_id) + registered_prompt = None + current_parent_run_id = parent_run_id + + # Check all parents for registered prompt + while current_parent_run_id is not None: + registered_prompt = self.prompt_to_parent_run_map.get( + current_parent_run_id + ) + + if registered_prompt: + self._deregister_langfuse_prompt(current_parent_run_id) + break + else: + current_parent_run_id = self._child_to_parent_run_id_map.get( + current_parent_run_id + ) content = { "name": self.get_langchain_run_name(serialized, **kwargs), @@ -956,6 +985,9 @@ def on_llm_end( finally: self.updated_completion_start_time_memo.discard(run_id) + if parent_run_id is None: + self._reset() + def on_llm_error( self, error: BaseException, @@ -980,6 +1012,9 @@ def on_llm_error( except Exception as e: langfuse_logger.exception(e) + def _reset(self) -> None: + self._child_to_parent_run_id_map = defaultdict(None) + def __join_tags_and_metadata( self, tags: Optional[List[str]] = None, @@ -1047,7 +1082,7 @@ def _log_debug_event( **kwargs: Any, ) -> None: langfuse_logger.debug( - f"Event: {event_name}, run_id: {str(run_id)[:5]}, parent_run_id: {str(parent_run_id)[:5]}" + f"Event: {event_name}, run_id: {run_id}, parent_run_id: {parent_run_id}" ) @@ -1210,7 +1245,9 @@ def _parse_usage_model(usage: Union[pydantic.BaseModel, dict]) -> Any: usage_model["input"] = max(0, usage_model["input"] - value) if f"input_modality_{item['modality']}" in usage_model: - usage_model[f"input_modality_{item['modality']}"] = max(0, usage_model[f"input_modality_{item['modality']}"] - value) + usage_model[f"input_modality_{item['modality']}"] = max( + 0, usage_model[f"input_modality_{item['modality']}"] - value + ) usage_model = {k: v for k, v in usage_model.items() if isinstance(v, int)} From e61d204fb8a43dd925a87d741953226426389d31 Mon Sep 17 00:00:00 2001 From: Hassieb Pakzad <68423100+hassiebp@users.noreply.github.com> Date: Fri, 19 Dec 2025 14:53:38 +0100 Subject: [PATCH 2/2] push --- langfuse/langchain/CallbackHandler.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/langfuse/langchain/CallbackHandler.py b/langfuse/langchain/CallbackHandler.py index e3b0df2ec..a2e9816da 100644 --- a/langfuse/langchain/CallbackHandler.py +++ b/langfuse/langchain/CallbackHandler.py @@ -1,4 +1,3 @@ -from collections import defaultdict from contextvars import Token from typing import ( Any, @@ -133,7 +132,7 @@ def __init__( LangfuseRetriever, ], ] = {} - self._child_to_parent_run_id_map: Dict[UUID, Optional[UUID]] = defaultdict(None) + self._child_to_parent_run_id_map: Dict[UUID, Optional[UUID]] = {} self.context_tokens: Dict[UUID, Token] = {} self.prompt_to_parent_run_map: Dict[UUID, Any] = {} self.updated_completion_start_time_memo: Set[UUID] = set() @@ -852,7 +851,7 @@ def __on_llm_action( break else: current_parent_run_id = self._child_to_parent_run_id_map.get( - current_parent_run_id + current_parent_run_id, None ) content = { @@ -1013,7 +1012,7 @@ def on_llm_error( langfuse_logger.exception(e) def _reset(self) -> None: - self._child_to_parent_run_id_map = defaultdict(None) + self._child_to_parent_run_id_map = {} def __join_tags_and_metadata( self,