|
12 | 12 | import org.reactivestreams.FlowAdapters; |
13 | 13 | import org.reactivestreams.Subscription; |
14 | 14 |
|
| 15 | +import io.modelcontextprotocol.spec.McpError; |
15 | 16 | import reactor.core.publisher.BaseSubscriber; |
16 | 17 | import reactor.core.publisher.FluxSink; |
17 | 18 |
|
@@ -135,45 +136,42 @@ protected void hookOnSubscribe(Subscription subscription) { |
135 | 136 |
|
136 | 137 | @Override |
137 | 138 | protected void hookOnNext(String line) { |
138 | | - if (this.responseInfo.statusCode() >= 200 && this.responseInfo.statusCode() < 300) { |
139 | 139 |
|
140 | | - if (line.isEmpty()) { |
141 | | - // Empty line means end of event |
142 | | - if (this.eventBuilder.length() > 0) { |
143 | | - String eventData = this.eventBuilder.toString(); |
144 | | - SseEvent sseEvent = new SseEvent(currentEventId.get(), currentEventType.get(), |
145 | | - eventData.trim()); |
| 140 | + if (line.isEmpty()) { |
| 141 | + // Empty line means end of event |
| 142 | + if (this.eventBuilder.length() > 0) { |
| 143 | + String eventData = this.eventBuilder.toString(); |
| 144 | + SseEvent sseEvent = new SseEvent(currentEventId.get(), currentEventType.get(), eventData.trim()); |
146 | 145 |
|
147 | | - this.sink.next(new SseResponseEvent(responseInfo, sseEvent)); |
148 | | - this.eventBuilder.setLength(0); |
149 | | - } |
| 146 | + this.sink.next(new SseResponseEvent(responseInfo, sseEvent)); |
| 147 | + this.eventBuilder.setLength(0); |
150 | 148 | } |
151 | | - else { |
152 | | - if (line.startsWith("data:")) { |
153 | | - var matcher = EVENT_DATA_PATTERN.matcher(line); |
154 | | - if (matcher.find()) { |
155 | | - this.eventBuilder.append(matcher.group(1).trim()).append("\n"); |
156 | | - } |
| 149 | + } |
| 150 | + else { |
| 151 | + if (line.startsWith("data:")) { |
| 152 | + var matcher = EVENT_DATA_PATTERN.matcher(line); |
| 153 | + if (matcher.find()) { |
| 154 | + this.eventBuilder.append(matcher.group(1).trim()).append("\n"); |
157 | 155 | } |
158 | | - else if (line.startsWith("id:")) { |
159 | | - var matcher = EVENT_ID_PATTERN.matcher(line); |
160 | | - if (matcher.find()) { |
161 | | - this.currentEventId.set(matcher.group(1).trim()); |
162 | | - } |
| 156 | + } |
| 157 | + else if (line.startsWith("id:")) { |
| 158 | + var matcher = EVENT_ID_PATTERN.matcher(line); |
| 159 | + if (matcher.find()) { |
| 160 | + this.currentEventId.set(matcher.group(1).trim()); |
163 | 161 | } |
164 | | - else if (line.startsWith("event:")) { |
165 | | - var matcher = EVENT_TYPE_PATTERN.matcher(line); |
166 | | - if (matcher.find()) { |
167 | | - this.currentEventType.set(matcher.group(1).trim()); |
168 | | - } |
| 162 | + } |
| 163 | + else if (line.startsWith("event:")) { |
| 164 | + var matcher = EVENT_TYPE_PATTERN.matcher(line); |
| 165 | + if (matcher.find()) { |
| 166 | + this.currentEventType.set(matcher.group(1).trim()); |
169 | 167 | } |
170 | 168 | } |
171 | | - } |
172 | | - else { |
173 | | - // If the response is not successful, emit an error |
174 | | - System.out.println("Received non-successful response: " + this.responseInfo.statusCode()); |
175 | | - SseEvent sseEvent = new SseEvent(null, null, null); |
176 | | - this.sink.next(new SseResponseEvent(responseInfo, sseEvent)); |
| 169 | + else { |
| 170 | + // If the response is not successful, emit an error |
| 171 | + this.sink.error(new McpError( |
| 172 | + "Invalid SSE response. Status code: " + this.responseInfo.statusCode() + " Line: " + line)); |
| 173 | + |
| 174 | + } |
177 | 175 | } |
178 | 176 | } |
179 | 177 |
|
|
0 commit comments