Skip to content

Commit 9c535a7

Browse files
committed
Simplify simple-task server example using new task API
- Remove manual AppContext, lifespan, TaskGroup, InMemoryTaskStore - Remove manual get_task and get_task_result handlers - Use enable_tasks() for one-line setup - Use run_task(work) for automatic task lifecycle 128 lines → 73 lines, same functionality.
1 parent 800b409 commit 9c535a7

File tree

1 file changed

+15
-70
lines changed
  • examples/servers/simple-task/mcp_simple_task

1 file changed

+15
-70
lines changed

examples/servers/simple-task/mcp_simple_task/server.py

Lines changed: 15 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -2,37 +2,22 @@
22

33
from collections.abc import AsyncIterator
44
from contextlib import asynccontextmanager
5-
from dataclasses import dataclass
65
from typing import Any
76

87
import anyio
98
import click
109
import mcp.types as types
1110
import uvicorn
12-
from anyio.abc import TaskGroup
11+
from mcp.server.experimental.task_context import ServerTaskContext
1312
from mcp.server.lowlevel import Server
1413
from mcp.server.streamable_http_manager import StreamableHTTPSessionManager
15-
from mcp.shared.experimental.tasks.helpers import task_execution
16-
from mcp.shared.experimental.tasks.in_memory_task_store import InMemoryTaskStore
1714
from starlette.applications import Starlette
1815
from starlette.routing import Mount
1916

17+
server = Server("simple-task-server")
2018

21-
@dataclass
22-
class AppContext:
23-
task_group: TaskGroup
24-
store: InMemoryTaskStore
25-
26-
27-
@asynccontextmanager
28-
async def lifespan(server: Server[AppContext, Any]) -> AsyncIterator[AppContext]:
29-
store = InMemoryTaskStore()
30-
async with anyio.create_task_group() as tg:
31-
yield AppContext(task_group=tg, store=store)
32-
store.cleanup()
33-
34-
35-
server: Server[AppContext, Any] = Server("simple-task-server", lifespan=lifespan)
19+
# One-line setup: auto-registers get_task, get_task_result, list_tasks, cancel_task
20+
server.experimental.enable_tasks()
3621

3722

3823
@server.list_tools()
@@ -50,61 +35,21 @@ async def list_tools() -> list[types.Tool]:
5035
@server.call_tool()
5136
async def handle_call_tool(name: str, arguments: dict[str, Any]) -> list[types.TextContent] | types.CreateTaskResult:
5237
ctx = server.request_context
53-
app = ctx.lifespan_context
54-
55-
# Validate task mode - raises McpError(-32601) if client didn't use task augmentation
5638
ctx.experimental.validate_task_mode(types.TASK_REQUIRED)
5739

58-
# Create the task
59-
metadata = ctx.experimental.task_metadata
60-
assert metadata is not None
61-
task = await app.store.create_task(metadata)
62-
63-
# Spawn background work
64-
async def do_work() -> None:
65-
async with task_execution(task.taskId, app.store) as task_ctx:
66-
await task_ctx.update_status("Starting work...")
67-
await anyio.sleep(1)
68-
69-
await task_ctx.update_status("Processing step 1...")
70-
await anyio.sleep(1)
71-
72-
await task_ctx.update_status("Processing step 2...")
73-
await anyio.sleep(1)
74-
75-
await task_ctx.complete(
76-
types.CallToolResult(content=[types.TextContent(type="text", text="Task completed!")])
77-
)
78-
79-
app.task_group.start_soon(do_work)
80-
return types.CreateTaskResult(task=task)
81-
82-
83-
@server.experimental.get_task()
84-
async def handle_get_task(request: types.GetTaskRequest) -> types.GetTaskResult:
85-
app = server.request_context.lifespan_context
86-
task = await app.store.get_task(request.params.taskId)
87-
if task is None:
88-
raise ValueError(f"Task {request.params.taskId} not found")
89-
return types.GetTaskResult(
90-
taskId=task.taskId,
91-
status=task.status,
92-
statusMessage=task.statusMessage,
93-
createdAt=task.createdAt,
94-
lastUpdatedAt=task.lastUpdatedAt,
95-
ttl=task.ttl,
96-
pollInterval=task.pollInterval,
97-
)
40+
async def work(task: ServerTaskContext) -> types.CallToolResult:
41+
await task.update_status("Starting work...")
42+
await anyio.sleep(1)
43+
44+
await task.update_status("Processing step 1...")
45+
await anyio.sleep(1)
46+
47+
await task.update_status("Processing step 2...")
48+
await anyio.sleep(1)
9849

50+
return types.CallToolResult(content=[types.TextContent(type="text", text="Task completed!")])
9951

100-
@server.experimental.get_task_result()
101-
async def handle_get_task_result(request: types.GetTaskPayloadRequest) -> types.GetTaskPayloadResult:
102-
app = server.request_context.lifespan_context
103-
result = await app.store.get_result(request.params.taskId)
104-
if result is None:
105-
raise ValueError(f"Result for task {request.params.taskId} not found")
106-
assert isinstance(result, types.CallToolResult)
107-
return types.GetTaskPayloadResult(**result.model_dump())
52+
return await ctx.experimental.run_task(work)
10853

10954

11055
@click.command()

0 commit comments

Comments
 (0)