Skip to content

Commit 5b7026e

Browse files
committed
Implement SEP-1686: Tasks
1 parent 6f2cd0c commit 5b7026e

File tree

15 files changed

+2217
-35
lines changed

15 files changed

+2217
-35
lines changed

examples/shared/__init__.py

Whitespace-only changes.
Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
"""
2+
In-memory implementation of TaskStore for demonstration purposes.
3+
4+
This implementation stores all tasks in memory and provides automatic cleanup
5+
based on the keepAlive duration specified in the task metadata.
6+
7+
Note: This is not suitable for production use as all data is lost on restart.
8+
For production, consider implementing TaskStore with a database or distributed cache.
9+
"""
10+
11+
import asyncio
12+
from dataclasses import dataclass
13+
from typing import Any
14+
15+
from mcp.shared.task import TaskStatus, TaskStore, is_terminal
16+
from mcp.types import Request, RequestId, Result, Task, TaskMetadata
17+
18+
19+
@dataclass
20+
class StoredTask:
21+
"""Internal storage representation of a task."""
22+
23+
task: Task
24+
request: Request[Any, Any]
25+
request_id: RequestId
26+
result: Result | None = None
27+
28+
29+
class InMemoryTaskStore(TaskStore):
30+
"""
31+
A simple in-memory implementation of TaskStore for demonstration purposes.
32+
33+
This implementation stores all tasks in memory and provides automatic cleanup
34+
based on the keepAlive duration specified in the task metadata.
35+
36+
Note: This is not suitable for production use as all data is lost on restart.
37+
For production, consider implementing TaskStore with a database or distributed cache.
38+
"""
39+
40+
def __init__(self) -> None:
41+
self._tasks: dict[str, StoredTask] = {}
42+
self._cleanup_tasks: dict[str, asyncio.Task[None]] = {}
43+
44+
async def create_task(self, task: TaskMetadata, request_id: RequestId, request: Request[Any, Any]) -> None:
45+
"""Create a new task with the given metadata and original request."""
46+
task_id = task.taskId
47+
48+
if task_id in self._tasks:
49+
raise ValueError(f"Task with ID {task_id} already exists")
50+
51+
task_obj = Task(
52+
taskId=task_id,
53+
status="submitted",
54+
keepAlive=task.keepAlive,
55+
pollFrequency=500, # Default 500ms poll frequency
56+
)
57+
58+
self._tasks[task_id] = StoredTask(task=task_obj, request=request, request_id=request_id)
59+
60+
# Schedule cleanup if keepAlive is specified
61+
if task.keepAlive is not None:
62+
self._schedule_cleanup(task_id, task.keepAlive / 1000.0)
63+
64+
async def get_task(self, task_id: str) -> Task | None:
65+
"""Get the current status of a task."""
66+
stored = self._tasks.get(task_id)
67+
if stored is None:
68+
return None
69+
70+
# Return a copy to prevent external modification
71+
return Task(**stored.task.model_dump())
72+
73+
async def store_task_result(self, task_id: str, result: Result) -> None:
74+
"""Store the result of a completed task."""
75+
stored = self._tasks.get(task_id)
76+
if stored is None:
77+
raise ValueError(f"Task with ID {task_id} not found")
78+
79+
stored.result = result
80+
stored.task.status = "completed"
81+
82+
# Reset cleanup timer to start from now (if keepAlive is set)
83+
if stored.task.keepAlive is not None:
84+
self._cancel_cleanup(task_id)
85+
self._schedule_cleanup(task_id, stored.task.keepAlive / 1000.0)
86+
87+
async def get_task_result(self, task_id: str) -> Result:
88+
"""Retrieve the stored result of a task."""
89+
stored = self._tasks.get(task_id)
90+
if stored is None:
91+
raise ValueError(f"Task with ID {task_id} not found")
92+
93+
if stored.result is None:
94+
raise ValueError(f"Task {task_id} has no result stored")
95+
96+
return stored.result
97+
98+
async def update_task_status(self, task_id: str, status: TaskStatus, error: str | None = None) -> None:
99+
"""Update a task's status."""
100+
stored = self._tasks.get(task_id)
101+
if stored is None:
102+
raise ValueError(f"Task with ID {task_id} not found")
103+
104+
stored.task.status = status
105+
if error is not None:
106+
stored.task.error = error
107+
108+
# If task is in a terminal state and has keepAlive, start cleanup timer
109+
if is_terminal(status) and stored.task.keepAlive is not None:
110+
self._cancel_cleanup(task_id)
111+
self._schedule_cleanup(task_id, stored.task.keepAlive / 1000.0)
112+
113+
async def list_tasks(self, cursor: str | None = None) -> dict[str, Any]:
114+
"""
115+
List tasks, optionally starting from a pagination cursor.
116+
117+
Returns a dict with 'tasks' list and optional 'nextCursor' string.
118+
"""
119+
PAGE_SIZE = 10
120+
all_task_ids = list(self._tasks.keys())
121+
122+
start_index = 0
123+
if cursor is not None:
124+
try:
125+
cursor_index = all_task_ids.index(cursor)
126+
start_index = cursor_index + 1
127+
except ValueError:
128+
raise ValueError(f"Invalid cursor: {cursor}")
129+
130+
page_task_ids = all_task_ids[start_index : start_index + PAGE_SIZE]
131+
tasks = [Task(**self._tasks[tid].task.model_dump()) for tid in page_task_ids]
132+
133+
next_cursor = page_task_ids[-1] if start_index + PAGE_SIZE < len(all_task_ids) and page_task_ids else None
134+
135+
return {"tasks": tasks, "nextCursor": next_cursor}
136+
137+
def _schedule_cleanup(self, task_id: str, delay_seconds: float) -> None:
138+
"""Schedule automatic cleanup of a task after the specified delay."""
139+
140+
async def cleanup() -> None:
141+
await asyncio.sleep(delay_seconds)
142+
self._tasks.pop(task_id, None)
143+
self._cleanup_tasks.pop(task_id, None)
144+
145+
task = asyncio.create_task(cleanup())
146+
self._cleanup_tasks[task_id] = task
147+
148+
def _cancel_cleanup(self, task_id: str) -> None:
149+
"""Cancel any scheduled cleanup for a task."""
150+
cleanup_task = self._cleanup_tasks.pop(task_id, None)
151+
if cleanup_task is not None and not cleanup_task.done():
152+
cleanup_task.cancel()
153+
154+
def cleanup(self) -> None:
155+
"""Cleanup all timers and tasks (useful for testing or graceful shutdown)."""
156+
for task in self._cleanup_tasks.values():
157+
if not task.done():
158+
task.cancel()
159+
self._cleanup_tasks.clear()
160+
self._tasks.clear()
161+
162+
def get_all_tasks(self) -> list[Task]:
163+
"""Get all tasks (useful for debugging). Returns copies to prevent modification."""
164+
return [Task(**stored.task.model_dump()) for stored in self._tasks.values()]

examples/snippets/servers/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ def run_server():
2222
print("Usage: server <server-name> [transport]")
2323
print("Available servers: basic_tool, basic_resource, basic_prompt, tool_progress,")
2424
print(" sampling, elicitation, completion, notifications,")
25-
print(" fastmcp_quickstart, structured_output, images")
25+
print(" fastmcp_quickstart, structured_output, images, task_based_tool")
2626
print("Available transports: stdio (default), sse, streamable-http")
2727
sys.exit(1)
2828

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
"""Example server demonstrating task-based execution with long-running tools."""
2+
3+
import asyncio
4+
5+
from examples.shared.in_memory_task_store import InMemoryTaskStore
6+
from mcp.server.fastmcp import FastMCP
7+
8+
# Create a task store to enable task-based execution
9+
task_store = InMemoryTaskStore()
10+
mcp = FastMCP(name="Task-Based Tool Example", task_store=task_store)
11+
12+
13+
@mcp.tool()
14+
async def long_running_computation(data: str, delay_seconds: float = 2.0) -> str:
15+
"""
16+
Simulate a long-running computation that benefits from task-based execution.
17+
18+
This tool demonstrates the 'call-now, fetch-later' pattern where clients can:
19+
1. Initiate the task without waiting
20+
2. Disconnect and reconnect later
21+
3. Poll for status and retrieve results when ready
22+
23+
Args:
24+
data: Input data to process
25+
delay_seconds: Simulated processing time
26+
"""
27+
# Simulate long-running work
28+
await asyncio.sleep(delay_seconds)
29+
30+
# Return processed result
31+
result = f"Processed: {data.upper()} (took {delay_seconds}s)"
32+
return result

0 commit comments

Comments
 (0)