@@ -401,93 +401,6 @@ else if (MESSAGE_EVENT_TYPE.equals(responseEvent.sseEvent().event())) {
401401 });
402402 }
403403
404- public Mono <Void > connect2 (Function <Mono <JSONRPCMessage >, Mono <JSONRPCMessage >> handler ) {
405-
406- return Mono .create (sink -> {
407-
408- HttpRequest request = requestBuilder .copy ()
409- .uri (Utils .resolveUri (this .baseUri , this .sseEndpoint ))
410- .header ("Accept" , "text/event-stream" )
411- .header ("Cache-Control" , "no-cache" )
412- .GET ()
413- .build ();
414-
415- Flux <ResponseSubscribers .SseResponseEvent > bla = Flux .<ResponseEvent >create (sseSink -> this .httpClient
416- .sendAsync (request , responseInfo -> ResponseSubscribers .sseToBodySubscriber (responseInfo , sseSink ))
417- .exceptionallyCompose (e -> {
418- sseSink .error (e );
419- return CompletableFuture .failedFuture (e );
420- })).map (responseEvent -> (ResponseSubscribers .SseResponseEvent ) responseEvent );
421-
422- Disposable connection = Flux .<ResponseEvent >create (sseSink -> this .httpClient
423- .sendAsync (request , responseInfo -> ResponseSubscribers .sseToBodySubscriber (responseInfo , sseSink ))
424- .exceptionallyCompose (e -> {
425- sseSink .error (e );
426- return CompletableFuture .failedFuture (e );
427- }))
428- .map (responseEvent -> (ResponseSubscribers .SseResponseEvent ) responseEvent )
429- .flatMap (responseEvent -> {
430- if (isClosing ) {
431- return Mono .empty ();
432- }
433-
434- int statusCode = responseEvent .responseInfo ().statusCode ();
435-
436- if (statusCode >= 200 && statusCode < 300 ) {
437- try {
438- if (ENDPOINT_EVENT_TYPE .equals (responseEvent .sseEvent ().event ())) {
439- String messageEndpointUri = responseEvent .sseEvent ().data ();
440- if (this .messageEndpointSink .tryEmitValue (messageEndpointUri ).isSuccess ()) {
441- sink .success ();
442- return Flux .empty (); // No further processing needed
443- }
444- else {
445- sink .error (new McpError ("Failed to handle SSE endpoint event" ));
446- }
447- }
448- else if (MESSAGE_EVENT_TYPE .equals (responseEvent .sseEvent ().event ())) {
449- JSONRPCMessage message = McpSchema .deserializeJsonRpcMessage (objectMapper ,
450- responseEvent .sseEvent ().data ());
451- sink .success ();
452- return Flux .just (message );
453- }
454- else {
455- logger .error ("Received unrecognized SSE event type: {}" ,
456- responseEvent .sseEvent ().event ());
457- sink .error (new McpError (
458- "Received unrecognized SSE event type: " + responseEvent .sseEvent ().event ()));
459- }
460- }
461- catch (IOException e ) {
462- logger .error ("Error processing SSE event" , e );
463- sink .error (new McpError ("Error processing SSE event" ));
464- }
465- }
466- return Flux .<McpSchema .JSONRPCMessage >error (
467- new RuntimeException ("Failed to send message: " + responseEvent ));
468-
469- })
470- .flatMap (jsonRpcMessage -> handler .apply (Mono .just (jsonRpcMessage )))
471- .onErrorComplete (t -> {
472- if (!isClosing ) {
473- logger .warn ("SSE stream observed an error" , t );
474- sink .error (t );
475- }
476- return true ;
477- })
478- .doFinally (s -> {
479- Disposable ref = this .sseSubscription .getAndSet (null );
480- if (ref != null && !ref .isDisposed ()) {
481- ref .dispose ();
482- }
483- })
484- .contextWrite (sink .contextView ())
485- .subscribe ();
486-
487- this .sseSubscription .set (connection );
488- });
489- }
490-
491404 /**
492405 * Sends a JSON-RPC message to the server.
493406 *
0 commit comments