11package org .acme .rest ;
22
33import static org .assertj .core .api .Assertions .assertThat ;
4- import static org .awaitility .Awaitility .await ;
54
65import java .net .URI ;
76import java .time .Duration ;
87import java .util .List ;
9- import java .util .concurrent .CopyOnWriteArrayList ;
108
11- import javax .ws .rs .client .Client ;
129import javax .ws .rs .client .ClientBuilder ;
13- import javax .ws .rs .client .WebTarget ;
14- import javax .ws .rs .sse .SseEventSource ;
10+ import javax .ws .rs .core .MediaType ;
1511
12+ import org .jboss .resteasy .reactive .client .impl .MultiInvoker ;
1613import org .junit .jupiter .api .Test ;
1714
1815import io .quarkus .test .common .http .TestHTTPEndpoint ;
1916import io .quarkus .test .common .http .TestHTTPResource ;
2017import io .quarkus .test .junit .QuarkusTest ;
18+ import io .smallrye .mutiny .helpers .test .AssertSubscriber ;
2119
2220@ QuarkusTest
2321class PriceResourceTest {
@@ -26,20 +24,18 @@ class PriceResourceTest {
2624 URI uri ;
2725
2826 @ Test
29- void testPricesEventStream () {
30- Client client = ClientBuilder .newClient ();
31- WebTarget target = client .target (this .uri );
32-
33- List <Double > received = new CopyOnWriteArrayList <>();
34-
35- try (SseEventSource source = SseEventSource .target (target ).build ()) {
36- source .register (inboundSseEvent -> received .add (Double .valueOf (inboundSseEvent .readData ())));
37- source .open ();
38-
39- await ()
40- .atMost (Duration .ofSeconds (20 ))
41- .until (() -> received .size () == 3 );
42- }
27+ public void sseEventStream () {
28+ List <Double > received = ClientBuilder .newClient ()
29+ .target (this .uri )
30+ .request (MediaType .SERVER_SENT_EVENTS )
31+ .rx (MultiInvoker .class )
32+ .get (Double .class )
33+ .select ().first (3 )
34+ .subscribe ().withSubscriber (AssertSubscriber .create (3 ))
35+ .assertSubscribed ()
36+ .awaitItems (3 , Duration .ofSeconds (20 ))
37+ .assertCompleted ()
38+ .getItems ();
4339
4440 assertThat (received )
4541 .hasSize (3 )
0 commit comments