Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.servicecomb.common.rest.codec.RestObjectMapperFactory;
import org.apache.servicecomb.common.rest.codec.param.FormProcessorCreator.PartProcessor;
import org.apache.servicecomb.common.rest.codec.produce.ProduceJsonProcessor;
import org.apache.servicecomb.common.rest.codec.produce.ProduceProcessor;
import org.apache.servicecomb.common.rest.codec.produce.ProduceProcessorManager;
import org.apache.servicecomb.common.rest.definition.path.PathRegExp;
Expand Down Expand Up @@ -88,8 +89,6 @@ public class RestOperationMeta {
// 快速构建URL path
private URLPathBuilder pathBuilder;

protected static final String EVENTS_MEDIA_TYPE = MediaType.SERVER_SENT_EVENTS;

public void init(OperationMeta operationMeta) {
this.operationMeta = operationMeta;

Expand Down Expand Up @@ -260,10 +259,11 @@ protected void doCreateProduceProcessors(Class<?> serialViewClass) {
ProduceProcessorManager.INSTANCE.getOrCreateAcceptMap(serialViewClass));
} else {
for (String produce : produces) {
if (produce.contains(EVENTS_MEDIA_TYPE)) {
// When the produce type is event-stream, the ProduceEventStreamProcessor implementation class corresponding
// to event-stream is not added, and it is set to the default type ProduceJsonProcessor.
// In case of an exception, the response result is parsed.
if (produce.contains(MediaType.SERVER_SENT_EVENTS)) {
// The event-stream type initializes the ProduceJsonProcessor in memory to handle parsing results
// in exceptional scenarios. If a singleton parsing result is required in normal scenarios,
// adjustments need to be made here.
this.produceProcessorMap.put(produce, new ProduceJsonProcessor());
continue;
}
if (produce.contains(";")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.concurrent.TimeUnit;

import org.apache.commons.lang3.StringUtils;
import org.apache.servicecomb.provider.springmvc.reference.RestTemplateBuilder;
import org.apache.servicecomb.swagger.invocation.sse.SseEventResponseEntity;
import org.apache.servicecomb.demo.CategorizedTestCase;
import org.apache.servicecomb.demo.TestMgr;
Expand All @@ -31,21 +32,31 @@
import org.reactivestreams.Subscription;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;

import jakarta.ws.rs.core.MediaType;

@Component
public class ReactiveStreamIT implements CategorizedTestCase {
@Autowired
@Qualifier("reactiveStreamProvider")
ReactiveStreamClient reactiveStreamProvider;

private RestTemplate restTemplate = RestTemplateBuilder.create();

@Override
public void testRestTransport() throws Exception {
testSseString(reactiveStreamProvider);
testSseStringWithParam(reactiveStreamProvider);
testSseModel(reactiveStreamProvider);
testSseResponseEntity(reactiveStreamProvider);
testSseMultipleData(reactiveStreamProvider);
sseStringWithAccept();
}

private void testSseString(ReactiveStreamClient client) throws Exception {
Expand Down Expand Up @@ -91,6 +102,48 @@ public void onComplete() {
return buffer.toString();
}

private void sseStringWithAccept() throws Exception {
HttpHeaders headers = new HttpHeaders();
headers.add(HttpHeaders.ACCEPT, MediaType.SERVER_SENT_EVENTS);
HttpEntity<?> requestEntity = new HttpEntity<>(headers);
ResponseEntity<Publisher> responseEntity = restTemplate
.exchange("cse://springmvc/sseString", HttpMethod.GET, requestEntity, Publisher.class);
Publisher result = responseEntity.getBody();
CountDownLatch countDownLatch = new CountDownLatch(1);
StringBuilder buffer = new StringBuilder();
result.subscribe(new Subscriber<>() {
Subscription subscription;

@Override
public void onSubscribe(Subscription s) {
subscription = s;
subscription.request(1);
}

@Override
public void onNext(Object entity) {
SseEventResponseEntity<String> response = (SseEventResponseEntity<String>) entity;
for (String str : response.getData()) {
buffer.append(str);
}
subscription.request(1);
}

@Override
public void onError(Throwable t) {
subscription.cancel();
countDownLatch.countDown();
}

@Override
public void onComplete() {
countDownLatch.countDown();
}
});
countDownLatch.await(10, TimeUnit.SECONDS);
TestMgr.check("abc", buffer.toString());
}

private void testSseModel(ReactiveStreamClient client) throws Exception {
Publisher<Model> result = client.sseModel();
CountDownLatch countDownLatch = new CountDownLatch(1);
Expand Down
Loading