Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/uipath-agent-framework/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "uipath-agent-framework"
version = "0.0.9"
version = "0.0.10"
description = "Python SDK that enables developers to build and deploy Microsoft Agent Framework agents to the UiPath Cloud Platform"
readme = "README.md"
requires-python = ">=3.11"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@ dev = [

[tool.uv]
prerelease = "allow"

Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,13 @@ def __init__(
# synthetic COMPLETED events on HITL resume when the framework
# doesn't surface function_result in output/executor_completed.
self._pending_tool_nodes: set[str] = set()
# Track executors that already emitted message events so we don't
# duplicate when the same data appears across breakpoint resume
# cycles. Persists across _stream_workflow() calls for the same
# reason as _pending_tool_nodes — the debug runtime's while loop
# calls stream() (and thus _stream_workflow()) once per breakpoint.
# Reset on fresh (non-resume) runs to avoid stale state.
self._executors_with_messages: set[str] = set()

# ------------------------------------------------------------------
# Checkpoint helpers
Expand Down Expand Up @@ -625,10 +632,13 @@ async def _stream_workflow(
output_executor_ids.add(ex.id)
except Exception:
pass
# Track executors that already emitted message events so we don't
# duplicate when the same data appears in both executor_completed
# and "output" events.
executors_with_messages: set[str] = set()
# Reset per-executor message tracking on fresh runs so stale state
# from a previous turn doesn't suppress new messages. On breakpoint
# resume runs the set is preserved so messages emitted in an earlier
# cycle are not duplicated when the "output" event contains the full
# conversation.
if not is_resuming:
self._executors_with_messages = set()

# Emit an early STARTED event for the start executor so the graph
# visualization shows it immediately rather than after it finishes.
Expand Down Expand Up @@ -708,7 +718,7 @@ async def _stream_workflow(
if (
isinstance(executor, AgentExecutor)
and event.executor_id not in output_executor_ids
and event.executor_id not in executors_with_messages
and event.executor_id not in self._executors_with_messages
):
completed_msg_events = self._extract_workflow_messages(
self._filter_completed_data(event.data)
Expand All @@ -720,7 +730,7 @@ async def _stream_workflow(
yield UiPathRuntimeMessageEvent(payload=close_evt)
for msg_event in completed_msg_events:
yield UiPathRuntimeMessageEvent(payload=msg_event)
executors_with_messages.add(event.executor_id)
self._executors_with_messages.add(event.executor_id)

yield UiPathRuntimeStateEvent(
payload=self._serialize_event_data(
Expand Down Expand Up @@ -749,7 +759,7 @@ async def _stream_workflow(
# When intermediate agents already emitted message
# events via executor_completed, skip the final
# orchestration output to avoid duplicating text.
if not executors_with_messages:
if not self._executors_with_messages:
for msg_event in self._extract_workflow_messages(
event.data, assistant_only=True
):
Expand Down
160 changes: 159 additions & 1 deletion packages/uipath-agent-framework/tests/test_breakpoints_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
- concurrent (ConcurrentBuilder): dispatcher → parallel agents → aggregator
- handoff (HandoffBuilder): triage → specialists
- hitl-workflow (HandoffBuilder): triage → specialists with HITL tools
- sequential (SequentialBuilder): researcher → editor chain
"""

import json
Expand All @@ -26,6 +27,7 @@
ConcurrentBuilder,
GroupChatBuilder,
HandoffBuilder,
SequentialBuilder,
)
from conftest import (
extract_system_text,
Expand All @@ -36,7 +38,9 @@
UiPathDebugProtocol,
UiPathDebugRuntime,
)
from uipath.runtime.result import UiPathRuntimeStatus
from uipath.runtime.events import UiPathRuntimeEvent
from uipath.runtime.events.state import UiPathRuntimeMessageEvent
from uipath.runtime.result import UiPathRuntimeResult, UiPathRuntimeStatus

from uipath_agent_framework.runtime.resumable_storage import (
ScopedCheckpointStorage,
Expand Down Expand Up @@ -1045,3 +1049,157 @@ async def mock_wait_for_resume(*args: Any, **kwargs: Any) -> None:
finally:
await storage.dispose()
os.unlink(tmp_path)


class TestSequentialBreakpoints:
"""Integration test: sequential workflow with breakpoints on all nodes.

Topology: input-conversation → researcher → editor → end
(SequentialBuilder chain).
Breakpoints="*" → breaks on each executor, then completes.

Verifies that chat message events are NOT duplicated across breakpoint
resume cycles. This reproduces the bug where the "end" executor's
output event re-emitted the full conversation because the per-executor
message tracking was reset on each resume.
"""

async def test_sequential_breakall_no_duplicate_messages(self):
"""Sequential workflow with breakpoints="*" must not duplicate messages.

Each agent's text should appear exactly once in the streamed message
events. Before the fix, the "end" executor's output event contained
the full conversation and was emitted because executors_with_messages
was a local variable reset on each resume cycle.
"""
researcher_text = "Tokyo is the capital of Japan."
editor_text = '{"city": "Tokyo", "country": "Japan"}'

async def mock_create(**kwargs: Any):
messages = kwargs.get("messages", [])
is_stream = kwargs.get("stream", False)
system_msg = extract_system_text(messages)

if "researcher" in system_msg.lower():
return make_mock_response(researcher_text, stream=is_stream)
elif "editor" in system_msg.lower():
return make_mock_response(editor_text, stream=is_stream)
else:
return make_mock_response("OK", stream=is_stream)

mock_openai = AsyncMock()
mock_openai.chat.completions.create = mock_create

client = OpenAIChatClient(model_id="mock-model", async_client=mock_openai)
researcher = client.as_agent(
name="researcher",
description="Researches factual information about a city.",
instructions="You are a researcher.",
)
editor = client.as_agent(
name="editor",
description="Edits research into a structured profile.",
instructions="You are an editor.",
)

workflow = SequentialBuilder(
participants=[researcher, editor],
).build()
agent = workflow.as_agent(name="sequential_test")

tmp_fd, tmp_path = tempfile.mkstemp(suffix=".db")
os.close(tmp_fd)
try:
storage = SqliteResumableStorage(tmp_path)
await storage.setup()
assert storage.checkpoint_storage is not None

scoped_cs = ScopedCheckpointStorage(
storage.checkpoint_storage, "test-seq-bp"
)

runtime = UiPathAgentFrameworkRuntime(
agent=agent,
runtime_id="test-seq-bp",
checkpoint_storage=scoped_cs,
resumable_storage=storage,
)
# Use the REAL chat mapper so message events are generated.
# Only mock map_messages_to_input to provide the user prompt.
# Use object.__setattr__ to avoid mypy [method-assign] error.
object.__setattr__(
runtime.chat,
"map_messages_to_input",
MagicMock(return_value="Tell me about Tokyo"),
)

resume_count = [0]

async def mock_wait_for_resume(*args: Any, **kwargs: Any) -> None:
resume_count[0] += 1
if resume_count[0] > MAX_RESUME_CALLS:
raise AssertionError(f"Loop detected: {resume_count[0]} resumes")
return None

bridge = _make_debug_bridge()
cast(Mock, bridge.get_breakpoints).return_value = "*"
cast(AsyncMock, bridge.wait_for_resume).side_effect = mock_wait_for_resume

debug_runtime = UiPathDebugRuntime(delegate=runtime, debug_bridge=bridge)

# ---- Stream and collect all events ----
all_events: list[UiPathRuntimeEvent] = []
async for event in debug_runtime.stream({"messages": []}):
all_events.append(event)

# ---- Verify execution completed successfully ----
results = [e for e in all_events if isinstance(e, UiPathRuntimeResult)]
assert len(results) >= 1, "No UiPathRuntimeResult events found"
final_result = results[-1]
assert final_result.status == UiPathRuntimeStatus.SUCCESSFUL, (
f"Expected SUCCESSFUL, got {final_result.status}. "
f"Resumes: {resume_count[0]}"
)

# ---- Verify breakpoints fired ----
bp_count = cast(AsyncMock, bridge.emit_breakpoint_hit).await_count
assert bp_count >= 2, (
f"Expected at least 2 breakpoints (researcher + editor), got {bp_count}"
)

# ---- Collect all text chunks from message events ----
text_chunks: list[str] = []
for event in all_events:
if not isinstance(event, UiPathRuntimeMessageEvent):
continue
payload = event.payload
cp = getattr(payload, "content_part", None)
if cp is None:
continue
chunk = getattr(cp, "chunk", None)
if chunk is None:
continue
data = getattr(chunk, "data", None)
if data:
text_chunks.append(data)

all_text = "".join(text_chunks)

# ---- Assert no duplicate messages ----
researcher_count = all_text.count(researcher_text)
editor_count = all_text.count(editor_text)

assert researcher_count == 1, (
f"Researcher text should appear exactly once, "
f"but appeared {researcher_count} times. "
f"This means breakpoint resume duplicated messages."
)
assert editor_count == 1, (
f"Editor text should appear exactly once, "
f"but appeared {editor_count} times. "
f"This means breakpoint resume duplicated messages."
)

finally:
await storage.dispose()
os.unlink(tmp_path)
2 changes: 1 addition & 1 deletion packages/uipath-agent-framework/uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.