Skip to content

Commit 255dee9

Browse files
committed
add response parsing
1 parent 2b32da4 commit 255dee9

File tree

23 files changed

+165
-87
lines changed

23 files changed

+165
-87
lines changed

common/common-rest/src/main/java/org/apache/servicecomb/common/rest/AbstractRestInvocation.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -351,7 +351,7 @@ protected void onExecuteHttpServerFiltersFinish(Response response, Throwable e)
351351

352352
if (!(response.getResult() instanceof ServerWebSocket)) {
353353
try {
354-
responseEx.flushBuffer();
354+
responseEx.endResponse();
355355
} catch (Throwable flushException) {
356356
LOGGER.error("Failed to flush rest response, operation:{}, request uri:{}",
357357
getMicroserviceQualifiedName(), requestEx.getRequestURI(), flushException);

common/common-rest/src/main/java/org/apache/servicecomb/common/rest/RestProducerInvocationFlow.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -53,24 +53,24 @@ protected Invocation sendCreateInvocationException(Throwable throwable) {
5353
requestEx.getRequestURI(), e);
5454
}
5555

56-
flushResponse("UNKNOWN_OPERATION");
56+
endResponse("UNKNOWN_OPERATION");
5757
return null;
5858
}
5959

6060
@Override
6161
protected void sendResponse(Invocation invocation, Response response) {
6262
if (isDownloadFileResponseType(invocation, response)) {
6363
responseEx.sendPart(PartUtils.getSinglePart(null, response.getResult()))
64-
.whenComplete((r, e) -> flushResponse(invocation.getMicroserviceQualifiedName()));
64+
.whenComplete((r, e) -> endResponse(invocation.getMicroserviceQualifiedName()));
6565
return;
6666
}
6767

68-
flushResponse(invocation.getMicroserviceQualifiedName());
68+
endResponse(invocation.getMicroserviceQualifiedName());
6969
}
7070

71-
private void flushResponse(String operationName) {
71+
private void endResponse(String operationName) {
7272
try {
73-
responseEx.flushBuffer();
73+
responseEx.endResponse();
7474
} catch (Throwable flushException) {
7575
LOGGER.error("Failed to flush rest response, operation:{}, request uri:{}",
7676
operationName, requestEx.getRequestURI(), flushException);

common/common-rest/src/main/java/org/apache/servicecomb/common/rest/codec/produce/ProduceEventStreamProcessor.java

Lines changed: 2 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -20,20 +20,16 @@
2020
import java.io.InputStream;
2121
import java.io.OutputStream;
2222
import java.nio.charset.StandardCharsets;
23-
import java.util.Arrays;
2423

2524
import org.apache.commons.lang3.StringUtils;
2625
import org.apache.servicecomb.common.rest.codec.RestObjectMapperFactory;
27-
import org.slf4j.Logger;
28-
import org.slf4j.LoggerFactory;
26+
import org.apache.servicecomb.swagger.invocation.sse.SseEventResponseEntity;
2927

3028
import com.fasterxml.jackson.databind.JavaType;
3129

3230
import jakarta.ws.rs.core.MediaType;
3331

3432
public class ProduceEventStreamProcessor implements ProduceProcessor {
35-
private static final Logger LOGGER = LoggerFactory.getLogger(ProduceEventStreamProcessor.class);
36-
3733
private int writeIndex = 0;
3834

3935
@Override
@@ -65,15 +61,6 @@ public void doEncodeResponse(OutputStream output, Object result) throws Exceptio
6561
@Override
6662
public Object doDecodeResponse(InputStream input, JavaType type) throws Exception {
6763
String buffer = new String(input.readAllBytes(), StandardCharsets.UTF_8);
68-
LOGGER.info("=========doDecodeResponse buffer===================>" + buffer + "stack: {}", Arrays.toString(
69-
new Exception().getStackTrace()));
70-
if (isResponseEntity(type)) {
71-
return parseAsSseEventResponseEntity(buffer, type);
72-
}
73-
return parseOriginObject(buffer, type);
74-
}
75-
76-
private Object parseAsSseEventResponseEntity(String buffer, JavaType type) throws Exception {
7764
SseEventResponseEntity<?> responseEntity = new SseEventResponseEntity<>();
7865
for (String line : buffer.split("\n")) {
7966
if (line.startsWith("eventId: ")) {
@@ -90,30 +77,12 @@ private Object parseAsSseEventResponseEntity(String buffer, JavaType type) throw
9077
}
9178
if (line.startsWith("data: ")) {
9279
responseEntity.data(RestObjectMapperFactory.getRestObjectMapper()
93-
.readValue(line.substring(6), getParsObjectType(type)));
80+
.readValue(line.substring(6), type));
9481
}
9582
}
9683
return responseEntity;
9784
}
9885

99-
private Object parseOriginObject(String buffer, JavaType type) throws Exception {
100-
for (String line : buffer.split("\n")) {
101-
if (line.startsWith("data: ")) {
102-
return RestObjectMapperFactory.getRestObjectMapper()
103-
.readValue(line.substring(6), type);
104-
}
105-
}
106-
return null;
107-
}
108-
109-
private boolean isResponseEntity(JavaType type) {
110-
return type.getRawClass().getName().equals(SseEventResponseEntity.class.getName());
111-
}
112-
113-
private JavaType getParsObjectType(JavaType type) {
114-
return type.getBindings().getBoundType(0);
115-
}
116-
11786
@Override
11887
public void refreshEventId(int index) {
11988
this.writeIndex = index;

common/common-rest/src/main/java/org/apache/servicecomb/common/rest/filter/inner/ServerRestArgsFilter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ private static CompletableFuture<Response> writeResponse(
169169
result.completeExceptionally(e);
170170
}
171171
try {
172-
responseEx.flushStreamBuffer();
172+
responseEx.flushBuffer();
173173
} catch (IOException ex) {
174174
LOGGER.warn("Failed to flush buffer for Server Send Events", ex);
175175
}

common/common-rest/src/test/java/org/apache/servicecomb/common/rest/TestAbstractRestInvocation.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -916,7 +916,7 @@ public void setStatus(int sc) {
916916
}
917917

918918
@Override
919-
public void flushBuffer() {
919+
public void endResponse() {
920920
endCount.value = endCount.value + 1;
921921
}
922922

@@ -956,7 +956,7 @@ public void setStatus(int sc) {
956956
}
957957

958958
@Override
959-
public void flushBuffer() {
959+
public void endResponse() {
960960
endCount.value = endCount.value + 1;
961961
}
962962

demo/demo-spring-boot-transport/demo-spring-boot-springmvc-client/src/main/java/org/apache/servicecomb/springboot/springmvc/client/ReactiveStreamIT.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import java.util.concurrent.TimeUnit;
2222

2323
import org.apache.commons.lang3.StringUtils;
24-
import org.apache.servicecomb.common.rest.codec.produce.SseEventResponseEntity;
24+
import org.apache.servicecomb.swagger.invocation.sse.SseEventResponseEntity;
2525
import org.apache.servicecomb.demo.CategorizedTestCase;
2626
import org.apache.servicecomb.demo.TestMgr;
2727
import org.apache.servicecomb.demo.model.Model;
@@ -75,21 +75,18 @@ public void onSubscribe(Subscription s) {
7575

7676
@Override
7777
public void onNext(String s) {
78-
LOGGER.info("=========buildBufferString result===================>" + s);
7978
buffer.append(s);
8079
subscription.request(1);
8180
}
8281

8382
@Override
8483
public void onError(Throwable t) {
85-
LOGGER.info("=========buildBufferString onError===================>");
8684
subscription.cancel();
8785
countDownLatch.countDown();
8886
}
8987

9088
@Override
9189
public void onComplete() {
92-
LOGGER.info("=========buildBufferString onComplete===================>");
9390
countDownLatch.countDown();
9491
}
9592
});
@@ -113,20 +110,17 @@ public void onSubscribe(Subscription s) {
113110
@Override
114111
public void onNext(Model s) {
115112
buffer.append(s.getName()).append(s.getAge());
116-
LOGGER.info("=========testSseModel result===================>" + buffer);
117113
subscription.request(1);
118114
}
119115

120116
@Override
121117
public void onError(Throwable t) {
122-
LOGGER.info("=========testSseModel error===================>");
123118
subscription.cancel();
124119
countDownLatch.countDown();
125120
}
126121

127122
@Override
128123
public void onComplete() {
129-
LOGGER.info("=========testSseModel onComplete===================>");
130124
countDownLatch.countDown();
131125
}
132126
});

demo/demo-spring-boot-transport/demo-spring-boot-springmvc-client/src/main/java/org/apache/servicecomb/springboot/springmvc/client/ThirdSvcConfiguration.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.servicecomb.springboot.springmvc.client;
1919

20-
import org.apache.servicecomb.common.rest.codec.produce.SseEventResponseEntity;
20+
import org.apache.servicecomb.swagger.invocation.sse.SseEventResponseEntity;
2121
import org.apache.servicecomb.demo.model.Model;
2222
import org.apache.servicecomb.provider.pojo.Invoker;
2323
import org.reactivestreams.Publisher;

demo/demo-spring-boot-transport/demo-spring-boot-springmvc-server/src/main/java/org/apache/servicecomb/springboot/springmvc/server/ReactiveStreamController.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import java.util.concurrent.TimeUnit;
2020
import java.util.concurrent.atomic.AtomicInteger;
2121

22-
import org.apache.servicecomb.common.rest.codec.produce.SseEventResponseEntity;
22+
import org.apache.servicecomb.swagger.invocation.sse.SseEventResponseEntity;
2323
import org.apache.servicecomb.demo.model.Model;
2424
import org.apache.servicecomb.provider.rest.common.RestSchema;
2525
import org.reactivestreams.Publisher;

foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/AbstractHttpServletResponse.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,4 +228,9 @@ public CompletableFuture<Void> sendPart(Part body) {
228228
public CompletableFuture<Void> sendBuffer(Buffer buffer) {
229229
throw new Error("not supported method");
230230
}
231+
232+
@Override
233+
public void endResponse() throws IOException {
234+
throw new Error("not supported method");
235+
}
231236
}

foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/HttpServletResponseEx.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,5 +47,5 @@ default Flowable<Buffer> getFlowableBuffer() {
4747
return null;
4848
}
4949

50-
default void flushStreamBuffer() throws IOException {};
50+
void endResponse() throws IOException;
5151
}

0 commit comments

Comments
 (0)