@@ -9,24 +9,29 @@ import monix.execution.atomic.Atomic
99import org .eclipse .jetty .client .HttpClient
1010import org .eclipse .jetty .ee8 .servlet .{ServletContextHandler , ServletHolder }
1111import org .eclipse .jetty .server .{NetworkConnector , Server }
12+ import org .scalatest .BeforeAndAfterEach
1213import org .scalatest .funsuite .AsyncFunSuite
1314
1415import java .net .InetSocketAddress
1516import scala .concurrent .Future
1617import scala .concurrent .duration .{FiniteDuration , IntMult }
1718
18- final class CloseStaleJettyConnectionsOnMonixTimeout extends AsyncFunSuite {
19+ final class CloseStaleJettyConnectionsOnMonixTimeout extends AsyncFunSuite with BeforeAndAfterEach {
1920
20- test(" close connection on monix task timeout" ) {
21- import monix .execution .Scheduler .Implicits .global
21+ import monix .execution .Scheduler .Implicits .global
2222
23- val MaxConnections : Int = 1 // to timeout quickly
24- val Connections : Int = 10 // > MaxConnections
25- val RequestTimeout : FiniteDuration = 1 .hour // no timeout
26- val CallTimeout : FiniteDuration = 300 .millis
23+ private val MaxConnections : Int = 1 // to timeout quickly
24+ private val Connections : Int = 10 // > MaxConnections
25+ private val RequestTimeout : FiniteDuration = 1 .hour // no timeout
26+ private val CallTimeout : FiniteDuration = 300 .millis
2727
28+ private var server : Server = _
29+ private var httpClient : HttpClient = _
30+ private var client : RestApiWithNeverCounter = _
2831
29- val server = new Server (new InetSocketAddress (" localhost" , 0 )) {
32+ override def beforeEach (): Unit = {
33+ super .beforeEach()
34+ server = new Server (new InetSocketAddress (" localhost" , 0 )) {
3035 setHandler(
3136 new ServletContextHandler ().setup(
3237 _.addServlet(
@@ -40,32 +45,49 @@ final class CloseStaleJettyConnectionsOnMonixTimeout extends AsyncFunSuite {
4045 start()
4146 }
4247
43- val httpClient = new HttpClient () {
48+ httpClient = new HttpClient () {
4449 setMaxConnectionsPerDestination(MaxConnections )
4550 setIdleTimeout(RequestTimeout .toMillis)
4651 start()
4752 }
4853
49- val client = JettyRestClient [RestApiWithNeverCounter ](
54+ client = JettyRestClient [RestApiWithNeverCounter ](
5055 client = httpClient,
5156 baseUri = server.getConnectors.head |> { case connector : NetworkConnector => s " http:// ${connector.getHost}: ${connector.getLocalPort}" },
5257 maxResponseLength = Int .MaxValue , // to avoid unnecessary logs
5358 timeout = RequestTimeout ,
5459 )
60+ }
5561
62+ override def afterEach (): Unit = {
63+ RestApiWithNeverCounter .Impl .counter.set(0 )
64+ server.stop()
65+ httpClient.stop()
66+ super .afterEach()
67+ }
68+
69+ test(" close connection on monix task timeout" ) {
5670 Task
5771 .traverse(List .range(0 , Connections ))(_ => Task .fromFuture(client.neverGet).timeout(CallTimeout ).failed)
5872 .timeoutTo(Connections * CallTimeout + 500 .millis, Task (fail(" All connections should have been closed" ))) // + 500 millis just in case
5973 .map(_ => assert(RestApiWithNeverCounter .Impl .counter.get() == Connections )) // neverGet should be called Connections times
60- .guarantee(Task {
61- server.stop()
62- httpClient.stop()
63- })
74+ .runToFuture
75+ }
76+
77+ test(" close connection on monix task cancellation" ) {
78+ Task
79+ .traverse(List .range(0 , Connections )) { i =>
80+ val cancelable = Task .fromFuture(client.neverGet).runAsync(_ => ())
81+ Task .sleep(100 .millis)
82+ .restartUntil(_ => RestApiWithNeverCounter .Impl .counter.get() >= i)
83+ .map(_ => cancelable.cancel())
84+ }
85+ .map(_ => assert(RestApiWithNeverCounter .Impl .counter.get() == Connections ))
6486 .runToFuture
6587 }
6688}
6789
68- object CloseStaleJettyConnectionsOnMonixTimeout {
90+ private object CloseStaleJettyConnectionsOnMonixTimeout {
6991 sealed trait RestApiWithNeverCounter {
7092 final val counter = Atomic (0 )
7193 @ GET def neverGet : Future [Unit ]
0 commit comments