44
55package io .modelcontextprotocol ;
66
7- import java .util .concurrent .atomic .AtomicInteger ;
7+ import java .util .ArrayList ;
8+ import java .util .List ;
9+ import java .util .function .BiConsumer ;
810import java .util .function .Function ;
911
1012import com .fasterxml .jackson .core .type .TypeReference ;
1113import com .fasterxml .jackson .databind .ObjectMapper ;
1214import io .modelcontextprotocol .spec .ClientMcpTransport ;
1315import io .modelcontextprotocol .spec .McpSchema ;
16+ import io .modelcontextprotocol .spec .ServerMcpTransport ;
1417import io .modelcontextprotocol .spec .McpSchema .JSONRPCNotification ;
1518import io .modelcontextprotocol .spec .McpSchema .JSONRPCRequest ;
16- import io .modelcontextprotocol .spec .ServerMcpTransport ;
17- import reactor .core .publisher .Flux ;
1819import reactor .core .publisher .Mono ;
1920import reactor .core .publisher .Sinks ;
20- import reactor .core .scheduler .Schedulers ;
2121
2222/**
2323 * A mock implementation of the {@link ClientMcpTransport} and {@link ServerMcpTransport}
2424 * interfaces.
2525 */
2626public class MockMcpTransport implements ClientMcpTransport , ServerMcpTransport {
2727
28- private final AtomicInteger inboundMessageCount = new AtomicInteger (0 );
29-
30- private final Sinks .Many <McpSchema .JSONRPCMessage > outgoing = Sinks .many ().multicast ().onBackpressureBuffer ();
31-
3228 private final Sinks .Many <McpSchema .JSONRPCMessage > inbound = Sinks .many ().unicast ().onBackpressureBuffer ();
3329
34- private final Flux <McpSchema .JSONRPCMessage > outboundView = outgoing . asFlux (). cache ( 1 );
30+ private final List <McpSchema .JSONRPCMessage > sent = new ArrayList <>( );
3531
36- // Latch to wait for the next message(s) to be sent in response of simulated incoming
37- // message
38- java .util .concurrent .CountDownLatch latch = new java .util .concurrent .CountDownLatch (1 );
32+ private final BiConsumer <MockMcpTransport , McpSchema .JSONRPCMessage > interceptor ;
3933
40- public void simulateIncomingMessage (McpSchema .JSONRPCMessage message ) {
41- simulateIncomingMessage (message , 1 );
34+ public MockMcpTransport () {
35+ this ((t , msg ) -> {
36+ });
4237 }
4338
44- public void simulateIncomingMessage (McpSchema .JSONRPCMessage message , int expectedResponseMessagesCount ) {
39+ public MockMcpTransport (BiConsumer <MockMcpTransport , McpSchema .JSONRPCMessage > interceptor ) {
40+ this .interceptor = interceptor ;
41+ }
42+
43+ public void simulateIncomingMessage (McpSchema .JSONRPCMessage message ) {
4544 if (inbound .tryEmitNext (message ).isFailure ()) {
46- throw new RuntimeException ("Failed to emit message " + message );
45+ throw new RuntimeException ("Failed to process incoming message " + message );
4746 }
48- inboundMessageCount .incrementAndGet ();
49- latch = new java .util .concurrent .CountDownLatch (expectedResponseMessagesCount );
5047 }
5148
5249 @ Override
5350 public Mono <Void > sendMessage (McpSchema .JSONRPCMessage message ) {
54- if (outgoing .tryEmitNext (message ).isFailure ()) {
55- return Mono .error (new RuntimeException ("Can't emit outgoing message " + message ));
56- }
57- latch .countDown ();
51+ sent .add (message );
52+ interceptor .accept (this , message );
5853 return Mono .empty ();
5954 }
6055
6156 public McpSchema .JSONRPCRequest getLastSentMessageAsRequest () {
6257 return (JSONRPCRequest ) getLastSentMessage ();
6358 }
6459
65- public McpSchema .JSONRPCNotification getLastSentMessageAsNotifiation () {
60+ public McpSchema .JSONRPCNotification getLastSentMessageAsNotification () {
6661 return (JSONRPCNotification ) getLastSentMessage ();
6762 }
6863
6964 public McpSchema .JSONRPCMessage getLastSentMessage () {
70- try {
71- latch .await ();
72- }
73- catch (InterruptedException e ) {
74- e .printStackTrace ();
75- }
76- return outboundView .blockFirst ();
65+ return !sent .isEmpty () ? sent .get (sent .size () - 1 ) : null ;
7766 }
7867
7968 private volatile boolean connected = false ;
@@ -85,7 +74,6 @@ public Mono<Void> connect(Function<Mono<McpSchema.JSONRPCMessage>, Mono<McpSchem
8574 }
8675 connected = true ;
8776 return inbound .asFlux ()
88- .publishOn (Schedulers .boundedElastic ())
8977 .flatMap (message -> Mono .just (message ).transform (handler ))
9078 .doFinally (signal -> connected = false )
9179 .then ();
@@ -95,7 +83,6 @@ public Mono<Void> connect(Function<Mono<McpSchema.JSONRPCMessage>, Mono<McpSchem
9583 public Mono <Void > closeGracefully () {
9684 return Mono .defer (() -> {
9785 connected = false ;
98- outgoing .tryEmitComplete ();
9986 inbound .tryEmitComplete ();
10087 // Wait for all subscribers to complete
10188 return Mono .empty ();
0 commit comments