55import io .modelcontextprotocol .spec .DefaultMcpTransportContext ;
66import io .modelcontextprotocol .spec .McpError ;
77import io .modelcontextprotocol .spec .McpSchema ;
8- import io .modelcontextprotocol .spec .McpServerTransport ;
98import io .modelcontextprotocol .spec .McpStreamableServerSession ;
9+ import io .modelcontextprotocol .spec .McpStreamableServerTransport ;
1010import io .modelcontextprotocol .spec .McpStreamableServerTransportProvider ;
1111import io .modelcontextprotocol .spec .McpTransportContext ;
1212import io .modelcontextprotocol .util .Assert ;
@@ -45,6 +45,8 @@ public class WebFluxStreamableServerTransportProvider implements McpStreamableSe
4545
4646 private final String mcpEndpoint ;
4747
48+ private final boolean disallowDelete ;
49+
4850 private final RouterFunction <?> routerFunction ;
4951
5052 private McpStreamableServerSession .Factory sessionFactory ;
@@ -70,7 +72,7 @@ public class WebFluxStreamableServerTransportProvider implements McpStreamableSe
7072 * @throws IllegalArgumentException if either parameter is null
7173 */
7274 public WebFluxStreamableServerTransportProvider (ObjectMapper objectMapper , String mcpEndpoint ) {
73- this (objectMapper , DEFAULT_BASE_URL , mcpEndpoint );
75+ this (objectMapper , DEFAULT_BASE_URL , mcpEndpoint , false );
7476 }
7577
7678 /**
@@ -83,17 +85,20 @@ public WebFluxStreamableServerTransportProvider(ObjectMapper objectMapper, Strin
8385 * setup. Must not be null.
8486 * @throws IllegalArgumentException if either parameter is null
8587 */
86- public WebFluxStreamableServerTransportProvider (ObjectMapper objectMapper , String baseUrl , String mcpEndpoint ) {
88+ public WebFluxStreamableServerTransportProvider (ObjectMapper objectMapper , String baseUrl , String mcpEndpoint ,
89+ boolean disallowDelete ) {
8790 Assert .notNull (objectMapper , "ObjectMapper must not be null" );
8891 Assert .notNull (baseUrl , "Message base path must not be null" );
8992 Assert .notNull (mcpEndpoint , "Message endpoint must not be null" );
9093
9194 this .objectMapper = objectMapper ;
9295 this .baseUrl = baseUrl ;
9396 this .mcpEndpoint = mcpEndpoint ;
97+ this .disallowDelete = disallowDelete ;
9498 this .routerFunction = RouterFunctions .route ()
9599 .GET (this .mcpEndpoint , this ::handleGet )
96100 .POST (this .mcpEndpoint , this ::handlePost )
101+ .DELETE (this .mcpEndpoint , this ::handleDelete )
97102 .build ();
98103 }
99104
@@ -306,7 +311,37 @@ else if (message instanceof McpSchema.JSONRPCRequest jsonrpcRequest) {
306311 }).contextWrite (ctx -> ctx .put (McpTransportContext .KEY , transportContext ));
307312 }
308313
309- private class WebFluxStreamableMcpSessionTransport implements McpServerTransport {
314+ private Mono <ServerResponse > handleDelete (ServerRequest request ) {
315+ if (isClosing ) {
316+ return ServerResponse .status (HttpStatus .SERVICE_UNAVAILABLE ).bodyValue ("Server is shutting down" );
317+ }
318+
319+ McpTransportContext transportContext = this .contextExtractor .apply (request );
320+
321+ return Mono .defer (() -> {
322+ if (!request .headers ().asHttpHeaders ().containsKey ("mcp-session-id" )) {
323+ return ServerResponse .badRequest ().build (); // TODO: say we need a session
324+ // id
325+ }
326+
327+ // TODO: The user can configure whether deletions are permitted
328+ if (this .disallowDelete ) {
329+ return ServerResponse .status (HttpStatus .METHOD_NOT_ALLOWED ).build ();
330+ }
331+
332+ String sessionId = request .headers ().asHttpHeaders ().getFirst ("mcp-session-id" );
333+
334+ McpStreamableServerSession session = this .sessions .get (sessionId );
335+
336+ if (session == null ) {
337+ return ServerResponse .notFound ().build ();
338+ }
339+
340+ return session .delete ().then (ServerResponse .ok ().build ());
341+ }).contextWrite (ctx -> ctx .put (McpTransportContext .KEY , transportContext ));
342+ }
343+
344+ private class WebFluxStreamableMcpSessionTransport implements McpStreamableServerTransport {
310345
311346 private final FluxSink <ServerSentEvent <?>> sink ;
312347
@@ -316,6 +351,11 @@ public WebFluxStreamableMcpSessionTransport(FluxSink<ServerSentEvent<?>> sink) {
316351
317352 @ Override
318353 public Mono <Void > sendMessage (McpSchema .JSONRPCMessage message ) {
354+ return this .sendMessage (message , null );
355+ }
356+
357+ @ Override
358+ public Mono <Void > sendMessage (McpSchema .JSONRPCMessage message , String messageId ) {
319359 return Mono .fromSupplier (() -> {
320360 try {
321361 return objectMapper .writeValueAsString (message );
@@ -325,6 +365,7 @@ public Mono<Void> sendMessage(McpSchema.JSONRPCMessage message) {
325365 }
326366 }).doOnNext (jsonText -> {
327367 ServerSentEvent <Object > event = ServerSentEvent .builder ()
368+ .id (messageId )
328369 .event (MESSAGE_EVENT_TYPE )
329370 .data (jsonText )
330371 .build ();
@@ -419,7 +460,7 @@ public WebFluxStreamableServerTransportProvider build() {
419460 Assert .notNull (objectMapper , "ObjectMapper must be set" );
420461 Assert .notNull (mcpEndpoint , "Message endpoint must be set" );
421462
422- return new WebFluxStreamableServerTransportProvider (objectMapper , baseUrl , mcpEndpoint );
463+ return new WebFluxStreamableServerTransportProvider (objectMapper , baseUrl , mcpEndpoint , false );
423464 }
424465
425466 }
0 commit comments