77import java .io .IOException ;
88import java .time .Duration ;
99import java .util .List ;
10- import java .util .UUID ;
1110import java .util .concurrent .ConcurrentHashMap ;
1211import java .util .concurrent .locks .ReentrantLock ;
1312
13+ import io .modelcontextprotocol .common .McpTransportContext ;
1414import io .modelcontextprotocol .json .McpJsonMapper ;
1515import io .modelcontextprotocol .json .TypeRef ;
16-
17- import io .modelcontextprotocol .common .McpTransportContext ;
1816import io .modelcontextprotocol .server .McpTransportContextExtractor ;
1917import io .modelcontextprotocol .spec .McpError ;
2018import io .modelcontextprotocol .spec .McpSchema ;
19+ import io .modelcontextprotocol .spec .McpServerSession ;
2120import io .modelcontextprotocol .spec .McpServerTransport ;
2221import io .modelcontextprotocol .spec .McpServerTransportProvider ;
2322import io .modelcontextprotocol .spec .ProtocolVersions ;
24- import io .modelcontextprotocol .spec .McpServerSession ;
2523import io .modelcontextprotocol .util .Assert ;
2624import io .modelcontextprotocol .util .KeepAliveScheduler ;
27-
2825import org .slf4j .Logger ;
2926import org .slf4j .LoggerFactory ;
3027import reactor .core .publisher .Flux ;
@@ -255,41 +252,33 @@ private ServerResponse handleSseConnection(ServerRequest request) {
255252 return ServerResponse .status (HttpStatus .SERVICE_UNAVAILABLE ).body ("Server is shutting down" );
256253 }
257254
258- String sessionId = UUID .randomUUID ().toString ();
259- logger .debug ("Creating new SSE connection for session: {}" , sessionId );
260-
261255 // Send initial endpoint event
262- try {
263- return ServerResponse . sse ( sseBuilder -> {
264- sseBuilder . onComplete (() -> {
265- logger . debug ( "SSE connection completed for session: {}" , sessionId );
266- sessions . remove ( sessionId );
267- });
268- sseBuilder . onTimeout (() -> {
269- logger . debug ( "SSE connection timed out for session: {}" , sessionId );
270- sessions . remove ( sessionId );
271- });
272-
273- WebMvcMcpSessionTransport sessionTransport = new WebMvcMcpSessionTransport (sessionId , sseBuilder );
274- McpServerSession session = sessionFactory . create ( sessionTransport );
275- this .sessions .put (sessionId , session );
256+ return ServerResponse . sse ( sseBuilder -> {
257+ WebMvcMcpSessionTransport sessionTransport = new WebMvcMcpSessionTransport ( sseBuilder );
258+ McpServerSession session = sessionFactory . create ( sessionTransport );
259+ String sessionId = session . getId ( );
260+ logger . debug ( "Creating new SSE connection for session: {}" , sessionId );
261+ sseBuilder . onComplete (() -> {
262+ logger . debug ( "SSE connection completed for session: {}" , sessionId );
263+ sessions . remove ( sessionId );
264+ } );
265+ sseBuilder . onTimeout (() -> {
266+ logger . debug ( "SSE connection timed out for session: {}" , sessionId );
267+ sessions . remove (sessionId );
268+ } );
269+ this .sessions .put (sessionId , session );
276270
277- try {
278- sseBuilder .id (sessionId )
279- .event (ENDPOINT_EVENT_TYPE )
280- .data (this .baseUrl + this .messageEndpoint + "?sessionId=" + sessionId );
281- }
282- catch (Exception e ) {
283- logger .error ("Failed to send initial endpoint event: {}" , e .getMessage ());
284- sseBuilder .error (e );
285- }
286- }, Duration .ZERO );
287- }
288- catch (Exception e ) {
289- logger .error ("Failed to send initial endpoint event to session {}: {}" , sessionId , e .getMessage ());
290- sessions .remove (sessionId );
291- return ServerResponse .status (HttpStatus .INTERNAL_SERVER_ERROR ).build ();
292- }
271+ try {
272+ sseBuilder .id (sessionId )
273+ .event (ENDPOINT_EVENT_TYPE )
274+ .data (this .baseUrl + this .messageEndpoint + "?sessionId=" + sessionId );
275+ }
276+ catch (Exception e ) {
277+ logger .error ("Failed to send initial endpoint event: {}" , e .getMessage ());
278+ this .sessions .remove (sessionId );
279+ sseBuilder .error (e );
280+ }
281+ }, Duration .ZERO );
293282 }
294283
295284 /**
@@ -349,8 +338,6 @@ private ServerResponse handleMessage(ServerRequest request) {
349338 */
350339 private class WebMvcMcpSessionTransport implements McpServerTransport {
351340
352- private final String sessionId ;
353-
354341 private final SseBuilder sseBuilder ;
355342
356343 /**
@@ -360,14 +347,11 @@ private class WebMvcMcpSessionTransport implements McpServerTransport {
360347 private final ReentrantLock sseBuilderLock = new ReentrantLock ();
361348
362349 /**
363- * Creates a new session transport with the specified ID and SSE builder.
364- * @param sessionId The unique identifier for this session
350+ * Creates a new session transport with the specified SSE builder.
365351 * @param sseBuilder The SSE builder for sending server events to the client
366352 */
367- WebMvcMcpSessionTransport (String sessionId , SseBuilder sseBuilder ) {
368- this .sessionId = sessionId ;
353+ WebMvcMcpSessionTransport (SseBuilder sseBuilder ) {
369354 this .sseBuilder = sseBuilder ;
370- logger .debug ("Session transport {} initialized with SSE builder" , sessionId );
371355 }
372356
373357 /**
@@ -381,11 +365,10 @@ public Mono<Void> sendMessage(McpSchema.JSONRPCMessage message) {
381365 sseBuilderLock .lock ();
382366 try {
383367 String jsonText = jsonMapper .writeValueAsString (message );
384- sseBuilder .id (sessionId ).event (MESSAGE_EVENT_TYPE ).data (jsonText );
385- logger .debug ("Message sent to session {}" , sessionId );
368+ sseBuilder .event (MESSAGE_EVENT_TYPE ).data (jsonText );
386369 }
387370 catch (Exception e ) {
388- logger .error ("Failed to send message to session {} : {}" , sessionId , e .getMessage ());
371+ logger .error ("Failed to send message: {}" , e .getMessage ());
389372 sseBuilder .error (e );
390373 }
391374 finally {
@@ -413,14 +396,12 @@ public <T> T unmarshalFrom(Object data, TypeRef<T> typeRef) {
413396 @ Override
414397 public Mono <Void > closeGracefully () {
415398 return Mono .fromRunnable (() -> {
416- logger .debug ("Closing session transport: {}" , sessionId );
417399 sseBuilderLock .lock ();
418400 try {
419401 sseBuilder .complete ();
420- logger .debug ("Successfully completed SSE builder for session {}" , sessionId );
421402 }
422403 catch (Exception e ) {
423- logger .warn ("Failed to complete SSE builder for session {} : {}" , sessionId , e .getMessage ());
404+ logger .warn ("Failed to complete SSE builder: {}" , e .getMessage ());
424405 }
425406 finally {
426407 sseBuilderLock .unlock ();
@@ -436,10 +417,9 @@ public void close() {
436417 sseBuilderLock .lock ();
437418 try {
438419 sseBuilder .complete ();
439- logger .debug ("Successfully completed SSE builder for session {}" , sessionId );
440420 }
441421 catch (Exception e ) {
442- logger .warn ("Failed to complete SSE builder for session {} : {}" , sessionId , e .getMessage ());
422+ logger .warn ("Failed to complete SSE builder: {}" , e .getMessage ());
443423 }
444424 finally {
445425 sseBuilderLock .unlock ();
0 commit comments