44
55package io .modelcontextprotocol .server .transport ;
66
7- import java .io .BufferedReader ;
8- import java .io .IOException ;
9- import java .io .PrintWriter ;
10-
11- import org .slf4j .Logger ;
12- import org .slf4j .LoggerFactory ;
13-
147import com .fasterxml .jackson .databind .ObjectMapper ;
15-
16- import io .modelcontextprotocol .server .DefaultMcpTransportContext ;
178import io .modelcontextprotocol .server .McpStatelessServerHandler ;
189import io .modelcontextprotocol .server .McpTransportContext ;
1910import io .modelcontextprotocol .server .McpTransportContextExtractor ;
11+ import io .modelcontextprotocol .server .StatelessMcpTransportContext ;
2012import io .modelcontextprotocol .spec .McpError ;
2113import io .modelcontextprotocol .spec .McpSchema ;
2214import io .modelcontextprotocol .spec .McpStatelessServerTransport ;
2618import jakarta .servlet .http .HttpServlet ;
2719import jakarta .servlet .http .HttpServletRequest ;
2820import jakarta .servlet .http .HttpServletResponse ;
21+ import org .slf4j .Logger ;
22+ import org .slf4j .LoggerFactory ;
2923import reactor .core .publisher .Mono ;
3024
25+ import java .io .BufferedReader ;
26+ import java .io .IOException ;
27+ import java .io .PrintWriter ;
28+ import java .util .concurrent .atomic .AtomicBoolean ;
29+ import java .util .concurrent .atomic .AtomicInteger ;
30+ import java .util .function .BiConsumer ;
31+
3132/**
3233 * Implementation of an HttpServlet based {@link McpStatelessServerTransport}.
3334 *
@@ -123,11 +124,16 @@ protected void doPost(HttpServletRequest request, HttpServletResponse response)
123124 return ;
124125 }
125126
126- McpTransportContext transportContext = this .contextExtractor .extract (request , new DefaultMcpTransportContext ());
127+ AtomicInteger nextId = new AtomicInteger (0 );
128+ AtomicBoolean upgradedToSse = new AtomicBoolean (false );
129+ BiConsumer <String , Object > notificationHandler = buildNotificationHandler (response , upgradedToSse , nextId );
130+ McpTransportContext transportContext = this .contextExtractor .extract (request ,
131+ new StatelessMcpTransportContext (notificationHandler ));
127132
128133 String accept = request .getHeader (ACCEPT );
129134 if (accept == null || !(accept .contains (APPLICATION_JSON ) && accept .contains (TEXT_EVENT_STREAM ))) {
130- this .responseError (response , HttpServletResponse .SC_BAD_REQUEST ,
135+ this .responseError (response , HttpServletResponse .SC_BAD_REQUEST , null , upgradedToSse .get (),
136+ nextId .getAndIncrement (),
131137 new McpError ("Both application/json and text/event-stream required in Accept header" ));
132138 return ;
133139 }
@@ -149,18 +155,24 @@ protected void doPost(HttpServletRequest request, HttpServletResponse response)
149155 .contextWrite (ctx -> ctx .put (McpTransportContext .KEY , transportContext ))
150156 .block ();
151157
152- response .setContentType (APPLICATION_JSON );
153- response .setCharacterEncoding (UTF_8 );
154- response .setStatus (HttpServletResponse .SC_OK );
155-
156158 String jsonResponseText = objectMapper .writeValueAsString (jsonrpcResponse );
157- PrintWriter writer = response .getWriter ();
158- writer .write (jsonResponseText );
159- writer .flush ();
159+ if (upgradedToSse .get ()) {
160+ sendEvent (response .getWriter (), jsonResponseText , nextId .getAndIncrement ());
161+ }
162+ else {
163+ response .setContentType (APPLICATION_JSON );
164+ response .setCharacterEncoding (UTF_8 );
165+ response .setStatus (HttpServletResponse .SC_OK );
166+
167+ PrintWriter writer = response .getWriter ();
168+ writer .write (jsonResponseText );
169+ writer .flush ();
170+ }
160171 }
161172 catch (Exception e ) {
162173 logger .error ("Failed to handle request: {}" , e .getMessage ());
163- this .responseError (response , HttpServletResponse .SC_INTERNAL_SERVER_ERROR ,
174+ this .responseError (response , HttpServletResponse .SC_INTERNAL_SERVER_ERROR , jsonrpcRequest .id (),
175+ upgradedToSse .get (), nextId .getAndIncrement (),
164176 new McpError ("Failed to handle request: " + e .getMessage ()));
165177 }
166178 }
@@ -173,41 +185,53 @@ else if (message instanceof McpSchema.JSONRPCNotification jsonrpcNotification) {
173185 }
174186 catch (Exception e ) {
175187 logger .error ("Failed to handle notification: {}" , e .getMessage ());
176- this .responseError (response , HttpServletResponse .SC_INTERNAL_SERVER_ERROR ,
188+ this .responseError (response , HttpServletResponse .SC_INTERNAL_SERVER_ERROR , null ,
189+ upgradedToSse .get (), nextId .getAndIncrement (),
177190 new McpError ("Failed to handle notification: " + e .getMessage ()));
178191 }
179192 }
180193 else {
181- this .responseError (response , HttpServletResponse .SC_BAD_REQUEST ,
182- new McpError ("The server accepts either requests or notifications" ));
194+ this .responseError (response , HttpServletResponse .SC_BAD_REQUEST , null , upgradedToSse . get (),
195+ nextId . getAndIncrement (), new McpError ("The server accepts either requests or notifications" ));
183196 }
184197 }
185198 catch (IllegalArgumentException | IOException e ) {
186199 logger .error ("Failed to deserialize message: {}" , e .getMessage ());
187- this .responseError (response , HttpServletResponse .SC_BAD_REQUEST , new McpError ("Invalid message format" ));
200+ this .responseError (response , HttpServletResponse .SC_BAD_REQUEST , null , upgradedToSse .get (),
201+ nextId .getAndIncrement (), new McpError ("Invalid message format" ));
188202 }
189203 catch (Exception e ) {
190204 logger .error ("Unexpected error handling message: {}" , e .getMessage ());
191- this .responseError (response , HttpServletResponse .SC_INTERNAL_SERVER_ERROR ,
192- new McpError ("Unexpected error: " + e .getMessage ()));
205+ this .responseError (response , HttpServletResponse .SC_INTERNAL_SERVER_ERROR , null , upgradedToSse . get (),
206+ nextId . getAndIncrement (), new McpError ("Unexpected error: " + e .getMessage ()));
193207 }
194208 }
195209
196210 /**
197211 * Sends an error response to the client.
198212 * @param response The HTTP servlet response
199213 * @param httpCode The HTTP status code
214+ * @param upgradedToSse true if the response is upgraded to SSE, false otherwise
215+ * @param eventIdIfNeeded if upgradedToSse, the event ID to use, otherwise ignored
200216 * @param mcpError The MCP error to send
201217 * @throws IOException If an I/O error occurs
202218 */
203- private void responseError (HttpServletResponse response , int httpCode , McpError mcpError ) throws IOException {
204- response .setContentType (APPLICATION_JSON );
205- response .setCharacterEncoding (UTF_8 );
206- response .setStatus (httpCode );
207- String jsonError = objectMapper .writeValueAsString (mcpError );
208- PrintWriter writer = response .getWriter ();
209- writer .write (jsonError );
210- writer .flush ();
219+ private void responseError (HttpServletResponse response , int httpCode , Object requestId , boolean upgradedToSse ,
220+ int eventIdIfNeeded , McpError mcpError ) throws IOException {
221+ if (upgradedToSse ) {
222+ String jsonError = objectMapper .writeValueAsString (new McpSchema .JSONRPCResponse (McpSchema .JSONRPC_VERSION ,
223+ requestId , null , mcpError .getJsonRpcError ()));
224+ sendEvent (response .getWriter (), jsonError , eventIdIfNeeded );
225+ }
226+ else {
227+ response .setContentType (APPLICATION_JSON );
228+ response .setCharacterEncoding (UTF_8 );
229+ response .setStatus (httpCode );
230+ PrintWriter writer = response .getWriter ();
231+ String jsonError = objectMapper .writeValueAsString (mcpError );
232+ writer .write (jsonError );
233+ writer .flush ();
234+ }
211235 }
212236
213237 /**
@@ -303,4 +327,43 @@ public HttpServletStatelessServerTransport build() {
303327
304328 }
305329
330+ private BiConsumer <String , Object > buildNotificationHandler (HttpServletResponse response ,
331+ AtomicBoolean upgradedToSse , AtomicInteger nextId ) {
332+ AtomicBoolean responseInitialized = new AtomicBoolean (false );
333+
334+ return (notificationMethod , params ) -> {
335+ if (responseInitialized .compareAndSet (false , true )) {
336+ response .setContentType (TEXT_EVENT_STREAM );
337+ response .setCharacterEncoding (UTF_8 );
338+ response .setStatus (HttpServletResponse .SC_OK );
339+ }
340+
341+ upgradedToSse .set (true );
342+
343+ McpSchema .JSONRPCNotification notification = new McpSchema .JSONRPCNotification (McpSchema .JSONRPC_VERSION ,
344+ notificationMethod , params );
345+ try {
346+ sendEvent (response .getWriter (), objectMapper .writeValueAsString (notification ),
347+ nextId .getAndIncrement ());
348+ }
349+ catch (IOException e ) {
350+ logger .error ("Failed to handle notification: {}" , e .getMessage ());
351+ throw new McpError (new McpSchema .JSONRPCResponse .JSONRPCError (McpSchema .ErrorCodes .INTERNAL_ERROR ,
352+ e .getMessage (), null ));
353+ }
354+ };
355+ }
356+
357+ private void sendEvent (PrintWriter writer , String data , int id ) throws IOException {
358+ // tested with MCP inspector. Event must consist of these two fields and only
359+ // these two fields
360+ writer .write ("id: " + id + "\n " );
361+ writer .write ("data: " + data + "\n \n " );
362+ writer .flush ();
363+
364+ if (writer .checkError ()) {
365+ throw new IOException ("Client disconnected" );
366+ }
367+ }
368+
306369}
0 commit comments