Skip to content

Commit f439736

Browse files
committed
Revert handling incremental results in WebSocket
1 parent ab9449a commit f439736

File tree

10 files changed

+121
-41
lines changed

10 files changed

+121
-41
lines changed

libraries/apollo-runtime/src/commonMain/kotlin/com/apollographql/apollo/internal/incremental/GraphQL17Alpha2IncrementalResultsMerger.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import com.apollographql.apollo.api.DeferredFragmentIdentifier
44
import okio.BufferedSource
55

66
/**
7-
* Merger for the [com.apollographql.apollo.network.http.HttpNetworkTransport.IncrementalDeliveryProtocol.GraphQL17Alpha2] protocol format.
7+
* Merger for the [com.apollographql.apollo.network.IncrementalDeliveryProtocol.GraphQL17Alpha2] protocol format.
88
*/
99
@Suppress("UNCHECKED_CAST")
1010
internal class GraphQL17Alpha2IncrementalResultsMerger : IncrementalResultsMerger {

libraries/apollo-runtime/src/commonMain/kotlin/com/apollographql/apollo/internal/incremental/GraphQL17Alpha9IncrementalResultsMerger.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import com.apollographql.apollo.api.DeferredFragmentIdentifier
44
import okio.BufferedSource
55

66
/**
7-
* Merger for the [com.apollographql.apollo.network.http.HttpNetworkTransport.IncrementalDeliveryProtocol.GraphQL17Alpha9] protocol format.
7+
* Merger for the [com.apollographql.apollo.network.IncrementalDeliveryProtocol.GraphQL17Alpha9] protocol format.
88
*/
99
@Suppress("UNCHECKED_CAST")
1010
internal class GraphQL17Alpha9IncrementalResultsMerger : IncrementalResultsMerger {

libraries/apollo-runtime/src/commonMain/kotlin/com/apollographql/apollo/internal/incremental/IncrementalDeliveryProtocolImpl.kt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package com.apollographql.apollo.internal.incremental
22

3-
import com.apollographql.apollo.network.http.HttpNetworkTransport
3+
import com.apollographql.apollo.network.IncrementalDeliveryProtocol
44

55
internal sealed interface IncrementalDeliveryProtocolImpl {
66
val acceptHeader: String
@@ -22,8 +22,8 @@ internal sealed interface IncrementalDeliveryProtocolImpl {
2222
}
2323
}
2424

25-
internal val HttpNetworkTransport.IncrementalDeliveryProtocol.impl: IncrementalDeliveryProtocolImpl
25+
internal val IncrementalDeliveryProtocol.impl: IncrementalDeliveryProtocolImpl
2626
get() = when (this) {
27-
HttpNetworkTransport.IncrementalDeliveryProtocol.GraphQL17Alpha2 -> IncrementalDeliveryProtocolImpl.GraphQL17Alpha2
28-
HttpNetworkTransport.IncrementalDeliveryProtocol.GraphQL17Alpha9 -> IncrementalDeliveryProtocolImpl.GraphQL17Alpha9
27+
IncrementalDeliveryProtocol.GraphQL17Alpha2 -> IncrementalDeliveryProtocolImpl.GraphQL17Alpha2
28+
IncrementalDeliveryProtocol.GraphQL17Alpha9 -> IncrementalDeliveryProtocolImpl.GraphQL17Alpha9
2929
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package com.apollographql.apollo.network
2+
3+
import com.apollographql.apollo.annotations.ApolloExperimental
4+
5+
/**
6+
* The protocol to use for incremental delivery (`@defer` and `@stream`).
7+
*/
8+
@ApolloExperimental
9+
enum class IncrementalDeliveryProtocol {
10+
11+
/**
12+
* Newer format as implemented by graphql.js version `17.0.0-alpha.2` and specified in this historical commit:
13+
* https://github.com/graphql/graphql-spec/tree/48cf7263a71a683fab03d45d309fd42d8d9a6659/spec
14+
*
15+
* Only `@defer` is supported with this format.
16+
*
17+
* This is the default.
18+
*/
19+
GraphQL17Alpha2,
20+
21+
/**
22+
* Newer format as implemented by graphql.js version `17.0.0-alpha.9`.
23+
*
24+
* Both `@defer` and `@stream` are supported with this format.
25+
*/
26+
GraphQL17Alpha9
27+
}

libraries/apollo-runtime/src/commonMain/kotlin/com/apollographql/apollo/network/http/HttpNetworkTransport.kt

Lines changed: 1 addition & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import com.apollographql.apollo.internal.isGraphQLResponse
2929
import com.apollographql.apollo.internal.isMultipart
3030
import com.apollographql.apollo.internal.multipartBodyFlow
3131
import com.apollographql.apollo.mpp.currentTimeMillis
32+
import com.apollographql.apollo.network.IncrementalDeliveryProtocol
3233
import com.apollographql.apollo.network.NetworkTransport
3334
import com.benasher44.uuid.Uuid
3435
import com.benasher44.uuid.uuid4
@@ -418,28 +419,4 @@ private constructor(
418419
return chain.proceed(request.newBuilder().addHeaders(headers).build())
419420
}
420421
}
421-
422-
/**
423-
* The protocol to use for incremental delivery (`@defer` and `@stream`).
424-
*/
425-
@ApolloExperimental
426-
enum class IncrementalDeliveryProtocol {
427-
428-
/**
429-
* Newer format as implemented by graphql.js version `17.0.0-alpha.2` and specified in this historical commit:
430-
* https://github.com/graphql/graphql-spec/tree/48cf7263a71a683fab03d45d309fd42d8d9a6659/spec
431-
*
432-
* Only `@defer` is supported with this format.
433-
*
434-
* This is the default.
435-
*/
436-
GraphQL17Alpha2,
437-
438-
/**
439-
* Newer format as implemented by graphql.js version `17.0.0-alpha.9`.
440-
*
441-
* Both `@defer` and `@stream` are supported with this format.
442-
*/
443-
GraphQL17Alpha9
444-
}
445422
}

libraries/apollo-runtime/src/commonMain/kotlin/com/apollographql/apollo/network/websocket/WebSocketNetworkTransport.kt

Lines changed: 45 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@ import com.apollographql.apollo.exception.ApolloException
1212
import com.apollographql.apollo.exception.ApolloWebSocketForceCloseException
1313
import com.apollographql.apollo.exception.DefaultApolloException
1414
import com.apollographql.apollo.exception.SubscriptionOperationException
15+
import com.apollographql.apollo.internal.incremental.IncrementalDeliveryProtocolImpl
16+
import com.apollographql.apollo.internal.incremental.impl
17+
import com.apollographql.apollo.network.IncrementalDeliveryProtocol
1518
import com.apollographql.apollo.network.NetworkTransport
1619
import com.apollographql.apollo.network.websocket.internal.OperationListener
1720
import com.apollographql.apollo.network.websocket.internal.WebSocketPool
@@ -113,6 +116,7 @@ class WebSocketNetworkTransport private constructor(
113116
private var pingInterval: Duration? = null
114117
private var idleTimeout: Duration? = null
115118
private var parserFactory: SubscriptionParserFactory? = null
119+
private var incrementalDeliveryProtocol: IncrementalDeliveryProtocol = IncrementalDeliveryProtocol.GraphQL17Alpha2
116120

117121
/**
118122
* @param serverUrl a server url that is called every time a WebSocket
@@ -176,6 +180,15 @@ class WebSocketNetworkTransport private constructor(
176180
this.parserFactory = parserFactory
177181
}
178182

183+
/**
184+
* The incremental delivery protocol to use when using `@defer` and/or `@stream`.
185+
*
186+
* Default: [IncrementalDeliveryProtocol.GraphQL17Alpha2]
187+
*/
188+
@ApolloExperimental
189+
fun incrementalDeliveryProtocol(incrementalDeliveryProtocol: IncrementalDeliveryProtocol) = apply {
190+
this.incrementalDeliveryProtocol = incrementalDeliveryProtocol
191+
}
179192

180193
/**
181194
* Builds the [WebSocketNetworkTransport]
@@ -188,19 +201,25 @@ class WebSocketNetworkTransport private constructor(
188201
wsProtocol = wsProtocol ?: GraphQLWsProtocol { null },
189202
pingInterval = pingInterval,
190203
connectionAcknowledgeTimeout = connectionAcknowledgeTimeout ?: 10.seconds,
191-
parserFactory = parserFactory ?: DefaultSubscriptionParserFactory
204+
parserFactory = parserFactory ?: DefaultSubscriptionParserFactory(incrementalDeliveryProtocol.impl),
192205
)
193206
}
194207
}
195208
}
196209

197-
private object DefaultSubscriptionParserFactory : SubscriptionParserFactory {
210+
private class DefaultSubscriptionParserFactory(
211+
private val incrementalDeliveryProtocolImpl: IncrementalDeliveryProtocolImpl,
212+
) : SubscriptionParserFactory {
198213
override fun <D : Operation.Data> createParser(request: ApolloRequest<D>): SubscriptionParser<D> {
199-
return DefaultSubscriptionParser(request)
214+
return DefaultSubscriptionParser(incrementalDeliveryProtocolImpl, request)
200215
}
201216
}
202217

203-
private class DefaultSubscriptionParser<D : Operation.Data>(private val request: ApolloRequest<D>) : SubscriptionParser<D> {
218+
private class DefaultSubscriptionParser<D : Operation.Data>(
219+
incrementalDeliveryProtocolImpl: IncrementalDeliveryProtocolImpl,
220+
private val request: ApolloRequest<D>,
221+
) : SubscriptionParser<D> {
222+
private val incrementalResultsMerger = incrementalDeliveryProtocolImpl.newIncrementalResultsMerger()
204223
private val requestCustomScalarAdapters = request.executionContext[CustomScalarAdapters] ?: CustomScalarAdapters.Empty
205224

206225
@Suppress("NAME_SHADOWING")
@@ -212,13 +231,28 @@ private class DefaultSubscriptionParser<D : Operation.Data>(private val request:
212231
.exception(DefaultApolloException("Invalid payload")).build()
213232
}
214233

215-
val apolloResponse: ApolloResponse<D> = responseMap.jsonReader().toApolloResponse(
234+
val (payload, deferredFragmentIdentifiers) = if (responseMap.isDeferred()) {
235+
incrementalResultsMerger.merge(responseMap) to incrementalResultsMerger.deferredFragmentIdentifiers
236+
} else {
237+
responseMap to null
238+
}
239+
val apolloResponse: ApolloResponse<D> = payload.jsonReader().toApolloResponse(
216240
operation = request.operation,
217241
requestUuid = request.requestUuid,
218242
customScalarAdapters = requestCustomScalarAdapters,
243+
deferredFragmentIdentifiers = deferredFragmentIdentifiers,
219244
)
220245

221-
return apolloResponse
246+
if (!incrementalResultsMerger.hasNext) {
247+
// Last deferred payload: reset the incrementalResultsMerger for potential subsequent responses
248+
incrementalResultsMerger.reset()
249+
}
250+
251+
return if (incrementalResultsMerger.isEmptyResponse) {
252+
null
253+
} else {
254+
apolloResponse
255+
}
222256
}
223257
}
224258

@@ -253,6 +287,11 @@ private class DefaultOperationListener<D : Operation.Data>(
253287
producerScope.close()
254288
}
255289
}
290+
291+
private fun Map<String, Any?>.isDeferred(): Boolean {
292+
return keys.contains("hasNext")
293+
}
294+
256295
/**
257296
* Closes the websocket connection if the transport is a [WebSocketNetworkTransport].
258297
*

libraries/apollo-runtime/src/commonMain/kotlin/com/apollographql/apollo/network/ws/WebSocketNetworkTransport.kt

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.apollographql.apollo.network.ws
22

3+
import com.apollographql.apollo.annotations.ApolloExperimental
34
import com.apollographql.apollo.api.ApolloRequest
45
import com.apollographql.apollo.api.ApolloResponse
56
import com.apollographql.apollo.api.CustomScalarAdapters
@@ -10,7 +11,10 @@ import com.apollographql.apollo.api.toApolloResponse
1011
import com.apollographql.apollo.exception.ApolloException
1112
import com.apollographql.apollo.exception.ApolloNetworkException
1213
import com.apollographql.apollo.exception.SubscriptionOperationException
14+
import com.apollographql.apollo.internal.incremental.IncrementalDeliveryProtocolImpl
15+
import com.apollographql.apollo.internal.incremental.impl
1316
import com.apollographql.apollo.internal.transformWhile
17+
import com.apollographql.apollo.network.IncrementalDeliveryProtocol
1418
import com.apollographql.apollo.network.NetworkTransport
1519
import com.apollographql.apollo.network.ws.internal.Command
1620
import com.apollographql.apollo.network.ws.internal.ConnectionReEstablished
@@ -38,6 +42,7 @@ import kotlinx.coroutines.flow.Flow
3842
import kotlinx.coroutines.flow.MutableSharedFlow
3943
import kotlinx.coroutines.flow.asSharedFlow
4044
import kotlinx.coroutines.flow.filter
45+
import kotlinx.coroutines.flow.filterNot
4146
import kotlinx.coroutines.flow.map
4247
import kotlinx.coroutines.flow.onCompletion
4348
import kotlinx.coroutines.flow.onSubscription
@@ -59,6 +64,7 @@ private constructor(
5964
private val idleTimeoutMillis: Long = 60_000,
6065
private val protocolFactory: WsProtocol.Factory = SubscriptionWsProtocol.Factory(),
6166
private val reopenWhen: (suspend (Throwable, attempt: Long) -> Boolean)?,
67+
private val incrementalDeliveryProtocolImpl: IncrementalDeliveryProtocolImpl,
6268
) : NetworkTransport {
6369

6470
/**
@@ -259,6 +265,8 @@ private constructor(
259265
override fun <D : Operation.Data> execute(
260266
request: ApolloRequest<D>,
261267
): Flow<ApolloResponse<D>> {
268+
val incrementalResultsMerger = incrementalDeliveryProtocolImpl.newIncrementalResultsMerger()
269+
262270
return events.onSubscription {
263271
messages.send(StartOperation(request))
264272
}.filter {
@@ -295,13 +303,24 @@ private constructor(
295303
}.map { response ->
296304
when (response) {
297305
is OperationResponse -> {
306+
val responsePayload = response.payload
298307
val requestCustomScalarAdapters = request.executionContext[CustomScalarAdapters]!!
299-
val apolloResponse: ApolloResponse<D> = response.payload.jsonReader().toApolloResponse(
308+
val (payload, deferredFragmentIdentifiers) = if (responsePayload.isDeferred()) {
309+
incrementalResultsMerger.merge(responsePayload) to incrementalResultsMerger.deferredFragmentIdentifiers
310+
} else {
311+
responsePayload to null
312+
}
313+
val apolloResponse: ApolloResponse<D> = payload.jsonReader().toApolloResponse(
300314
operation = request.operation,
301315
requestUuid = request.requestUuid,
302316
customScalarAdapters = requestCustomScalarAdapters,
317+
deferredFragmentIdentifiers = deferredFragmentIdentifiers
303318
)
304319

320+
if (!incrementalResultsMerger.hasNext) {
321+
// Last deferred payload: reset the incrementalResultsMerger for potential subsequent responses
322+
incrementalResultsMerger.reset()
323+
}
305324
apolloResponse
306325
}
307326

@@ -311,6 +330,8 @@ private constructor(
311330
// Cannot happen as these events are filtered out upstream
312331
is ConnectionReEstablished, is OperationComplete, is GeneralError -> error("Unexpected event $response")
313332
}
333+
}.filterNot {
334+
incrementalResultsMerger.isEmptyResponse
314335
}.onCompletion {
315336
messages.send(StopOperation(request))
316337
}
@@ -353,6 +374,7 @@ private constructor(
353374
private var idleTimeoutMillis: Long? = null
354375
private var protocolFactory: WsProtocol.Factory? = null
355376
private var reopenWhen: (suspend (Throwable, attempt: Long) -> Boolean)? = null
377+
private var incrementalDeliveryProtocol: IncrementalDeliveryProtocol = IncrementalDeliveryProtocol.GraphQL17Alpha2
356378

357379
/**
358380
* Configure the server URL.
@@ -419,14 +441,25 @@ private constructor(
419441
this.reopenWhen = reopenWhen
420442
}
421443

444+
/**
445+
* The incremental delivery protocol to use when using `@defer` and/or `@stream`.
446+
*
447+
* Default: [IncrementalDeliveryProtocol.GraphQL17Alpha2]
448+
*/
449+
@ApolloExperimental
450+
fun incrementalDeliveryProtocol(incrementalDeliveryProtocol: IncrementalDeliveryProtocol) = apply {
451+
this.incrementalDeliveryProtocol = incrementalDeliveryProtocol
452+
}
453+
422454
fun build(): WebSocketNetworkTransport {
423455
return WebSocketNetworkTransport(
424456
serverUrl = serverUrl ?: error("No serverUrl specified"),
425457
headers = headers,
426458
webSocketEngine = webSocketEngine ?: DefaultWebSocketEngine(),
427459
idleTimeoutMillis = idleTimeoutMillis ?: 60_000,
428460
protocolFactory = protocolFactory ?: SubscriptionWsProtocol.Factory(),
429-
reopenWhen = reopenWhen
461+
reopenWhen = reopenWhen,
462+
incrementalDeliveryProtocolImpl = incrementalDeliveryProtocol.impl,
430463
)
431464
}
432465
}
@@ -441,3 +474,7 @@ fun NetworkTransport.closeConnection(reason: Throwable) {
441474
(this as? WebSocketNetworkTransport
442475
?: throw IllegalArgumentException("'$this' is not an instance of com.apollographql.apollo.ws.WebSocketNetworkTransport")).closeConnection(reason)
443476
}
477+
478+
private fun Map<String, Any?>.isDeferred(): Boolean {
479+
return keys.contains("hasNext")
480+
}

tests/defer/src/commonTest/kotlin/test/DeferGraphQL17Alpha9NormalizedCacheTest.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import com.apollographql.apollo.exception.ApolloNetworkException
2020
import com.apollographql.apollo.exception.CacheMissException
2121
import com.apollographql.apollo.network.NetworkTransport
2222
import com.apollographql.apollo.network.http.HttpNetworkTransport
23-
import com.apollographql.apollo.network.http.HttpNetworkTransport.IncrementalDeliveryProtocol
23+
import com.apollographql.apollo.network.IncrementalDeliveryProtocol
2424
import com.apollographql.apollo.testing.internal.runTest
2525
import com.apollographql.mockserver.MockServer
2626
import com.apollographql.mockserver.assertNoRequest

tests/defer/src/commonTest/kotlin/test/DeferGraphQL17Alpha9Test.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import com.apollographql.apollo.api.Error.Builder
77
import com.apollographql.apollo.autoPersistedQueryInfo
88
import com.apollographql.apollo.mpp.currentTimeMillis
99
import com.apollographql.apollo.network.http.HttpNetworkTransport
10-
import com.apollographql.apollo.network.http.HttpNetworkTransport.IncrementalDeliveryProtocol
10+
import com.apollographql.apollo.network.IncrementalDeliveryProtocol
1111
import com.apollographql.apollo.testing.Platform
1212
import com.apollographql.apollo.testing.internal.runTest
1313
import com.apollographql.apollo.testing.platform

tests/defer/src/commonTest/kotlin/test/DeferWithApolloServerTest.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import com.apollographql.apollo.api.ApolloResponse
55
import com.apollographql.apollo.api.Error
66
import com.apollographql.apollo.api.Optional
77
import com.apollographql.apollo.network.http.HttpNetworkTransport
8-
import com.apollographql.apollo.network.http.HttpNetworkTransport.IncrementalDeliveryProtocol
8+
import com.apollographql.apollo.network.IncrementalDeliveryProtocol
99
import com.apollographql.apollo.testing.internal.runTest
1010
import com.benasher44.uuid.uuid4
1111
import defer.CanDeferFragmentsOnTheTopLevelQueryFieldQuery

0 commit comments

Comments
 (0)