Skip to content

Commit 42aceef

Browse files
committed
refactor(tasks): extract TaskManager from McpAsyncServer for modularity
This refactor splits most Task orchestration logic out of McpAsyncServer, enabling custom task lifecycle handlers and improving maintainability. Key changes: **TaskManager abstraction:** - TaskManager interface defines task lifecycle operations - DefaultTaskManager implements full task orchestration logic - NullTaskManager provides no-op implementation for serverless deployments - TaskManagerHost interface enables server-to-task-manager communication **Custom handler support:** - invokeCustomTaskHandler() allows tools to provide custom getTask/getTaskResult handlers - TaskAwareAsyncToolSpecification.getTaskHandler() for custom task status - TaskAwareAsyncToolSpecification.getTaskResultHandler() for custom results - Handlers integrate seamlessly with existing task infrastructure **Architecture improvements:** - RequestTaskStore manages task-to-request mapping for side-channeling - TaskManagerMessageProcessorAdapter bridges session and task manager - McpSessionMessageProcessor interface for decoupled message processing
1 parent ec28f0a commit 42aceef

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+4651
-2918
lines changed

mcp-core/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java

Lines changed: 121 additions & 709 deletions
Large diffs are not rendered by default.

mcp-core/src/main/java/io/modelcontextprotocol/client/McpClientFeatures.java

Lines changed: 22 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,22 @@
4949
*/
5050
class McpClientFeatures {
5151

52+
/**
53+
* Build the task capabilities based on the presence of sampling and elicitation
54+
* handlers.
55+
*/
56+
private static McpSchema.ClientCapabilities.ClientTaskCapabilities buildTaskCapabilities(boolean hasSamplingHandler,
57+
boolean hasElicitationHandler) {
58+
var builder = McpSchema.ClientCapabilities.ClientTaskCapabilities.builder().list().cancel();
59+
if (hasSamplingHandler) {
60+
builder.samplingCreateMessage();
61+
}
62+
if (hasElicitationHandler) {
63+
builder.elicitationCreate();
64+
}
65+
return builder.build();
66+
}
67+
5268
/**
5369
* Asynchronous client features specification providing the capabilities and request
5470
* and notification handlers.
@@ -110,7 +126,9 @@ public Async(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities c
110126
!Utils.isEmpty(roots) ? new McpSchema.ClientCapabilities.RootCapabilities(false) : null,
111127
samplingHandler != null ? new McpSchema.ClientCapabilities.Sampling() : null,
112128
elicitationHandler != null ? new McpSchema.ClientCapabilities.Elicitation() : null,
113-
taskStorePresent ? buildTaskCapabilities(samplingHandler, elicitationHandler) : null);
129+
taskStorePresent
130+
? buildTaskCapabilities(samplingHandler != null, elicitationHandler != null)
131+
: null);
114132
this.roots = roots != null ? new ConcurrentHashMap<>(roots) : new ConcurrentHashMap<>();
115133

116134
this.toolsChangeConsumers = toolsChangeConsumers != null ? toolsChangeConsumers : List.of();
@@ -127,23 +145,6 @@ public Async(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities c
127145
this.taskStorePresent = taskStorePresent;
128146
}
129147

130-
/**
131-
* Build the task capabilities based on the presence of sampling and elicitation
132-
* handlers.
133-
*/
134-
private static McpSchema.ClientCapabilities.ClientTaskCapabilities buildTaskCapabilities(
135-
Function<McpSchema.CreateMessageRequest, Mono<McpSchema.CreateMessageResult>> samplingHandler,
136-
Function<McpSchema.ElicitRequest, Mono<McpSchema.ElicitResult>> elicitationHandler) {
137-
var builder = McpSchema.ClientCapabilities.ClientTaskCapabilities.builder().list().cancel();
138-
if (samplingHandler != null) {
139-
builder.samplingCreateMessage();
140-
}
141-
if (elicitationHandler != null) {
142-
builder.elicitationCreate();
143-
}
144-
return builder.build();
145-
}
146-
147148
/**
148149
* @deprecated Only exists for backwards-compatibility purposes.
149150
*/
@@ -293,7 +294,9 @@ public Sync(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities cl
293294
!Utils.isEmpty(roots) ? new McpSchema.ClientCapabilities.RootCapabilities(false) : null,
294295
samplingHandler != null ? new McpSchema.ClientCapabilities.Sampling() : null,
295296
elicitationHandler != null ? new McpSchema.ClientCapabilities.Elicitation() : null,
296-
taskStorePresent ? buildTaskCapabilities(samplingHandler, elicitationHandler) : null);
297+
taskStorePresent
298+
? buildTaskCapabilities(samplingHandler != null, elicitationHandler != null)
299+
: null);
297300
this.roots = roots != null ? new HashMap<>(roots) : new HashMap<>();
298301

299302
this.toolsChangeConsumers = toolsChangeConsumers != null ? toolsChangeConsumers : List.of();
@@ -310,23 +313,6 @@ public Sync(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities cl
310313
this.taskStorePresent = taskStorePresent;
311314
}
312315

313-
/**
314-
* Build the task capabilities based on the presence of sampling and elicitation
315-
* handlers.
316-
*/
317-
private static McpSchema.ClientCapabilities.ClientTaskCapabilities buildTaskCapabilities(
318-
Function<McpSchema.CreateMessageRequest, McpSchema.CreateMessageResult> samplingHandler,
319-
Function<McpSchema.ElicitRequest, McpSchema.ElicitResult> elicitationHandler) {
320-
var builder = McpSchema.ClientCapabilities.ClientTaskCapabilities.builder().list().cancel();
321-
if (samplingHandler != null) {
322-
builder.samplingCreateMessage();
323-
}
324-
if (elicitationHandler != null) {
325-
builder.elicitationCreate();
326-
}
327-
return builder.build();
328-
}
329-
330316
/**
331317
* @deprecated Only exists for backwards-compatibility purposes.
332318
*/

mcp-core/src/main/java/io/modelcontextprotocol/client/McpSyncClient.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.slf4j.LoggerFactory;
1414

1515
import io.modelcontextprotocol.common.McpTransportContext;
16+
import io.modelcontextprotocol.experimental.tasks.TaskManager;
1617
import io.modelcontextprotocol.experimental.tasks.TaskStore;
1718
import io.modelcontextprotocol.json.TypeRef;
1819
import io.modelcontextprotocol.spec.McpSchema;
@@ -508,6 +509,23 @@ public TaskStore<McpSchema.ClientTaskPayloadResult> getTaskStore() {
508509
return this.delegate.getTaskStore();
509510
}
510511

512+
/**
513+
* Returns the task manager for task orchestration operations.
514+
* <p>
515+
* The task manager provides the outbound API for interacting with server-hosted
516+
* tasks, including streaming task results, getting task status, and cancelling tasks.
517+
* It also manages task lifecycle operations and message queuing for side-channel
518+
* communication.
519+
* <p>
520+
* <strong>Warning:</strong> This is an experimental API that may change in future
521+
* releases. Use with caution in production environments.
522+
* @return the task manager (never null; returns NullTaskManager if task support is
523+
* not configured)
524+
*/
525+
public TaskManager taskManager() {
526+
return this.delegate.taskManager();
527+
}
528+
511529
/**
512530
* Retrieves a task previously initiated by the client with the server.
513531
*
Lines changed: 243 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,243 @@
1+
/*
2+
* Copyright 2024-2026 the original author or authors.
3+
*/
4+
5+
package io.modelcontextprotocol.experimental.tasks;
6+
7+
import java.time.Duration;
8+
import java.util.function.BiFunction;
9+
10+
import io.modelcontextprotocol.spec.McpError;
11+
import io.modelcontextprotocol.spec.McpSchema;
12+
import io.modelcontextprotocol.spec.McpSchema.ErrorCodes;
13+
import io.modelcontextprotocol.spec.McpSchema.Notification;
14+
import io.modelcontextprotocol.spec.McpSchema.Request;
15+
import io.modelcontextprotocol.spec.McpSchema.Result;
16+
import org.slf4j.Logger;
17+
import org.slf4j.LoggerFactory;
18+
import reactor.core.publisher.Mono;
19+
20+
/**
21+
* Shared base class for task handler implementations on both client and server sides.
22+
*
23+
* <p>
24+
* This class encapsulates the common task lifecycle logic that is identical between
25+
* {@code ServerTaskToolHandler} and {@code ClientTaskHandler}:
26+
* <ul>
27+
* <li>TaskManager creation (DefaultTaskManager vs NullTaskManager) and binding</li>
28+
* <li>Handler registration via {@link TaskHandlerRegistry}</li>
29+
* <li>Custom task handler invocation with validation and error handling</li>
30+
* <li>Task store session validation</li>
31+
* <li>Close and graceful close lifecycle</li>
32+
* <li>Accessor methods for TaskStore and TaskManager</li>
33+
* </ul>
34+
*
35+
* <p>
36+
* Subclasses must implement the transport-specific methods {@link #request} and
37+
* {@link #notification} which differ between client (delegates to session) and server
38+
* (broadcasts to all clients or errors).
39+
*
40+
* <p>
41+
* The {@link #findAndInvokeCustomHandler} method is an overridable hook that defaults to
42+
* returning empty. The server overrides this to look up tool-specific custom handlers by
43+
* tool name from the originating request.
44+
*
45+
* <p>
46+
* This is an experimental API that may change in future releases.
47+
*
48+
* @param <S> the task store payload type (e.g., ServerTaskPayloadResult or
49+
* ClientTaskPayloadResult)
50+
* @see TaskManagerHost
51+
* @see TaskManager
52+
*/
53+
abstract class AbstractTaskHandler<S extends Result> implements TaskManagerHost {
54+
55+
private static final Logger logger = LoggerFactory.getLogger(AbstractTaskHandler.class);
56+
57+
/**
58+
* Task store for managing long-running tasks. May be null if tasks are not
59+
* configured.
60+
*/
61+
protected final TaskStore<S> taskStore;
62+
63+
/**
64+
* Task manager for task orchestration. Handles task lifecycle operations, message
65+
* queuing, and handler registration.
66+
*/
67+
protected final TaskManager taskManager;
68+
69+
/**
70+
* Registry for task handlers registered by TaskManager during bind(). These are
71+
* adapted to the appropriate handler type by subclasses.
72+
*/
73+
protected final TaskHandlerRegistry taskHandlerRegistry = new TaskHandlerRegistry();
74+
75+
/**
76+
* Creates a new AbstractTaskHandler.
77+
* @param taskStore the task store for managing task state, or null if tasks are not
78+
* configured
79+
* @param taskOptions the task manager options, or null if tasks are not configured
80+
*/
81+
protected AbstractTaskHandler(TaskStore<S> taskStore, TaskManagerOptions taskOptions) {
82+
this.taskStore = taskStore;
83+
84+
// Initialize TaskManager based on whether TaskOptions are configured
85+
if (taskOptions != null && taskStore != null) {
86+
this.taskManager = new DefaultTaskManager(taskOptions);
87+
}
88+
else {
89+
this.taskManager = NullTaskManager.getInstance();
90+
}
91+
92+
// Bind the TaskManager to this handler so it can register handlers and send
93+
// requests
94+
this.taskManager.bind(this);
95+
}
96+
97+
// --------------------------
98+
// TaskManagerHost Implementation
99+
// --------------------------
100+
101+
@Override
102+
public void registerHandler(String method, TaskRequestHandler handler) {
103+
this.taskHandlerRegistry.registerHandler(method, handler);
104+
}
105+
106+
@Override
107+
public <T extends Result> Mono<T> invokeCustomTaskHandler(String taskId, String method, Request request,
108+
TaskHandlerContext context, Class<T> resultType) {
109+
if (this.taskStore == null) {
110+
return Mono.empty();
111+
}
112+
return getTaskWithSessionValidation(taskId, context.sessionId())
113+
.flatMap(storeResult -> findAndInvokeCustomHandler(storeResult, method, request, context, resultType))
114+
.onErrorResume(e -> {
115+
logger.debug("invokeCustomTaskHandler: task lookup failed for taskId={}, returning empty", taskId, e);
116+
return Mono.empty();
117+
});
118+
}
119+
120+
/**
121+
* Overridable hook for subclass-specific custom handler lookup. The server overrides
122+
* this to look up tool-specific custom handlers by tool name from the originating
123+
* request. The client inherits the default which returns empty.
124+
* @param <T> the result type
125+
* @param storeResult the task store result containing the task and originating
126+
* request
127+
* @param method the request method (e.g., METHOD_TASKS_GET or METHOD_TASKS_RESULT)
128+
* @param request the original request object
129+
* @param context the handler context with session information
130+
* @param resultType the expected result type class
131+
* @return a Mono emitting the handler result, or empty if no custom handler found
132+
*/
133+
protected <T extends Result> Mono<T> findAndInvokeCustomHandler(GetTaskFromStoreResult storeResult, String method,
134+
Request request, TaskHandlerContext context, Class<T> resultType) {
135+
return Mono.empty();
136+
}
137+
138+
// --------------------------
139+
// Session Validation
140+
// --------------------------
141+
142+
/**
143+
* Validates that a task exists and is accessible for the given session.
144+
* @param taskId the task identifier
145+
* @param sessionId the session ID for validation
146+
* @return a Mono emitting the task store result, or error if not found
147+
*/
148+
protected Mono<GetTaskFromStoreResult> getTaskWithSessionValidation(String taskId, String sessionId) {
149+
return this.taskStore.getTask(taskId, sessionId)
150+
.switchIfEmpty(Mono.error(McpError.builder(ErrorCodes.INVALID_PARAMS)
151+
.message("Task not found (may have expired after TTL)")
152+
.data("Task ID: " + taskId)
153+
.build()));
154+
}
155+
156+
// --------------------------
157+
// Handler Context Factory
158+
// --------------------------
159+
160+
/**
161+
* Creates a {@link TaskHandlerContext} that provides session identification and
162+
* message routing capabilities to task request handlers.
163+
*
164+
* <p>
165+
* Both client and server need to create context objects when adapting
166+
* {@link TaskRequestHandler} instances to their respective request handler types.
167+
* This factory centralizes that creation to avoid duplicating the anonymous class
168+
* implementation.
169+
* @param sessionId the session ID for the requesting client, or null if not
170+
* applicable (e.g., client-side handlers)
171+
* @param requestSender sends a request to the remote party; accepts the method name
172+
* and params, returns a Mono of the result
173+
* @param notificationSender sends a notification to the remote party; accepts the
174+
* method name and notification object
175+
* @return a new TaskHandlerContext backed by the provided functions
176+
*/
177+
protected static TaskHandlerContext createTaskHandlerContext(String sessionId,
178+
BiFunction<String, Object, Mono<? extends Result>> requestSender,
179+
BiFunction<String, Notification, Mono<Void>> notificationSender) {
180+
return new TaskHandlerContext() {
181+
@Override
182+
public String sessionId() {
183+
return sessionId;
184+
}
185+
186+
@Override
187+
@SuppressWarnings("unchecked")
188+
public <R extends Result> Mono<R> sendRequest(String reqMethod, Object reqParams, Class<R> resultType) {
189+
return (Mono<R>) requestSender.apply(reqMethod, reqParams);
190+
}
191+
192+
@Override
193+
public Mono<Void> sendNotification(String notifMethod, Notification notification) {
194+
return notificationSender.apply(notifMethod, notification);
195+
}
196+
};
197+
}
198+
199+
// --------------------------
200+
// Lifecycle
201+
// --------------------------
202+
203+
/**
204+
* Cleanup on immediate close. Shuts down the TaskManager and TaskStore.
205+
*/
206+
public void close() {
207+
this.taskManager.onClose();
208+
if (this.taskStore != null) {
209+
this.taskStore.shutdown().block(Duration.ofSeconds(TaskDefaults.TASK_STORE_SHUTDOWN_TIMEOUT_SECONDS));
210+
}
211+
}
212+
213+
/**
214+
* Cleanup on graceful close.
215+
* @return a Mono that completes when cleanup is done
216+
*/
217+
public Mono<Void> closeGracefully() {
218+
this.taskManager.onClose();
219+
return this.taskStore != null ? this.taskStore.shutdown() : Mono.empty();
220+
}
221+
222+
// --------------------------
223+
// Accessors
224+
// --------------------------
225+
226+
/**
227+
* Get the task store used for managing long-running tasks.
228+
* @return the task store, or null if tasks are not configured
229+
*/
230+
public TaskStore<S> getTaskStore() {
231+
return this.taskStore;
232+
}
233+
234+
/**
235+
* Returns the task manager for task orchestration operations.
236+
* @return the task manager (never null; returns NullTaskManager if task support is
237+
* not configured)
238+
*/
239+
public TaskManager taskManager() {
240+
return this.taskManager;
241+
}
242+
243+
}

0 commit comments

Comments
 (0)