diff --git a/framework/fit/java/fit-builtin/plugins/fit-http-server-netty/src/main/java/modelengine/fit/http/server/netty/HttpClassicRequestAssembler.java b/framework/fit/java/fit-builtin/plugins/fit-http-server-netty/src/main/java/modelengine/fit/http/server/netty/HttpClassicRequestAssembler.java index deeb2936..5928e23e 100644 --- a/framework/fit/java/fit-builtin/plugins/fit-http-server-netty/src/main/java/modelengine/fit/http/server/netty/HttpClassicRequestAssembler.java +++ b/framework/fit/java/fit-builtin/plugins/fit-http-server-netty/src/main/java/modelengine/fit/http/server/netty/HttpClassicRequestAssembler.java @@ -32,6 +32,7 @@ import io.netty.handler.codec.http.LastHttpContent; import io.netty.util.Attribute; import io.netty.util.AttributeKey; +import io.netty.util.AttributeMap; import modelengine.fit.http.protocol.HttpResponseStatus; import modelengine.fit.http.server.ErrorResponse; import modelengine.fit.http.server.HttpClassicServer; @@ -88,16 +89,24 @@ public HttpClassicRequestAssembler(HttpClassicServer server, boolean secure, Con } private static void setRequest(ChannelHandlerContext ctx, NettyHttpServerRequest serverRequest) { - Attribute attr = ctx.channel().attr(REQUEST); + Attribute attr = ((AttributeMap) ctx).attr(REQUEST); attr.set(serverRequest); } private static NettyHttpServerRequest getRequest(ChannelHandlerContext ctx) { - return ctx.channel().attr(REQUEST).get(); + return ((AttributeMap) ctx).attr(REQUEST).get(); } private static void clearRequest(ChannelHandlerContext ctx) { - ctx.channel().attr(REQUEST).set(null); + NettyHttpServerRequest request = getRequest(ctx); + if (request != null) { + ((AttributeMap) ctx).attr(REQUEST).set(null); + try { + request.close(); + } catch (IOException e) { + log.warn("Failed to close netty http server request, ignored.", e); + } + } } @Override @@ -117,12 +126,6 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { super.channelInactive(ctx); } - @Override - public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { - this.stopExecution(ctx); - super.channelUnregistered(ctx); - } - private void stopExecution(ChannelHandlerContext ctx) { NettyHttpServerRequest request = getRequest(ctx); if (request != null) { @@ -135,6 +138,7 @@ private void stopExecution(ChannelHandlerContext ctx) { @Override protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) { if (msg instanceof HttpRequest) { + clearRequest(ctx); this.handleHttpRequest(ctx, cast(msg)); return; } @@ -171,6 +175,11 @@ private void doHttpRequest(ChannelHandlerContext ctx, NettyHttpServerRequest req this.exceptionCaught(ctx, cause, request); } finally { request.removeExecuteThread(); + try { + request.tryClose(); + } catch (IOException e) { + log.warn("Failed to close netty http server request when request finished, ignored.", e); + } } } @@ -184,15 +193,18 @@ private void handleHttpContent(ChannelHandlerContext ctx, HttpContent content) { ctx.channel().isOpen()); throw new IllegalStateException(message); } - this.receiveHttpContent(ctx, request, content); + this.receiveHttpContent(request, content); } - private void receiveHttpContent(ChannelHandlerContext ctx, NettyHttpServerRequest serverRequest, - HttpContent content) { + private void receiveHttpContent(NettyHttpServerRequest serverRequest, HttpContent content) { try { if (content instanceof LastHttpContent) { serverRequest.receiveLastHttpContent(cast(content)); - clearRequest(ctx); + try { + serverRequest.tryClose(); + } catch (IOException e) { + log.warn("Failed to close netty http server request when received last http content, ignored.", e); + } } else { serverRequest.receiveHttpContent(content); } diff --git a/framework/fit/java/fit-builtin/plugins/fit-http-server-netty/src/main/java/modelengine/fit/http/server/netty/NettyHttpServerRequest.java b/framework/fit/java/fit-builtin/plugins/fit-http-server-netty/src/main/java/modelengine/fit/http/server/netty/NettyHttpServerRequest.java index 74259f2b..411f93da 100644 --- a/framework/fit/java/fit-builtin/plugins/fit-http-server-netty/src/main/java/modelengine/fit/http/server/netty/NettyHttpServerRequest.java +++ b/framework/fit/java/fit-builtin/plugins/fit-http-server-netty/src/main/java/modelengine/fit/http/server/netty/NettyHttpServerRequest.java @@ -24,6 +24,7 @@ import modelengine.fit.http.protocol.ServerRequest; import modelengine.fit.http.protocol.util.HeaderUtils; import modelengine.fitframework.log.Logger; +import modelengine.fitframework.util.LockUtils; import modelengine.fitframework.util.ObjectUtils; import java.io.IOException; @@ -32,6 +33,9 @@ import java.net.SocketAddress; import java.util.List; import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; /** * {@link ServerRequest} 的 Netty 实现。 @@ -51,8 +55,12 @@ public class NettyHttpServerRequest implements ServerRequest, OnHttpContentRecei private final RequestLine startLine; private final MessageHeaders headers; private final NettyReadableMessageBody body; - private boolean isClosed; - private Thread executeThread; + private final AtomicBoolean isClosed = new AtomicBoolean(false); + private final ReadWriteLock isClosedLock = LockUtils.newReentrantReadWriteLock(); + private final AtomicBoolean isComplete = new AtomicBoolean(false); + private final AtomicBoolean isFinished = new AtomicBoolean(false); + private final Lock tryCloseLock = LockUtils.newReentrantLock(); + private volatile Thread executeThread; public NettyHttpServerRequest(HttpRequest request, ChannelHandlerContext ctx, boolean isSecure, long largeBodySize) { @@ -127,6 +135,7 @@ public void receiveLastHttpContent(LastHttpContent content) throws IOException { this.checkIfClosed(); ByteBuf byteBuf = content.content(); this.body.write(byteBuf, true); + LockUtils.synchronize(this.tryCloseLock, () -> this.isComplete.set(true)); } @Override @@ -152,16 +161,48 @@ public boolean isActive() { } private void checkIfClosed() throws IOException { - if (this.isClosed) { - throw new IOException("The netty http server request has already been closed."); + this.isClosedLock.readLock().lock(); + try { + if (this.isClosed.get()) { + throw new IOException("The netty http server request has already been closed."); + } + } finally { + this.isClosedLock.readLock().unlock(); + } + } + + /** + * 尝试关闭。 + * + * @throws IOException 当关闭失败时。 + */ + void tryClose() throws IOException { + this.tryCloseLock.lock(); + try { + if (this.isFinished.get() && this.isComplete.get()) { + this.close(); + } + } finally { + this.tryCloseLock.unlock(); } } @Override public void close() throws IOException { - log.info("Netty http request closed. [id={}]", this.ctx.name()); - this.isClosed = true; - this.body.close(); + if (this.isClosed.get()) { + return; + } + this.isClosedLock.writeLock().lock(); + try { + if (this.isClosed.get()) { + return; + } + log.info("Netty http request closed. [id={}]", this.ctx.name()); + this.isClosed.set(true); + this.body.close(); + } finally { + this.isClosedLock.writeLock().unlock(); + } } @Override @@ -228,6 +269,7 @@ void setExecuteThread(Thread thread) { */ void removeExecuteThread() { this.executeThread = null; + LockUtils.synchronize(this.tryCloseLock, () -> this.isFinished.set(true)); } /** diff --git a/framework/fit/java/fit-builtin/plugins/fit-http-server-netty/src/test/java/modelengine/fit/http/server/netty/HttpClassicRequestAssemblerTest.java b/framework/fit/java/fit-builtin/plugins/fit-http-server-netty/src/test/java/modelengine/fit/http/server/netty/HttpClassicRequestAssemblerTest.java index 00db07e8..e5db71a5 100644 --- a/framework/fit/java/fit-builtin/plugins/fit-http-server-netty/src/test/java/modelengine/fit/http/server/netty/HttpClassicRequestAssemblerTest.java +++ b/framework/fit/java/fit-builtin/plugins/fit-http-server-netty/src/test/java/modelengine/fit/http/server/netty/HttpClassicRequestAssemblerTest.java @@ -22,6 +22,7 @@ import io.netty.handler.codec.http.HttpMethod; import io.netty.handler.codec.http.HttpVersion; import io.netty.util.Attribute; +import io.netty.util.AttributeMap; import modelengine.fit.http.protocol.HttpRequestMethod; import modelengine.fit.http.server.HttpHandler; import modelengine.fit.http.server.netty.support.DefaultNettyServerConfig; @@ -72,9 +73,9 @@ void setup() { this.ctx = mock(ChannelHandlerContext.class); Channel channel = mock(Channel.class); when(this.ctx.channel()).thenReturn(channel); - ChannelId channelId = mock(ChannelId.class); Attribute attribute = mock(Attribute.class); - when(channel.attr(any())).thenReturn(attribute); + when(((AttributeMap) this.ctx).attr(any())).thenReturn(attribute); + ChannelId channelId = mock(ChannelId.class); when(channelId.asLongText()).thenReturn("requestId"); this.requestAssembler = new HttpClassicRequestAssembler(classicServer, false, diff --git a/framework/fit/java/fit-builtin/services/fit-http-classic/definition/src/main/java/modelengine/fit/http/server/support/DefaultHttpClassicServerRequest.java b/framework/fit/java/fit-builtin/services/fit-http-classic/definition/src/main/java/modelengine/fit/http/server/support/DefaultHttpClassicServerRequest.java index 4804c61b..78a5c2d9 100644 --- a/framework/fit/java/fit-builtin/services/fit-http-classic/definition/src/main/java/modelengine/fit/http/server/support/DefaultHttpClassicServerRequest.java +++ b/framework/fit/java/fit-builtin/services/fit-http-classic/definition/src/main/java/modelengine/fit/http/server/support/DefaultHttpClassicServerRequest.java @@ -117,7 +117,6 @@ private byte[] actualEntityBytes() { @Override public void close() throws IOException { - this.serverRequest.close(); if (this.entityLoader.isLoaded()) { Optional entity = this.entityLoader.get(); if (entity.isPresent()) {