Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public class DefaultMcpClient implements McpClient {
private volatile String sessionId;
private volatile ServerSchema serverSchema;
private volatile boolean initialized = false;
private volatile boolean closed = false;
private final List<Tool> tools = new ArrayList<>();
private final Object initializedLock = LockUtils.newSynchronizedLock();
private final Object toolsLock = LockUtils.newSynchronizedLock();
Expand Down Expand Up @@ -99,6 +100,9 @@ public DefaultMcpClient(ObjectSerializer jsonSerializer, HttpClassicClient clien

@Override
public void initialize() {
if (this.closed) {
throw new IllegalStateException("The MCP client is closed.");
}
HttpClassicClientRequest request =
this.client.createRequest(HttpRequestMethod.GET, this.baseUri + this.sseEndpoint);
Choir<TextEvent> messages = this.client.exchangeStream(request, TextEvent.class);
Expand All @@ -116,8 +120,8 @@ public void initialize() {
.build();
messages.subscribeOn(threadPool).subscribe(subscription -> {
log.info("Prepare to create SSE channel.");
subscription.request(Long.MAX_VALUE);
this.subscription = subscription;
subscription.request(Long.MAX_VALUE);
},
(subscription, textEvent) -> this.consumeTextEvent(textEvent),
subscription -> log.info("SSE channel is completed."),
Expand Down Expand Up @@ -242,6 +246,9 @@ private void recordServerSchema(JsonRpc.Response<Long> response) {

@Override
public List<Tool> getTools() {
if (this.closed) {
throw new IllegalStateException("The MCP client is closed.");
}
if (this.isNotInitialized()) {
throw new IllegalStateException("MCP client is not initialized. Please wait a moment.");
}
Expand Down Expand Up @@ -278,6 +285,9 @@ private void getTools0(JsonRpc.Response<Long> response) {

@Override
public Object callTool(String name, Map<String, Object> arguments) {
if (this.closed) {
throw new IllegalStateException("The MCP client is closed.");
}
if (this.isNotInitialized()) {
throw new IllegalStateException("MCP client is not initialized. Please wait a moment.");
}
Expand Down Expand Up @@ -383,9 +393,14 @@ private boolean waitInitialized() {

@Override
public void close() throws IOException {
this.subscription.cancel();
this.closed = true;
if (this.subscription != null) {
this.subscription.cancel();
}
try {
this.pingScheduler.shutdown();
if (this.pingScheduler != null) {
this.pingScheduler.shutdown();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException(e);
Expand Down