Skip to content

Commit ec28f0a

Browse files
committed
fix: add related-task metadata to task-related messages
1 parent 20eb848 commit ec28f0a

File tree

4 files changed

+360
-6
lines changed

4 files changed

+360
-6
lines changed

mcp-core/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java

Lines changed: 51 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -661,7 +661,7 @@ private RequestHandler<McpSchema.Result> samplingCreateMessageHandler() {
661661
}
662662

663663
// Non-task-augmented request - execute directly
664-
return this.samplingHandler.apply(request).map(result -> (McpSchema.Result) result);
664+
return this.samplingHandler.apply(request).map(result -> processClientResult(request.meta(), result));
665665
};
666666
}
667667

@@ -681,10 +681,59 @@ private RequestHandler<McpSchema.Result> elicitationCreateHandler() {
681681
}
682682

683683
// Non-task-augmented request - execute directly
684-
return this.elicitationHandler.apply(request).map(result -> (McpSchema.Result) result);
684+
return this.elicitationHandler.apply(request).map(result -> processClientResult(request.meta(), result));
685685
};
686686
}
687687

688+
/**
689+
* Processes a client result before returning it to the server. Echoes related-task
690+
* metadata from the request to the response, which is necessary for the server to
691+
* associate elicitation/sampling responses with their originating task during
692+
* side-channeling.
693+
* @param requestMeta the request's _meta field
694+
* @param result the handler's result
695+
* @return the processed result with related-task metadata echoed (if present in
696+
* request)
697+
*/
698+
private McpSchema.Result processClientResult(Map<String, Object> requestMeta, McpSchema.Result result) {
699+
if (requestMeta == null || !requestMeta.containsKey(McpSchema.RELATED_TASK_META_KEY)) {
700+
return result;
701+
}
702+
703+
Object relatedTask = requestMeta.get(McpSchema.RELATED_TASK_META_KEY);
704+
Map<String, Object> newMeta = mergeRelatedTaskMetadata(relatedTask, result.meta());
705+
706+
// Client-side task payloads are ElicitResult or CreateMessageResult
707+
// (per ClientTaskPayloadResult sealed interface)
708+
if (result instanceof McpSchema.ElicitResult elicitResult) {
709+
return new McpSchema.ElicitResult(elicitResult.action(), elicitResult.content(), newMeta);
710+
}
711+
else if (result instanceof McpSchema.CreateMessageResult messageResult) {
712+
return new McpSchema.CreateMessageResult(messageResult.role(), messageResult.content(),
713+
messageResult.model(), messageResult.stopReason(), newMeta);
714+
}
715+
716+
// For other result types, return as-is (shouldn't happen for client-side task
717+
// payloads)
718+
return result;
719+
}
720+
721+
/**
722+
* Merges related-task metadata with existing metadata.
723+
* @param relatedTask the related-task object to include
724+
* @param existingMeta the existing metadata (may be null)
725+
* @return a new map containing both the related-task metadata and any existing
726+
* metadata
727+
*/
728+
private Map<String, Object> mergeRelatedTaskMetadata(Object relatedTask, Map<String, Object> existingMeta) {
729+
Map<String, Object> newMeta = new HashMap<>();
730+
newMeta.put(McpSchema.RELATED_TASK_META_KEY, relatedTask);
731+
if (existingMeta != null) {
732+
newMeta.putAll(existingMeta);
733+
}
734+
return newMeta;
735+
}
736+
688737
// --------------------------
689738
// Client-Side Task Hosting
690739
// --------------------------

mcp-core/src/main/java/io/modelcontextprotocol/server/McpAsyncServer.java

Lines changed: 74 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1664,7 +1664,7 @@ private Mono<Void> processMessage(McpAsyncServerExchange exchange, QueuedMessage
16641664

16651665
// Handle Notification messages (no response expected)
16661666
if (msg instanceof QueuedMessage.Notification notif) {
1667-
return sendNotificationToClient(exchange, notif);
1667+
return sendNotificationToClient(exchange, notif, taskId);
16681668
}
16691669

16701670
// Response messages should never be returned by dequeue() - but handle gracefully
@@ -1711,8 +1711,58 @@ private TypeRef<? extends McpSchema.Result> getResultTypeRef(String method) {
17111711
/**
17121712
* Sends a notification to the client without waiting for a response.
17131713
*/
1714-
private Mono<Void> sendNotificationToClient(McpAsyncServerExchange exchange, QueuedMessage.Notification notif) {
1715-
return exchange.getSession().sendNotification(notif.method(), notif.notification());
1714+
private Mono<Void> sendNotificationToClient(McpAsyncServerExchange exchange, QueuedMessage.Notification notif,
1715+
String taskId) {
1716+
McpSchema.Notification notification = addRelatedTaskMetadataToNotification(taskId, notif.notification());
1717+
return exchange.getSession().sendNotification(notif.method(), notification);
1718+
}
1719+
1720+
/**
1721+
* Adds related-task metadata to a notification. Task status notifications are
1722+
* excluded as they already contain the taskId in their params.
1723+
* @param taskId the task ID to include in the metadata
1724+
* @param notification the notification to augment
1725+
* @return the notification with related-task metadata added
1726+
*/
1727+
private McpSchema.Notification addRelatedTaskMetadataToNotification(String taskId,
1728+
McpSchema.Notification notification) {
1729+
// Handle all Notification subtypes (sealed interface guarantees exhaustiveness)
1730+
if (notification instanceof McpSchema.TaskStatusNotification tsn) {
1731+
// Already has taskId in params - spec says SHOULD NOT include metadata
1732+
return tsn;
1733+
}
1734+
else if (notification instanceof McpSchema.ProgressNotification pn) {
1735+
Map<String, Object> newMeta = mergeRelatedTaskMetadata(taskId, pn.meta());
1736+
return new McpSchema.ProgressNotification(pn.progressToken(), pn.progress(), pn.total(), pn.message(),
1737+
newMeta);
1738+
}
1739+
else if (notification instanceof McpSchema.LoggingMessageNotification ln) {
1740+
Map<String, Object> newMeta = mergeRelatedTaskMetadata(taskId, ln.meta());
1741+
return new McpSchema.LoggingMessageNotification(ln.level(), ln.logger(), ln.data(), newMeta);
1742+
}
1743+
else if (notification instanceof McpSchema.ResourcesUpdatedNotification rn) {
1744+
Map<String, Object> newMeta = mergeRelatedTaskMetadata(taskId, rn.meta());
1745+
return new McpSchema.ResourcesUpdatedNotification(rn.uri(), newMeta);
1746+
}
1747+
1748+
// This should never happen due to sealed interface, but satisfies compiler
1749+
throw new IllegalStateException("Unexpected notification type: " + notification.getClass().getName());
1750+
}
1751+
1752+
/**
1753+
* Merges related-task metadata with existing metadata.
1754+
* @param taskId the task ID to include
1755+
* @param existingMeta the existing metadata (may be null)
1756+
* @return a new map containing both the related-task metadata and any existing
1757+
* metadata
1758+
*/
1759+
private Map<String, Object> mergeRelatedTaskMetadata(String taskId, Map<String, Object> existingMeta) {
1760+
Map<String, Object> newMeta = new HashMap<>();
1761+
newMeta.put(McpSchema.RELATED_TASK_META_KEY, Map.of("taskId", taskId));
1762+
if (existingMeta != null) {
1763+
newMeta.putAll(existingMeta);
1764+
}
1765+
return newMeta;
17161766
}
17171767

17181768
/**
@@ -1750,13 +1800,33 @@ private Mono<McpSchema.Result> pollAndProcessUntilTerminal(McpAsyncServerExchang
17501800
@SuppressWarnings("unchecked")
17511801
private Mono<McpSchema.Result> fetchTaskResult(String taskId, String sessionId) {
17521802
return this.taskStore.getTaskResult(taskId, sessionId)
1753-
.map(result -> (McpSchema.Result) result)
1803+
.map(result -> addRelatedTaskMetadata(taskId, (McpSchema.Result) result))
17541804
.switchIfEmpty(Mono.error(McpError.builder(ErrorCodes.INVALID_PARAMS)
17551805
.message("Task result not available")
17561806
.data("Task ID: " + taskId)
17571807
.build()));
17581808
}
17591809

1810+
/**
1811+
* Adds the related-task metadata to a server task result. The tasks/result operation
1812+
* MUST include this metadata in its response, as the result structure itself does not
1813+
* contain the task ID.
1814+
* @param taskId the task ID to include in the metadata
1815+
* @param result the result to add metadata to
1816+
* @return the result with related-task metadata added
1817+
*/
1818+
private McpSchema.Result addRelatedTaskMetadata(String taskId, McpSchema.Result result) {
1819+
// Server-side tasks only produce CallToolResult (per ServerTaskPayloadResult
1820+
// sealed interface)
1821+
if (result instanceof McpSchema.CallToolResult ctr) {
1822+
Map<String, Object> newMeta = mergeRelatedTaskMetadata(taskId, ctr.meta());
1823+
return new McpSchema.CallToolResult(ctr.content(), ctr.isError(), ctr.structuredContent(), newMeta);
1824+
}
1825+
1826+
// For non-task results (e.g., direct tool calls), return as-is
1827+
return result;
1828+
}
1829+
17601830
/**
17611831
* Returns the effective automatic polling timeout, using the configured value or the
17621832
* default if not configured.

mcp-core/src/test/java/io/modelcontextprotocol/client/McpAsyncClientResponseHandlerTests.java

Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -530,6 +530,202 @@ void testElicitationCreateRequestHandlingWithNullHandler() {
530530
.hasMessage("Elicitation handler must not be null when client capabilities include elicitation");
531531
}
532532

533+
@Test
534+
void testElicitationResponseIncludesRelatedTaskMetadata() {
535+
MockMcpClientTransport transport = initializationEnabledTransport();
536+
537+
// Create a test elicitation handler that returns a simple response
538+
Function<McpSchema.ElicitRequest, Mono<McpSchema.ElicitResult>> elicitationHandler = request -> Mono
539+
.just(new McpSchema.ElicitResult(McpSchema.ElicitResult.Action.ACCEPT, Map.of("value", "42"), null));
540+
541+
// Create client with elicitation capability and handler
542+
McpAsyncClient asyncMcpClient = McpClient.async(transport)
543+
.capabilities(ClientCapabilities.builder().elicitation().build())
544+
.elicitation(elicitationHandler)
545+
.build();
546+
547+
assertThat(asyncMcpClient.initialize().block()).isNotNull();
548+
549+
// Create elicitation request WITH related-task metadata (simulating
550+
// side-channeling)
551+
String taskId = "test-task-123";
552+
Map<String, Object> relatedTaskMeta = Map.of("taskId", taskId);
553+
Map<String, Object> requestMeta = Map.of(McpSchema.RELATED_TASK_META_KEY, relatedTaskMeta);
554+
555+
var elicitRequest = new McpSchema.ElicitRequest("What is your favorite number?",
556+
Map.of("type", "object", "properties", Map.of("value", Map.of("type", "string"))), null, requestMeta);
557+
558+
// Simulate incoming request
559+
McpSchema.JSONRPCRequest request = new McpSchema.JSONRPCRequest(McpSchema.JSONRPC_VERSION,
560+
McpSchema.METHOD_ELICITATION_CREATE, "test-id", elicitRequest);
561+
transport.simulateIncomingMessage(request);
562+
563+
// Verify response
564+
McpSchema.JSONRPCMessage sentMessage = transport.getLastSentMessage();
565+
assertThat(sentMessage).isInstanceOf(McpSchema.JSONRPCResponse.class);
566+
567+
McpSchema.JSONRPCResponse response = (McpSchema.JSONRPCResponse) sentMessage;
568+
assertThat(response.id()).isEqualTo("test-id");
569+
assertThat(response.error()).isNull();
570+
571+
McpSchema.ElicitResult result = transport.unmarshalFrom(response.result(), new TypeRef<>() {
572+
});
573+
assertThat(result).isNotNull();
574+
assertThat(result.action()).isEqualTo(McpSchema.ElicitResult.Action.ACCEPT);
575+
assertThat(result.content()).isEqualTo(Map.of("value", "42"));
576+
577+
// Verify related-task metadata was echoed back
578+
assertThat(result.meta()).isNotNull();
579+
assertThat(result.meta()).containsKey(McpSchema.RELATED_TASK_META_KEY);
580+
@SuppressWarnings("unchecked")
581+
Map<String, Object> echoedRelatedTask = (Map<String, Object>) result.meta()
582+
.get(McpSchema.RELATED_TASK_META_KEY);
583+
assertThat(echoedRelatedTask.get("taskId")).isEqualTo(taskId);
584+
585+
asyncMcpClient.closeGracefully();
586+
}
587+
588+
@Test
589+
void testElicitationResponseWithoutRelatedTaskMetadata() {
590+
MockMcpClientTransport transport = initializationEnabledTransport();
591+
592+
// Create a test elicitation handler that returns a response with custom meta
593+
Map<String, Object> handlerMeta = Map.of("custom-key", "custom-value");
594+
Function<McpSchema.ElicitRequest, Mono<McpSchema.ElicitResult>> elicitationHandler = request -> Mono
595+
.just(new McpSchema.ElicitResult(McpSchema.ElicitResult.Action.ACCEPT, Map.of("value", "42"), handlerMeta));
596+
597+
// Create client with elicitation capability and handler
598+
McpAsyncClient asyncMcpClient = McpClient.async(transport)
599+
.capabilities(ClientCapabilities.builder().elicitation().build())
600+
.elicitation(elicitationHandler)
601+
.build();
602+
603+
assertThat(asyncMcpClient.initialize().block()).isNotNull();
604+
605+
// Create elicitation request WITHOUT related-task metadata
606+
var elicitRequest = new McpSchema.ElicitRequest("What is your favorite number?",
607+
Map.of("type", "object", "properties", Map.of("value", Map.of("type", "string"))));
608+
609+
// Simulate incoming request
610+
McpSchema.JSONRPCRequest request = new McpSchema.JSONRPCRequest(McpSchema.JSONRPC_VERSION,
611+
McpSchema.METHOD_ELICITATION_CREATE, "test-id", elicitRequest);
612+
transport.simulateIncomingMessage(request);
613+
614+
// Verify response
615+
McpSchema.JSONRPCMessage sentMessage = transport.getLastSentMessage();
616+
McpSchema.JSONRPCResponse response = (McpSchema.JSONRPCResponse) sentMessage;
617+
618+
McpSchema.ElicitResult result = transport.unmarshalFrom(response.result(), new TypeRef<>() {
619+
});
620+
assertThat(result).isNotNull();
621+
622+
// Verify handler's meta is preserved and no related-task was added
623+
assertThat(result.meta()).isEqualTo(handlerMeta);
624+
assertThat(result.meta()).doesNotContainKey(McpSchema.RELATED_TASK_META_KEY);
625+
626+
asyncMcpClient.closeGracefully();
627+
}
628+
629+
@Test
630+
void testSamplingResponseIncludesRelatedTaskMetadata() {
631+
MockMcpClientTransport transport = initializationEnabledTransport();
632+
633+
// Create a test sampling handler that returns a simple response
634+
Function<McpSchema.CreateMessageRequest, Mono<McpSchema.CreateMessageResult>> samplingHandler = request -> Mono
635+
.just(new McpSchema.CreateMessageResult(McpSchema.Role.ASSISTANT, new McpSchema.TextContent("Response"),
636+
"test-model", McpSchema.CreateMessageResult.StopReason.END_TURN, null));
637+
638+
// Create client with sampling capability and handler
639+
McpAsyncClient asyncMcpClient = McpClient.async(transport)
640+
.capabilities(ClientCapabilities.builder().sampling().build())
641+
.sampling(samplingHandler)
642+
.build();
643+
644+
assertThat(asyncMcpClient.initialize().block()).isNotNull();
645+
646+
// Create sampling request WITH related-task metadata (simulating side-channeling)
647+
String taskId = "test-task-456";
648+
Map<String, Object> relatedTaskMeta = Map.of("taskId", taskId);
649+
Map<String, Object> requestMeta = Map.of(McpSchema.RELATED_TASK_META_KEY, relatedTaskMeta);
650+
651+
var messageRequest = new McpSchema.CreateMessageRequest(
652+
List.of(new McpSchema.SamplingMessage(McpSchema.Role.USER, new McpSchema.TextContent("Test message"))),
653+
null, "Test system prompt", McpSchema.CreateMessageRequest.ContextInclusionStrategy.NONE, 0.7, 100,
654+
null, null, null, requestMeta);
655+
656+
// Simulate incoming request
657+
McpSchema.JSONRPCRequest request = new McpSchema.JSONRPCRequest(McpSchema.JSONRPC_VERSION,
658+
McpSchema.METHOD_SAMPLING_CREATE_MESSAGE, "test-id", messageRequest);
659+
transport.simulateIncomingMessage(request);
660+
661+
// Verify response
662+
McpSchema.JSONRPCMessage sentMessage = transport.getLastSentMessage();
663+
assertThat(sentMessage).isInstanceOf(McpSchema.JSONRPCResponse.class);
664+
665+
McpSchema.JSONRPCResponse response = (McpSchema.JSONRPCResponse) sentMessage;
666+
assertThat(response.id()).isEqualTo("test-id");
667+
assertThat(response.error()).isNull();
668+
669+
McpSchema.CreateMessageResult result = transport.unmarshalFrom(response.result(), new TypeRef<>() {
670+
});
671+
assertThat(result).isNotNull();
672+
assertThat(result.role()).isEqualTo(McpSchema.Role.ASSISTANT);
673+
674+
// Verify related-task metadata was echoed back
675+
assertThat(result.meta()).isNotNull();
676+
assertThat(result.meta()).containsKey(McpSchema.RELATED_TASK_META_KEY);
677+
@SuppressWarnings("unchecked")
678+
Map<String, Object> echoedRelatedTask = (Map<String, Object>) result.meta()
679+
.get(McpSchema.RELATED_TASK_META_KEY);
680+
assertThat(echoedRelatedTask.get("taskId")).isEqualTo(taskId);
681+
682+
asyncMcpClient.closeGracefully();
683+
}
684+
685+
@Test
686+
void testElicitationResponseMergesHandlerMetaWithRelatedTask() {
687+
MockMcpClientTransport transport = initializationEnabledTransport();
688+
689+
// Create a test elicitation handler that returns a response with custom meta
690+
Map<String, Object> handlerMeta = Map.of("custom-key", "custom-value");
691+
Function<McpSchema.ElicitRequest, Mono<McpSchema.ElicitResult>> elicitationHandler = request -> Mono
692+
.just(new McpSchema.ElicitResult(McpSchema.ElicitResult.Action.ACCEPT, Map.of("value", "42"), handlerMeta));
693+
694+
// Create client with elicitation capability and handler
695+
McpAsyncClient asyncMcpClient = McpClient.async(transport)
696+
.capabilities(ClientCapabilities.builder().elicitation().build())
697+
.elicitation(elicitationHandler)
698+
.build();
699+
700+
assertThat(asyncMcpClient.initialize().block()).isNotNull();
701+
702+
// Create elicitation request WITH related-task metadata
703+
String taskId = "test-task-789";
704+
Map<String, Object> relatedTaskMeta = Map.of("taskId", taskId);
705+
Map<String, Object> requestMeta = Map.of(McpSchema.RELATED_TASK_META_KEY, relatedTaskMeta);
706+
707+
var elicitRequest = new McpSchema.ElicitRequest("Test?",
708+
Map.of("type", "object", "properties", Map.of("value", Map.of("type", "string"))), null, requestMeta);
709+
710+
// Simulate incoming request
711+
McpSchema.JSONRPCRequest request = new McpSchema.JSONRPCRequest(McpSchema.JSONRPC_VERSION,
712+
McpSchema.METHOD_ELICITATION_CREATE, "test-id", elicitRequest);
713+
transport.simulateIncomingMessage(request);
714+
715+
// Verify response
716+
McpSchema.JSONRPCResponse response = (McpSchema.JSONRPCResponse) transport.getLastSentMessage();
717+
McpSchema.ElicitResult result = transport.unmarshalFrom(response.result(), new TypeRef<>() {
718+
});
719+
720+
// Verify both handler's meta AND related-task are present (merged)
721+
assertThat(result.meta()).isNotNull();
722+
assertThat(result.meta()).containsKey("custom-key");
723+
assertThat(result.meta().get("custom-key")).isEqualTo("custom-value");
724+
assertThat(result.meta()).containsKey(McpSchema.RELATED_TASK_META_KEY);
725+
726+
asyncMcpClient.closeGracefully();
727+
}
728+
533729
@Test
534730
void testPingMessageRequestHandling() {
535731
MockMcpClientTransport transport = initializationEnabledTransport();

0 commit comments

Comments
 (0)