Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
217 changes: 217 additions & 0 deletions tests/agent/react/test_json_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
"""Tests for json_utils.py — JSONPath extraction from Pydantic models."""

from typing import Optional

import pytest
from pydantic import BaseModel

from uipath_langchain.agent.react.json_utils import (
_create_type_matcher,
_is_pydantic_model,
_unwrap_optional,
extract_values_by_paths,
get_json_paths_by_type,
)

# --- Test models ---


class Attachment(BaseModel):
id: str
filename: str


class NestedContainer(BaseModel):
attachment: Attachment
label: str


class ModelWithSimpleField(BaseModel):
attachment: Attachment
name: str


class ModelWithArrayField(BaseModel):
attachments: list[Attachment]
count: int


class ModelWithOptionalField(BaseModel):
attachment: Optional[Attachment] = None
name: str


class ModelWithNestedModel(BaseModel):
container: NestedContainer
title: str


class ModelWithArrayOfNested(BaseModel):
containers: list[NestedContainer]


class ModelWithNoTargetType(BaseModel):
name: str
value: int


class ModelWithMixedFields(BaseModel):
single: Attachment
multiple: list[Attachment]
label: str


# --- Tests for get_json_paths_by_type ---


class TestGetJsonPathsByType:
"""Tests for get_json_paths_by_type."""

def test_simple_field(self) -> None:
paths = get_json_paths_by_type(ModelWithSimpleField, "Attachment")
assert paths == ["$.attachment"]

def test_array_field(self) -> None:
paths = get_json_paths_by_type(ModelWithArrayField, "Attachment")
assert paths == ["$.attachments[*]"]

def test_optional_field_unwrapped(self) -> None:
paths = get_json_paths_by_type(ModelWithOptionalField, "Attachment")
assert paths == ["$.attachment"]

def test_nested_model_field(self) -> None:
paths = get_json_paths_by_type(ModelWithNestedModel, "Attachment")
assert paths == ["$.container.attachment"]

def test_array_of_nested_models(self) -> None:
paths = get_json_paths_by_type(ModelWithArrayOfNested, "Attachment")
assert paths == ["$.containers[*].attachment"]

def test_no_matching_type_returns_empty(self) -> None:
paths = get_json_paths_by_type(ModelWithNoTargetType, "Attachment")
assert paths == []

def test_mixed_simple_and_array(self) -> None:
paths = get_json_paths_by_type(ModelWithMixedFields, "Attachment")
assert "$.single" in paths
assert "$.multiple[*]" in paths
assert len(paths) == 2


# --- Tests for extract_values_by_paths ---


class TestExtractValuesByPaths:
"""Tests for extract_values_by_paths."""

def test_extract_from_dict_simple_path(self) -> None:
obj = {"attachment": {"id": "123", "filename": "doc.pdf"}, "name": "test"}
result = extract_values_by_paths(obj, ["$.attachment"])
assert result == [{"id": "123", "filename": "doc.pdf"}]

def test_extract_from_dict_array_path(self) -> None:
obj = {
"attachments": [
{"id": "1", "filename": "a.pdf"},
{"id": "2", "filename": "b.pdf"},
],
"count": 2,
}
result = extract_values_by_paths(obj, ["$.attachments[*]"])
assert len(result) == 2
assert result[0]["id"] == "1"
assert result[1]["id"] == "2"

def test_extract_from_basemodel(self) -> None:
obj = ModelWithSimpleField(
attachment=Attachment(id="456", filename="img.png"),
name="test",
)
result = extract_values_by_paths(obj, ["$.attachment"])
assert len(result) == 1
assert result[0]["id"] == "456"

def test_extract_multiple_paths(self) -> None:
obj = {
"single": {"id": "s1", "filename": "s.pdf"},
"multiple": [
{"id": "m1", "filename": "m1.pdf"},
{"id": "m2", "filename": "m2.pdf"},
],
"label": "test",
}
result = extract_values_by_paths(obj, ["$.single", "$.multiple[*]"])
assert len(result) == 3

def test_extract_no_match_returns_empty(self) -> None:
obj = {"name": "test"}
result = extract_values_by_paths(obj, ["$.nonexistent"])
assert result == []

def test_extract_empty_paths_returns_empty(self) -> None:
obj = {"name": "test"}
result = extract_values_by_paths(obj, [])
assert result == []

def test_extract_nested_path(self) -> None:
obj = {
"container": {
"attachment": {"id": "nested", "filename": "n.pdf"},
"label": "c",
},
"title": "t",
}
result = extract_values_by_paths(obj, ["$.container.attachment"])
assert len(result) == 1
assert result[0]["id"] == "nested"


# --- Tests for helper functions ---


class TestUnwrapOptional:
"""Tests for _unwrap_optional."""

@pytest.mark.parametrize(
"input_type,expected",
[
(Optional[str], str),
(str, str),
(Optional[Attachment], Attachment),
],
ids=["optional-str", "plain-str", "optional-basemodel"],
)
def test_unwraps_correctly(self, input_type: type, expected: type) -> None:
assert _unwrap_optional(input_type) is expected


class TestIsPydanticModel:
"""Tests for _is_pydantic_model."""

@pytest.mark.parametrize(
"value,expected",
[
(Attachment, True),
(str, False),
(None, False),
(Attachment(id="1", filename="f"), False),
],
ids=["basemodel-class", "builtin-type", "none", "instance"],
)
def test_identifies_pydantic_models(self, value: object, expected: bool) -> None:
assert _is_pydantic_model(value) is expected


class TestCreateTypeMatcher:
"""Tests for _create_type_matcher."""

def test_matches_by_class_and_string_annotation(self) -> None:
matcher = _create_type_matcher("Attachment", None)
assert matcher(Attachment) is True
assert matcher("Attachment") is True
assert matcher(str) is False
assert matcher("OtherType") is False

def test_matches_by_target_type(self) -> None:
matcher = _create_type_matcher("Attachment", Attachment)
assert matcher(Attachment) is True
33 changes: 33 additions & 0 deletions tests/agent/react/test_merge_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import pytest
from pydantic import BaseModel

from uipath_langchain.agent.react.reducers import merge_dicts as reducer_merge_dicts
from uipath_langchain.agent.react.reducers import merge_objects


Expand Down Expand Up @@ -184,3 +185,35 @@ def test_invalid_field_names_ignored(self):
assert result.name == "updated"
# Invalid field should not exist
assert not hasattr(result, "invalid_field")


class TestMergeDicts:
"""Test merge_dicts reducer from reducers.py."""

def test_empty_right_returns_left(self):
left = {"a": 1, "b": 2}
result = reducer_merge_dicts(left, {})
assert result is left

def test_empty_left_returns_right(self):
right = {"a": 1}
result = reducer_merge_dicts({}, right)
assert result is right

def test_disjoint_keys_merged(self):
left = {"a": 1}
right = {"b": 2}
result = reducer_merge_dicts(left, right)
assert result == {"a": 1, "b": 2}

def test_overlapping_keys_right_wins(self):
left = {"a": 1, "b": 2}
right = {"b": 3, "c": 4}
result = reducer_merge_dicts(left, right)
assert result == {"a": 1, "b": 3, "c": 4}

def test_none_values_in_right_override(self):
left = {"a": 1}
right = {"a": None}
result = reducer_merge_dicts(left, right)
assert result == {"a": None}
88 changes: 88 additions & 0 deletions tests/agent/react/test_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,3 +278,91 @@ def test_empty_ai_response_raises_exception(self, route_function_no_limit):
match="Agent produced empty response without tool calls",
):
route_function_no_limit(state)


class TestRouteAgentMultipleToolCallSequencing:
"""Test sequential dispatching of multiple tool calls in a single AI message."""

def test_three_tools_dispatched_sequentially(self):
"""Router dispatches each tool in order when multiple tool calls exist."""
route_func = create_route_agent(thinking_messages_limit=0)

ai_message = AIMessage(
content="Using three tools",
tool_calls=[
{"name": "tool_a", "args": {}, "id": "call_a"},
{"name": "tool_b", "args": {}, "id": "call_b"},
{"name": "tool_c", "args": {}, "id": "call_c"},
],
)

# Step 1: No tool results yet — route to first tool
state_0 = MockAgentGraphState(
messages=[HumanMessage(content="query"), ai_message]
)
assert route_func(state_0) == "tool_a"

# Step 2: First tool done — route to second
state_1 = MockAgentGraphState(
messages=[
HumanMessage(content="query"),
ai_message,
ToolMessage(content="result_a", tool_call_id="call_a"),
]
)
assert route_func(state_1) == "tool_b"

# Step 3: Two tools done — route to third
state_2 = MockAgentGraphState(
messages=[
HumanMessage(content="query"),
ai_message,
ToolMessage(content="result_a", tool_call_id="call_a"),
ToolMessage(content="result_b", tool_call_id="call_b"),
]
)
assert route_func(state_2) == "tool_c"

# Step 4: All done — route back to agent
state_3 = MockAgentGraphState(
messages=[
HumanMessage(content="query"),
ai_message,
ToolMessage(content="result_a", tool_call_id="call_a"),
ToolMessage(content="result_b", tool_call_id="call_b"),
ToolMessage(content="result_c", tool_call_id="call_c"),
]
)
assert route_func(state_3) == AgentGraphNode.AGENT

def test_flow_control_tool_among_multiple_terminates(self):
"""Router should terminate when the next tool is a flow control tool."""
route_func = create_route_agent(thinking_messages_limit=0)

ai_message = AIMessage(
content="Using tools then ending",
tool_calls=[
{"name": "regular_tool", "args": {}, "id": "call_1"},
{
"name": END_EXECUTION_TOOL.name,
"args": {"reason": "done"},
"id": "call_2",
},
],
)

# First tool is regular
state_0 = MockAgentGraphState(
messages=[HumanMessage(content="query"), ai_message]
)
assert route_func(state_0) == "regular_tool"

# After first tool done, next is flow control — terminate
state_1 = MockAgentGraphState(
messages=[
HumanMessage(content="query"),
ai_message,
ToolMessage(content="done", tool_call_id="call_1"),
]
)
assert route_func(state_1) == AgentGraphNode.TERMINATE
Loading
Loading