Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.util.Attribute;
import io.netty.util.AttributeKey;
import io.netty.util.AttributeMap;
import modelengine.fit.http.protocol.HttpResponseStatus;
import modelengine.fit.http.server.ErrorResponse;
import modelengine.fit.http.server.HttpClassicServer;
Expand Down Expand Up @@ -88,16 +89,24 @@ public HttpClassicRequestAssembler(HttpClassicServer server, boolean secure, Con
}

private static void setRequest(ChannelHandlerContext ctx, NettyHttpServerRequest serverRequest) {
Attribute<NettyHttpServerRequest> attr = ctx.channel().attr(REQUEST);
Attribute<NettyHttpServerRequest> attr = ((AttributeMap) ctx).attr(REQUEST);
attr.set(serverRequest);
}

private static NettyHttpServerRequest getRequest(ChannelHandlerContext ctx) {
return ctx.channel().attr(REQUEST).get();
return ((AttributeMap) ctx).attr(REQUEST).get();
}

private static void clearRequest(ChannelHandlerContext ctx) {
ctx.channel().attr(REQUEST).set(null);
NettyHttpServerRequest request = getRequest(ctx);
if (request != null) {
((AttributeMap) ctx).attr(REQUEST).set(null);
try {
request.close();
} catch (IOException e) {
log.warn("Failed to close netty http server request, ignored.", e);
}
}
}

@Override
Expand All @@ -117,12 +126,6 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
}

@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
this.stopExecution(ctx);
super.channelUnregistered(ctx);
}

private void stopExecution(ChannelHandlerContext ctx) {
NettyHttpServerRequest request = getRequest(ctx);
if (request != null) {
Expand All @@ -135,6 +138,7 @@ private void stopExecution(ChannelHandlerContext ctx) {
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) {
if (msg instanceof HttpRequest) {
clearRequest(ctx);
this.handleHttpRequest(ctx, cast(msg));
return;
}
Expand Down Expand Up @@ -171,6 +175,11 @@ private void doHttpRequest(ChannelHandlerContext ctx, NettyHttpServerRequest req
this.exceptionCaught(ctx, cause, request);
} finally {
request.removeExecuteThread();
try {
request.tryClose();
} catch (IOException e) {
log.warn("Failed to close netty http server request when request finished, ignored.", e);
}
}
}

Expand All @@ -184,15 +193,18 @@ private void handleHttpContent(ChannelHandlerContext ctx, HttpContent content) {
ctx.channel().isOpen());
throw new IllegalStateException(message);
}
this.receiveHttpContent(ctx, request, content);
this.receiveHttpContent(request, content);
}

private void receiveHttpContent(ChannelHandlerContext ctx, NettyHttpServerRequest serverRequest,
HttpContent content) {
private void receiveHttpContent(NettyHttpServerRequest serverRequest, HttpContent content) {
try {
if (content instanceof LastHttpContent) {
serverRequest.receiveLastHttpContent(cast(content));
clearRequest(ctx);
try {
serverRequest.tryClose();
} catch (IOException e) {
log.warn("Failed to close netty http server request when received last http content, ignored.", e);
}
} else {
serverRequest.receiveHttpContent(content);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import modelengine.fit.http.protocol.ServerRequest;
import modelengine.fit.http.protocol.util.HeaderUtils;
import modelengine.fitframework.log.Logger;
import modelengine.fitframework.util.LockUtils;
import modelengine.fitframework.util.ObjectUtils;

import java.io.IOException;
Expand All @@ -32,6 +33,9 @@
import java.net.SocketAddress;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;

/**
* {@link ServerRequest} 的 Netty 实现。
Expand All @@ -51,8 +55,12 @@ public class NettyHttpServerRequest implements ServerRequest, OnHttpContentRecei
private final RequestLine startLine;
private final MessageHeaders headers;
private final NettyReadableMessageBody body;
private boolean isClosed;
private Thread executeThread;
private final AtomicBoolean isClosed = new AtomicBoolean(false);
private final ReadWriteLock isClosedLock = LockUtils.newReentrantReadWriteLock();
private final AtomicBoolean isComplete = new AtomicBoolean(false);
private final AtomicBoolean isFinished = new AtomicBoolean(false);
private final Lock tryCloseLock = LockUtils.newReentrantLock();
private volatile Thread executeThread;

public NettyHttpServerRequest(HttpRequest request, ChannelHandlerContext ctx, boolean isSecure,
long largeBodySize) {
Expand Down Expand Up @@ -127,6 +135,7 @@ public void receiveLastHttpContent(LastHttpContent content) throws IOException {
this.checkIfClosed();
ByteBuf byteBuf = content.content();
this.body.write(byteBuf, true);
LockUtils.synchronize(this.tryCloseLock, () -> this.isComplete.set(true));
}

@Override
Expand All @@ -152,16 +161,48 @@ public boolean isActive() {
}

private void checkIfClosed() throws IOException {
if (this.isClosed) {
throw new IOException("The netty http server request has already been closed.");
this.isClosedLock.readLock().lock();
try {
if (this.isClosed.get()) {
throw new IOException("The netty http server request has already been closed.");
}
} finally {
this.isClosedLock.readLock().unlock();
}
}

/**
* 尝试关闭。
*
* @throws IOException 当关闭失败时。
*/
void tryClose() throws IOException {
this.tryCloseLock.lock();
try {
if (this.isFinished.get() && this.isComplete.get()) {
this.close();
}
} finally {
this.tryCloseLock.unlock();
}
}

@Override
public void close() throws IOException {
log.info("Netty http request closed. [id={}]", this.ctx.name());
this.isClosed = true;
this.body.close();
if (this.isClosed.get()) {
return;
}
this.isClosedLock.writeLock().lock();
try {
if (this.isClosed.get()) {
return;
}
log.info("Netty http request closed. [id={}]", this.ctx.name());
this.isClosed.set(true);
this.body.close();
} finally {
this.isClosedLock.writeLock().unlock();
}
}

@Override
Expand Down Expand Up @@ -228,6 +269,7 @@ void setExecuteThread(Thread thread) {
*/
void removeExecuteThread() {
this.executeThread = null;
LockUtils.synchronize(this.tryCloseLock, () -> this.isFinished.set(true));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.util.Attribute;
import io.netty.util.AttributeMap;
import modelengine.fit.http.protocol.HttpRequestMethod;
import modelengine.fit.http.server.HttpHandler;
import modelengine.fit.http.server.netty.support.DefaultNettyServerConfig;
Expand Down Expand Up @@ -72,9 +73,9 @@ void setup() {
this.ctx = mock(ChannelHandlerContext.class);
Channel channel = mock(Channel.class);
when(this.ctx.channel()).thenReturn(channel);
ChannelId channelId = mock(ChannelId.class);
Attribute attribute = mock(Attribute.class);
when(channel.attr(any())).thenReturn(attribute);
when(((AttributeMap) this.ctx).attr(any())).thenReturn(attribute);
ChannelId channelId = mock(ChannelId.class);
when(channelId.asLongText()).thenReturn("requestId");
this.requestAssembler = new HttpClassicRequestAssembler(classicServer,
false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ private byte[] actualEntityBytes() {

@Override
public void close() throws IOException {
this.serverRequest.close();
if (this.entityLoader.isLoaded()) {
Optional<Entity> entity = this.entityLoader.get();
if (entity.isPresent()) {
Expand Down