@@ -26,9 +26,16 @@ import io.netty.handler.codec.http.HttpHeaderNames
2626import io.netty.handler.codec.http.HttpRequest
2727import io.netty.handler.codec.http.LastHttpContent
2828import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory
29+ import kotlinx.coroutines.CoroutineExceptionHandler
30+ import kotlinx.coroutines.CoroutineName
31+ import kotlinx.coroutines.CoroutineScope
32+ import kotlinx.coroutines.asCoroutineDispatcher
33+ import kotlinx.coroutines.cancel
34+ import kotlinx.coroutines.launch
2935import net.ccbluex.netty.http.HttpServer.Companion.logger
3036import net.ccbluex.netty.http.middleware.Middleware
3137import net.ccbluex.netty.http.model.RequestContext
38+ import net.ccbluex.netty.http.util.forEachIsInstance
3239import net.ccbluex.netty.http.websocket.WebSocketHandler
3340import java.net.URLDecoder
3441
@@ -40,13 +47,37 @@ import java.net.URLDecoder
4047internal class HttpServerHandler (private val server : HttpServer ) : ChannelInboundHandlerAdapter() {
4148
4249 private val localRequestContext = ThreadLocal <RequestContext >()
50+ private lateinit var channelScope: CoroutineScope
4351
4452 /* *
4553 * Extension property to get the WebSocket URL from an HttpRequest.
4654 */
4755 private val HttpRequest .webSocketUrl: String
4856 get() = " ws://${headers().get(" Host" )}${uri()} "
4957
58+ /* *
59+ * Adds the [CoroutineScope] of current [io.netty.channel.Channel].
60+ */
61+ override fun handlerAdded (ctx : ChannelHandlerContext ) {
62+ super .handlerAdded(ctx)
63+
64+ val exceptionHandler = CoroutineExceptionHandler { _, throwable ->
65+ val ctxName = ctx.name()
66+ val channelId = ctx.channel().id().asLongText()
67+ logger.error(
68+ " Uncaught coroutine error in [ctx: $ctxName , channel: $channelId ]" ,
69+ throwable
70+ )
71+ }
72+
73+ channelScope = CoroutineScope (
74+ ctx.channel().eventLoop().asCoroutineDispatcher()
75+ + CoroutineName (" ${ctx.name()} #${ctx.channel().id().asShortText()} " )
76+ + exceptionHandler
77+ )
78+ ctx.channel().closeFuture().addListener { channelScope.cancel() }
79+ }
80+
5081 /* *
5182 * Reads the incoming messages and processes HTTP requests.
5283 *
@@ -68,11 +99,11 @@ internal class HttpServerHandler(private val server: HttpServer) : ChannelInboun
6899 if (connection.equals(" Upgrade" , ignoreCase = true ) &&
69100 upgrade.equals(" WebSocket" , ignoreCase = true )) {
70101
71- server.middlewares.filterIsInstance <Middleware .OnWebSocketUpgrade >().forEach { middleware ->
102+ server.middlewares.forEachIsInstance <Middleware .OnWebSocketUpgrade > { middleware ->
72103 val response = middleware.invoke(ctx, msg)
73104 if (response != null ) {
74105 ctx.writeAndFlush(response)
75- return
106+ return super .channelRead(ctx, msg)
76107 }
77108 }
78109
@@ -99,15 +130,15 @@ internal class HttpServerHandler(private val server: HttpServer) : ChannelInboun
99130 URLDecoder .decode(msg.uri(), Charsets .UTF_8 ),
100131 msg.headers(),
101132 )
102-
133+
103134 localRequestContext.set(requestContext)
104135 }
105136 }
106137
107138 is HttpContent -> {
108139 val requestContext = localRequestContext.get() ? : run {
109140 logger.warn(" Received HttpContent without HttpRequest" )
110- return
141+ return super .channelRead(ctx, msg)
111142 }
112143
113144 // Append content to the buffer
@@ -117,18 +148,21 @@ internal class HttpServerHandler(private val server: HttpServer) : ChannelInboun
117148 if (msg is LastHttpContent ) {
118149 localRequestContext.remove()
119150
120- server.middlewares.filterIsInstance <Middleware .OnRequest >().forEach { middleware ->
151+ server.middlewares.forEachIsInstance <Middleware .OnRequest > { middleware ->
121152 val response = middleware.invoke(requestContext)
122153 if (response != null ) {
123154 ctx.writeAndFlush(response)
124- return
155+ return super .channelRead(ctx, msg)
125156 }
126157 }
127- var response = server.processRequestContext(requestContext)
128- server.middlewares.filterIsInstance<Middleware .OnResponse >().forEach { middleware ->
129- response = middleware.invoke(requestContext, response)
158+
159+ channelScope.launch {
160+ var response = server.processRequestContext(requestContext)
161+ server.middlewares.forEachIsInstance<Middleware .OnResponse > { middleware ->
162+ response = middleware.invoke(requestContext, response)
163+ }
164+ ctx.writeAndFlush(response)
130165 }
131- ctx.writeAndFlush(response)
132166 }
133167 }
134168
0 commit comments