diff --git a/packages/uipath-agent-framework/pyproject.toml b/packages/uipath-agent-framework/pyproject.toml index 695ad3c..94be407 100644 --- a/packages/uipath-agent-framework/pyproject.toml +++ b/packages/uipath-agent-framework/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "uipath-agent-framework" -version = "0.0.9" +version = "0.0.10" description = "Python SDK that enables developers to build and deploy Microsoft Agent Framework agents to the UiPath Cloud Platform" readme = "README.md" requires-python = ">=3.11" diff --git a/packages/uipath-agent-framework/samples/sequential-structured-output/pyproject.toml b/packages/uipath-agent-framework/samples/sequential-structured-output/pyproject.toml index 1b37467..fbe0c17 100644 --- a/packages/uipath-agent-framework/samples/sequential-structured-output/pyproject.toml +++ b/packages/uipath-agent-framework/samples/sequential-structured-output/pyproject.toml @@ -19,3 +19,4 @@ dev = [ [tool.uv] prerelease = "allow" + diff --git a/packages/uipath-agent-framework/src/uipath_agent_framework/runtime/runtime.py b/packages/uipath-agent-framework/src/uipath_agent_framework/runtime/runtime.py index d4f8a12..567d819 100644 --- a/packages/uipath-agent-framework/src/uipath_agent_framework/runtime/runtime.py +++ b/packages/uipath-agent-framework/src/uipath_agent_framework/runtime/runtime.py @@ -78,6 +78,13 @@ def __init__( # synthetic COMPLETED events on HITL resume when the framework # doesn't surface function_result in output/executor_completed. self._pending_tool_nodes: set[str] = set() + # Track executors that already emitted message events so we don't + # duplicate when the same data appears across breakpoint resume + # cycles. Persists across _stream_workflow() calls for the same + # reason as _pending_tool_nodes — the debug runtime's while loop + # calls stream() (and thus _stream_workflow()) once per breakpoint. + # Reset on fresh (non-resume) runs to avoid stale state. + self._executors_with_messages: set[str] = set() # ------------------------------------------------------------------ # Checkpoint helpers @@ -625,10 +632,13 @@ async def _stream_workflow( output_executor_ids.add(ex.id) except Exception: pass - # Track executors that already emitted message events so we don't - # duplicate when the same data appears in both executor_completed - # and "output" events. - executors_with_messages: set[str] = set() + # Reset per-executor message tracking on fresh runs so stale state + # from a previous turn doesn't suppress new messages. On breakpoint + # resume runs the set is preserved so messages emitted in an earlier + # cycle are not duplicated when the "output" event contains the full + # conversation. + if not is_resuming: + self._executors_with_messages = set() # Emit an early STARTED event for the start executor so the graph # visualization shows it immediately rather than after it finishes. @@ -708,7 +718,7 @@ async def _stream_workflow( if ( isinstance(executor, AgentExecutor) and event.executor_id not in output_executor_ids - and event.executor_id not in executors_with_messages + and event.executor_id not in self._executors_with_messages ): completed_msg_events = self._extract_workflow_messages( self._filter_completed_data(event.data) @@ -720,7 +730,7 @@ async def _stream_workflow( yield UiPathRuntimeMessageEvent(payload=close_evt) for msg_event in completed_msg_events: yield UiPathRuntimeMessageEvent(payload=msg_event) - executors_with_messages.add(event.executor_id) + self._executors_with_messages.add(event.executor_id) yield UiPathRuntimeStateEvent( payload=self._serialize_event_data( @@ -749,7 +759,7 @@ async def _stream_workflow( # When intermediate agents already emitted message # events via executor_completed, skip the final # orchestration output to avoid duplicating text. - if not executors_with_messages: + if not self._executors_with_messages: for msg_event in self._extract_workflow_messages( event.data, assistant_only=True ): diff --git a/packages/uipath-agent-framework/tests/test_breakpoints_e2e.py b/packages/uipath-agent-framework/tests/test_breakpoints_e2e.py index b5845e7..4584446 100644 --- a/packages/uipath-agent-framework/tests/test_breakpoints_e2e.py +++ b/packages/uipath-agent-framework/tests/test_breakpoints_e2e.py @@ -12,6 +12,7 @@ - concurrent (ConcurrentBuilder): dispatcher → parallel agents → aggregator - handoff (HandoffBuilder): triage → specialists - hitl-workflow (HandoffBuilder): triage → specialists with HITL tools +- sequential (SequentialBuilder): researcher → editor chain """ import json @@ -26,6 +27,7 @@ ConcurrentBuilder, GroupChatBuilder, HandoffBuilder, + SequentialBuilder, ) from conftest import ( extract_system_text, @@ -36,7 +38,9 @@ UiPathDebugProtocol, UiPathDebugRuntime, ) -from uipath.runtime.result import UiPathRuntimeStatus +from uipath.runtime.events import UiPathRuntimeEvent +from uipath.runtime.events.state import UiPathRuntimeMessageEvent +from uipath.runtime.result import UiPathRuntimeResult, UiPathRuntimeStatus from uipath_agent_framework.runtime.resumable_storage import ( ScopedCheckpointStorage, @@ -1045,3 +1049,157 @@ async def mock_wait_for_resume(*args: Any, **kwargs: Any) -> None: finally: await storage.dispose() os.unlink(tmp_path) + + +class TestSequentialBreakpoints: + """Integration test: sequential workflow with breakpoints on all nodes. + + Topology: input-conversation → researcher → editor → end + (SequentialBuilder chain). + Breakpoints="*" → breaks on each executor, then completes. + + Verifies that chat message events are NOT duplicated across breakpoint + resume cycles. This reproduces the bug where the "end" executor's + output event re-emitted the full conversation because the per-executor + message tracking was reset on each resume. + """ + + async def test_sequential_breakall_no_duplicate_messages(self): + """Sequential workflow with breakpoints="*" must not duplicate messages. + + Each agent's text should appear exactly once in the streamed message + events. Before the fix, the "end" executor's output event contained + the full conversation and was emitted because executors_with_messages + was a local variable reset on each resume cycle. + """ + researcher_text = "Tokyo is the capital of Japan." + editor_text = '{"city": "Tokyo", "country": "Japan"}' + + async def mock_create(**kwargs: Any): + messages = kwargs.get("messages", []) + is_stream = kwargs.get("stream", False) + system_msg = extract_system_text(messages) + + if "researcher" in system_msg.lower(): + return make_mock_response(researcher_text, stream=is_stream) + elif "editor" in system_msg.lower(): + return make_mock_response(editor_text, stream=is_stream) + else: + return make_mock_response("OK", stream=is_stream) + + mock_openai = AsyncMock() + mock_openai.chat.completions.create = mock_create + + client = OpenAIChatClient(model_id="mock-model", async_client=mock_openai) + researcher = client.as_agent( + name="researcher", + description="Researches factual information about a city.", + instructions="You are a researcher.", + ) + editor = client.as_agent( + name="editor", + description="Edits research into a structured profile.", + instructions="You are an editor.", + ) + + workflow = SequentialBuilder( + participants=[researcher, editor], + ).build() + agent = workflow.as_agent(name="sequential_test") + + tmp_fd, tmp_path = tempfile.mkstemp(suffix=".db") + os.close(tmp_fd) + try: + storage = SqliteResumableStorage(tmp_path) + await storage.setup() + assert storage.checkpoint_storage is not None + + scoped_cs = ScopedCheckpointStorage( + storage.checkpoint_storage, "test-seq-bp" + ) + + runtime = UiPathAgentFrameworkRuntime( + agent=agent, + runtime_id="test-seq-bp", + checkpoint_storage=scoped_cs, + resumable_storage=storage, + ) + # Use the REAL chat mapper so message events are generated. + # Only mock map_messages_to_input to provide the user prompt. + # Use object.__setattr__ to avoid mypy [method-assign] error. + object.__setattr__( + runtime.chat, + "map_messages_to_input", + MagicMock(return_value="Tell me about Tokyo"), + ) + + resume_count = [0] + + async def mock_wait_for_resume(*args: Any, **kwargs: Any) -> None: + resume_count[0] += 1 + if resume_count[0] > MAX_RESUME_CALLS: + raise AssertionError(f"Loop detected: {resume_count[0]} resumes") + return None + + bridge = _make_debug_bridge() + cast(Mock, bridge.get_breakpoints).return_value = "*" + cast(AsyncMock, bridge.wait_for_resume).side_effect = mock_wait_for_resume + + debug_runtime = UiPathDebugRuntime(delegate=runtime, debug_bridge=bridge) + + # ---- Stream and collect all events ---- + all_events: list[UiPathRuntimeEvent] = [] + async for event in debug_runtime.stream({"messages": []}): + all_events.append(event) + + # ---- Verify execution completed successfully ---- + results = [e for e in all_events if isinstance(e, UiPathRuntimeResult)] + assert len(results) >= 1, "No UiPathRuntimeResult events found" + final_result = results[-1] + assert final_result.status == UiPathRuntimeStatus.SUCCESSFUL, ( + f"Expected SUCCESSFUL, got {final_result.status}. " + f"Resumes: {resume_count[0]}" + ) + + # ---- Verify breakpoints fired ---- + bp_count = cast(AsyncMock, bridge.emit_breakpoint_hit).await_count + assert bp_count >= 2, ( + f"Expected at least 2 breakpoints (researcher + editor), got {bp_count}" + ) + + # ---- Collect all text chunks from message events ---- + text_chunks: list[str] = [] + for event in all_events: + if not isinstance(event, UiPathRuntimeMessageEvent): + continue + payload = event.payload + cp = getattr(payload, "content_part", None) + if cp is None: + continue + chunk = getattr(cp, "chunk", None) + if chunk is None: + continue + data = getattr(chunk, "data", None) + if data: + text_chunks.append(data) + + all_text = "".join(text_chunks) + + # ---- Assert no duplicate messages ---- + researcher_count = all_text.count(researcher_text) + editor_count = all_text.count(editor_text) + + assert researcher_count == 1, ( + f"Researcher text should appear exactly once, " + f"but appeared {researcher_count} times. " + f"This means breakpoint resume duplicated messages." + ) + assert editor_count == 1, ( + f"Editor text should appear exactly once, " + f"but appeared {editor_count} times. " + f"This means breakpoint resume duplicated messages." + ) + + finally: + await storage.dispose() + os.unlink(tmp_path) diff --git a/packages/uipath-agent-framework/uv.lock b/packages/uipath-agent-framework/uv.lock index 5464ee3..c4f57f8 100644 --- a/packages/uipath-agent-framework/uv.lock +++ b/packages/uipath-agent-framework/uv.lock @@ -2460,7 +2460,7 @@ wheels = [ [[package]] name = "uipath-agent-framework" -version = "0.0.9" +version = "0.0.10" source = { editable = "." } dependencies = [ { name = "agent-framework-core" },