diff --git a/src/strands/tools/mcp/mcp_client.py b/src/strands/tools/mcp/mcp_client.py index 6ce591bc5..37b99d021 100644 --- a/src/strands/tools/mcp/mcp_client.py +++ b/src/strands/tools/mcp/mcp_client.py @@ -21,11 +21,20 @@ import anyio from mcp import ClientSession, ListToolsResult from mcp.client.session import ElicitationFnT -from mcp.types import BlobResourceContents, GetPromptResult, ListPromptsResult, TextResourceContents +from mcp.types import ( + BlobResourceContents, + GetPromptResult, + ListPromptsResult, + ListResourcesResult, + ListResourceTemplatesResult, + ReadResourceResult, + TextResourceContents, +) from mcp.types import CallToolResult as MCPCallToolResult from mcp.types import EmbeddedResource as MCPEmbeddedResource from mcp.types import ImageContent as MCPImageContent from mcp.types import TextContent as MCPTextContent +from pydantic import AnyUrl from typing_extensions import Protocol, TypedDict from ...experimental.tools import ToolProvider @@ -449,6 +458,82 @@ async def _get_prompt_async() -> GetPromptResult: return get_prompt_result + def list_resources_sync(self, pagination_token: Optional[str] = None) -> ListResourcesResult: + """Synchronously retrieves the list of available resources from the MCP server. + + This method calls the asynchronous list_resources method on the MCP session + and returns the raw ListResourcesResult with pagination support. + + Args: + pagination_token: Optional token for pagination + + Returns: + ListResourcesResult: The raw MCP response containing resources and pagination info + """ + self._log_debug_with_thread("listing MCP resources synchronously") + if not self._is_session_active(): + raise MCPClientInitializationError(CLIENT_SESSION_NOT_RUNNING_ERROR_MESSAGE) + + async def _list_resources_async() -> ListResourcesResult: + return await cast(ClientSession, self._background_thread_session).list_resources(cursor=pagination_token) + + list_resources_result: ListResourcesResult = self._invoke_on_background_thread(_list_resources_async()).result() + self._log_debug_with_thread("received %d resources from MCP server", len(list_resources_result.resources)) + + return list_resources_result + + def read_resource_sync(self, uri: AnyUrl | str) -> ReadResourceResult: + """Synchronously reads a resource from the MCP server. + + Args: + uri: The URI of the resource to read + + Returns: + ReadResourceResult: The resource content from the MCP server + """ + self._log_debug_with_thread("reading MCP resource synchronously: %s", uri) + if not self._is_session_active(): + raise MCPClientInitializationError(CLIENT_SESSION_NOT_RUNNING_ERROR_MESSAGE) + + async def _read_resource_async() -> ReadResourceResult: + # Convert string to AnyUrl if needed + resource_uri = AnyUrl(uri) if isinstance(uri, str) else uri + return await cast(ClientSession, self._background_thread_session).read_resource(resource_uri) + + read_resource_result: ReadResourceResult = self._invoke_on_background_thread(_read_resource_async()).result() + self._log_debug_with_thread("received resource content from MCP server") + + return read_resource_result + + def list_resource_templates_sync(self, pagination_token: Optional[str] = None) -> ListResourceTemplatesResult: + """Synchronously retrieves the list of available resource templates from the MCP server. + + Resource templates define URI patterns that can be used to access resources dynamically. + + Args: + pagination_token: Optional token for pagination + + Returns: + ListResourceTemplatesResult: The raw MCP response containing resource templates and pagination info + """ + self._log_debug_with_thread("listing MCP resource templates synchronously") + if not self._is_session_active(): + raise MCPClientInitializationError(CLIENT_SESSION_NOT_RUNNING_ERROR_MESSAGE) + + async def _list_resource_templates_async() -> ListResourceTemplatesResult: + return await cast(ClientSession, self._background_thread_session).list_resource_templates( + cursor=pagination_token + ) + + list_resource_templates_result: ListResourceTemplatesResult = self._invoke_on_background_thread( + _list_resource_templates_async() + ).result() + self._log_debug_with_thread( + "received %d resource templates from MCP server", len(list_resource_templates_result.resourceTemplates) + ) + + return list_resource_templates_result + def call_tool_sync( self, tool_use_id: str, diff --git a/tests/strands/tools/mcp/test_mcp_client.py b/tests/strands/tools/mcp/test_mcp_client.py index f5040de1b..35f11f47f 100644 --- a/tests/strands/tools/mcp/test_mcp_client.py +++ b/tests/strands/tools/mcp/test_mcp_client.py @@ -5,9 +5,21 @@ import pytest from mcp import ListToolsResult from mcp.types import CallToolResult as MCPCallToolResult -from mcp.types import GetPromptResult, ListPromptsResult, Prompt, PromptMessage +from mcp.types import ( + GetPromptResult, + ListPromptsResult, + ListResourcesResult, + ListResourceTemplatesResult, + Prompt, + PromptMessage, + ReadResourceResult, + Resource, + ResourceTemplate, + TextResourceContents, +) from mcp.types import TextContent as MCPTextContent from mcp.types import Tool as MCPTool +from pydantic import AnyUrl from strands.tools.mcp import MCPClient from strands.tools.mcp.mcp_types import MCPToolResult @@ -772,3 +784,143 @@ def test_call_tool_sync_with_meta_and_structured_content(mock_transport, mock_se assert result["metadata"] == metadata assert "structuredContent" in result assert result["structuredContent"] == structured_content + + +# Resource Tests - Sync Methods + + +def test_list_resources_sync(mock_transport, mock_session): + """Test that list_resources_sync correctly retrieves resources.""" + mock_resource = Resource( + uri=AnyUrl("file://documents/test.txt"), name="test.txt", description="A test document", mimeType="text/plain" + ) + mock_session.list_resources.return_value = ListResourcesResult(resources=[mock_resource]) + + with MCPClient(mock_transport["transport_callable"]) as client: + result = client.list_resources_sync() + + mock_session.list_resources.assert_called_once_with(cursor=None) + assert len(result.resources) == 1 + assert result.resources[0].name == "test.txt" + assert str(result.resources[0].uri) == "file://documents/test.txt" + assert result.nextCursor is None + + +def test_list_resources_sync_with_pagination_token(mock_transport, mock_session): + """Test that list_resources_sync correctly passes pagination token and returns next cursor.""" + mock_resource = Resource( + uri=AnyUrl("file://documents/test.txt"), name="test.txt", description="A test document", mimeType="text/plain" + ) + mock_session.list_resources.return_value = ListResourcesResult(resources=[mock_resource], nextCursor="next_page") + + with MCPClient(mock_transport["transport_callable"]) as client: + result = client.list_resources_sync(pagination_token="current_page") + + mock_session.list_resources.assert_called_once_with(cursor="current_page") + assert len(result.resources) == 1 + assert result.resources[0].name == "test.txt" + assert result.nextCursor == "next_page" + + +def test_list_resources_sync_session_not_active(): + """Test that list_resources_sync raises an error when session is not active.""" + client = MCPClient(MagicMock()) + + with pytest.raises(MCPClientInitializationError, match="client session is not running"): + client.list_resources_sync() + + +def test_read_resource_sync(mock_transport, mock_session): + """Test that read_resource_sync correctly reads a resource.""" + mock_content = TextResourceContents( + uri=AnyUrl("file://documents/test.txt"), text="Resource content", mimeType="text/plain" + ) + mock_session.read_resource.return_value = ReadResourceResult(contents=[mock_content]) + + with MCPClient(mock_transport["transport_callable"]) as client: + result = client.read_resource_sync("file://documents/test.txt") + + # Verify the session method was called + mock_session.read_resource.assert_called_once() + # Check the URI argument (it will be wrapped as AnyUrl) + call_args = mock_session.read_resource.call_args[0] + assert str(call_args[0]) == "file://documents/test.txt" + + assert len(result.contents) == 1 + assert result.contents[0].text == "Resource content" + + +def test_read_resource_sync_with_anyurl(mock_transport, mock_session): + """Test that read_resource_sync correctly handles AnyUrl input.""" + mock_content = TextResourceContents( + uri=AnyUrl("file://documents/test.txt"), text="Resource content", mimeType="text/plain" + ) + mock_session.read_resource.return_value = ReadResourceResult(contents=[mock_content]) + + with MCPClient(mock_transport["transport_callable"]) as client: + uri = AnyUrl("file://documents/test.txt") + result = client.read_resource_sync(uri) + + mock_session.read_resource.assert_called_once() + call_args = mock_session.read_resource.call_args[0] + assert str(call_args[0]) == "file://documents/test.txt" + + assert len(result.contents) == 1 + assert result.contents[0].text == "Resource content" + + +def test_read_resource_sync_session_not_active(): + """Test that read_resource_sync raises an error when session is not active.""" + client = MCPClient(MagicMock()) + + with pytest.raises(MCPClientInitializationError, match="client session is not running"): + client.read_resource_sync("file://documents/test.txt") + + +def test_list_resource_templates_sync(mock_transport, mock_session): + """Test that list_resource_templates_sync correctly retrieves resource templates.""" + mock_template = ResourceTemplate( + uriTemplate="file://documents/{name}", + name="document_template", + description="Template for documents", + mimeType="text/plain", + ) + mock_session.list_resource_templates.return_value = ListResourceTemplatesResult(resourceTemplates=[mock_template]) + + with MCPClient(mock_transport["transport_callable"]) as client: + result = client.list_resource_templates_sync() + + mock_session.list_resource_templates.assert_called_once_with(cursor=None) + assert len(result.resourceTemplates) == 1 + assert result.resourceTemplates[0].name == "document_template" + assert result.resourceTemplates[0].uriTemplate == "file://documents/{name}" + assert result.nextCursor is None + + +def test_list_resource_templates_sync_with_pagination_token(mock_transport, mock_session): + """Test that list_resource_templates_sync correctly passes pagination token and returns next cursor.""" + mock_template = ResourceTemplate( + uriTemplate="file://documents/{name}", + name="document_template", + description="Template for documents", + mimeType="text/plain", + ) + mock_session.list_resource_templates.return_value = ListResourceTemplatesResult( + resourceTemplates=[mock_template], nextCursor="next_page" + ) + + with MCPClient(mock_transport["transport_callable"]) as client: + result = client.list_resource_templates_sync(pagination_token="current_page") + + mock_session.list_resource_templates.assert_called_once_with(cursor="current_page") + assert len(result.resourceTemplates) == 1 + assert result.resourceTemplates[0].name == "document_template" + assert result.nextCursor == "next_page" + + +def test_list_resource_templates_sync_session_not_active(): + """Test that list_resource_templates_sync raises an error when session is not active.""" + client = MCPClient(MagicMock()) + + with pytest.raises(MCPClientInitializationError, match="client session is not running"): + client.list_resource_templates_sync() diff --git a/tests_integ/mcp/echo_server.py b/tests_integ/mcp/echo_server.py index a23a87b5c..151f913d6 100644 --- a/tests_integ/mcp/echo_server.py +++ b/tests_integ/mcp/echo_server.py @@ -16,12 +16,15 @@ """ import base64 +import json from typing import Literal from mcp.server import FastMCP from mcp.types import BlobResourceContents, CallToolResult, EmbeddedResource, TextContent, TextResourceContents from pydantic import BaseModel +TEST_IMAGE_BASE64 = "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mP8z8DwHwAFBQIAX8jx0gAAAABJRU5ErkJggg==" + class EchoResponse(BaseModel): """Response model for echo with structured content.""" @@ -102,6 +105,22 @@ def get_weather(location: Literal["New York", "London", "Tokyo"] = "New York"): ) ] + # Resources + @mcp.resource("test://static-text") + def static_text_resource() -> str: + """A static text resource for testing""" + return "This is the content of the static text resource." + + @mcp.resource("test://static-binary") + def static_binary_resource() -> bytes: + """A static binary resource (image) for testing""" + return base64.b64decode(TEST_IMAGE_BASE64) + + @mcp.resource("test://template/{id}/data") + def template_resource(id: str) -> str: + """A resource template with parameter substitution""" + return json.dumps({"id": id, "templateTest": True, "data": f"Data for ID: {id}"}) + mcp.run(transport="stdio") diff --git a/tests_integ/mcp/test_mcp_resources.py b/tests_integ/mcp/test_mcp_resources.py new file mode 100644 index 000000000..dccf3b808 --- /dev/null +++ b/tests_integ/mcp/test_mcp_resources.py @@ -0,0 +1,130 @@ +""" +Integration tests for MCP client resource functionality. + +This module tests the resource-related methods in MCPClient: +- list_resources_sync() +- read_resource_sync() +- list_resource_templates_sync() + +The tests use the echo server which has been extended with resource functionality. +""" + +import base64 +import json + +import pytest +from mcp import StdioServerParameters, stdio_client +from mcp.shared.exceptions import McpError +from mcp.types import BlobResourceContents, TextResourceContents +from pydantic import AnyUrl + +from strands.tools.mcp.mcp_client import MCPClient + + +def test_mcp_resources_list_and_read(): + """Test listing and reading various types of resources.""" + mcp_client = MCPClient( + lambda: stdio_client(StdioServerParameters(command="python", args=["tests_integ/mcp/echo_server.py"])) + ) + + with mcp_client: + # Test list_resources_sync + resources_result = mcp_client.list_resources_sync() + assert len(resources_result.resources) >= 2 # At least our 2 static resources + + # Verify resource URIs exist (only static resources, not templates) + resource_uris = [str(r.uri) for r in resources_result.resources] + assert "test://static-text" in resource_uris + assert "test://static-binary" in resource_uris + # Template resources are not listed in static resources + + # Test reading text resource + text_resource = mcp_client.read_resource_sync("test://static-text") + assert len(text_resource.contents) == 1 + content = text_resource.contents[0] + assert isinstance(content, TextResourceContents) + assert "This is the content of the static text resource." in content.text + + # Test reading binary resource + binary_resource = mcp_client.read_resource_sync("test://static-binary") + assert len(binary_resource.contents) == 1 + binary_content = binary_resource.contents[0] + assert isinstance(binary_content, BlobResourceContents) + # Verify it's valid base64 encoded data + decoded_data = base64.b64decode(binary_content.blob) + assert len(decoded_data) > 0 + + +def test_mcp_resources_templates(): + """Test listing resource templates and reading from template resources.""" + mcp_client = MCPClient( + lambda: stdio_client(StdioServerParameters(command="python", args=["tests_integ/mcp/echo_server.py"])) + ) + + with mcp_client: + # Test list_resource_templates_sync + templates_result = mcp_client.list_resource_templates_sync() + assert len(templates_result.resourceTemplates) >= 1 + + # Verify template URIs exist + template_uris = [t.uriTemplate for t in templates_result.resourceTemplates] + assert "test://template/{id}/data" in template_uris + + # Test reading from template resource + template_resource = mcp_client.read_resource_sync("test://template/123/data") + assert len(template_resource.contents) == 1 + template_content = template_resource.contents[0] + assert isinstance(template_content, TextResourceContents) + + # Parse the JSON response + parsed_json = json.loads(template_content.text) + assert parsed_json["id"] == "123" + assert parsed_json["templateTest"] is True + assert "Data for ID: 123" in parsed_json["data"] + + +def test_mcp_resources_pagination(): + """Test pagination support for resources.""" + mcp_client = MCPClient( + lambda: stdio_client(StdioServerParameters(command="python", args=["tests_integ/mcp/echo_server.py"])) + ) + + with mcp_client: + # Test with pagination token (should work even if server doesn't implement pagination) + resources_result = mcp_client.list_resources_sync(pagination_token=None) + assert len(resources_result.resources) >= 0 + + # Test resource templates pagination + templates_result = mcp_client.list_resource_templates_sync(pagination_token=None) + assert len(templates_result.resourceTemplates) >= 0 + + +def test_mcp_resources_error_handling(): + """Test error handling for resource operations.""" + mcp_client = MCPClient( + lambda: stdio_client(StdioServerParameters(command="python", args=["tests_integ/mcp/echo_server.py"])) + ) + + with mcp_client: + # Test reading non-existent resource + with pytest.raises(McpError, match="Unknown resource"): + mcp_client.read_resource_sync("test://nonexistent") + + +def test_mcp_resources_uri_types(): + """Test that both string and AnyUrl types work for read_resource_sync.""" + mcp_client = MCPClient( + lambda: stdio_client(StdioServerParameters(command="python", args=["tests_integ/mcp/echo_server.py"])) + ) + + with mcp_client: + # Test with string URI + text_resource_str = mcp_client.read_resource_sync("test://static-text") + assert len(text_resource_str.contents) == 1 + + # Test with AnyUrl URI + text_resource_url = mcp_client.read_resource_sync(AnyUrl("test://static-text")) + assert len(text_resource_url.contents) == 1 + + # Both should return the same content + assert text_resource_str.contents[0].text == text_resource_url.contents[0].text