Skip to content

Commit 1998f17

Browse files
committed
feat: Handle invalid cursor in streamable transport provider
1 parent 91a30a7 commit 1998f17

File tree

2 files changed

+20
-16
lines changed

2 files changed

+20
-16
lines changed

mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/server/transport/WebFluxStreamableServerTransportProvider.java

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,7 @@
88
import com.fasterxml.jackson.databind.ObjectMapper;
99
import io.modelcontextprotocol.server.DefaultMcpTransportContext;
1010
import io.modelcontextprotocol.server.McpTransportContextExtractor;
11-
import io.modelcontextprotocol.spec.HttpHeaders;
12-
import io.modelcontextprotocol.spec.McpError;
13-
import io.modelcontextprotocol.spec.McpSchema;
14-
import io.modelcontextprotocol.spec.McpStreamableServerSession;
15-
import io.modelcontextprotocol.spec.McpStreamableServerTransport;
16-
import io.modelcontextprotocol.spec.McpStreamableServerTransportProvider;
17-
import io.modelcontextprotocol.spec.ProtocolVersions;
11+
import io.modelcontextprotocol.spec.*;
1812
import io.modelcontextprotocol.server.McpTransportContext;
1913
import io.modelcontextprotocol.util.Assert;
2014
import io.modelcontextprotocol.util.KeepAliveScheduler;
@@ -278,6 +272,19 @@ else if (message instanceof McpSchema.JSONRPCRequest jsonrpcRequest) {
278272
WebFluxStreamableMcpSessionTransport st = new WebFluxStreamableMcpSessionTransport(sink);
279273
Mono<Void> stream = session.responseStream(jsonrpcRequest, st);
280274
Disposable streamSubscription = stream.onErrorComplete(err -> {
275+
if (err instanceof McpParamsValidationError) {
276+
var errorResponse = new McpSchema.JSONRPCResponse(McpSchema.JSONRPC_VERSION, jsonrpcRequest.id(),
277+
null, new McpSchema.JSONRPCResponse.JSONRPCError(McpSchema.ErrorCodes.INVALID_PARAMS,
278+
err.getMessage(), null));
279+
280+
var event = ServerSentEvent.builder()
281+
.event(MESSAGE_EVENT_TYPE)
282+
.data(errorResponse)
283+
.build();
284+
285+
sink.next(event);
286+
return true;
287+
}
281288
sink.error(err);
282289
return true;
283290
}).contextWrite(sink.contextView()).subscribe();

mcp-spring/mcp-spring-webmvc/src/main/java/io/modelcontextprotocol/server/transport/WebMvcStreamableServerTransportProvider.java

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import java.util.concurrent.ConcurrentHashMap;
1111
import java.util.concurrent.locks.ReentrantLock;
1212

13+
import io.modelcontextprotocol.spec.*;
1314
import org.slf4j.Logger;
1415
import org.slf4j.LoggerFactory;
1516
import org.springframework.http.HttpStatus;
@@ -26,13 +27,6 @@
2627
import io.modelcontextprotocol.server.DefaultMcpTransportContext;
2728
import io.modelcontextprotocol.server.McpTransportContext;
2829
import io.modelcontextprotocol.server.McpTransportContextExtractor;
29-
import io.modelcontextprotocol.spec.HttpHeaders;
30-
import io.modelcontextprotocol.spec.McpError;
31-
import io.modelcontextprotocol.spec.McpSchema;
32-
import io.modelcontextprotocol.spec.McpStreamableServerSession;
33-
import io.modelcontextprotocol.spec.McpStreamableServerTransport;
34-
import io.modelcontextprotocol.spec.McpStreamableServerTransportProvider;
35-
import io.modelcontextprotocol.spec.ProtocolVersions;
3630
import io.modelcontextprotocol.util.Assert;
3731
import io.modelcontextprotocol.util.KeepAliveScheduler;
3832
import reactor.core.publisher.Flux;
@@ -395,8 +389,11 @@ else if (message instanceof McpSchema.JSONRPCRequest jsonrpcRequest) {
395389
session.responseStream(jsonrpcRequest, sessionTransport)
396390
.contextWrite(ctx -> ctx.put(McpTransportContext.KEY, transportContext))
397391
.block();
398-
}
399-
catch (Exception e) {
392+
} catch (McpParamsValidationError e) {
393+
var errorResponse = new McpSchema.JSONRPCResponse(McpSchema.JSONRPC_VERSION, jsonrpcRequest.id(), null,
394+
new McpSchema.JSONRPCResponse.JSONRPCError(McpSchema.ErrorCodes.INVALID_PARAMS, e.getMessage(), null));
395+
sessionTransport.sendMessage(errorResponse).block();
396+
} catch (Exception e) {
400397
logger.error("Failed to handle request stream: {}", e.getMessage());
401398
sseBuilder.error(e);
402399
}

0 commit comments

Comments
 (0)