Skip to content

Commit 62ab2aa

Browse files
cristipufuclaude
andcommitted
fix: prevent duplicate message events during sequential breakpoint resumes
Promote `executors_with_messages` from a local variable to instance state (`self._executors_with_messages`) so it persists across `_stream_workflow()` calls within the debug runtime's breakpoint resume loop. Without this, the final "end" executor's output event would re-emit all messages that were already shown in earlier breakpoint iterations. Includes a regression test for sequential workflows with breakpoints on all nodes. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent ef6f42a commit 62ab2aa

File tree

5 files changed

+195
-10
lines changed

5 files changed

+195
-10
lines changed

packages/uipath-agent-framework/pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "uipath-agent-framework"
3-
version = "0.0.9"
3+
version = "0.0.10"
44
description = "Python SDK that enables developers to build and deploy Microsoft Agent Framework agents to the UiPath Cloud Platform"
55
readme = "README.md"
66
requires-python = ">=3.11"

packages/uipath-agent-framework/samples/sequential-structured-output/pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,3 +19,4 @@ dev = [
1919

2020
[tool.uv]
2121
prerelease = "allow"
22+

packages/uipath-agent-framework/src/uipath_agent_framework/runtime/runtime.py

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,13 @@ def __init__(
7878
# synthetic COMPLETED events on HITL resume when the framework
7979
# doesn't surface function_result in output/executor_completed.
8080
self._pending_tool_nodes: set[str] = set()
81+
# Track executors that already emitted message events so we don't
82+
# duplicate when the same data appears across breakpoint resume
83+
# cycles. Persists across _stream_workflow() calls for the same
84+
# reason as _pending_tool_nodes — the debug runtime's while loop
85+
# calls stream() (and thus _stream_workflow()) once per breakpoint.
86+
# Reset on fresh (non-resume) runs to avoid stale state.
87+
self._executors_with_messages: set[str] = set()
8188

8289
# ------------------------------------------------------------------
8390
# Checkpoint helpers
@@ -625,10 +632,13 @@ async def _stream_workflow(
625632
output_executor_ids.add(ex.id)
626633
except Exception:
627634
pass
628-
# Track executors that already emitted message events so we don't
629-
# duplicate when the same data appears in both executor_completed
630-
# and "output" events.
631-
executors_with_messages: set[str] = set()
635+
# Reset per-executor message tracking on fresh runs so stale state
636+
# from a previous turn doesn't suppress new messages. On breakpoint
637+
# resume runs the set is preserved so messages emitted in an earlier
638+
# cycle are not duplicated when the "output" event contains the full
639+
# conversation.
640+
if not is_resuming:
641+
self._executors_with_messages = set()
632642

633643
# Emit an early STARTED event for the start executor so the graph
634644
# visualization shows it immediately rather than after it finishes.
@@ -708,7 +718,8 @@ async def _stream_workflow(
708718
if (
709719
isinstance(executor, AgentExecutor)
710720
and event.executor_id not in output_executor_ids
711-
and event.executor_id not in executors_with_messages
721+
and event.executor_id
722+
not in self._executors_with_messages
712723
):
713724
completed_msg_events = self._extract_workflow_messages(
714725
self._filter_completed_data(event.data)
@@ -720,7 +731,9 @@ async def _stream_workflow(
720731
yield UiPathRuntimeMessageEvent(payload=close_evt)
721732
for msg_event in completed_msg_events:
722733
yield UiPathRuntimeMessageEvent(payload=msg_event)
723-
executors_with_messages.add(event.executor_id)
734+
self._executors_with_messages.add(
735+
event.executor_id
736+
)
724737

725738
yield UiPathRuntimeStateEvent(
726739
payload=self._serialize_event_data(
@@ -749,7 +762,7 @@ async def _stream_workflow(
749762
# When intermediate agents already emitted message
750763
# events via executor_completed, skip the final
751764
# orchestration output to avoid duplicating text.
752-
if not executors_with_messages:
765+
if not self._executors_with_messages:
753766
for msg_event in self._extract_workflow_messages(
754767
event.data, assistant_only=True
755768
):

packages/uipath-agent-framework/tests/test_breakpoints_e2e.py

Lines changed: 172 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
- concurrent (ConcurrentBuilder): dispatcher → parallel agents → aggregator
1313
- handoff (HandoffBuilder): triage → specialists
1414
- hitl-workflow (HandoffBuilder): triage → specialists with HITL tools
15+
- sequential (SequentialBuilder): researcher → editor chain
1516
"""
1617

1718
import json
@@ -26,6 +27,7 @@
2627
ConcurrentBuilder,
2728
GroupChatBuilder,
2829
HandoffBuilder,
30+
SequentialBuilder,
2931
)
3032
from conftest import (
3133
extract_system_text,
@@ -36,7 +38,9 @@
3638
UiPathDebugProtocol,
3739
UiPathDebugRuntime,
3840
)
39-
from uipath.runtime.result import UiPathRuntimeStatus
41+
from uipath.runtime.events import UiPathRuntimeEvent
42+
from uipath.runtime.events.state import UiPathRuntimeMessageEvent
43+
from uipath.runtime.result import UiPathRuntimeResult, UiPathRuntimeStatus
4044

4145
from uipath_agent_framework.runtime.resumable_storage import (
4246
ScopedCheckpointStorage,
@@ -1045,3 +1049,170 @@ async def mock_wait_for_resume(*args: Any, **kwargs: Any) -> None:
10451049
finally:
10461050
await storage.dispose()
10471051
os.unlink(tmp_path)
1052+
1053+
1054+
class TestSequentialBreakpoints:
1055+
"""Integration test: sequential workflow with breakpoints on all nodes.
1056+
1057+
Topology: input-conversation → researcher → editor → end
1058+
(SequentialBuilder chain).
1059+
Breakpoints="*" → breaks on each executor, then completes.
1060+
1061+
Verifies that chat message events are NOT duplicated across breakpoint
1062+
resume cycles. This reproduces the bug where the "end" executor's
1063+
output event re-emitted the full conversation because the per-executor
1064+
message tracking was reset on each resume.
1065+
"""
1066+
1067+
async def test_sequential_breakall_no_duplicate_messages(self):
1068+
"""Sequential workflow with breakpoints="*" must not duplicate messages.
1069+
1070+
Each agent's text should appear exactly once in the streamed message
1071+
events. Before the fix, the "end" executor's output event contained
1072+
the full conversation and was emitted because executors_with_messages
1073+
was a local variable reset on each resume cycle.
1074+
"""
1075+
researcher_text = "Tokyo is the capital of Japan."
1076+
editor_text = '{"city": "Tokyo", "country": "Japan"}'
1077+
1078+
async def mock_create(**kwargs: Any):
1079+
messages = kwargs.get("messages", [])
1080+
is_stream = kwargs.get("stream", False)
1081+
system_msg = extract_system_text(messages)
1082+
1083+
if "researcher" in system_msg.lower():
1084+
return make_mock_response(researcher_text, stream=is_stream)
1085+
elif "editor" in system_msg.lower():
1086+
return make_mock_response(editor_text, stream=is_stream)
1087+
else:
1088+
return make_mock_response("OK", stream=is_stream)
1089+
1090+
mock_openai = AsyncMock()
1091+
mock_openai.chat.completions.create = mock_create
1092+
1093+
client = OpenAIChatClient(
1094+
model_id="mock-model", async_client=mock_openai
1095+
)
1096+
researcher = client.as_agent(
1097+
name="researcher",
1098+
description="Researches factual information about a city.",
1099+
instructions="You are a researcher.",
1100+
)
1101+
editor = client.as_agent(
1102+
name="editor",
1103+
description="Edits research into a structured profile.",
1104+
instructions="You are an editor.",
1105+
)
1106+
1107+
workflow = SequentialBuilder(
1108+
participants=[researcher, editor],
1109+
).build()
1110+
agent = workflow.as_agent(name="sequential_test")
1111+
1112+
tmp_fd, tmp_path = tempfile.mkstemp(suffix=".db")
1113+
os.close(tmp_fd)
1114+
try:
1115+
storage = SqliteResumableStorage(tmp_path)
1116+
await storage.setup()
1117+
assert storage.checkpoint_storage is not None
1118+
1119+
scoped_cs = ScopedCheckpointStorage(
1120+
storage.checkpoint_storage, "test-seq-bp"
1121+
)
1122+
1123+
runtime = UiPathAgentFrameworkRuntime(
1124+
agent=agent,
1125+
runtime_id="test-seq-bp",
1126+
checkpoint_storage=scoped_cs,
1127+
resumable_storage=storage,
1128+
)
1129+
# Use the REAL chat mapper so message events are generated.
1130+
# Only mock map_messages_to_input to provide the user prompt.
1131+
# Use object.__setattr__ to avoid mypy [method-assign] error.
1132+
object.__setattr__(
1133+
runtime.chat,
1134+
"map_messages_to_input",
1135+
MagicMock(return_value="Tell me about Tokyo"),
1136+
)
1137+
1138+
resume_count = [0]
1139+
1140+
async def mock_wait_for_resume(*args: Any, **kwargs: Any) -> None:
1141+
resume_count[0] += 1
1142+
if resume_count[0] > MAX_RESUME_CALLS:
1143+
raise AssertionError(
1144+
f"Loop detected: {resume_count[0]} resumes"
1145+
)
1146+
return None
1147+
1148+
bridge = _make_debug_bridge()
1149+
cast(Mock, bridge.get_breakpoints).return_value = "*"
1150+
cast(
1151+
AsyncMock, bridge.wait_for_resume
1152+
).side_effect = mock_wait_for_resume
1153+
1154+
debug_runtime = UiPathDebugRuntime(
1155+
delegate=runtime, debug_bridge=bridge
1156+
)
1157+
1158+
# ---- Stream and collect all events ----
1159+
all_events: list[UiPathRuntimeEvent] = []
1160+
async for event in debug_runtime.stream({"messages": []}):
1161+
all_events.append(event)
1162+
1163+
# ---- Verify execution completed successfully ----
1164+
results = [
1165+
e for e in all_events if isinstance(e, UiPathRuntimeResult)
1166+
]
1167+
assert len(results) >= 1, "No UiPathRuntimeResult events found"
1168+
final_result = results[-1]
1169+
assert final_result.status == UiPathRuntimeStatus.SUCCESSFUL, (
1170+
f"Expected SUCCESSFUL, got {final_result.status}. "
1171+
f"Resumes: {resume_count[0]}"
1172+
)
1173+
1174+
# ---- Verify breakpoints fired ----
1175+
bp_count = cast(
1176+
AsyncMock, bridge.emit_breakpoint_hit
1177+
).await_count
1178+
assert bp_count >= 2, (
1179+
f"Expected at least 2 breakpoints (researcher + editor), "
1180+
f"got {bp_count}"
1181+
)
1182+
1183+
# ---- Collect all text chunks from message events ----
1184+
text_chunks: list[str] = []
1185+
for event in all_events:
1186+
if not isinstance(event, UiPathRuntimeMessageEvent):
1187+
continue
1188+
payload = event.payload
1189+
cp = getattr(payload, "content_part", None)
1190+
if cp is None:
1191+
continue
1192+
chunk = getattr(cp, "chunk", None)
1193+
if chunk is None:
1194+
continue
1195+
data = getattr(chunk, "data", None)
1196+
if data:
1197+
text_chunks.append(data)
1198+
1199+
all_text = "".join(text_chunks)
1200+
1201+
# ---- Assert no duplicate messages ----
1202+
researcher_count = all_text.count(researcher_text)
1203+
editor_count = all_text.count(editor_text)
1204+
1205+
assert researcher_count == 1, (
1206+
f"Researcher text should appear exactly once, "
1207+
f"but appeared {researcher_count} times. "
1208+
f"This means breakpoint resume duplicated messages."
1209+
)
1210+
assert editor_count == 1, (
1211+
f"Editor text should appear exactly once, "
1212+
f"but appeared {editor_count} times. "
1213+
f"This means breakpoint resume duplicated messages."
1214+
)
1215+
1216+
finally:
1217+
await storage.dispose()
1218+
os.unlink(tmp_path)

packages/uipath-agent-framework/uv.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)