diff --git a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/filter/inner/ServerRestArgsFilter.java b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/filter/inner/ServerRestArgsFilter.java index 281769d756d..ee97dcce207 100644 --- a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/filter/inner/ServerRestArgsFilter.java +++ b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/filter/inner/ServerRestArgsFilter.java @@ -31,6 +31,7 @@ import org.apache.servicecomb.common.rest.filter.HttpServerFilter; import org.apache.servicecomb.core.Invocation; import org.apache.servicecomb.core.definition.OperationMeta; +import org.apache.servicecomb.foundation.common.concurrency.SuppressedRunnableWrapper; import org.apache.servicecomb.foundation.common.utils.PartUtils; import org.apache.servicecomb.foundation.vertx.http.HttpServletRequestEx; import org.apache.servicecomb.foundation.vertx.http.HttpServletResponseEx; @@ -126,19 +127,15 @@ public void onSubscribe(Subscription s) { @Override public void onNext(Object o) { - try { - writeResponse(responseEx, produceProcessor, o, response).whenComplete((r, e) -> { - if (e != null) { - subscription.cancel(); - result.completeExceptionally(e); - return; - } + writeResponse(responseEx, produceProcessor, o, response).thenApply(r -> { subscription.request(1); + return r; + }) + .exceptionally(e -> { + new SuppressedRunnableWrapper(() -> subscription.cancel()).run(); + new SuppressedRunnableWrapper(() -> result.completeExceptionally(e)).run(); + return response; }); - } catch (Throwable e) { - LOGGER.warn("Failed to subscribe event: {}", o, e); - result.completeExceptionally(e); - } } @Override @@ -158,22 +155,18 @@ private static CompletableFuture writeResponse( HttpServletResponseEx responseEx, ProduceProcessor produceProcessor, Object data, Response response) { try (BufferOutputStream output = new BufferOutputStream(Buffer.buffer())) { produceProcessor.encodeResponse(output, data); - CompletableFuture result = new CompletableFuture<>(); - responseEx.sendBuffer(output.getBuffer()).whenComplete((v, e) -> { - if (e != null) { - result.completeExceptionally(e); - } - try { - responseEx.flushBuffer(); - } catch (IOException ex) { - LOGGER.warn("Failed to flush buffer for Server Send Events", ex); - } - }); - result.complete(response); - return result; + return responseEx.sendBuffer(output.getBuffer()) + .thenApply(v -> { + try { + responseEx.flushBuffer(); + return response; + } catch (IOException e) { + LOGGER.warn("Failed to flush buffer for Server Send Events", e); + throw new IllegalStateException("Failed to flush buffer for Server Send Events", e); + } + }); } catch (Throwable e) { LOGGER.error("internal service error must be fixed.", e); - responseEx.setStatus(500); return CompletableFuture.failedFuture(e); } } diff --git a/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/response/producer/PublisherProducerResponseMapper.java b/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/response/producer/PublisherProducerResponseMapper.java index c5f56bb53d6..c8ec40ebba8 100644 --- a/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/response/producer/PublisherProducerResponseMapper.java +++ b/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/response/producer/PublisherProducerResponseMapper.java @@ -18,6 +18,7 @@ import org.apache.servicecomb.swagger.invocation.Response; import org.apache.servicecomb.swagger.invocation.sse.SseEventResponseEntity; +import org.reactivestreams.Publisher; import io.reactivex.rxjava3.core.Flowable; import jakarta.ws.rs.core.Response.StatusType; @@ -31,12 +32,14 @@ public PublisherProducerResponseMapper(boolean shouldConstructEntity) { @Override public Response mapResponse(StatusType status, Object result) { + // Unified Flowable conversion to prevent internal typecasting exceptions. + final Flowable flowableResult = result instanceof Flowable ? + (Flowable) result : Flowable.fromPublisher(((Publisher) result)); if (shouldConstructEntity) { - Flowable> responseEntity = ((Flowable) result).map(obj -> - new SseEventResponseEntity<>() - .data(obj)); + Flowable> responseEntity = flowableResult + .map(obj -> new SseEventResponseEntity<>().data(obj)); return Response.create(status, responseEntity); } - return Response.create(status, result); + return Response.create(status, flowableResult); } }