diff --git a/framework/fit/java/fit-builtin/plugins/fit-http-server-netty/src/main/java/modelengine/fit/http/server/netty/NettyHttpServerRequest.java b/framework/fit/java/fit-builtin/plugins/fit-http-server-netty/src/main/java/modelengine/fit/http/server/netty/NettyHttpServerRequest.java index 411f93da..a5c461b3 100644 --- a/framework/fit/java/fit-builtin/plugins/fit-http-server-netty/src/main/java/modelengine/fit/http/server/netty/NettyHttpServerRequest.java +++ b/framework/fit/java/fit-builtin/plugins/fit-http-server-netty/src/main/java/modelengine/fit/http/server/netty/NettyHttpServerRequest.java @@ -1,8 +1,8 @@ -/*--------------------------------------------------------------------------------------------- - * Copyright (c) 2024 Huawei Technologies Co., Ltd. All rights reserved. - * This file is a part of the ModelEngine Project. - * Licensed under the MIT License. See License.txt in the project root for license information. - *--------------------------------------------------------------------------------------------*/ +/* + * Copyright (c) 2024-2025 Huawei Technologies Co., Ltd. All rights reserved. + * This file is a part of the ModelEngine Project. + * Licensed under the MIT License. See License.txt in the project root for license information. + */ package modelengine.fit.http.server.netty; @@ -71,7 +71,7 @@ public NettyHttpServerRequest(HttpRequest request, ChannelHandlerContext ctx, bo this.startLine = this.initStartLine(); this.headers = this.initHeaders(); this.body = this.isLargeBody() ? NettyReadableMessageBody.large() : NettyReadableMessageBody.common(); - log.info("Netty http request initialized. [id={0}, request={1}]", ctx.name(), this.startLine()); + log.debug("Netty http request initialized. [id={0}, request={1}]", ctx.name(), this.startLine()); } private boolean isLargeBody() { @@ -197,7 +197,7 @@ public void close() throws IOException { if (this.isClosed.get()) { return; } - log.info("Netty http request closed. [id={}]", this.ctx.name()); + log.debug("Netty http request closed. [id={}]", this.ctx.name()); this.isClosed.set(true); this.body.close(); } finally { diff --git a/framework/fit/java/fit-builtin/plugins/fit-http-server-netty/src/main/java/modelengine/fit/http/server/netty/NettyHttpServerResponse.java b/framework/fit/java/fit-builtin/plugins/fit-http-server-netty/src/main/java/modelengine/fit/http/server/netty/NettyHttpServerResponse.java index 12e163bc..316908c6 100644 --- a/framework/fit/java/fit-builtin/plugins/fit-http-server-netty/src/main/java/modelengine/fit/http/server/netty/NettyHttpServerResponse.java +++ b/framework/fit/java/fit-builtin/plugins/fit-http-server-netty/src/main/java/modelengine/fit/http/server/netty/NettyHttpServerResponse.java @@ -1,8 +1,8 @@ -/*--------------------------------------------------------------------------------------------- - * Copyright (c) 2024 Huawei Technologies Co., Ltd. All rights reserved. - * This file is a part of the ModelEngine Project. - * Licensed under the MIT License. See License.txt in the project root for license information. - *--------------------------------------------------------------------------------------------*/ +/* + * Copyright (c) 2024-2025 Huawei Technologies Co., Ltd. All rights reserved. + * This file is a part of the ModelEngine Project. + * Licensed under the MIT License. See License.txt in the project root for license information. + */ package modelengine.fit.http.server.netty; @@ -126,6 +126,11 @@ public boolean isActive() { return this.ctx.channel().isActive(); } + @Override + public void closeChannel() { + this.ctx.close(); + } + @Override public void close() throws IOException { this.isClosed = true; diff --git a/framework/fit/java/fit-builtin/services/fit-http-classic/definition/src/main/java/modelengine/fit/http/entity/support/DefaultTextEvent.java b/framework/fit/java/fit-builtin/services/fit-http-classic/definition/src/main/java/modelengine/fit/http/entity/support/DefaultTextEvent.java index e9ecb6e5..3feb24f5 100644 --- a/framework/fit/java/fit-builtin/services/fit-http-classic/definition/src/main/java/modelengine/fit/http/entity/support/DefaultTextEvent.java +++ b/framework/fit/java/fit-builtin/services/fit-http-classic/definition/src/main/java/modelengine/fit/http/entity/support/DefaultTextEvent.java @@ -1,8 +1,8 @@ -/*--------------------------------------------------------------------------------------------- - * Copyright (c) 2024 Huawei Technologies Co., Ltd. All rights reserved. - * This file is a part of the ModelEngine Project. - * Licensed under the MIT License. See License.txt in the project root for license information. - *--------------------------------------------------------------------------------------------*/ +/* + * Copyright (c) 2024-2025 Huawei Technologies Co., Ltd. All rights reserved. + * This file is a part of the ModelEngine Project. + * Licensed under the MIT License. See License.txt in the project root for license information. + */ package modelengine.fit.http.entity.support; @@ -88,7 +88,7 @@ private static void appendComment(StringBuilder sb, String comment) { private static void appendData(StringBuilder sb, ObjectSerializer objectSerializer, Object data) { if (data == null) { - if (sb.length() == 0) { + if (sb.isEmpty()) { sb.append(LF); } sb.append(LF); diff --git a/framework/fit/java/fit-builtin/services/fit-http-classic/definition/src/main/java/modelengine/fit/http/server/support/DefaultHttpClassicServerResponse.java b/framework/fit/java/fit-builtin/services/fit-http-classic/definition/src/main/java/modelengine/fit/http/server/support/DefaultHttpClassicServerResponse.java index 9fd34b41..48687b10 100644 --- a/framework/fit/java/fit-builtin/services/fit-http-classic/definition/src/main/java/modelengine/fit/http/server/support/DefaultHttpClassicServerResponse.java +++ b/framework/fit/java/fit-builtin/services/fit-http-classic/definition/src/main/java/modelengine/fit/http/server/support/DefaultHttpClassicServerResponse.java @@ -1,8 +1,8 @@ -/*--------------------------------------------------------------------------------------------- - * Copyright (c) 2024 Huawei Technologies Co., Ltd. All rights reserved. - * This file is a part of the ModelEngine Project. - * Licensed under the MIT License. See License.txt in the project root for license information. - *--------------------------------------------------------------------------------------------*/ +/* + * Copyright (c) 2024-2025 Huawei Technologies Co., Ltd. All rights reserved. + * This file is a part of the ModelEngine Project. + * Licensed under the MIT License. See License.txt in the project root for license information. + */ package modelengine.fit.http.server.support; @@ -22,6 +22,7 @@ import modelengine.fit.http.entity.Entity; import modelengine.fit.http.entity.FileEntity; import modelengine.fit.http.entity.ReadableBinaryEntity; +import modelengine.fit.http.entity.TextEvent; import modelengine.fit.http.entity.TextEventStreamEntity; import modelengine.fit.http.entity.WritableBinaryEntity; import modelengine.fit.http.entity.support.DefaultWritableBinaryEntity; @@ -36,6 +37,7 @@ import modelengine.fit.http.server.HttpClassicServerResponse; import modelengine.fit.http.server.InternalServerErrorException; import modelengine.fit.http.support.AbstractHttpClassicResponse; +import modelengine.fitframework.flowable.Subscription; import modelengine.fitframework.resource.UrlUtils; import modelengine.fitframework.serialization.ObjectSerializer; import modelengine.fitframework.util.ObjectUtils; @@ -45,8 +47,6 @@ import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.util.Optional; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicReference; /** * 表示 {@link HttpClassicServerResponse} 的默认实现。 @@ -148,78 +148,106 @@ public void send() { if (this.entity == null) { this.headers().set(CONTENT_LENGTH, ZERO); this.serverResponse.writeStartLineAndHeaders(); - } else if (this.entity instanceof ReadableBinaryEntity) { - if (this.entity instanceof FileEntity) { - FileEntity actual = cast(this.entity); - this.headers().set(CONTENT_LENGTH, String.valueOf(actual.length())); - } else if (!this.headers().contains(CONTENT_LENGTH)) { - this.headers().set(TRANSFER_ENCODING, CHUNKED); - } - this.serverResponse.writeStartLineAndHeaders(); - ReadableBinaryEntity readableBinaryEntity = cast(this.entity); - byte[] bytes = new byte[512]; - int read; - while ((read = readableBinaryEntity.read(bytes)) > -1) { - this.serverResponse.writeBody(bytes, 0, read); - } - } else if (this.entity instanceof WritableBinaryEntity) { - // WritableBinaryEntity 已经在用户代码层面进行了输出,因此此处什么都不需要处理。 + this.serverResponse.flush(); } else if (this.entity instanceof TextEventStreamEntity) { this.headers().set(CACHE_CONTROL, NO_CACHE); this.headers().set(CONNECTION, KEEP_ALIVE); this.headers().set(TRANSFER_ENCODING, CHUNKED); this.serverResponse.writeStartLineAndHeaders(); - this.sendTextEventStream(cast(this.entity)); + this.sendTextEventStream(cast(this.entity), charset); } else { - byte[] entityBytes = this.entitySerializer().serializeEntity(ObjectUtils.cast(this.entity), charset); - this.headers().set(CONTENT_LENGTH, String.valueOf(entityBytes.length)); - this.serverResponse.writeStartLineAndHeaders(); - this.serverResponse.writeBody(entityBytes); + this.sendDirectly(charset); } - this.serverResponse.flush(); } catch (IOException e) { throw new InternalServerErrorException("Failed to write response.", e); } } - @Override - public boolean isActive() { - return this.serverResponse.isActive(); - } - - private void sendTextEventStream(TextEventStreamEntity eventStreamEntity) throws IOException { + private void sendTextEventStream(TextEventStreamEntity eventStreamEntity, Charset charset) throws IOException { ObjectSerializer objectSerializer = this.jsonSerializer() .orElseThrow(() -> new IllegalStateException("The json serializer cannot be null.")); - AtomicReference exception = new AtomicReference<>(); - CountDownLatch latch = new CountDownLatch(1); eventStreamEntity.stream() - .map(sse -> sse.serialize(objectSerializer).getBytes(StandardCharsets.UTF_8)) - .subscribe(null, (subscription, bytes) -> { - try { - this.serverResponse.writeBody(bytes); - } catch (IOException e) { - subscription.cancel(); - exception.set(e); - latch.countDown(); - } - }, subscription -> latch.countDown(), (ignore, e) -> { - exception.set(e); - latch.countDown(); - }); + .map(event -> event.serialize(objectSerializer).getBytes(charset)) + .subscribe(null, + (subscription, bytes) -> this.onSseMessage(subscription, bytes, charset), + subscription -> this.onSseComplete(charset), + (subscription, e) -> this.onSseError(e, charset)); + } + + private void onSseMessage(Subscription subscription, byte[] bytes, Charset charset) { try { - latch.await(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new InternalServerErrorException("Failed to execute handler.", e); + this.serverResponse.writeBody(bytes); + } catch (IOException e) { + subscription.cancel(); + this.onSseError(e, charset); } - Exception e = exception.get(); - if (e == null) { - return; + } + + private void onSseComplete(Charset charset) { + try { + this.serverResponse.flush(); + } catch (IOException e) { + this.onSseError(e, charset); + } finally { + try { + this.close0(); + } catch (IOException e) { + // ignore. + } + } + } + + private void onSseError(Throwable throwable, Charset charset) { + try { + TextEvent errorEvent = TextEvent.custom().event("error").data(throwable.getMessage()).build(); + ObjectSerializer objectSerializer = this.jsonSerializer() + .orElseThrow(() -> new IllegalStateException("The json serializer cannot be null.")); + this.serverResponse.writeBody(errorEvent.serialize(objectSerializer).getBytes(charset)); + this.serverResponse.flush(); + } catch (IOException e) { + this.serverResponse.closeChannel(); + InternalServerErrorException internalServerErrorException = + new InternalServerErrorException("Failed to send error response when sse.", e); + internalServerErrorException.addSuppressed(throwable); + throw internalServerErrorException; + } finally { + try { + this.close0(); + } catch (IOException e) { + // ignore. + } } - if (e instanceof IOException) { - throw (IOException) e; + } + + private void sendDirectly(Charset charset) throws IOException { + if (this.entity instanceof ReadableBinaryEntity) { + if (this.entity instanceof FileEntity) { + FileEntity actual = cast(this.entity); + this.headers().set(CONTENT_LENGTH, String.valueOf(actual.length())); + } else if (!this.headers().contains(CONTENT_LENGTH)) { + this.headers().set(TRANSFER_ENCODING, CHUNKED); + } + this.serverResponse.writeStartLineAndHeaders(); + ReadableBinaryEntity readableBinaryEntity = cast(this.entity); + byte[] bytes = new byte[512]; + int read; + while ((read = readableBinaryEntity.read(bytes)) > -1) { + this.serverResponse.writeBody(bytes, 0, read); + } + } else if (this.entity instanceof WritableBinaryEntity) { + // WritableBinaryEntity 已经在用户代码层面进行了输出,因此此处什么都不需要处理。 + } else { + byte[] entityBytes = this.entitySerializer().serializeEntity(ObjectUtils.cast(this.entity), charset); + this.headers().set(CONTENT_LENGTH, String.valueOf(entityBytes.length)); + this.serverResponse.writeStartLineAndHeaders(); + this.serverResponse.writeBody(entityBytes); } - throw new InternalServerErrorException("Failed to execute handler.", e); + this.serverResponse.flush(); + } + + @Override + public boolean isActive() { + return this.serverResponse.isActive(); } @Override @@ -239,6 +267,13 @@ protected void commit() { @Override public void close() throws IOException { + if (this.entity instanceof TextEventStreamEntity) { + return; + } + this.close0(); + } + + private void close0() throws IOException { this.serverResponse.close(); if (this.entity != null) { this.entity.close(); diff --git a/framework/fit/java/fit-builtin/services/fit-http-classic/definition/src/test/java/modelengine/fit/http/server/support/DefaultHttpClassicServerResponseTest.java b/framework/fit/java/fit-builtin/services/fit-http-classic/definition/src/test/java/modelengine/fit/http/server/support/DefaultHttpClassicServerResponseTest.java index bcc14390..d6e4c394 100644 --- a/framework/fit/java/fit-builtin/services/fit-http-classic/definition/src/test/java/modelengine/fit/http/server/support/DefaultHttpClassicServerResponseTest.java +++ b/framework/fit/java/fit-builtin/services/fit-http-classic/definition/src/test/java/modelengine/fit/http/server/support/DefaultHttpClassicServerResponseTest.java @@ -1,8 +1,8 @@ -/*--------------------------------------------------------------------------------------------- - * Copyright (c) 2024 Huawei Technologies Co., Ltd. All rights reserved. - * This file is a part of the ModelEngine Project. - * Licensed under the MIT License. See License.txt in the project root for license information. - *--------------------------------------------------------------------------------------------*/ +/* + * Copyright (c) 2024-2025 Huawei Technologies Co., Ltd. All rights reserved. + * This file is a part of the ModelEngine Project. + * Licensed under the MIT License. See License.txt in the project root for license information. + */ package modelengine.fit.http.server.support; @@ -99,7 +99,7 @@ void shouldThrowExceptionWhenSendTextStreamError() throws IOException { ServerResponse serverResponse = mock(ServerResponse.class); when(serverResponse.startLine()).thenReturn(mock(ConfigurableStatusLine.class)); when(serverResponse.headers()).thenReturn(mock(ConfigurableMessageHeaders.class)); - doThrow(new IOException("Error")).when(serverResponse).writeBody(new byte[1]); + doThrow(new IOException("Error")).when(serverResponse).flush(); Choir mappedChoir = Choir.just(new byte[1], new byte[2]); Choir stream = ObjectUtils.cast(mock(Choir.class)); when(stream.map(Mockito.any())).thenReturn(ObjectUtils.cast(mappedChoir)); diff --git a/framework/fit/java/fit-builtin/services/fit-http-protocol/definition/src/main/java/modelengine/fit/http/protocol/ServerResponse.java b/framework/fit/java/fit-builtin/services/fit-http-protocol/definition/src/main/java/modelengine/fit/http/protocol/ServerResponse.java index c8e886b4..d4125ed9 100644 --- a/framework/fit/java/fit-builtin/services/fit-http-protocol/definition/src/main/java/modelengine/fit/http/protocol/ServerResponse.java +++ b/framework/fit/java/fit-builtin/services/fit-http-protocol/definition/src/main/java/modelengine/fit/http/protocol/ServerResponse.java @@ -1,8 +1,8 @@ -/*--------------------------------------------------------------------------------------------- - * Copyright (c) 2024 Huawei Technologies Co., Ltd. All rights reserved. - * This file is a part of the ModelEngine Project. - * Licensed under the MIT License. See License.txt in the project root for license information. - *--------------------------------------------------------------------------------------------*/ +/* + * Copyright (c) 2024-2025 Huawei Technologies Co., Ltd. All rights reserved. + * This file is a part of the ModelEngine Project. + * Licensed under the MIT License. See License.txt in the project root for license information. + */ package modelengine.fit.http.protocol; @@ -82,4 +82,9 @@ default void writeBody(byte[] bytes) throws IOException { * @return true if the response is active; false otherwise. */ boolean isActive(); + + /** + * Closes the underlying channel associated with the response. + */ + void closeChannel(); }