From cda8829523d956109c66fc0da73e7409916c023b Mon Sep 17 00:00:00 2001 From: chengyouling Date: Tue, 9 Sep 2025 19:56:40 +0800 Subject: [PATCH 1/3] [#3934] fixed client not support sets accept:text/event-stream problem --- .../rest/definition/RestOperationMeta.java | 8 --- .../filter/inner/ServerRestArgsFilter.java | 6 +++ .../springmvc/client/ReactiveStreamIT.java | 53 +++++++++++++++++++ 3 files changed, 59 insertions(+), 8 deletions(-) diff --git a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/definition/RestOperationMeta.java b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/definition/RestOperationMeta.java index 9da6d06f32d..5061ef783cc 100644 --- a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/definition/RestOperationMeta.java +++ b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/definition/RestOperationMeta.java @@ -88,8 +88,6 @@ public class RestOperationMeta { // 快速构建URL path private URLPathBuilder pathBuilder; - protected static final String EVENTS_MEDIA_TYPE = MediaType.SERVER_SENT_EVENTS; - public void init(OperationMeta operationMeta) { this.operationMeta = operationMeta; @@ -260,12 +258,6 @@ protected void doCreateProduceProcessors(Class serialViewClass) { ProduceProcessorManager.INSTANCE.getOrCreateAcceptMap(serialViewClass)); } else { for (String produce : produces) { - if (produce.contains(EVENTS_MEDIA_TYPE)) { - // When the produce type is event-stream, the ProduceEventStreamProcessor implementation class corresponding - // to event-stream is not added, and it is set to the default type ProduceJsonProcessor. - // In case of an exception, the response result is parsed. - continue; - } if (produce.contains(";")) { produce = produce.substring(0, produce.indexOf(";")); } diff --git a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/filter/inner/ServerRestArgsFilter.java b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/filter/inner/ServerRestArgsFilter.java index ee97dcce207..f02d3992ff1 100644 --- a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/filter/inner/ServerRestArgsFilter.java +++ b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/filter/inner/ServerRestArgsFilter.java @@ -26,6 +26,7 @@ import org.apache.servicecomb.common.rest.RestConst; import org.apache.servicecomb.common.rest.codec.RestCodec; import org.apache.servicecomb.common.rest.codec.produce.ProduceEventStreamProcessor; +import org.apache.servicecomb.common.rest.codec.produce.ProduceJsonProcessor; import org.apache.servicecomb.common.rest.codec.produce.ProduceProcessor; import org.apache.servicecomb.common.rest.definition.RestOperationMeta; import org.apache.servicecomb.common.rest.filter.HttpServerFilter; @@ -48,6 +49,7 @@ import com.netflix.config.DynamicPropertyFactory; import io.vertx.core.buffer.Buffer; +import jakarta.ws.rs.core.MediaType; public class ServerRestArgsFilter implements HttpServerFilter { private static final boolean enabled = DynamicPropertyFactory.getInstance().getBooleanProperty @@ -90,6 +92,10 @@ public CompletableFuture beforeSendResponseAsync(Invocation invocation, Ht return writeServerSendEvent(invocation, response, produceProcessor, responseEx); } + if (failed && MediaType.SERVER_SENT_EVENTS.equals(produceProcessor.getName())) { + produceProcessor = new ProduceJsonProcessor(); + } + responseEx.setContentType(produceProcessor.getName() + "; charset=utf-8"); CompletableFuture future = new CompletableFuture<>(); try (BufferOutputStream output = new BufferOutputStream(Buffer.buffer())) { diff --git a/demo/demo-spring-boot-transport/demo-spring-boot-springmvc-client/src/main/java/org/apache/servicecomb/springboot/springmvc/client/ReactiveStreamIT.java b/demo/demo-spring-boot-transport/demo-spring-boot-springmvc-client/src/main/java/org/apache/servicecomb/springboot/springmvc/client/ReactiveStreamIT.java index 4830f872cdd..06c61501a29 100644 --- a/demo/demo-spring-boot-transport/demo-spring-boot-springmvc-client/src/main/java/org/apache/servicecomb/springboot/springmvc/client/ReactiveStreamIT.java +++ b/demo/demo-spring-boot-transport/demo-spring-boot-springmvc-client/src/main/java/org/apache/servicecomb/springboot/springmvc/client/ReactiveStreamIT.java @@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.StringUtils; +import org.apache.servicecomb.provider.springmvc.reference.RestTemplateBuilder; import org.apache.servicecomb.swagger.invocation.sse.SseEventResponseEntity; import org.apache.servicecomb.demo.CategorizedTestCase; import org.apache.servicecomb.demo.TestMgr; @@ -31,7 +32,14 @@ import org.reactivestreams.Subscription; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.http.HttpEntity; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpMethod; +import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Component; +import org.springframework.web.client.RestTemplate; + +import jakarta.ws.rs.core.MediaType; @Component public class ReactiveStreamIT implements CategorizedTestCase { @@ -39,6 +47,8 @@ public class ReactiveStreamIT implements CategorizedTestCase { @Qualifier("reactiveStreamProvider") ReactiveStreamClient reactiveStreamProvider; + private RestTemplate restTemplate = RestTemplateBuilder.create(); + @Override public void testRestTransport() throws Exception { testSseString(reactiveStreamProvider); @@ -46,6 +56,7 @@ public void testRestTransport() throws Exception { testSseModel(reactiveStreamProvider); testSseResponseEntity(reactiveStreamProvider); testSseMultipleData(reactiveStreamProvider); + sseStringWithAccept(); } private void testSseString(ReactiveStreamClient client) throws Exception { @@ -91,6 +102,48 @@ public void onComplete() { return buffer.toString(); } + private void sseStringWithAccept() throws Exception { + HttpHeaders headers = new HttpHeaders(); + headers.add(HttpHeaders.ACCEPT, MediaType.SERVER_SENT_EVENTS); + HttpEntity requestEntity = new HttpEntity<>(headers); + ResponseEntity responseEntity = restTemplate + .exchange("cse://springmvc/sseString", HttpMethod.GET, requestEntity, Publisher.class); + Publisher result = responseEntity.getBody(); + CountDownLatch countDownLatch = new CountDownLatch(1); + StringBuilder buffer = new StringBuilder(); + result.subscribe(new Subscriber<>() { + Subscription subscription; + + @Override + public void onSubscribe(Subscription s) { + subscription = s; + subscription.request(1); + } + + @Override + public void onNext(Object entity) { + SseEventResponseEntity response = (SseEventResponseEntity) entity; + for (String str : response.getData()) { + buffer.append(str); + } + subscription.request(1); + } + + @Override + public void onError(Throwable t) { + subscription.cancel(); + countDownLatch.countDown(); + } + + @Override + public void onComplete() { + countDownLatch.countDown(); + } + }); + countDownLatch.await(10, TimeUnit.SECONDS); + TestMgr.check("abc", buffer.toString()); + } + private void testSseModel(ReactiveStreamClient client) throws Exception { Publisher result = client.sseModel(); CountDownLatch countDownLatch = new CountDownLatch(1); From 3e4166a899a9843c375e5867caa87c32129fff7d Mon Sep 17 00:00:00 2001 From: chengyouling Date: Fri, 10 Oct 2025 17:45:48 +0800 Subject: [PATCH 2/3] change singleton processor parsing result --- .../common/rest/definition/RestOperationMeta.java | 8 ++++++++ .../common/rest/filter/inner/ServerRestArgsFilter.java | 5 ----- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/definition/RestOperationMeta.java b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/definition/RestOperationMeta.java index 5061ef783cc..be091d8cd09 100644 --- a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/definition/RestOperationMeta.java +++ b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/definition/RestOperationMeta.java @@ -31,6 +31,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.servicecomb.common.rest.codec.RestObjectMapperFactory; import org.apache.servicecomb.common.rest.codec.param.FormProcessorCreator.PartProcessor; +import org.apache.servicecomb.common.rest.codec.produce.ProduceJsonProcessor; import org.apache.servicecomb.common.rest.codec.produce.ProduceProcessor; import org.apache.servicecomb.common.rest.codec.produce.ProduceProcessorManager; import org.apache.servicecomb.common.rest.definition.path.PathRegExp; @@ -258,6 +259,13 @@ protected void doCreateProduceProcessors(Class serialViewClass) { ProduceProcessorManager.INSTANCE.getOrCreateAcceptMap(serialViewClass)); } else { for (String produce : produces) { + if (produce.contains(MediaType.SERVER_SENT_EVENTS)) { + // The event-stream type initializes the ProduceJsonProcessor in memory to handle parsing results + // in exceptional scenarios. If a singleton parsing result is required in normal scenarios, + // adjustments need to be made here. + this.produceProcessorMap.put(produce, new ProduceJsonProcessor()); + continue; + } if (produce.contains(";")) { produce = produce.substring(0, produce.indexOf(";")); } diff --git a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/filter/inner/ServerRestArgsFilter.java b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/filter/inner/ServerRestArgsFilter.java index f02d3992ff1..af2b747436c 100644 --- a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/filter/inner/ServerRestArgsFilter.java +++ b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/filter/inner/ServerRestArgsFilter.java @@ -26,7 +26,6 @@ import org.apache.servicecomb.common.rest.RestConst; import org.apache.servicecomb.common.rest.codec.RestCodec; import org.apache.servicecomb.common.rest.codec.produce.ProduceEventStreamProcessor; -import org.apache.servicecomb.common.rest.codec.produce.ProduceJsonProcessor; import org.apache.servicecomb.common.rest.codec.produce.ProduceProcessor; import org.apache.servicecomb.common.rest.definition.RestOperationMeta; import org.apache.servicecomb.common.rest.filter.HttpServerFilter; @@ -92,10 +91,6 @@ public CompletableFuture beforeSendResponseAsync(Invocation invocation, Ht return writeServerSendEvent(invocation, response, produceProcessor, responseEx); } - if (failed && MediaType.SERVER_SENT_EVENTS.equals(produceProcessor.getName())) { - produceProcessor = new ProduceJsonProcessor(); - } - responseEx.setContentType(produceProcessor.getName() + "; charset=utf-8"); CompletableFuture future = new CompletableFuture<>(); try (BufferOutputStream output = new BufferOutputStream(Buffer.buffer())) { From 8ad4ee32605e022d422bff7a52ae84442eae33d6 Mon Sep 17 00:00:00 2001 From: chengyouling Date: Fri, 10 Oct 2025 19:23:17 +0800 Subject: [PATCH 3/3] delete import --- .../common/rest/filter/inner/ServerRestArgsFilter.java | 1 - 1 file changed, 1 deletion(-) diff --git a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/filter/inner/ServerRestArgsFilter.java b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/filter/inner/ServerRestArgsFilter.java index af2b747436c..ee97dcce207 100644 --- a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/filter/inner/ServerRestArgsFilter.java +++ b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/filter/inner/ServerRestArgsFilter.java @@ -48,7 +48,6 @@ import com.netflix.config.DynamicPropertyFactory; import io.vertx.core.buffer.Buffer; -import jakarta.ws.rs.core.MediaType; public class ServerRestArgsFilter implements HttpServerFilter { private static final boolean enabled = DynamicPropertyFactory.getInstance().getBooleanProperty