@@ -108,7 +108,7 @@ public interface NotificationHandler {
108108 public McpClientSession (Duration requestTimeout , McpClientTransport transport ,
109109 Map <String , RequestHandler <?>> requestHandlers , Map <String , NotificationHandler > notificationHandlers ) {
110110
111- Assert .notNull (requestTimeout , "The requstTimeout can not be null" );
111+ Assert .notNull (requestTimeout , "The requestTimeout can not be null" );
112112 Assert .notNull (transport , "The transport can not be null" );
113113 Assert .notNull (requestHandlers , "The requestHandlers can not be null" );
114114 Assert .notNull (notificationHandlers , "The notificationHandlers can not be null" );
@@ -123,33 +123,41 @@ public McpClientSession(Duration requestTimeout, McpClientTransport transport,
123123 // Observation associated with the individual message - it can be used to
124124 // create child Observation and emit it together with the message to the
125125 // consumer
126- this .connection = this .transport .connect (mono -> mono .doOnNext (message -> {
126+ this .connection = this .transport .connect (mono -> mono .doOnNext ((msg ) -> handle (msg ).subscribe ())).subscribe ();
127+ }
128+
129+ public Mono <Void > handle (McpSchema .JSONRPCMessage message ) {
130+ return Mono .defer (() -> {
127131 if (message instanceof McpSchema .JSONRPCResponse response ) {
128132 logger .debug ("Received Response: {}" , response );
129133 var sink = pendingResponses .remove (response .id ());
130134 if (sink == null ) {
131- logger .warn ("Unexpected response for unkown id {}" , response .id ());
135+ logger .warn ("Unexpected response for unknown id {}" , response .id ());
132136 }
133137 else {
134138 sink .success (response );
135139 }
140+ return Mono .empty ();
136141 }
137142 else if (message instanceof McpSchema .JSONRPCRequest request ) {
138143 logger .debug ("Received request: {}" , request );
139- handleIncomingRequest (request ). flatMap ( transport :: sendMessage ).onErrorResume (error -> {
144+ return handleIncomingRequest (request ).onErrorResume (error -> {
140145 var errorResponse = new McpSchema .JSONRPCResponse (McpSchema .JSONRPC_VERSION , request .id (), null ,
141146 new McpSchema .JSONRPCResponse .JSONRPCError (McpSchema .ErrorCodes .INTERNAL_ERROR ,
142147 error .getMessage (), null ));
143- return transport .sendMessage (errorResponse );
144- }).subscribe ();
145-
148+ return this .transport .sendMessage (errorResponse ).then (Mono .empty ());
149+ }).flatMap (this .transport ::sendMessage );
146150 }
147151 else if (message instanceof McpSchema .JSONRPCNotification notification ) {
148152 logger .debug ("Received notification: {}" , notification );
149- handleIncomingNotification (notification ). subscribe ( null ,
150- error -> logger .error ("Error handling notification: {}" , error .getMessage ()));
153+ return handleIncomingNotification (notification )
154+ . doOnError ( error -> logger .error ("Error handling notification: {}" , error .getMessage ()));
151155 }
152- })).subscribe ();
156+ else {
157+ logger .warn ("Received unknown message type: {}" , message );
158+ return Mono .empty ();
159+ }
160+ });
153161 }
154162
155163 /**
0 commit comments