Skip to content

Commit e450650

Browse files
cristipufuclaude
andcommitted
fix: support structured output extraction for sequential workflows
The runtime and schema extraction only checked output_executors for response_format, which missed sequential workflows where the output executor is _EndWithConversation (not an AgentExecutor). Add fallback to scan all workflow executors and pick the last agent's response_format. Includes e2e streaming tests and a sequential-structured-output sample. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent ad4b73e commit e450650

File tree

11 files changed

+495
-22
lines changed

11 files changed

+495
-22
lines changed

packages/uipath-agent-framework/samples/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ Sample agents built with [Agent Framework](https://github.com/microsoft/agent-fr
88
|--------|-------------|
99
| [quickstart-workflow](./quickstart-workflow/) | Single workflow agent with tool calling: fetches live weather data for any location |
1010
| [structured-output](./structured-output/) | Structured output workflow: extracts city information and returns it as a typed Pydantic model |
11+
| [sequential-structured-output](./sequential-structured-output/) | Sequential pipeline with structured output: researcher and editor agents produce a typed Pydantic city profile |
1112
| [hitl-workflow](./hitl-workflow/) | Human-in-the-loop workflow: customer support with approval-gated billing and refund operations |
1213
| [sequential](./sequential/) | Sequential pipeline: writer, reviewer, and editor agents process a task one after another |
1314
| [concurrent](./concurrent/) | Concurrent orchestration: sentiment, topic extraction, and summarization agents analyze text in parallel |
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
# Sequential + Structured Output
2+
3+
A sequential pipeline that combines multi-agent processing with structured output. A researcher gathers facts about a city, then an editor organizes them into a well-defined Pydantic model (`CityInfo`). The final output is a typed JSON object — not free-form text.
4+
5+
## Agent Graph
6+
7+
```mermaid
8+
flowchart TB
9+
__start__(__start__)
10+
__end__(__end__)
11+
input-conversation(input-conversation)
12+
researcher(researcher)
13+
editor(editor)
14+
end_(end)
15+
__start__ --> |input|input-conversation
16+
input-conversation --> researcher
17+
researcher --> editor
18+
editor --> end_
19+
end_ --> |output|__end__
20+
```
21+
22+
Internally, the sequential orchestration chains:
23+
- **researcher** — gathers key facts about the city (country, population, landmarks, cultural significance)
24+
- **editor** — organizes the research into a structured `CityInfo` schema with `response_format`
25+
26+
Each agent sees the full conversation history from previous agents. The last agent's `response_format` determines the output schema.
27+
28+
## Prerequisites
29+
30+
Authenticate with UiPath to configure your `.env` file:
31+
32+
```bash
33+
uipath auth
34+
```
35+
36+
## Run
37+
38+
```
39+
uipath run agent '{"messages": [{"contentParts": [{"data": {"inline": "Tell me about Tokyo"}}], "role": "user"}]}'
40+
```
41+
42+
## Debug
43+
44+
```
45+
uipath dev web
46+
```
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
flowchart TB
2+
__start__(__start__)
3+
__end__(__end__)
4+
input-conversation(input-conversation)
5+
researcher(researcher)
6+
editor(editor)
7+
end(end)
8+
__start__ --> |input|input-conversation
9+
input-conversation --> researcher
10+
researcher --> editor
11+
editor --> end
12+
end --> |output|__end__
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
{
2+
"agents": {
3+
"agent": "main.py:agent"
4+
}
5+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
from agent_framework.orchestrations import SequentialBuilder
2+
from pydantic import BaseModel
3+
4+
from uipath_agent_framework.chat import UiPathOpenAIChatClient
5+
6+
7+
class CityInfo(BaseModel):
8+
"""Structured output for city information."""
9+
10+
city: str
11+
country: str
12+
description: str
13+
population_estimate: str
14+
famous_for: list[str]
15+
16+
17+
client = UiPathOpenAIChatClient(model="gpt-5-mini-2025-08-07")
18+
19+
researcher = client.as_agent(
20+
name="researcher",
21+
description="Researches factual information about a city.",
22+
instructions=(
23+
"You are a thorough researcher. Given a city name, gather key facts "
24+
"including its country, population, notable landmarks, cultural "
25+
"significance, and what it is famous for. Present your findings clearly."
26+
),
27+
)
28+
29+
editor = client.as_agent(
30+
name="editor",
31+
description="Edits research into a structured city profile.",
32+
instructions=(
33+
"You are a precise editor. Take the researcher's findings and organize "
34+
"them into a well-structured city profile. Ensure all facts are accurate "
35+
"and the description is concise and informative."
36+
),
37+
default_options={"response_format": CityInfo},
38+
)
39+
40+
workflow = SequentialBuilder(
41+
participants=[researcher, editor],
42+
).build()
43+
44+
agent = workflow.as_agent(name="sequential_structured_output_workflow")
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
[project]
2+
name = "sequential-structured-output"
3+
version = "0.0.1"
4+
description = "Sequential + structured output: agents process a task in a pipeline, the last agent returns data in a well-defined Pydantic schema"
5+
authors = [{ name = "John Doe" }]
6+
readme = "README.md"
7+
requires-python = ">=3.11"
8+
dependencies = [
9+
"uipath",
10+
"uipath-agent-framework",
11+
"agent-framework-core>=1.0.0rc1",
12+
"agent-framework-orchestrations>=1.0.0b260219",
13+
]
14+
15+
[dependency-groups]
16+
dev = [
17+
"uipath-dev",
18+
]
19+
20+
[tool.uv]
21+
prerelease = "allow"
22+
23+
[tool.uv.sources]
24+
uipath-dev = { path = "../../../../../uipath-dev-python", editable = true }
25+
uipath-agent-framework = { path = "../../", editable = true }
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
{
2+
"$schema": "https://cloud.uipath.com/draft/2024-12/uipath",
3+
"runtimeOptions": {
4+
"isConversational": false
5+
},
6+
"packOptions": {
7+
"fileExtensionsIncluded": [],
8+
"filesIncluded": [],
9+
"filesExcluded": [],
10+
"directoriesExcluded": [],
11+
"includeUvLock": true
12+
},
13+
"functions": {}
14+
}

packages/uipath-agent-framework/samples/structured-output/pyproject.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,4 +18,3 @@ dev = [
1818

1919
[tool.uv]
2020
prerelease = "allow"
21-

packages/uipath-agent-framework/src/uipath_agent_framework/runtime/runtime.py

Lines changed: 132 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -614,6 +614,22 @@ async def _stream_workflow(
614614
# (COMPLETED) is only in executor_completed.
615615
executor_tool_phases: dict[str, set[UiPathRuntimeStatePhase]] = {}
616616

617+
# Determine which executors are output executors so we can emit
618+
# intermediate message events from non-output agent executors on
619+
# completion. This enables per-agent streaming for orchestrations
620+
# like SequentialBuilder where output_executors=[end] and
621+
# intermediate agent outputs are filtered from "output" events.
622+
output_executor_ids: set[str] = set()
623+
try:
624+
for ex in workflow.get_output_executors():
625+
output_executor_ids.add(ex.id)
626+
except Exception:
627+
pass
628+
# Track executors that already emitted message events so we don't
629+
# duplicate when the same data appears in both executor_completed
630+
# and "output" events.
631+
executors_with_messages: set[str] = set()
632+
617633
# Emit an early STARTED event for the start executor so the graph
618634
# visualization shows it immediately rather than after it finishes.
619635
# The framework's _run_workflow_with_tracing awaits the entire start
@@ -679,6 +695,33 @@ async def _stream_workflow(
679695
tool_event.node_name
680696
)
681697
yield tool_event
698+
699+
# For non-output AgentExecutor instances, extract
700+
# message events from executor_completed data.
701+
# This provides intermediate streaming for
702+
# orchestrations (e.g. sequential) where agent
703+
# output events are filtered by output_executors.
704+
# Only AgentExecutors produce meaningful chat
705+
# messages; framework-internal executors like
706+
# input-conversation would echo user input.
707+
executor = workflow.executors.get(event.executor_id)
708+
if (
709+
isinstance(executor, AgentExecutor)
710+
and event.executor_id not in output_executor_ids
711+
and event.executor_id not in executors_with_messages
712+
):
713+
completed_msg_events = self._extract_workflow_messages(
714+
self._filter_completed_data(event.data)
715+
)
716+
if completed_msg_events:
717+
# Close prior message so each agent gets a
718+
# separate bubble in the UI.
719+
for close_evt in self.chat.close_message():
720+
yield UiPathRuntimeMessageEvent(payload=close_evt)
721+
for msg_event in completed_msg_events:
722+
yield UiPathRuntimeMessageEvent(payload=msg_event)
723+
executors_with_messages.add(event.executor_id)
724+
682725
yield UiPathRuntimeStateEvent(
683726
payload=self._serialize_event_data(
684727
self._filter_completed_data(event.data)
@@ -702,8 +745,15 @@ async def _stream_workflow(
702745
elif tool_event.phase == UiPathRuntimeStatePhase.COMPLETED:
703746
self._pending_tool_nodes.discard(tool_event.node_name)
704747
yield tool_event
705-
for msg_event in self._extract_workflow_messages(event.data):
706-
yield UiPathRuntimeMessageEvent(payload=msg_event)
748+
749+
# When intermediate agents already emitted message
750+
# events via executor_completed, skip the final
751+
# orchestration output to avoid duplicating text.
752+
if not executors_with_messages:
753+
for msg_event in self._extract_workflow_messages(
754+
event.data, assistant_only=True
755+
):
756+
yield UiPathRuntimeMessageEvent(payload=msg_event)
707757

708758
# Detect workflow suspension via state
709759
if (
@@ -899,9 +949,31 @@ def _extract_contents(data: Any) -> list[Any]:
899949
contents.extend(UiPathAgentFrameworkRuntime._extract_contents(item))
900950
return contents
901951

902-
def _extract_workflow_messages(self, data: Any) -> list[Any]:
903-
"""Extract UiPath conversation message events from workflow output data."""
952+
def _extract_workflow_messages(
953+
self, data: Any, *, assistant_only: bool = False
954+
) -> list[Any]:
955+
"""Extract UiPath conversation message events from workflow output data.
956+
957+
Args:
958+
data: Workflow output data (AgentResponse, Message, list[Message], etc.)
959+
assistant_only: When True, only extract content from assistant-role
960+
messages. Used for orchestration outputs (e.g. sequential
961+
workflow full-conversation lists) to avoid echoing the user's
962+
input back as AI output.
963+
"""
904964
events: list[Any] = []
965+
966+
if assistant_only and isinstance(data, list):
967+
for item in data:
968+
if isinstance(item, Message) and item.role != "assistant":
969+
continue
970+
for content in self._extract_contents(item):
971+
if isinstance(content, Content):
972+
if content.type == "function_approval_request":
973+
continue
974+
events.extend(self.chat.map_streaming_content(content))
975+
return events
976+
905977
for content in self._extract_contents(data):
906978
if isinstance(content, Content):
907979
# Skip HITL approval requests — handled by the suspension mechanism
@@ -964,7 +1036,12 @@ def _try_parse_structured_output(self, text: str) -> dict[str, Any] | None:
9641036
return None
9651037

9661038
def _get_output_response_format(self) -> type[BaseModel] | None:
967-
"""Get the response_format from the workflow's output executors."""
1039+
"""Get the response_format from the workflow's output executors.
1040+
1041+
For orchestrations (e.g. SequentialBuilder) where output executors are
1042+
framework-internal adapters, falls back to scanning all workflow
1043+
executors and returns the response_format from the last AgentExecutor.
1044+
"""
9681045
try:
9691046
output_executors = self.agent.workflow.get_output_executors()
9701047
except Exception:
@@ -976,34 +1053,76 @@ def _get_output_response_format(self) -> type[BaseModel] | None:
9761053
if not isinstance(inner_agent, Agent):
9771054
continue
9781055
response_format = inner_agent.default_options.get("response_format")
979-
if response_format is not None and isinstance(response_format, type) and issubclass(response_format, BaseModel):
1056+
if (
1057+
response_format is not None
1058+
and isinstance(response_format, type)
1059+
and issubclass(response_format, BaseModel)
1060+
):
9801061
return response_format
981-
return None
1062+
1063+
# Fallback: scan all workflow executors for the last AgentExecutor
1064+
# with a response_format. Needed for orchestrations like sequential
1065+
# where the output executor is an internal adapter (e.g. _EndWithConversation).
1066+
try:
1067+
all_executors = list(self.agent.workflow.executors.values())
1068+
except Exception:
1069+
return None
1070+
result: type[BaseModel] | None = None
1071+
for executor in all_executors:
1072+
if not isinstance(executor, AgentExecutor):
1073+
continue
1074+
inner_agent = executor._agent
1075+
if not isinstance(inner_agent, Agent):
1076+
continue
1077+
response_format = inner_agent.default_options.get("response_format")
1078+
if (
1079+
response_format is not None
1080+
and isinstance(response_format, type)
1081+
and issubclass(response_format, BaseModel)
1082+
):
1083+
result = response_format
1084+
return result
9821085

9831086
@staticmethod
9841087
def _extract_text_from_data(data: Any) -> str:
985-
"""Extract text from any workflow data type."""
1088+
"""Extract text from any workflow data type.
1089+
1090+
For list[Message] data (e.g. sequential workflow full-conversation
1091+
output), only the last assistant message is used. The full
1092+
conversation includes intermediate agent turns but the workflow
1093+
result should be the final agent's output, not the concatenation
1094+
of every participant.
1095+
"""
9861096
if isinstance(data, (AgentResponseUpdate, AgentResponse)):
9871097
return data.text or ""
9881098
if isinstance(data, Message):
1099+
if data.role != "assistant":
1100+
return ""
9891101
return "".join(
9901102
c.text for c in (data.contents or []) if hasattr(c, "text") and c.text
9911103
)
9921104
if isinstance(data, str):
9931105
return data
9941106
if isinstance(data, list):
995-
parts: list[str] = []
1107+
# Collect assistant message texts, then return only the last
1108+
# one. For single-agent workflows there is typically only one
1109+
# assistant message so this is equivalent to the old behavior.
1110+
# For multi-agent conversations (sequential, group-chat) the
1111+
# last assistant message is the final agent's output.
1112+
last_text: str = ""
9961113
for item in data:
9971114
if isinstance(item, Message):
1115+
if item.role != "assistant":
1116+
continue
9981117
text = "".join(
9991118
c.text
10001119
for c in (item.contents or [])
10011120
if hasattr(c, "text") and c.text
10021121
)
10031122
if text:
1004-
parts.append(text)
1123+
last_text = text
10051124
elif isinstance(item, str):
1006-
parts.append(item)
1125+
last_text = item
10071126
elif isinstance(item, list):
10081127
for inner in item:
10091128
if isinstance(inner, Message) and inner.role == "assistant":
@@ -1013,8 +1132,8 @@ def _extract_text_from_data(data: Any) -> str:
10131132
if hasattr(c, "text") and c.text
10141133
)
10151134
if text:
1016-
parts.append(text)
1017-
return "".join(parts)
1135+
last_text = text
1136+
return last_text
10181137
return ""
10191138

10201139
def _prepare_input(self, input: dict[str, Any] | None) -> str:

0 commit comments

Comments
 (0)