1919import io .modelcontextprotocol .spec .McpSchema .ClientCapabilities ;
2020import io .modelcontextprotocol .spec .McpSchema .InitializeResult ;
2121import io .modelcontextprotocol .spec .McpSchema .Root ;
22- import org .junit .jupiter .api .Disabled ;
2322import org .junit .jupiter .api .Test ;
2423import reactor .core .publisher .Mono ;
2524
3029class McpAsyncClientResponseHandlerTests {
3130
3231 private InitializeResult initialization (McpAsyncClient asyncMcpClient , MockMcpTransport transport ) {
33-
3432 // Create mock server response
35- McpSchema .ServerCapabilities mockServerCapabilities = McpSchema .ServerCapabilities .builder ()
36- .tools (true )
37- .resources (true , true ) // Enable both resources and resource templates
38- .build ();
39- McpSchema .Implementation mockServerInfo = new McpSchema .Implementation ("test-server" , "1.0.0" );
4033 McpSchema .InitializeResult mockInitResult = new McpSchema .InitializeResult (McpSchema .LATEST_PROTOCOL_VERSION ,
41- mockServerCapabilities , mockServerInfo , "Test instructions" );
42-
43- Mono <McpSchema .InitializeResult > initMono = asyncMcpClient .initialize ();
44-
45- new Thread (new Runnable () {
46- @ Override
47- public void run () {
34+ McpSchema .ServerCapabilities .builder ()
35+ .tools (true )
36+ .resources (true , true ) // Enable both resources and resource templates
37+ .build (),
38+ new McpSchema .Implementation ("test-server" , "1.0.0" ), "Test instructions" );
39+
40+ // Use CountDownLatch to coordinate between threads
41+ java .util .concurrent .CountDownLatch latch = new java .util .concurrent .CountDownLatch (1 );
42+
43+ // Create a Mono that will handle the initialization and response simulation
44+ return asyncMcpClient .initialize ().doOnSubscribe (subscription -> {
45+ // Run in a separate reactive context to avoid blocking the main subscription
46+ Mono .fromRunnable (() -> {
4847 McpSchema .JSONRPCRequest initRequest = transport .getLastSentMessageAsRequest ();
4948 assertThat (initRequest .method ()).isEqualTo (McpSchema .METHOD_INITIALIZE );
5049
5150 // Send mock server response
5251 McpSchema .JSONRPCResponse initResponse = new McpSchema .JSONRPCResponse (McpSchema .JSONRPC_VERSION ,
5352 initRequest .id (), mockInitResult , null );
5453 transport .simulateIncomingMessage (initResponse );
54+ latch .countDown ();
55+ }).subscribeOn (reactor .core .scheduler .Schedulers .boundedElastic ()).subscribe ();
56+ }).doOnTerminate (() -> {
57+ try {
58+ // Wait for the response simulation to complete
59+ latch .await (5 , java .util .concurrent .TimeUnit .SECONDS );
5560 }
56- }).start ();
57-
58- return initMono .block ();
61+ catch (InterruptedException e ) {
62+ Thread .currentThread ().interrupt ();
63+ throw new RuntimeException ("Interrupted while waiting for initialization" , e );
64+ }
65+ }).block ();
5966 }
6067
6168 @ Test
@@ -75,23 +82,32 @@ void testSuccessfulInitialization() {
7582 McpSchema .InitializeResult mockInitResult = new McpSchema .InitializeResult (McpSchema .LATEST_PROTOCOL_VERSION ,
7683 mockServerCapabilities , mockServerInfo , "Test instructions" );
7784
78- // Start initialization
79- Mono < McpSchema . InitializeResult > initMono = asyncMcpClient . initialize ( );
85+ // Use CountDownLatch to coordinate between threads
86+ java . util . concurrent . CountDownLatch latch = new java . util . concurrent . CountDownLatch ( 1 );
8087
81- new Thread (new Runnable () {
82- @ Override
83- public void run () {
88+ // Start initialization with reactive handling
89+ InitializeResult result = asyncMcpClient .initialize ().doOnSubscribe (subscription -> {
90+ // Run in a separate reactive context to avoid blocking the main subscription
91+ Mono .fromRunnable (() -> {
8492 McpSchema .JSONRPCRequest initRequest = transport .getLastSentMessageAsRequest ();
8593 assertThat (initRequest .method ()).isEqualTo (McpSchema .METHOD_INITIALIZE );
8694
8795 // Send mock server response
8896 McpSchema .JSONRPCResponse initResponse = new McpSchema .JSONRPCResponse (McpSchema .JSONRPC_VERSION ,
8997 initRequest .id (), mockInitResult , null );
9098 transport .simulateIncomingMessage (initResponse );
99+ latch .countDown ();
100+ }).subscribeOn (reactor .core .scheduler .Schedulers .boundedElastic ()).subscribe ();
101+ }).doOnTerminate (() -> {
102+ try {
103+ // Wait for the response simulation to complete
104+ latch .await (5 , java .util .concurrent .TimeUnit .SECONDS );
91105 }
92- }).start ();
93-
94- InitializeResult result = initMono .block ();
106+ catch (InterruptedException e ) {
107+ Thread .currentThread ().interrupt ();
108+ throw new RuntimeException ("Interrupted while waiting for initialization" , e );
109+ }
110+ }).block ();
95111
96112 // Verify initialized notification was sent
97113 McpSchema .JSONRPCMessage notificationMessage = transport .getLastSentMessage ();
0 commit comments