Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 45 additions & 10 deletions src/llama_stack_client/lib/agents/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
from llama_stack_client.types.agents.turn_create_params import Document, Toolgroup
from llama_stack_client.types.agents.turn_create_response import AgentTurnResponseStreamChunk


from .client_tool import ClientTool
from .output_parser import OutputParser

DEFAULT_MAX_ITER = 10

Expand All @@ -23,14 +25,18 @@ def __init__(
client: LlamaStackClient,
agent_config: AgentConfig,
client_tools: Tuple[ClientTool] = (),
memory_bank_id: Optional[str] = None,
output_parser: Optional[OutputParser] = None,
):
self.client = client
self.agent_config = agent_config
self.agent_id = self._create_agent(agent_config)
self.client_tools = {t.get_name(): t for t in client_tools}
self.sessions = []
self.memory_bank_id = memory_bank_id
self.output_parser = output_parser
self.builtin_tools = {}
for tg in agent_config["toolgroups"]:
for tool in self.client.tools.list(toolgroup_id=tg):
self.builtin_tools[tool.identifier] = tool

def _create_agent(self, agent_config: AgentConfig) -> int:
agentic_system_create_response = self.client.agents.create(
Expand All @@ -48,28 +54,56 @@ def create_session(self, session_name: str) -> int:
self.sessions.append(self.session_id)
return self.session_id

def _process_chunk(self, chunk: AgentTurnResponseStreamChunk) -> None:
if chunk.event.payload.event_type != "turn_complete":
return
message = chunk.event.payload.turn.output_message

if self.output_parser:
parsed_message = self.output_parser.parse(message)
message = parsed_message

def _has_tool_call(self, chunk: AgentTurnResponseStreamChunk) -> bool:
if chunk.event.payload.event_type != "turn_complete":
return False
message = chunk.event.payload.turn.output_message
if message.stop_reason == "out_of_tokens":
return False

return len(message.tool_calls) > 0

def _run_tool(self, chunk: AgentTurnResponseStreamChunk) -> ToolResponseMessage:
message = chunk.event.payload.turn.output_message
tool_call = message.tool_calls[0]
if tool_call.tool_name not in self.client_tools:
return ToolResponseMessage(

# custom client tools
if tool_call.tool_name in self.client_tools:
tool = self.client_tools[tool_call.tool_name]
result_messages = tool.run([message])
next_message = result_messages[0]
return next_message

# builtin tools executed by tool_runtime
if tool_call.tool_name in self.builtin_tools:
tool_result = self.client.tool_runtime.invoke_tool(
tool_name=tool_call.tool_name,
kwargs=tool_call.arguments,
)
tool_response_message = ToolResponseMessage(
call_id=tool_call.call_id,
tool_name=tool_call.tool_name,
content=f"Unknown tool `{tool_call.tool_name}` was called.",
role="ipython",
content=tool_result.content,
role="tool",
)
tool = self.client_tools[tool_call.tool_name]
result_messages = tool.run([message])
next_message = result_messages[0]
return next_message
return tool_response_message

# cannot find tools
return ToolResponseMessage(
call_id=tool_call.call_id,
tool_name=tool_call.tool_name,
content=f"Unknown tool `{tool_call.tool_name}` was called.",
role="tool",
)

def create_turn(
self,
Expand Down Expand Up @@ -115,6 +149,7 @@ def _create_turn_streaming(
# by default, we stop after the first turn
stop = True
for chunk in response:
self._process_chunk(chunk)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it's cleaner to only have the override as get_tool_call(chunk) instead of a generic output parser. This way:

  1. It's clearer what the user is supposed to override
  2. We can actually simplify the logic below as:
# the default tool_call_getter just return `chunk...tool_calls`
tool_call = self.tool_call_getter.get_tool_call(chunk)
if not tool_call:
    yield chunk
    return
 else:
    # run tool
  1. Bonus: we can also be more functional and not overwrite chunk with the parsed tool calls.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ehhuang +100 especially the (3) bonus

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some update in: #130

However, we still need to overwrite chunk with parsed tool calls, as ClientTool.run takes in a message history and expect the ToolCall detail in the last message.

if hasattr(chunk, "error"):
yield chunk
return
Expand Down
48 changes: 48 additions & 0 deletions src/llama_stack_client/lib/agents/output_parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.

from abc import abstractmethod

from llama_stack_client.types.agents.turn import CompletionMessage


class OutputParser:
"""
Abstract base class for parsing agent responses. Implement this class to customize how
agent outputs are processed and transformed.
This class allows developers to define custom parsing logic for agent responses,
which can be useful for:
- Extracting specific information from the response
- Formatting or structuring the output in a specific way
- Validating or sanitizing the agent's response
To use this class:
1. Create a subclass of OutputParser
2. Implement the `parse` method
3. Pass your parser instance to the Agent's constructor
Example:
class MyCustomParser(OutputParser):
def parse(self, output_message: CompletionMessage) -> CompletionMessage:
# Add your custom parsing logic here
return processed_message
Methods:
parse(output_message: CompletionMessage) -> CompletionMessage:
Abstract method that must be implemented by subclasses to process
the agent's response.
Args:
output_message (CompletionMessage): The response message from agent turn
Returns:
CompletionMessage: The processed/transformed response message
"""

@abstractmethod
def parse(self, output_message: CompletionMessage) -> CompletionMessage:
raise NotImplementedError
5 changes: 5 additions & 0 deletions src/llama_stack_client/lib/agents/react/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
105 changes: 105 additions & 0 deletions src/llama_stack_client/lib/agents/react/agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
from pydantic import BaseModel
from typing import Dict, Any
from ..agent import Agent
from .output_parser import ReActOutputParser
from ..output_parser import OutputParser
from .prompts import DEFAULT_REACT_AGENT_SYSTEM_PROMPT_TEMPLATE

from typing import Tuple, Optional
from llama_stack_client import LlamaStackClient
from ..client_tool import ClientTool
from llama_stack_client.types.agent_create_params import AgentConfig


class Action(BaseModel):
tool_name: str
tool_params: Dict[str, Any]


class ReActOutput(BaseModel):
thought: str
action: Optional[Action] = None
answer: Optional[str] = None


class ReActAgent(Agent):
"""ReAct agent.

Simple wrapper around Agent to add prepare prompts for creating a ReAct agent from a list of tools.
"""

def __init__(
self,
client: LlamaStackClient,
model: str,
builtin_toolgroups: Tuple[str] = (),
client_tools: Tuple[ClientTool] = (),
output_parser: OutputParser = ReActOutputParser(),
json_response_format: bool = False,
custom_agent_config: Optional[AgentConfig] = None,
):
def get_tool_defs():
tool_defs = []
for x in builtin_toolgroups:
tool_defs.extend(
[
{
"name": tool.identifier,
"description": tool.description,
"parameters": tool.parameters,
}
for tool in client.tools.list(toolgroup_id=x)
]
)
tool_defs.extend(
[
{
"name": tool.get_name(),
"description": tool.get_description(),
"parameters": tool.get_params_definition(),
}
for tool in client_tools
]
)
return tool_defs

if custom_agent_config is None:
tool_names, tool_descriptions = "", ""
tool_defs = get_tool_defs()
tool_names = ", ".join([x["name"] for x in tool_defs])
tool_descriptions = "\n".join([f"- {x['name']}: {x}" for x in tool_defs])
instruction = DEFAULT_REACT_AGENT_SYSTEM_PROMPT_TEMPLATE.replace("<<tool_names>>", tool_names).replace(
"<<tool_descriptions>>", tool_descriptions
)

# user default toolgroups
agent_config = AgentConfig(
model=model,
instructions=instruction,
toolgroups=builtin_toolgroups,
client_tools=[client_tool.get_tool_definition() for client_tool in client_tools],
tool_choice="auto",
# TODO: refactor this to use SystemMessageBehaviour.replace
tool_prompt_format="json",
input_shields=[],
output_shields=[],
enable_session_persistence=False,
)

if json_response_format:
agent_config.response_format = {
"type": "json_schema",
"json_schema": ReActOutput.model_json_schema(),
}

super().__init__(
client=client,
agent_config=agent_config,
client_tools=client_tools,
output_parser=output_parser,
)
46 changes: 46 additions & 0 deletions src/llama_stack_client/lib/agents/react/output_parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.

from pydantic import BaseModel, ValidationError
from typing import Dict, Any, Optional
from ..output_parser import OutputParser
from llama_stack_client.types.shared.completion_message import CompletionMessage
from llama_stack_client.types.shared.tool_call import ToolCall

import uuid


class Action(BaseModel):
tool_name: str
tool_params: Dict[str, Any]


class ReActOutput(BaseModel):
thought: str
action: Optional[Action] = None
answer: Optional[str] = None


class ReActOutputParser(OutputParser):
def parse(self, output_message: CompletionMessage) -> CompletionMessage:
response_text = str(output_message.content)
try:
react_output = ReActOutput.model_validate_json(response_text)
except ValidationError as e:
print(f"Error parsing action: {e}")
return output_message
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the turn just terminate after this point?

Copy link
Contributor Author

@yanxi0830 yanxi0830 Feb 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the turn will terminate after this point as there's no tool calls in Agent. We can override orchestration in ReActAgent to continue the loop and think again if tool call is not being correctly parsed until "answer" is reached.


if react_output.answer:
return output_message

if react_output.action:
tool_name = react_output.action.tool_name
tool_params = react_output.action.tool_params
if tool_name and tool_params:
call_id = str(uuid.uuid4())
output_message.tool_calls = [ToolCall(call_id=call_id, tool_name=tool_name, arguments=tool_params)]

return output_message
Loading