Skip to content

Commit 97e6457

Browse files
committed
chore: update sync variants of callToolStream etc. to actually return Stream
1 parent b8b5f20 commit 97e6457

File tree

4 files changed

+25
-18
lines changed

4 files changed

+25
-18
lines changed

mcp-core/src/main/java/io/modelcontextprotocol/client/McpSyncClient.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import java.time.Duration;
88
import java.util.List;
99
import java.util.function.Supplier;
10+
import java.util.stream.Stream;
1011

1112
import org.slf4j.Logger;
1213
import org.slf4j.LoggerFactory;
@@ -310,8 +311,7 @@ public McpSchema.CreateTaskResult callToolTask(McpSchema.CallToolRequest callToo
310311
* var request = new CallToolRequest("my-tool", Map.of("arg", "value"),
311312
* new TaskMetadata(60000L), null); // Optional task metadata
312313
*
313-
* List<ResponseMessage<CallToolResult>> messages = client.callToolStream(request);
314-
* for (var message : messages) {
314+
* client.callToolStream(request).forEach(message -> {
315315
* switch (message) {
316316
* case TaskCreatedMessage<CallToolResult> tc ->
317317
* System.out.println("Task created: " + tc.task().taskId());
@@ -322,21 +322,21 @@ public McpSchema.CreateTaskResult callToolTask(McpSchema.CallToolRequest callToo
322322
* case ErrorMessage<CallToolResult> e ->
323323
* System.err.println("Error: " + e.error().getMessage());
324324
* }
325-
* }
325+
* });
326326
* }</pre>
327327
* @param callToolRequest The request containing the tool name and arguments. If the
328328
* {@code task} field is set, the call will be task-augmented.
329-
* @return A list of {@link McpSchema.ResponseMessage} instances representing the
329+
* @return A stream of {@link McpSchema.ResponseMessage} instances representing the
330330
* progress and result of the tool call.
331331
* @see McpSchema.ResponseMessage
332332
* @see McpSchema.TaskCreatedMessage
333333
* @see McpSchema.TaskStatusMessage
334334
* @see McpSchema.ResultMessage
335335
* @see McpSchema.ErrorMessage
336336
*/
337-
public List<McpSchema.ResponseMessage<McpSchema.CallToolResult>> callToolStream(
337+
public Stream<McpSchema.ResponseMessage<McpSchema.CallToolResult>> callToolStream(
338338
McpSchema.CallToolRequest callToolRequest) {
339-
return withProvidedContextFlux(this.delegate.callToolStream(callToolRequest)).collectList().block();
339+
return withProvidedContextFlux(this.delegate.callToolStream(callToolRequest)).toStream();
340340
}
341341

342342
/**

mcp-core/src/main/java/io/modelcontextprotocol/server/McpSyncServerExchange.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import io.modelcontextprotocol.spec.McpSchema.LoggingMessageNotification;
1212

1313
import java.util.List;
14+
import java.util.stream.Stream;
1415

1516
/**
1617
* Represents a synchronous exchange with a Model Context Protocol (MCP) client. The
@@ -457,12 +458,12 @@ public McpSchema.CreateTaskResult createMessageTask(McpSchema.CreateMessageReque
457458
* releases.
458459
* @param createMessageRequest The request containing the sampling parameters. If the
459460
* {@code task} field is set, the call will be task-augmented.
460-
* @return A list of {@link McpSchema.ResponseMessage} instances representing the
461+
* @return A stream of {@link McpSchema.ResponseMessage} instances representing the
461462
* progress and result
462463
*/
463-
public List<McpSchema.ResponseMessage<McpSchema.CreateMessageResult>> createMessageStream(
464+
public Stream<McpSchema.ResponseMessage<McpSchema.CreateMessageResult>> createMessageStream(
464465
McpSchema.CreateMessageRequest createMessageRequest) {
465-
return this.exchange.createMessageStream(createMessageRequest).collectList().block();
466+
return this.exchange.createMessageStream(createMessageRequest).toStream();
466467
}
467468

468469
// --------------------------
@@ -506,12 +507,12 @@ public McpSchema.CreateTaskResult createElicitationTask(McpSchema.ElicitRequest
506507
* releases.
507508
* @param elicitRequest The request containing the elicitation parameters. If the
508509
* {@code task} field is set, the call will be task-augmented.
509-
* @return A list of {@link McpSchema.ResponseMessage} instances representing the
510+
* @return A stream of {@link McpSchema.ResponseMessage} instances representing the
510511
* progress and result
511512
*/
512-
public List<McpSchema.ResponseMessage<McpSchema.ElicitResult>> createElicitationStream(
513+
public Stream<McpSchema.ResponseMessage<McpSchema.ElicitResult>> createElicitationStream(
513514
McpSchema.ElicitRequest elicitRequest) {
514-
return this.exchange.createElicitationStream(elicitRequest).collectList().block();
515+
return this.exchange.createElicitationStream(elicitRequest).toStream();
515516
}
516517

517518
}

mcp-core/src/test/java/io/modelcontextprotocol/server/AbstractMcpClientServerIntegrationTests.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.function.BiFunction;
2222
import java.util.function.Function;
2323
import java.util.stream.Collectors;
24+
import java.util.stream.Stream;
2425

2526
import io.modelcontextprotocol.client.McpClient;
2627
import io.modelcontextprotocol.client.McpSyncClient;
@@ -1882,7 +1883,8 @@ protected McpSyncClient createTaskClient(String clientType, String name, ClientC
18821883
}
18831884

18841885
/** Extracts the task ID from a list of response messages. */
1885-
protected String extractTaskId(List<ResponseMessage<CallToolResult>> messages) {
1886+
protected String extractTaskId(Stream<ResponseMessage<CallToolResult>> messageStream) {
1887+
List<ResponseMessage<CallToolResult>> messages = messageStream.toList();
18861888
for (var msg : messages) {
18871889
if (msg instanceof TaskCreatedMessage<CallToolResult> tcm) {
18881890
return tcm.task().taskId();
@@ -1892,7 +1894,8 @@ protected String extractTaskId(List<ResponseMessage<CallToolResult>> messages) {
18921894
}
18931895

18941896
/** Extracts all task statuses from a list of response messages. */
1895-
protected List<TaskStatus> extractTaskStatuses(List<ResponseMessage<CallToolResult>> messages) {
1897+
protected List<TaskStatus> extractTaskStatuses(Stream<ResponseMessage<CallToolResult>> messageStream) {
1898+
List<ResponseMessage<CallToolResult>> messages = messageStream.toList();
18961899
List<TaskStatus> statuses = new ArrayList<>();
18971900
for (var msg : messages) {
18981901
if (msg instanceof TaskCreatedMessage<CallToolResult> tcm) {
@@ -2093,7 +2096,7 @@ void testAutomaticPollingShimWithCreateTaskHandler(String clientType) {
20932096

20942097
// Call tool WITHOUT task metadata - should trigger automatic polling shim
20952098
var request = new McpSchema.CallToolRequest("auto-polling-tool", Map.of("input", "test-value"), null, null);
2096-
var messages = client.callToolStream(request);
2099+
var messages = client.callToolStream(request).toList();
20972100

20982101
// The automatic polling shim should poll and return the final result
20992102
assertThat(messages).as("Should have response messages").isNotEmpty();

mcp-test/src/main/java/io/modelcontextprotocol/AbstractMcpClientServerIntegrationTests.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.function.BiFunction;
2323
import java.util.function.Function;
2424
import java.util.stream.Collectors;
25+
import java.util.stream.Stream;
2526

2627
import io.modelcontextprotocol.client.McpClient;
2728
import io.modelcontextprotocol.client.McpSyncClient;
@@ -1845,7 +1846,8 @@ protected McpSyncClient createTaskClient(String clientType, String name, ClientC
18451846
}
18461847

18471848
/** Extracts the task ID from a list of response messages. */
1848-
protected String extractTaskId(List<ResponseMessage<CallToolResult>> messages) {
1849+
protected String extractTaskId(Stream<ResponseMessage<CallToolResult>> messageStream) {
1850+
List<ResponseMessage<CallToolResult>> messages = messageStream.toList();
18491851
for (var msg : messages) {
18501852
if (msg instanceof TaskCreatedMessage<CallToolResult> tcm) {
18511853
return tcm.task().taskId();
@@ -1855,7 +1857,8 @@ protected String extractTaskId(List<ResponseMessage<CallToolResult>> messages) {
18551857
}
18561858

18571859
/** Extracts all task statuses from a list of response messages. */
1858-
protected List<TaskStatus> extractTaskStatuses(List<ResponseMessage<CallToolResult>> messages) {
1860+
protected List<TaskStatus> extractTaskStatuses(Stream<ResponseMessage<CallToolResult>> messageStream) {
1861+
List<ResponseMessage<CallToolResult>> messages = messageStream.toList();
18591862
List<TaskStatus> statuses = new ArrayList<>();
18601863
for (var msg : messages) {
18611864
if (msg instanceof TaskCreatedMessage<CallToolResult> tcm) {
@@ -2181,7 +2184,7 @@ void testAutomaticPollingShimWithCreateTaskHandler(String clientType) {
21812184

21822185
// Call tool WITHOUT task metadata - should trigger automatic polling shim
21832186
var request = new McpSchema.CallToolRequest("auto-polling-tool", Map.of("input", "test-value"), null, null);
2184-
var messages = client.callToolStream(request);
2187+
var messages = client.callToolStream(request).toList();
21852188

21862189
// The automatic polling shim should poll and return the final result
21872190
assertThat(messages).as("Should have response messages").isNotEmpty();

0 commit comments

Comments
 (0)