Skip to content

Commit 63d41ce

Browse files
committed
fix: resolve message duplication in LangChain integration
- Convert AgentRunConverter.to_agui_events() from static to instance method calls in tests - Add comprehensive integration tests for LangChain/LangGraph invoke methods - Add conversion tests for Python 3.10 and 3.12 compatibility - Ensure proper state management to prevent duplicate message output in streaming scenarios Change-Id: I113237eeb2cb73bf0e7b00e9e55882f2f32e6c10 Signed-off-by: OhYee <oyohyee@oyohyee.com>
1 parent 0590ae2 commit 63d41ce

File tree

9 files changed

+3668
-232
lines changed

9 files changed

+3668
-232
lines changed

agentrun/integration/langgraph/agent_converter.py

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,8 @@ def __init__(self) -> None:
8282
# 用于在 on_tool_end 中查找对应的 tool_call_id
8383
self._run_id_to_tool_call_id: Dict[str, str] = {}
8484

85+
self.has_on_chat_model_stream = False
86+
8587
def convert(
8688
self,
8789
event: Union[Dict[str, Any], Any],
@@ -436,8 +438,8 @@ def is_stream_values_format(event_dict: Dict[str, Any]) -> bool:
436438
# 事件转换器(静态方法)
437439
# =========================================================================
438440

439-
@staticmethod
440441
def _convert_stream_updates_event(
442+
self,
441443
event_dict: Dict[str, Any],
442444
messages_key: str = "messages",
443445
) -> Iterator[Union[AgentResult, str]]:
@@ -525,8 +527,8 @@ def _convert_stream_updates_event(
525527
},
526528
)
527529

528-
@staticmethod
529530
def _convert_stream_values_event(
531+
self,
530532
event_dict: Dict[str, Any],
531533
messages_key: str = "messages",
532534
) -> Iterator[Union[AgentResult, str]]:
@@ -595,8 +597,8 @@ def _convert_stream_values_event(
595597
},
596598
)
597599

598-
@staticmethod
599600
def _convert_astream_events_event(
601+
self,
600602
event_dict: Dict[str, Any],
601603
tool_call_id_map: Optional[Dict[int, str]] = None,
602604
tool_call_started_set: Optional[set] = None,
@@ -625,6 +627,9 @@ def _convert_astream_events_event(
625627

626628
# 1. LangGraph 格式: on_chat_model_stream
627629
if event_type == "on_chat_model_stream":
630+
631+
self.has_on_chat_model_stream = True
632+
628633
chunk = data.get("chunk")
629634
if chunk:
630635
# 文本内容
@@ -722,6 +727,7 @@ def _convert_astream_events_event(
722727
elif (
723728
event_type == "on_chain_stream"
724729
and event_dict.get("name") == "model"
730+
and not self.has_on_chat_model_stream
725731
):
726732
chunk_data = data.get("chunk", {})
727733
if isinstance(chunk_data, dict):
@@ -997,8 +1003,8 @@ def _convert_astream_events_event(
9971003
# 主要 API(静态方法)
9981004
# =========================================================================
9991005

1000-
@staticmethod
10011006
def to_agui_events(
1007+
self,
10021008
event: Union[Dict[str, Any], Any],
10031009
messages_key: str = "messages",
10041010
tool_call_id_map: Optional[Dict[int, str]] = None,
@@ -1052,7 +1058,7 @@ def to_agui_events(
10521058
# 根据事件格式选择对应的转换器
10531059
if AgentRunConverter.is_astream_events_format(event_dict):
10541060
# astream_events 格式:{"event": "on_xxx", "data": {...}}
1055-
yield from AgentRunConverter._convert_astream_events_event(
1061+
yield from self._convert_astream_events_event(
10561062
event_dict,
10571063
tool_call_id_map,
10581064
tool_call_started_set,
@@ -1062,12 +1068,12 @@ def to_agui_events(
10621068

10631069
elif AgentRunConverter.is_stream_updates_format(event_dict):
10641070
# stream/astream(stream_mode="updates") 格式:{node_name: state_update}
1065-
yield from AgentRunConverter._convert_stream_updates_event(
1071+
yield from self._convert_stream_updates_event(
10661072
event_dict, messages_key
10671073
)
10681074

10691075
elif AgentRunConverter.is_stream_values_format(event_dict):
10701076
# stream/astream(stream_mode="values") 格式:完整 state dict
1071-
yield from AgentRunConverter._convert_stream_values_event(
1077+
yield from self._convert_stream_values_event(
10721078
event_dict, messages_key
10731079
)

0 commit comments

Comments
 (0)