Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
/*---------------------------------------------------------------------------------------------
* Copyright (c) 2024 Huawei Technologies Co., Ltd. All rights reserved.
* This file is a part of the ModelEngine Project.
* Licensed under the MIT License. See License.txt in the project root for license information.
*--------------------------------------------------------------------------------------------*/
/*
* Copyright (c) 2024-2025 Huawei Technologies Co., Ltd. All rights reserved.
* This file is a part of the ModelEngine Project.
* Licensed under the MIT License. See License.txt in the project root for license information.
*/

package modelengine.fit.http.server.netty;

Expand Down Expand Up @@ -71,7 +71,7 @@ public NettyHttpServerRequest(HttpRequest request, ChannelHandlerContext ctx, bo
this.startLine = this.initStartLine();
this.headers = this.initHeaders();
this.body = this.isLargeBody() ? NettyReadableMessageBody.large() : NettyReadableMessageBody.common();
log.info("Netty http request initialized. [id={0}, request={1}]", ctx.name(), this.startLine());
log.debug("Netty http request initialized. [id={0}, request={1}]", ctx.name(), this.startLine());
}

private boolean isLargeBody() {
Expand Down Expand Up @@ -197,7 +197,7 @@ public void close() throws IOException {
if (this.isClosed.get()) {
return;
}
log.info("Netty http request closed. [id={}]", this.ctx.name());
log.debug("Netty http request closed. [id={}]", this.ctx.name());
this.isClosed.set(true);
this.body.close();
} finally {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
/*---------------------------------------------------------------------------------------------
* Copyright (c) 2024 Huawei Technologies Co., Ltd. All rights reserved.
* This file is a part of the ModelEngine Project.
* Licensed under the MIT License. See License.txt in the project root for license information.
*--------------------------------------------------------------------------------------------*/
/*
* Copyright (c) 2024-2025 Huawei Technologies Co., Ltd. All rights reserved.
* This file is a part of the ModelEngine Project.
* Licensed under the MIT License. See License.txt in the project root for license information.
*/

package modelengine.fit.http.server.netty;

Expand Down Expand Up @@ -126,6 +126,11 @@ public boolean isActive() {
return this.ctx.channel().isActive();
}

@Override
public void closeChannel() {
this.ctx.close();
}

@Override
public void close() throws IOException {
this.isClosed = true;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
/*---------------------------------------------------------------------------------------------
* Copyright (c) 2024 Huawei Technologies Co., Ltd. All rights reserved.
* This file is a part of the ModelEngine Project.
* Licensed under the MIT License. See License.txt in the project root for license information.
*--------------------------------------------------------------------------------------------*/
/*
* Copyright (c) 2024-2025 Huawei Technologies Co., Ltd. All rights reserved.
* This file is a part of the ModelEngine Project.
* Licensed under the MIT License. See License.txt in the project root for license information.
*/

package modelengine.fit.http.entity.support;

Expand Down Expand Up @@ -88,7 +88,7 @@ private static void appendComment(StringBuilder sb, String comment) {

private static void appendData(StringBuilder sb, ObjectSerializer objectSerializer, Object data) {
if (data == null) {
if (sb.length() == 0) {
if (sb.isEmpty()) {
sb.append(LF);
}
sb.append(LF);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
/*---------------------------------------------------------------------------------------------
* Copyright (c) 2024 Huawei Technologies Co., Ltd. All rights reserved.
* This file is a part of the ModelEngine Project.
* Licensed under the MIT License. See License.txt in the project root for license information.
*--------------------------------------------------------------------------------------------*/
/*
* Copyright (c) 2024-2025 Huawei Technologies Co., Ltd. All rights reserved.
* This file is a part of the ModelEngine Project.
* Licensed under the MIT License. See License.txt in the project root for license information.
*/

package modelengine.fit.http.server.support;

Expand All @@ -22,6 +22,7 @@
import modelengine.fit.http.entity.Entity;
import modelengine.fit.http.entity.FileEntity;
import modelengine.fit.http.entity.ReadableBinaryEntity;
import modelengine.fit.http.entity.TextEvent;
import modelengine.fit.http.entity.TextEventStreamEntity;
import modelengine.fit.http.entity.WritableBinaryEntity;
import modelengine.fit.http.entity.support.DefaultWritableBinaryEntity;
Expand All @@ -36,6 +37,7 @@
import modelengine.fit.http.server.HttpClassicServerResponse;
import modelengine.fit.http.server.InternalServerErrorException;
import modelengine.fit.http.support.AbstractHttpClassicResponse;
import modelengine.fitframework.flowable.Subscription;
import modelengine.fitframework.resource.UrlUtils;
import modelengine.fitframework.serialization.ObjectSerializer;
import modelengine.fitframework.util.ObjectUtils;
Expand All @@ -45,8 +47,6 @@
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;

/**
* 表示 {@link HttpClassicServerResponse} 的默认实现。
Expand Down Expand Up @@ -148,78 +148,106 @@ public void send() {
if (this.entity == null) {
this.headers().set(CONTENT_LENGTH, ZERO);
this.serverResponse.writeStartLineAndHeaders();
} else if (this.entity instanceof ReadableBinaryEntity) {
if (this.entity instanceof FileEntity) {
FileEntity actual = cast(this.entity);
this.headers().set(CONTENT_LENGTH, String.valueOf(actual.length()));
} else if (!this.headers().contains(CONTENT_LENGTH)) {
this.headers().set(TRANSFER_ENCODING, CHUNKED);
}
this.serverResponse.writeStartLineAndHeaders();
ReadableBinaryEntity readableBinaryEntity = cast(this.entity);
byte[] bytes = new byte[512];
int read;
while ((read = readableBinaryEntity.read(bytes)) > -1) {
this.serverResponse.writeBody(bytes, 0, read);
}
} else if (this.entity instanceof WritableBinaryEntity) {
// WritableBinaryEntity 已经在用户代码层面进行了输出,因此此处什么都不需要处理。
this.serverResponse.flush();
} else if (this.entity instanceof TextEventStreamEntity) {
this.headers().set(CACHE_CONTROL, NO_CACHE);
this.headers().set(CONNECTION, KEEP_ALIVE);
this.headers().set(TRANSFER_ENCODING, CHUNKED);
this.serverResponse.writeStartLineAndHeaders();
this.sendTextEventStream(cast(this.entity));
this.sendTextEventStream(cast(this.entity), charset);
} else {
byte[] entityBytes = this.entitySerializer().serializeEntity(ObjectUtils.cast(this.entity), charset);
this.headers().set(CONTENT_LENGTH, String.valueOf(entityBytes.length));
this.serverResponse.writeStartLineAndHeaders();
this.serverResponse.writeBody(entityBytes);
this.sendDirectly(charset);
}
this.serverResponse.flush();
} catch (IOException e) {
throw new InternalServerErrorException("Failed to write response.", e);
}
}

@Override
public boolean isActive() {
return this.serverResponse.isActive();
}

private void sendTextEventStream(TextEventStreamEntity eventStreamEntity) throws IOException {
private void sendTextEventStream(TextEventStreamEntity eventStreamEntity, Charset charset) throws IOException {
ObjectSerializer objectSerializer = this.jsonSerializer()
.orElseThrow(() -> new IllegalStateException("The json serializer cannot be null."));
AtomicReference<Exception> exception = new AtomicReference<>();
CountDownLatch latch = new CountDownLatch(1);
eventStreamEntity.stream()
.map(sse -> sse.serialize(objectSerializer).getBytes(StandardCharsets.UTF_8))
.subscribe(null, (subscription, bytes) -> {
try {
this.serverResponse.writeBody(bytes);
} catch (IOException e) {
subscription.cancel();
exception.set(e);
latch.countDown();
}
}, subscription -> latch.countDown(), (ignore, e) -> {
exception.set(e);
latch.countDown();
});
.map(event -> event.serialize(objectSerializer).getBytes(charset))
.subscribe(null,
(subscription, bytes) -> this.onSseMessage(subscription, bytes, charset),
subscription -> this.onSseComplete(charset),
(subscription, e) -> this.onSseError(e, charset));
}

private void onSseMessage(Subscription subscription, byte[] bytes, Charset charset) {
try {
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new InternalServerErrorException("Failed to execute handler.", e);
this.serverResponse.writeBody(bytes);
} catch (IOException e) {
subscription.cancel();
this.onSseError(e, charset);
}
Exception e = exception.get();
if (e == null) {
return;
}

private void onSseComplete(Charset charset) {
try {
this.serverResponse.flush();
} catch (IOException e) {
this.onSseError(e, charset);
} finally {
try {
this.close0();
} catch (IOException e) {
// ignore.
}
}
}

private void onSseError(Throwable throwable, Charset charset) {
try {
TextEvent errorEvent = TextEvent.custom().event("error").data(throwable.getMessage()).build();
ObjectSerializer objectSerializer = this.jsonSerializer()
.orElseThrow(() -> new IllegalStateException("The json serializer cannot be null."));
this.serverResponse.writeBody(errorEvent.serialize(objectSerializer).getBytes(charset));
this.serverResponse.flush();
} catch (IOException e) {
this.serverResponse.closeChannel();
InternalServerErrorException internalServerErrorException =
new InternalServerErrorException("Failed to send error response when sse.", e);
internalServerErrorException.addSuppressed(throwable);
throw internalServerErrorException;
} finally {
try {
this.close0();
} catch (IOException e) {
// ignore.
}
}
if (e instanceof IOException) {
throw (IOException) e;
}

private void sendDirectly(Charset charset) throws IOException {
if (this.entity instanceof ReadableBinaryEntity) {
if (this.entity instanceof FileEntity) {
FileEntity actual = cast(this.entity);
this.headers().set(CONTENT_LENGTH, String.valueOf(actual.length()));
} else if (!this.headers().contains(CONTENT_LENGTH)) {
this.headers().set(TRANSFER_ENCODING, CHUNKED);
}
this.serverResponse.writeStartLineAndHeaders();
ReadableBinaryEntity readableBinaryEntity = cast(this.entity);
byte[] bytes = new byte[512];
int read;
while ((read = readableBinaryEntity.read(bytes)) > -1) {
this.serverResponse.writeBody(bytes, 0, read);
}
} else if (this.entity instanceof WritableBinaryEntity) {
// WritableBinaryEntity 已经在用户代码层面进行了输出,因此此处什么都不需要处理。
} else {
byte[] entityBytes = this.entitySerializer().serializeEntity(ObjectUtils.cast(this.entity), charset);
this.headers().set(CONTENT_LENGTH, String.valueOf(entityBytes.length));
this.serverResponse.writeStartLineAndHeaders();
this.serverResponse.writeBody(entityBytes);
}
throw new InternalServerErrorException("Failed to execute handler.", e);
this.serverResponse.flush();
}

@Override
public boolean isActive() {
return this.serverResponse.isActive();
}

@Override
Expand All @@ -239,6 +267,13 @@ protected void commit() {

@Override
public void close() throws IOException {
if (this.entity instanceof TextEventStreamEntity) {
return;
}
this.close0();
}

private void close0() throws IOException {
this.serverResponse.close();
if (this.entity != null) {
this.entity.close();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
/*---------------------------------------------------------------------------------------------
* Copyright (c) 2024 Huawei Technologies Co., Ltd. All rights reserved.
* This file is a part of the ModelEngine Project.
* Licensed under the MIT License. See License.txt in the project root for license information.
*--------------------------------------------------------------------------------------------*/
/*
* Copyright (c) 2024-2025 Huawei Technologies Co., Ltd. All rights reserved.
* This file is a part of the ModelEngine Project.
* Licensed under the MIT License. See License.txt in the project root for license information.
*/

package modelengine.fit.http.server.support;

Expand Down Expand Up @@ -99,7 +99,7 @@ void shouldThrowExceptionWhenSendTextStreamError() throws IOException {
ServerResponse serverResponse = mock(ServerResponse.class);
when(serverResponse.startLine()).thenReturn(mock(ConfigurableStatusLine.class));
when(serverResponse.headers()).thenReturn(mock(ConfigurableMessageHeaders.class));
doThrow(new IOException("Error")).when(serverResponse).writeBody(new byte[1]);
doThrow(new IOException("Error")).when(serverResponse).flush();
Choir<byte[]> mappedChoir = Choir.just(new byte[1], new byte[2]);
Choir<TextEvent> stream = ObjectUtils.cast(mock(Choir.class));
when(stream.map(Mockito.any())).thenReturn(ObjectUtils.cast(mappedChoir));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
/*---------------------------------------------------------------------------------------------
* Copyright (c) 2024 Huawei Technologies Co., Ltd. All rights reserved.
* This file is a part of the ModelEngine Project.
* Licensed under the MIT License. See License.txt in the project root for license information.
*--------------------------------------------------------------------------------------------*/
/*
* Copyright (c) 2024-2025 Huawei Technologies Co., Ltd. All rights reserved.
* This file is a part of the ModelEngine Project.
* Licensed under the MIT License. See License.txt in the project root for license information.
*/

package modelengine.fit.http.protocol;

Expand Down Expand Up @@ -82,4 +82,9 @@ default void writeBody(byte[] bytes) throws IOException {
* @return true if the response is active; false otherwise.
*/
boolean isActive();

/**
* Closes the underlying channel associated with the response.
*/
void closeChannel();
}