Skip to content

Commit 90e45ea

Browse files
committed
fix(WS): fix closed socket exceptions
1 parent 0d50f40 commit 90e45ea

File tree

5 files changed

+101
-39
lines changed

5 files changed

+101
-39
lines changed

src/main/kotlin/net/ccbluex/netty/http/HttpServer.kt

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,16 @@ import io.netty.channel.ChannelOption
2525
import io.netty.channel.EventLoopGroup
2626
import io.netty.handler.logging.LogLevel
2727
import io.netty.handler.logging.LoggingHandler
28+
import kotlinx.coroutines.sync.Mutex
29+
import kotlinx.coroutines.sync.withLock
30+
import net.ccbluex.netty.http.coroutines.awaitSuspend
31+
import net.ccbluex.netty.http.coroutines.syncSuspend
2832
import net.ccbluex.netty.http.middleware.Middleware
2933
import net.ccbluex.netty.http.rest.RouteController
3034
import net.ccbluex.netty.http.util.TransportType
3135
import net.ccbluex.netty.http.websocket.WebSocketController
3236
import org.apache.logging.log4j.LogManager
3337
import java.net.InetSocketAddress
34-
import java.util.concurrent.locks.ReentrantLock
35-
import kotlin.concurrent.withLock
3638

3739

3840
/**
@@ -43,15 +45,16 @@ import kotlin.concurrent.withLock
4345
class HttpServer {
4446

4547
val routeController = RouteController()
46-
val webSocketController = WebSocketController()
4748

48-
private val lock = ReentrantLock()
49+
private val lock = Mutex()
4950

5051
internal val middlewares = mutableListOf<Middleware>()
5152

5253
private var bossGroup: EventLoopGroup? = null
5354
private var workerGroup: EventLoopGroup? = null
5455
private var serverChannel: Channel? = null
56+
var webSocketController: WebSocketController? = null
57+
private set
5558

5659
companion object {
5760
internal val logger = LogManager.getLogger("HttpServer")
@@ -69,7 +72,7 @@ class HttpServer {
6972
*
7073
* @return actual port of server.
7174
*/
72-
fun start(port: Int, useNativeTransport: Boolean = true): Int = lock.withLock {
75+
suspend fun start(port: Int, useNativeTransport: Boolean = true): Int = lock.withLock {
7376
val b = ServerBootstrap()
7477

7578
val groups = TransportType.apply(b, useNativeTransport)
@@ -81,8 +84,9 @@ class HttpServer {
8184
b.option(ChannelOption.SO_BACKLOG, 1024)
8285
.handler(LoggingHandler(LogLevel.INFO))
8386
.childHandler(HttpChannelInitializer(this))
84-
val ch = b.bind(port).sync().channel()
87+
val ch = b.bind(port).syncSuspend().channel()
8588
serverChannel = ch
89+
webSocketController = WebSocketController(ch)
8690

8791
logger.info("Netty server started on port $port.")
8892

@@ -98,12 +102,13 @@ class HttpServer {
98102
/**
99103
* Stops the Netty server gracefully.
100104
*/
101-
fun stop() = lock.withLock {
105+
suspend fun stop() = lock.withLock {
102106
logger.info("Shutting down Netty server...")
103107
try {
104-
serverChannel?.close()?.sync()
105-
bossGroup?.shutdownGracefully()?.sync()
106-
workerGroup?.shutdownGracefully()?.sync()
108+
webSocketController?.disconnect()
109+
serverChannel?.close()?.awaitSuspend()
110+
bossGroup?.shutdownGracefully()?.awaitSuspend()
111+
workerGroup?.shutdownGracefully()?.awaitSuspend()
107112
} catch (e: Exception) {
108113
logger.warn("Error during shutdown", e)
109114
} finally {

src/main/kotlin/net/ccbluex/netty/http/HttpServerHandler.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ internal class HttpServerHandler(private val server: HttpServer) : ChannelInboun
7575
+ CoroutineName("${ctx.name()}#${ctx.channel().id().asShortText()}")
7676
+ exceptionHandler
7777
)
78-
ctx.channel().closeFuture().addListener { channelScope.cancel() }
78+
ctx.channel().closeFuture().addListener { channelScope.cancel("Channel closed") }
7979
}
8080

8181
/**
@@ -123,7 +123,7 @@ internal class HttpServerHandler(private val server: HttpServer) : ChannelInboun
123123
handshaker.handshake(ctx.channel(), msg)
124124
}
125125

126-
server.webSocketController.addContext(ctx)
126+
server.webSocketController!!.addContext(ctx)
127127
} else {
128128
val requestContext = RequestContext(
129129
msg.method(),

src/main/kotlin/net/ccbluex/netty/http/coroutines/NettyCoroutine.kt

Lines changed: 42 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,37 +3,67 @@ package net.ccbluex.netty.http.coroutines
33
import io.netty.util.concurrent.Future
44
import io.netty.util.concurrent.GenericFutureListener
55
import kotlinx.coroutines.CancellableContinuation
6-
import kotlinx.coroutines.CancellationException
76
import kotlinx.coroutines.suspendCancellableCoroutine
7+
import java.util.concurrent.CancellationException
88

99
/**
10-
* Suspend until this Netty Future completes.
10+
* Suspend until this Netty Future completes,
11+
* and rethrows the cause of the failure if this future failed.
1112
*
12-
* Returns the Future result. Throws on failure or cancellation.
13+
* Returns the Future itself.
1314
*/
14-
suspend fun <V, F : Future<V>> F.suspend(): V {
15+
suspend fun <V, F : Future<V>> F.syncSuspend(): F {
1516
if (isDone) return unwrapDone().getOrThrow()
1617

1718
return suspendCancellableCoroutine { cont ->
18-
addListener(futureContinuationListener(cont))
19+
addListener(FutureResultContListener(cont))
1920

2021
cont.invokeOnCancellation {
2122
this.cancel(false)
2223
}
2324
}
2425
}
2526

26-
private fun <V, F : Future<V>> futureContinuationListener(
27-
cont: CancellableContinuation<V>
28-
): GenericFutureListener<F> = GenericFutureListener { future ->
29-
if (cont.isActive) {
30-
cont.resumeWith(future.unwrapDone())
27+
/**
28+
* Suspend until this Netty Future completes.
29+
*
30+
* Returns the Future itself.
31+
*/
32+
suspend fun <F : Future<*>> F.awaitSuspend(): F {
33+
if (isDone) return this
34+
35+
return suspendCancellableCoroutine { cont ->
36+
addListener(FutureContListener(cont))
37+
38+
cont.invokeOnCancellation {
39+
this.cancel(false)
40+
}
41+
}
42+
}
43+
44+
private class FutureContListener<V, F : Future<V>>(
45+
private val cont: CancellableContinuation<F>
46+
): GenericFutureListener<F> {
47+
override fun operationComplete(future: F) {
48+
if (cont.isActive) {
49+
cont.resumeWith(Result.success(future))
50+
}
51+
}
52+
}
53+
54+
private class FutureResultContListener<V, F : Future<V>>(
55+
private val cont: CancellableContinuation<F>
56+
): GenericFutureListener<F> {
57+
override fun operationComplete(future: F) {
58+
if (cont.isActive) {
59+
cont.resumeWith(future.unwrapDone())
60+
}
3161
}
3262
}
3363

34-
private fun <V, F : Future<V>> F.unwrapDone(): Result<V> =
64+
private fun <V, F : Future<V>> F.unwrapDone(): Result<F> =
3565
when {
36-
isSuccess -> Result.success(this.now)
66+
isSuccess -> Result.success(this)
3767
isCancelled -> Result.failure(CancellationException("Netty Future was cancelled"))
3868
else -> Result.failure(
3969
this.cause() ?: IllegalStateException("Future failed without cause")

src/main/kotlin/net/ccbluex/netty/http/websocket/WebSocketController.kt

Lines changed: 41 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,42 +19,66 @@
1919
*/
2020
package net.ccbluex.netty.http.websocket
2121

22+
import io.netty.channel.Channel
2223
import io.netty.channel.ChannelHandlerContext
2324
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame
24-
import java.util.concurrent.CopyOnWriteArrayList
25+
import kotlinx.coroutines.CoroutineScope
26+
import kotlinx.coroutines.Job
27+
import kotlinx.coroutines.SupervisorJob
28+
import kotlinx.coroutines.asCoroutineDispatcher
29+
import kotlinx.coroutines.cancel
30+
import kotlinx.coroutines.joinAll
31+
import kotlinx.coroutines.launch
32+
import net.ccbluex.netty.http.coroutines.awaitSuspend
33+
import net.ccbluex.netty.http.coroutines.syncSuspend
34+
import java.util.concurrent.CopyOnWriteArraySet
2535
import java.util.function.BiConsumer
2636

2737
/**
2838
* Controller for handling websocket connections.
2939
*/
30-
class WebSocketController {
40+
class WebSocketController(
41+
private val serverChannel: Channel,
42+
) {
43+
44+
private val scope = CoroutineScope(
45+
serverChannel.eventLoop().asCoroutineDispatcher() + SupervisorJob()
46+
)
47+
48+
init {
49+
serverChannel.closeFuture().addListener { scope.cancel("Channel closed") }
50+
}
3151

3252
/**
3353
* Keeps track of all connected websocket connections to the server.
3454
* This is used to broadcast messages to all connected clients.
3555
*/
36-
private val activeContexts = CopyOnWriteArrayList<ChannelHandlerContext>()
56+
private val activeContexts = CopyOnWriteArraySet<ChannelHandlerContext>()
3757

3858
/**
3959
* Broadcasts a message to all connected clients.
4060
*
4161
* @param text The message to broadcast.
4262
* @param onFailure The action to take if a failure occurs.
4363
*/
44-
fun broadcast(text: String, onFailure: BiConsumer<ChannelHandlerContext, Throwable>? = null) {
45-
val frame = TextWebSocketFrame(text)
46-
for (handlerContext in activeContexts) {
47-
val channelFuture = handlerContext.channel().writeAndFlush(frame.retainedDuplicate())
48-
if (onFailure != null) {
49-
channelFuture.addListener {
50-
if (!it.isSuccess) {
51-
onFailure.accept(handlerContext, it.cause())
64+
fun broadcast(text: String, onFailure: BiConsumer<ChannelHandlerContext, Throwable>? = null): Job =
65+
scope.launch {
66+
val frame = TextWebSocketFrame(text)
67+
68+
activeContexts.map { handlerContext ->
69+
launch {
70+
try {
71+
handlerContext.channel()
72+
.writeAndFlush(frame.retainedDuplicate())
73+
.syncSuspend()
74+
} catch (e: Exception) {
75+
onFailure?.accept(handlerContext, e)
5276
}
5377
}
54-
}
78+
}.joinAll()
79+
80+
frame.release()
5581
}
56-
frame.release()
57-
}
5882

5983
/**
6084
* Closes all active contexts.
@@ -74,6 +98,9 @@ class WebSocketController {
7498
*/
7599
fun addContext(context: ChannelHandlerContext) {
76100
activeContexts.add(context)
101+
context.channel().closeFuture().addListener {
102+
removeContext(context)
103+
}
77104
}
78105

79106
/**

src/main/kotlin/net/ccbluex/netty/http/websocket/WebSocketHandler.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ internal class WebSocketHandler(private val server: HttpServer) : ChannelInbound
5353
.writeAndFlush(msg.retainedDuplicate())
5454
.addListener(ChannelFutureListener.CLOSE)
5555

56-
server.webSocketController.removeContext(ctx)
56+
server.webSocketController!!.removeContext(ctx)
5757
logger.debug("WebSocket closed due to ${msg.reasonText()} (${msg.statusCode()})")
5858
}
5959
else -> logger.error("Unknown WebSocketFrame type: ${msg.javaClass.name}")

0 commit comments

Comments
 (0)