diff --git a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/definition/RestOperationMeta.java b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/definition/RestOperationMeta.java index 9da6d06f32d..be091d8cd09 100644 --- a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/definition/RestOperationMeta.java +++ b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/definition/RestOperationMeta.java @@ -31,6 +31,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.servicecomb.common.rest.codec.RestObjectMapperFactory; import org.apache.servicecomb.common.rest.codec.param.FormProcessorCreator.PartProcessor; +import org.apache.servicecomb.common.rest.codec.produce.ProduceJsonProcessor; import org.apache.servicecomb.common.rest.codec.produce.ProduceProcessor; import org.apache.servicecomb.common.rest.codec.produce.ProduceProcessorManager; import org.apache.servicecomb.common.rest.definition.path.PathRegExp; @@ -88,8 +89,6 @@ public class RestOperationMeta { // 快速构建URL path private URLPathBuilder pathBuilder; - protected static final String EVENTS_MEDIA_TYPE = MediaType.SERVER_SENT_EVENTS; - public void init(OperationMeta operationMeta) { this.operationMeta = operationMeta; @@ -260,10 +259,11 @@ protected void doCreateProduceProcessors(Class serialViewClass) { ProduceProcessorManager.INSTANCE.getOrCreateAcceptMap(serialViewClass)); } else { for (String produce : produces) { - if (produce.contains(EVENTS_MEDIA_TYPE)) { - // When the produce type is event-stream, the ProduceEventStreamProcessor implementation class corresponding - // to event-stream is not added, and it is set to the default type ProduceJsonProcessor. - // In case of an exception, the response result is parsed. + if (produce.contains(MediaType.SERVER_SENT_EVENTS)) { + // The event-stream type initializes the ProduceJsonProcessor in memory to handle parsing results + // in exceptional scenarios. If a singleton parsing result is required in normal scenarios, + // adjustments need to be made here. + this.produceProcessorMap.put(produce, new ProduceJsonProcessor()); continue; } if (produce.contains(";")) { diff --git a/demo/demo-spring-boot-transport/demo-spring-boot-springmvc-client/src/main/java/org/apache/servicecomb/springboot/springmvc/client/ReactiveStreamIT.java b/demo/demo-spring-boot-transport/demo-spring-boot-springmvc-client/src/main/java/org/apache/servicecomb/springboot/springmvc/client/ReactiveStreamIT.java index 4830f872cdd..06c61501a29 100644 --- a/demo/demo-spring-boot-transport/demo-spring-boot-springmvc-client/src/main/java/org/apache/servicecomb/springboot/springmvc/client/ReactiveStreamIT.java +++ b/demo/demo-spring-boot-transport/demo-spring-boot-springmvc-client/src/main/java/org/apache/servicecomb/springboot/springmvc/client/ReactiveStreamIT.java @@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.StringUtils; +import org.apache.servicecomb.provider.springmvc.reference.RestTemplateBuilder; import org.apache.servicecomb.swagger.invocation.sse.SseEventResponseEntity; import org.apache.servicecomb.demo.CategorizedTestCase; import org.apache.servicecomb.demo.TestMgr; @@ -31,7 +32,14 @@ import org.reactivestreams.Subscription; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.http.HttpEntity; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpMethod; +import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Component; +import org.springframework.web.client.RestTemplate; + +import jakarta.ws.rs.core.MediaType; @Component public class ReactiveStreamIT implements CategorizedTestCase { @@ -39,6 +47,8 @@ public class ReactiveStreamIT implements CategorizedTestCase { @Qualifier("reactiveStreamProvider") ReactiveStreamClient reactiveStreamProvider; + private RestTemplate restTemplate = RestTemplateBuilder.create(); + @Override public void testRestTransport() throws Exception { testSseString(reactiveStreamProvider); @@ -46,6 +56,7 @@ public void testRestTransport() throws Exception { testSseModel(reactiveStreamProvider); testSseResponseEntity(reactiveStreamProvider); testSseMultipleData(reactiveStreamProvider); + sseStringWithAccept(); } private void testSseString(ReactiveStreamClient client) throws Exception { @@ -91,6 +102,48 @@ public void onComplete() { return buffer.toString(); } + private void sseStringWithAccept() throws Exception { + HttpHeaders headers = new HttpHeaders(); + headers.add(HttpHeaders.ACCEPT, MediaType.SERVER_SENT_EVENTS); + HttpEntity requestEntity = new HttpEntity<>(headers); + ResponseEntity responseEntity = restTemplate + .exchange("cse://springmvc/sseString", HttpMethod.GET, requestEntity, Publisher.class); + Publisher result = responseEntity.getBody(); + CountDownLatch countDownLatch = new CountDownLatch(1); + StringBuilder buffer = new StringBuilder(); + result.subscribe(new Subscriber<>() { + Subscription subscription; + + @Override + public void onSubscribe(Subscription s) { + subscription = s; + subscription.request(1); + } + + @Override + public void onNext(Object entity) { + SseEventResponseEntity response = (SseEventResponseEntity) entity; + for (String str : response.getData()) { + buffer.append(str); + } + subscription.request(1); + } + + @Override + public void onError(Throwable t) { + subscription.cancel(); + countDownLatch.countDown(); + } + + @Override + public void onComplete() { + countDownLatch.countDown(); + } + }); + countDownLatch.await(10, TimeUnit.SECONDS); + TestMgr.check("abc", buffer.toString()); + } + private void testSseModel(ReactiveStreamClient client) throws Exception { Publisher result = client.sseModel(); CountDownLatch countDownLatch = new CountDownLatch(1);