Skip to content

Commit 759a9a3

Browse files
committed
Implement async tools snippets
1 parent 2df5e7c commit 759a9a3

File tree

6 files changed

+474
-10
lines changed

6 files changed

+474
-10
lines changed
Lines changed: 219 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,219 @@
1+
"""
2+
Client example showing how to use async tools.
3+
4+
cd to the `examples/snippets` directory and run:
5+
uv run async-tools-client
6+
uv run async-tools-client --protocol=latest # backwards compatible mode
7+
uv run async-tools-client --protocol=next # async tools mode
8+
"""
9+
10+
import asyncio
11+
import os
12+
import sys
13+
14+
from mcp import ClientSession, StdioServerParameters, types
15+
from mcp.client.stdio import stdio_client
16+
17+
# Create server parameters for stdio connection
18+
server_params = StdioServerParameters(
19+
command="uv", # Using uv to run the server
20+
args=["run", "server", "async_tools", "stdio"],
21+
env={"UV_INDEX": os.environ.get("UV_INDEX", "")},
22+
)
23+
24+
25+
async def demonstrate_sync_tool(session: ClientSession):
26+
"""Demonstrate calling a synchronous tool."""
27+
print("\n=== Synchronous Tool Demo ===")
28+
29+
result = await session.call_tool("sync_tool", arguments={"x": 21})
30+
31+
# Print the result
32+
for content in result.content:
33+
if isinstance(content, types.TextContent):
34+
print(f"Sync tool result: {content.text}")
35+
36+
37+
async def demonstrate_async_tool(session: ClientSession):
38+
"""Demonstrate calling an async-only tool."""
39+
print("\n=== Asynchronous Tool Demo ===")
40+
41+
# Call the async tool
42+
result = await session.call_tool("async_only_tool", arguments={"data": "sample dataset"})
43+
44+
if result.operation:
45+
token = result.operation.token
46+
print(f"Async operation started with token: {token}")
47+
48+
# Poll for status updates
49+
while True:
50+
status = await session.get_operation_status(token)
51+
print(f"Status: {status.status}")
52+
53+
if status.status == "completed":
54+
# Get the final result
55+
final_result = await session.get_operation_result(token)
56+
for content in final_result.result.content:
57+
if isinstance(content, types.TextContent):
58+
print(f"Final result: {content.text}")
59+
break
60+
elif status.status == "failed":
61+
print(f"Operation failed: {status.error}")
62+
break
63+
elif status.status in ("canceled", "unknown"):
64+
print(f"Operation ended with status: {status.status}")
65+
break
66+
67+
# Wait before polling again
68+
await asyncio.sleep(1)
69+
else:
70+
# Synchronous result (shouldn't happen for async-only tools)
71+
for content in result.content:
72+
if isinstance(content, types.TextContent):
73+
print(f"Unexpected sync result: {content.text}")
74+
75+
76+
async def demonstrate_hybrid_tool(session: ClientSession):
77+
"""Demonstrate calling a hybrid tool in both modes."""
78+
print("\n=== Hybrid Tool Demo ===")
79+
80+
# Call hybrid tool (will be sync by default for compatibility)
81+
result = await session.call_tool("hybrid_tool", arguments={"message": "hello world"})
82+
83+
for content in result.content:
84+
if isinstance(content, types.TextContent):
85+
print(f"Hybrid tool result: {content.text}")
86+
87+
88+
async def demonstrate_batch_processing(session: ClientSession):
89+
"""Demonstrate batch processing with progress updates."""
90+
print("\n=== Batch Processing Demo ===")
91+
92+
items = ["apple", "banana", "cherry", "date", "elderberry"]
93+
result = await session.call_tool("batch_operation_tool", arguments={"items": items})
94+
95+
if result.operation:
96+
token = result.operation.token
97+
print(f"Batch operation started with token: {token}")
98+
99+
# Poll for status with progress tracking
100+
while True:
101+
status = await session.get_operation_status(token)
102+
print(f"Status: {status.status}")
103+
104+
if status.status == "completed":
105+
# Get the final result
106+
final_result = await session.get_operation_result(token)
107+
108+
# Check for structured result
109+
if final_result.result.structuredContent:
110+
print(f"Structured result: {final_result.result.structuredContent}")
111+
112+
# Also show text content
113+
for content in final_result.result.content:
114+
if isinstance(content, types.TextContent):
115+
print(f"Text result: {content.text}")
116+
break
117+
elif status.status == "failed":
118+
print(f"Operation failed: {status.error}")
119+
break
120+
elif status.status in ("canceled", "unknown"):
121+
print(f"Operation ended with status: {status.status}")
122+
break
123+
124+
# Wait before polling again
125+
await asyncio.sleep(0.5)
126+
else:
127+
print("Unexpected: batch operation returned synchronous result")
128+
129+
130+
async def demonstrate_data_processing(session: ClientSession):
131+
"""Demonstrate complex data processing pipeline."""
132+
print("\n=== Data Processing Pipeline Demo ===")
133+
134+
operations = ["validate", "clean", "transform", "analyze", "export"]
135+
result = await session.call_tool(
136+
"data_processing_tool", arguments={"dataset": "customer_data.csv", "operations": operations}
137+
)
138+
139+
if result.operation:
140+
token = result.operation.token
141+
print(f"Data processing started with token: {token}")
142+
143+
# Poll for completion
144+
while True:
145+
status = await session.get_operation_status(token)
146+
print(f"Status: {status.status}")
147+
148+
if status.status == "completed":
149+
final_result = await session.get_operation_result(token)
150+
151+
# Show structured result if available
152+
if final_result.result.structuredContent:
153+
print("Processing results:")
154+
for op, result_text in final_result.result.structuredContent.items():
155+
print(f" {op}: {result_text}")
156+
break
157+
elif status.status == "failed":
158+
print(f"Processing failed: {status.error}")
159+
break
160+
elif status.status in ("canceled", "unknown"):
161+
print(f"Processing ended with status: {status.status}")
162+
break
163+
164+
await asyncio.sleep(0.8)
165+
166+
167+
async def run():
168+
"""Run all async tool demonstrations."""
169+
# Determine protocol version from command line
170+
protocol_version = "next" # Default to next for async tools
171+
if len(sys.argv) > 1:
172+
if "--protocol=latest" in sys.argv:
173+
protocol_version = "2025-06-18" # Latest stable protocol
174+
elif "--protocol=next" in sys.argv:
175+
protocol_version = "next" # Development protocol version with async tools
176+
177+
print(f"Using protocol version: {protocol_version}")
178+
print()
179+
180+
async with stdio_client(server_params) as (read, write):
181+
# Use configured protocol version
182+
async with ClientSession(read, write, protocol_version=protocol_version) as session:
183+
# Initialize the connection
184+
await session.initialize()
185+
186+
# List available tools to see invocation modes
187+
tools = await session.list_tools()
188+
print("Available tools:")
189+
for tool in tools.tools:
190+
invocation_mode = getattr(tool, "invocationMode", "sync")
191+
print(f" - {tool.name}: {tool.description} (mode: {invocation_mode})")
192+
193+
# Demonstrate different tool types
194+
await demonstrate_sync_tool(session)
195+
await demonstrate_hybrid_tool(session)
196+
await demonstrate_async_tool(session)
197+
await demonstrate_batch_processing(session)
198+
await demonstrate_data_processing(session)
199+
200+
print("\n=== All demonstrations complete! ===")
201+
202+
203+
def main():
204+
"""Entry point for the async tools client."""
205+
if "--help" in sys.argv or "-h" in sys.argv:
206+
print("Usage: async-tools-client [--protocol=latest|next]")
207+
print()
208+
print("Protocol versions:")
209+
print(" --protocol=latest Use stable protocol (only sync/hybrid tools visible)")
210+
print(" --protocol=next Use development protocol (all async tools visible)")
211+
print()
212+
print("Default: --protocol=next")
213+
return
214+
215+
asyncio.run(run())
216+
217+
218+
if __name__ == "__main__":
219+
main()

examples/snippets/pyproject.toml

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,7 @@ name = "mcp-snippets"
33
version = "0.1.0"
44
description = "MCP Example Snippets"
55
requires-python = ">=3.10"
6-
dependencies = [
7-
"mcp",
8-
]
6+
dependencies = ["mcp"]
97

108
[build-system]
119
requires = ["setuptools", "wheel"]
@@ -21,3 +19,4 @@ completion-client = "clients.completion_client:main"
2119
direct-execution-server = "servers.direct_execution:main"
2220
display-utilities-client = "clients.display_utilities:main"
2321
oauth-client = "clients.oauth_client:run"
22+
async-tools-client = "clients.async_tools_client:main"

examples/snippets/servers/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ def run_server():
2222
print("Usage: server <server-name> [transport]")
2323
print("Available servers: basic_tool, basic_resource, basic_prompt, tool_progress,")
2424
print(" sampling, elicitation, completion, notifications,")
25-
print(" fastmcp_quickstart, structured_output, images")
25+
print(" fastmcp_quickstart, structured_output, images,")
26+
print(" async_tools_example")
2627
print("Available transports: stdio (default), sse, streamable-http")
2728
sys.exit(1)
2829

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
"""
2+
FastMCP async tools example showing different invocation modes.
3+
4+
cd to the `examples/snippets/clients` directory and run:
5+
uv run server async_tools stdio
6+
"""
7+
8+
import asyncio
9+
10+
from mcp.server.fastmcp import Context, FastMCP
11+
12+
# Create an MCP server with async operations support
13+
mcp = FastMCP("Async Tools Demo")
14+
15+
16+
@mcp.tool()
17+
def sync_tool(x: int) -> str:
18+
"""An implicitly-synchronous tool."""
19+
return f"Sync result: {x * 2}"
20+
21+
22+
@mcp.tool(invocation_modes=["async"])
23+
async def async_only_tool(data: str, ctx: Context) -> str: # type: ignore[type-arg]
24+
"""An async-only tool that takes time to complete."""
25+
await ctx.info("Starting long-running analysis...")
26+
27+
# Simulate long-running work with progress updates
28+
for i in range(5):
29+
await asyncio.sleep(0.5)
30+
progress = (i + 1) / 5
31+
await ctx.report_progress(progress, 1.0, f"Processing step {i + 1}/5")
32+
33+
await ctx.info("Analysis complete!")
34+
return f"Async analysis result for: {data}"
35+
36+
37+
@mcp.tool(invocation_modes=["sync", "async"])
38+
def hybrid_tool(message: str, ctx: Context | None = None) -> str: # type: ignore[type-arg]
39+
"""A hybrid tool that works both sync and async."""
40+
if ctx:
41+
# Async mode - we have context for progress reporting
42+
import asyncio
43+
44+
async def async_work():
45+
await ctx.info(f"Processing '{message}' asynchronously...")
46+
await asyncio.sleep(0.5) # Simulate some work
47+
await ctx.debug("Async processing complete")
48+
49+
# Run the async work (this is a bit of a hack for demo purposes)
50+
try:
51+
loop = asyncio.get_event_loop()
52+
loop.create_task(async_work())
53+
except RuntimeError:
54+
pass # No event loop running
55+
56+
# Both sync and async modes return the same result
57+
return f"Hybrid result: {message.upper()}"
58+
59+
60+
@mcp.tool(invocation_modes=["async"])
61+
async def data_processing_tool(dataset: str, operations: list[str], ctx: Context) -> dict[str, str]: # type: ignore[type-arg]
62+
"""Simulate a complex data processing pipeline."""
63+
await ctx.info(f"Starting data processing pipeline for {dataset}")
64+
65+
results: dict[str, str] = {}
66+
total_ops = len(operations)
67+
68+
for i, operation in enumerate(operations):
69+
await ctx.debug(f"Executing operation: {operation}")
70+
71+
# Simulate processing time
72+
processing_time = 0.5 + (i * 0.2) # Increasing complexity
73+
await asyncio.sleep(processing_time)
74+
75+
# Report progress
76+
progress = (i + 1) / total_ops
77+
await ctx.report_progress(progress, 1.0, f"Completed {operation}")
78+
79+
# Store result
80+
results[operation] = f"Result of {operation} on {dataset}"
81+
82+
await ctx.info("Data processing pipeline complete!")
83+
return results
84+
85+
86+
@mcp.tool(invocation_modes=["async"])
87+
async def file_analysis_tool(file_path: str, ctx: Context) -> str: # type: ignore[type-arg]
88+
"""Simulate file analysis with user interaction."""
89+
await ctx.info(f"Analyzing file: {file_path}")
90+
91+
# Simulate initial analysis
92+
await asyncio.sleep(1)
93+
await ctx.report_progress(0.3, 1.0, "Initial scan complete")
94+
95+
# Simulate finding an issue that requires user input
96+
await ctx.warning("Found potential security issue - requires user confirmation")
97+
98+
# In a real implementation, you would use ctx.elicit() here to ask the user
99+
# For this demo, we'll just simulate the decision
100+
await asyncio.sleep(0.5)
101+
await ctx.info("User confirmed - continuing analysis")
102+
103+
# Complete the analysis
104+
await asyncio.sleep(1)
105+
await ctx.report_progress(1.0, 1.0, "Analysis complete")
106+
107+
return f"File analysis complete for {file_path}. No issues found after user review."
108+
109+
110+
@mcp.tool(invocation_modes=["async"])
111+
async def batch_operation_tool(items: list[str], ctx: Context) -> list[str]: # type: ignore[type-arg]
112+
"""Process a batch of items with detailed progress reporting."""
113+
await ctx.info(f"Starting batch operation on {len(items)} items")
114+
115+
results: list[str] = []
116+
117+
for i, item in enumerate(items):
118+
await ctx.debug(f"Processing item {i + 1}: {item}")
119+
120+
# Simulate variable processing time
121+
processing_time = 0.2 + (len(item) * 0.1)
122+
await asyncio.sleep(processing_time)
123+
124+
# Report progress for this item
125+
progress = (i + 1) / len(items)
126+
await ctx.report_progress(progress, 1.0, f"Processed {i + 1}/{len(items)}: {item}")
127+
128+
# Process the item
129+
result = f"PROCESSED_{item.upper()}"
130+
results.append(result)
131+
132+
await ctx.debug(f"Item {i + 1} result: {result}")
133+
134+
await ctx.info(f"Batch operation complete! Processed {len(results)} items")
135+
return results
136+
137+
138+
if __name__ == "__main__":
139+
mcp.run()

0 commit comments

Comments
 (0)