@@ -95,28 +95,30 @@ record Async(McpSchema.Implementation serverInfo, McpSchema.ServerCapabilities s
9595 * blocking code offloading to prevent accidental blocking of the non-blocking
9696 * transport.
9797 * @param syncSpec a potentially blocking, synchronous specification.
98+ * @param immediateExecution when true, do not offload. Only use if you are sure
99+ * you are in a blocking context.
98100 * @return a specification which is protected from blocking calls specified by the
99101 * user.
100102 */
101- static Async fromSync (Sync syncSpec ) {
103+ static Async fromSync (Sync syncSpec , boolean immediateExecution ) {
102104 List <McpServerFeatures .AsyncToolSpecification > tools = new ArrayList <>();
103105 for (var tool : syncSpec .tools ()) {
104- tools .add (AsyncToolSpecification .fromSync (tool ));
106+ tools .add (AsyncToolSpecification .fromSync (tool , immediateExecution ));
105107 }
106108
107109 Map <String , AsyncResourceSpecification > resources = new HashMap <>();
108110 syncSpec .resources ().forEach ((key , resource ) -> {
109- resources .put (key , AsyncResourceSpecification .fromSync (resource ));
111+ resources .put (key , AsyncResourceSpecification .fromSync (resource , immediateExecution ));
110112 });
111113
112114 Map <String , AsyncPromptSpecification > prompts = new HashMap <>();
113115 syncSpec .prompts ().forEach ((key , prompt ) -> {
114- prompts .put (key , AsyncPromptSpecification .fromSync (prompt ));
116+ prompts .put (key , AsyncPromptSpecification .fromSync (prompt , immediateExecution ));
115117 });
116118
117119 Map <McpSchema .CompleteReference , McpServerFeatures .AsyncCompletionSpecification > completions = new HashMap <>();
118120 syncSpec .completions ().forEach ((key , completion ) -> {
119- completions .put (key , AsyncCompletionSpecification .fromSync (completion ));
121+ completions .put (key , AsyncCompletionSpecification .fromSync (completion , immediateExecution ));
120122 });
121123
122124 List <BiFunction <McpAsyncServerExchange , List <McpSchema .Root >, Mono <Void >>> rootChangeConsumers = new ArrayList <>();
@@ -239,15 +241,15 @@ record Sync(McpSchema.Implementation serverInfo, McpSchema.ServerCapabilities se
239241 public record AsyncToolSpecification (McpSchema .Tool tool ,
240242 BiFunction <McpAsyncServerExchange , Map <String , Object >, Mono <McpSchema .CallToolResult >> call ) {
241243
242- static AsyncToolSpecification fromSync (SyncToolSpecification tool ) {
244+ static AsyncToolSpecification fromSync (SyncToolSpecification tool , boolean immediate ) {
243245 // FIXME: This is temporary, proper validation should be implemented
244246 if (tool == null ) {
245247 return null ;
246248 }
247- return new AsyncToolSpecification (tool .tool (),
248- ( exchange , map ) -> Mono
249- . fromCallable (() -> tool . call (). apply ( new McpSyncServerExchange ( exchange ), map ))
250- . subscribeOn ( Schedulers . boundedElastic ()) );
249+ return new AsyncToolSpecification (tool .tool (), ( exchange , map ) -> {
250+ var toolResult = Mono . fromCallable (( ) -> tool . call (). apply ( new McpSyncServerExchange ( exchange ), map ));
251+ return immediate ? toolResult : toolResult . subscribeOn ( Schedulers . boundedElastic ());
252+ } );
251253 }
252254 }
253255
@@ -281,15 +283,16 @@ static AsyncToolSpecification fromSync(SyncToolSpecification tool) {
281283 public record AsyncResourceSpecification (McpSchema .Resource resource ,
282284 BiFunction <McpAsyncServerExchange , McpSchema .ReadResourceRequest , Mono <McpSchema .ReadResourceResult >> readHandler ) {
283285
284- static AsyncResourceSpecification fromSync (SyncResourceSpecification resource ) {
286+ static AsyncResourceSpecification fromSync (SyncResourceSpecification resource , boolean immediateExecution ) {
285287 // FIXME: This is temporary, proper validation should be implemented
286288 if (resource == null ) {
287289 return null ;
288290 }
289- return new AsyncResourceSpecification (resource .resource (),
290- (exchange , req ) -> Mono
291- .fromCallable (() -> resource .readHandler ().apply (new McpSyncServerExchange (exchange ), req ))
292- .subscribeOn (Schedulers .boundedElastic ()));
291+ return new AsyncResourceSpecification (resource .resource (), (exchange , req ) -> {
292+ var resourceResult = Mono
293+ .fromCallable (() -> resource .readHandler ().apply (new McpSyncServerExchange (exchange ), req ));
294+ return immediateExecution ? resourceResult : resourceResult .subscribeOn (Schedulers .boundedElastic ());
295+ });
293296 }
294297 }
295298
@@ -327,15 +330,16 @@ static AsyncResourceSpecification fromSync(SyncResourceSpecification resource) {
327330 public record AsyncPromptSpecification (McpSchema .Prompt prompt ,
328331 BiFunction <McpAsyncServerExchange , McpSchema .GetPromptRequest , Mono <McpSchema .GetPromptResult >> promptHandler ) {
329332
330- static AsyncPromptSpecification fromSync (SyncPromptSpecification prompt ) {
333+ static AsyncPromptSpecification fromSync (SyncPromptSpecification prompt , boolean immediateExecution ) {
331334 // FIXME: This is temporary, proper validation should be implemented
332335 if (prompt == null ) {
333336 return null ;
334337 }
335- return new AsyncPromptSpecification (prompt .prompt (),
336- (exchange , req ) -> Mono
337- .fromCallable (() -> prompt .promptHandler ().apply (new McpSyncServerExchange (exchange ), req ))
338- .subscribeOn (Schedulers .boundedElastic ()));
338+ return new AsyncPromptSpecification (prompt .prompt (), (exchange , req ) -> {
339+ var promptResult = Mono
340+ .fromCallable (() -> prompt .promptHandler ().apply (new McpSyncServerExchange (exchange ), req ));
341+ return immediateExecution ? promptResult : promptResult .subscribeOn (Schedulers .boundedElastic ());
342+ });
339343 }
340344 }
341345
@@ -366,14 +370,17 @@ public record AsyncCompletionSpecification(McpSchema.CompleteReference reference
366370 * @return an asynchronous wrapper of the provided sync specification, or
367371 * {@code null} if input is null
368372 */
369- static AsyncCompletionSpecification fromSync (SyncCompletionSpecification completion ) {
373+ static AsyncCompletionSpecification fromSync (SyncCompletionSpecification completion ,
374+ boolean immediateExecution ) {
370375 if (completion == null ) {
371376 return null ;
372377 }
373- return new AsyncCompletionSpecification (completion .referenceKey (),
374- (exchange , request ) -> Mono .fromCallable (
375- () -> completion .completionHandler ().apply (new McpSyncServerExchange (exchange ), request ))
376- .subscribeOn (Schedulers .boundedElastic ()));
378+ return new AsyncCompletionSpecification (completion .referenceKey (), (exchange , request ) -> {
379+ var completionResult = Mono .fromCallable (
380+ () -> completion .completionHandler ().apply (new McpSyncServerExchange (exchange ), request ));
381+ return immediateExecution ? completionResult
382+ : completionResult .subscribeOn (Schedulers .boundedElastic ());
383+ });
377384 }
378385 }
379386
0 commit comments