|
17 | 17 | import reactor.core.Disposable; |
18 | 18 | import reactor.core.publisher.Mono; |
19 | 19 | import reactor.core.publisher.MonoSink; |
| 20 | +import reactor.core.scheduler.Schedulers; |
20 | 21 |
|
21 | 22 | /** |
22 | 23 | * Default implementation of the MCP (Model Context Protocol) session that manages |
@@ -135,13 +136,13 @@ public DefaultMcpSession(Duration requestTimeout, McpTransport transport, |
135 | 136 | } |
136 | 137 | else if (message instanceof McpSchema.JSONRPCRequest request) { |
137 | 138 | logger.debug("Received request: {}", request); |
138 | | - handleIncomingRequest(request).subscribe(response -> transport.sendMessage(response).subscribe(), |
139 | | - error -> { |
140 | | - var errorResponse = new McpSchema.JSONRPCResponse(McpSchema.JSONRPC_VERSION, request.id(), |
141 | | - null, new McpSchema.JSONRPCResponse.JSONRPCError( |
142 | | - McpSchema.ErrorCodes.INTERNAL_ERROR, error.getMessage(), null)); |
143 | | - transport.sendMessage(errorResponse).subscribe(); |
144 | | - }); |
| 139 | + handleIncomingRequest(request).flatMap(transport::sendMessage).onErrorResume(error -> { |
| 140 | + var errorResponse = new McpSchema.JSONRPCResponse(McpSchema.JSONRPC_VERSION, request.id(), null, |
| 141 | + new McpSchema.JSONRPCResponse.JSONRPCError(McpSchema.ErrorCodes.INTERNAL_ERROR, |
| 142 | + error.getMessage(), null)); |
| 143 | + return transport.sendMessage(errorResponse); |
| 144 | + }).subscribe(); |
| 145 | + |
145 | 146 | } |
146 | 147 | else if (message instanceof McpSchema.JSONRPCNotification notification) { |
147 | 148 | logger.debug("Received notification: {}", notification); |
|
0 commit comments