From c8c84e7550b3e214c6eb84beda81eb06efa99b67 Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Wed, 27 Aug 2025 18:53:40 +0530 Subject: [PATCH 1/5] [ECO-5056] use server-provided GC grace period for garbage collection - Add support for server-configured objectsGCGracePeriod from connection details, with fallback to default 24-hour period. Updates ObjectsPool and all LiveObjects - Implementations to use the server-provided value for tombstone cleanup timing. --- .../java/io/ably/lib/objects/Adapter.java | 6 ++-- .../io/ably/lib/objects/ObjectsAdapter.java | 8 ++--- .../ably/lib/transport/ConnectionManager.java | 2 ++ .../io/ably/lib/types/ConnectionDetails.java | 8 +++++ .../kotlin/io/ably/lib/objects/Helpers.kt | 20 ++++++++++--- .../kotlin/io/ably/lib/objects/ObjectsPool.kt | 18 ++++++++++-- .../lib/objects/type/BaseRealtimeObject.kt | 29 +++++++++++++++---- .../type/livecounter/DefaultLiveCounter.kt | 2 +- .../objects/type/livemap/DefaultLiveMap.kt | 4 +-- .../lib/objects/type/livemap/LiveMapEntry.kt | 5 ++-- .../io/ably/lib/objects/unit/HelpersTest.kt | 20 +++++++++---- .../lib/objects/unit/ObjectMessageSizeTest.kt | 18 +++++++++--- 12 files changed, 106 insertions(+), 34 deletions(-) diff --git a/lib/src/main/java/io/ably/lib/objects/Adapter.java b/lib/src/main/java/io/ably/lib/objects/Adapter.java index e9a084ae7..76c35cc37 100644 --- a/lib/src/main/java/io/ably/lib/objects/Adapter.java +++ b/lib/src/main/java/io/ably/lib/objects/Adapter.java @@ -2,7 +2,7 @@ import io.ably.lib.realtime.AblyRealtime; import io.ably.lib.realtime.ChannelBase; -import io.ably.lib.transport.ConnectionManager; +import io.ably.lib.realtime.Connection; import io.ably.lib.types.AblyException; import io.ably.lib.types.ClientOptions; import io.ably.lib.types.ErrorInfo; @@ -23,8 +23,8 @@ public Adapter(@NotNull AblyRealtime ably) { } @Override - public @NotNull ConnectionManager getConnectionManager() { - return ably.connection.connectionManager; + public @NotNull Connection getConnection() { + return ably.connection; } @Override diff --git a/lib/src/main/java/io/ably/lib/objects/ObjectsAdapter.java b/lib/src/main/java/io/ably/lib/objects/ObjectsAdapter.java index 21262942a..b6054e71a 100644 --- a/lib/src/main/java/io/ably/lib/objects/ObjectsAdapter.java +++ b/lib/src/main/java/io/ably/lib/objects/ObjectsAdapter.java @@ -1,7 +1,7 @@ package io.ably.lib.objects; import io.ably.lib.realtime.ChannelBase; -import io.ably.lib.transport.ConnectionManager; +import io.ably.lib.realtime.Connection; import io.ably.lib.types.AblyException; import io.ably.lib.types.ClientOptions; import org.jetbrains.annotations.Blocking; @@ -18,13 +18,13 @@ public interface ObjectsAdapter { @NotNull ClientOptions getClientOptions(); /** - * Retrieves the connection manager for handling connection state and operations. + * Retrieves the connection instance for handling connection state and operations. * Used to check connection status, obtain error information, and manage * message transmission across the Ably connection. * - * @return the connection manager instance + * @return the connection instance */ - @NotNull ConnectionManager getConnectionManager(); + @NotNull Connection getConnection(); /** * Retrieves the current time in milliseconds from the Ably server. diff --git a/lib/src/main/java/io/ably/lib/transport/ConnectionManager.java b/lib/src/main/java/io/ably/lib/transport/ConnectionManager.java index 89107d91e..2b0a14663 100644 --- a/lib/src/main/java/io/ably/lib/transport/ConnectionManager.java +++ b/lib/src/main/java/io/ably/lib/transport/ConnectionManager.java @@ -100,6 +100,7 @@ public class ConnectionManager implements ConnectListener { * This field is initialized only if the LiveObjects plugin is present in the classpath. */ private final LiveObjectsPlugin liveObjectsPlugin; + public Long objectsGCGracePeriod = null; /** * Methods on the channels map owned by the {@link AblyRealtime} instance @@ -1297,6 +1298,7 @@ private synchronized void onConnected(ProtocolMessage message) { maxIdleInterval = connectionDetails.maxIdleInterval; connectionStateTtl = connectionDetails.connectionStateTtl; maxMessageSize = connectionDetails.maxMessageSize; + objectsGCGracePeriod = connectionDetails.objectsGCGracePeriod; /* set the clientId resolved from token, if any */ String clientId = connectionDetails.clientId; diff --git a/lib/src/main/java/io/ably/lib/types/ConnectionDetails.java b/lib/src/main/java/io/ably/lib/types/ConnectionDetails.java index 0977a2350..8bf91cf78 100644 --- a/lib/src/main/java/io/ably/lib/types/ConnectionDetails.java +++ b/lib/src/main/java/io/ably/lib/types/ConnectionDetails.java @@ -74,6 +74,11 @@ public class ConnectionDetails { */ public Long connectionStateTtl; + /** + * The duration in milliseconds used to retain tombstoned objects at client side. + */ + public Long objectsGCGracePeriod; + ConnectionDetails() { maxIdleInterval = Defaults.maxIdleInterval; connectionStateTtl = Defaults.connectionStateTtl; @@ -114,6 +119,9 @@ ConnectionDetails readMsgpack(MessageUnpacker unpacker) throws IOException { case "connectionStateTtl": connectionStateTtl = unpacker.unpackLong(); break; + case "objectsGCGracePeriod": + objectsGCGracePeriod = unpacker.unpackLong(); + break; default: Log.v(TAG, "Unexpected field: " + fieldName); unpacker.skipValue(); diff --git a/liveobjects/src/main/kotlin/io/ably/lib/objects/Helpers.kt b/liveobjects/src/main/kotlin/io/ably/lib/objects/Helpers.kt index 7b169ff8f..fd09008b0 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/objects/Helpers.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/objects/Helpers.kt @@ -2,6 +2,7 @@ package io.ably.lib.objects import io.ably.lib.realtime.ChannelState import io.ably.lib.realtime.CompletionListener +import io.ably.lib.realtime.ConnectionEvent import io.ably.lib.types.ChannelMode import io.ably.lib.types.ErrorInfo import io.ably.lib.types.ProtocolMessage @@ -15,7 +16,7 @@ import kotlin.coroutines.resumeWithException */ internal suspend fun ObjectsAdapter.sendAsync(message: ProtocolMessage) = suspendCancellableCoroutine { continuation -> try { - connectionManager.send(message, clientOptions.queueMessages, object : CompletionListener { + connection.connectionManager.send(message, clientOptions.queueMessages, object : CompletionListener { override fun onSuccess() { continuation.resume(Unit) } @@ -45,6 +46,17 @@ internal suspend fun ObjectsAdapter.attachAsync(channelName: String) = suspendCa } } +internal fun ObjectsAdapter.retrieveObjectsGCGracePeriod(block : (Long?) -> Unit) { + val connectionManager = connection.connectionManager + if (connectionManager.objectsGCGracePeriod != null) { + block(connectionManager.objectsGCGracePeriod) + return + } + connection.once(ConnectionEvent.connected) { + block(connectionManager.objectsGCGracePeriod) + } +} + /** * Retrieves the channel modes for a specific channel. * This method returns the modes that are set for the specified channel. @@ -76,7 +88,7 @@ internal fun ObjectsAdapter.getChannelModes(channelName: String): Array) { - val maximumAllowedSize = connectionManager.maxMessageSize + val maximumAllowedSize = connection.connectionManager.maxMessageSize val objectsTotalMessageSize = objectMessages.sumOf { it.size() } if (objectsTotalMessageSize > maximumAllowedSize) { throw ablyException("ObjectMessages size $objectsTotalMessageSize exceeds maximum allowed size of $maximumAllowedSize bytes", @@ -131,8 +143,8 @@ internal fun ObjectsAdapter.throwIfInvalidWriteApiConfiguration(channelName: Str } internal fun ObjectsAdapter.throwIfUnpublishableState(channelName: String) { - if (!connectionManager.isActive) { - throw ablyException(connectionManager.stateErrorInfo) + if (!connection.connectionManager.isActive) { + throw ablyException(connection.connectionManager.stateErrorInfo) } throwIfInChannelState(channelName, arrayOf(ChannelState.failed, ChannelState.suspended)) } diff --git a/liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsPool.kt b/liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsPool.kt index 28ee839e0..43cab31c2 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsPool.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsPool.kt @@ -14,6 +14,9 @@ import java.util.concurrent.ConcurrentHashMap internal object ObjectsPoolDefaults { const val GC_INTERVAL_MS = 1000L * 60 * 5 // 5 minutes /** + * The SDK will attempt to use the `objectsGCGracePeriod` value provided by the server in the `connectionDetails` + * object of the `CONNECTED` event. + * If the server does not provide this value, the SDK will fall back to this default value. * Must be > 2 minutes to ensure we keep tombstones long enough to avoid the possibility of receiving an operation * with an earlier serial that would not have been applied if the tombstone still existed. * @@ -49,10 +52,19 @@ internal class ObjectsPool( private val gcScope = CoroutineScope(Dispatchers.Default + SupervisorJob()) private var gcJob: Job // Job for the garbage collection coroutine + @Volatile + private var gcGracePeriod = ObjectsPoolDefaults.GC_GRACE_PERIOD_MS + init { // RTO3b - Initialize pool with root object pool[ROOT_OBJECT_ID] = DefaultLiveMap.zeroValue(ROOT_OBJECT_ID, realtimeObjects) - // Start garbage collection coroutine + // Start garbage collection coroutine with server-provided grace period if available + realtimeObjects.adapter.retrieveObjectsGCGracePeriod { period -> + period?.let { + gcGracePeriod = it + Log.i(tag, "Using objectsGCGracePeriod from server: $gcGracePeriod ms") + } ?: Log.i(tag, "Server did not provide objectsGCGracePeriod, using default: $gcGracePeriod ms") + } gcJob = startGCJob() } @@ -123,9 +135,9 @@ internal class ObjectsPool( */ private fun onGCInterval() { pool.entries.removeIf { (_, obj) -> - if (obj.isEligibleForGc()) { true } // Remove from pool + if (obj.isEligibleForGc(gcGracePeriod)) { true } // Remove from pool else { - obj.onGCInterval() + obj.onGCInterval(gcGracePeriod) false // Keep in pool } } diff --git a/liveobjects/src/main/kotlin/io/ably/lib/objects/type/BaseRealtimeObject.kt b/liveobjects/src/main/kotlin/io/ably/lib/objects/type/BaseRealtimeObject.kt index fa94e0a59..852ab1639 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/objects/type/BaseRealtimeObject.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/objects/type/BaseRealtimeObject.kt @@ -3,7 +3,6 @@ package io.ably.lib.objects.type import io.ably.lib.objects.ObjectMessage import io.ably.lib.objects.ObjectOperation import io.ably.lib.objects.ObjectState -import io.ably.lib.objects.ObjectsPoolDefaults import io.ably.lib.objects.objectError import io.ably.lib.objects.type.livecounter.noOpCounterUpdate import io.ably.lib.objects.type.livemap.noOpMapUpdate @@ -136,10 +135,20 @@ internal abstract class BaseRealtimeObject( /** * Checks if the object is eligible for garbage collection. + * + * An object is eligible for garbage collection if it has been tombstoned and + * the time since tombstoning exceeds the specified grace period. + * + * @param gcGracePeriod The grace period in milliseconds that tombstoned objects + * should be kept before being eligible for collection. + * This value is retrieved from the server's connection details + * or defaults to 24 hours if not provided by the server. + * @return true if the object is tombstoned and the grace period has elapsed, + * false otherwise */ - internal fun isEligibleForGc(): Boolean { + internal fun isEligibleForGc(gcGracePeriod: Long): Boolean { val currentTime = System.currentTimeMillis() - return isTombstoned && tombstonedAt?.let { currentTime - it >= ObjectsPoolDefaults.GC_GRACE_PERIOD_MS } == true + return isTombstoned && tombstonedAt?.let { currentTime - it >= gcGracePeriod } == true } /** @@ -195,12 +204,22 @@ internal abstract class BaseRealtimeObject( /** * Called during garbage collection intervals to clean up expired entries. * + * This method is invoked periodically (every 5 minutes) by the ObjectsPool + * to perform cleanup of tombstoned data that has exceeded the grace period. + * * This method should identify and remove entries that: * - Have been marked as tombstoned - * - Have a tombstone timestamp older than the configured grace period + * - Have a tombstone timestamp older than the specified grace period + * + * @param gcGracePeriod The grace period in milliseconds that tombstoned entries + * should be kept before being eligible for removal. + * This value is retrieved from the server's connection details + * or defaults to 24 hours if not provided by the server. + * Must be greater than 2 minutes to ensure proper operation + * ordering and avoid issues with delayed operations. * * Implementations typically use single-pass removal techniques to * efficiently clean up expired data without creating temporary collections. */ - abstract fun onGCInterval() + abstract fun onGCInterval(gcGracePeriod: Long) } diff --git a/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livecounter/DefaultLiveCounter.kt b/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livecounter/DefaultLiveCounter.kt index b34188b62..aed7859b7 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livecounter/DefaultLiveCounter.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livecounter/DefaultLiveCounter.kt @@ -109,7 +109,7 @@ internal class DefaultLiveCounter private constructor( liveCounterManager.notify(update as LiveCounterUpdate) } - override fun onGCInterval() { + override fun onGCInterval(gcGracePeriod: Long) { // Nothing to GC for a counter object return } diff --git a/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livemap/DefaultLiveMap.kt b/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livemap/DefaultLiveMap.kt index 8c2da8e6a..7a2979f2f 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livemap/DefaultLiveMap.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livemap/DefaultLiveMap.kt @@ -182,8 +182,8 @@ internal class DefaultLiveMap private constructor( liveMapManager.notify(update as LiveMapUpdate) } - override fun onGCInterval() { - data.entries.removeIf { (_, entry) -> entry.isEligibleForGc() } + override fun onGCInterval(gcGracePeriod: Long) { + data.entries.removeIf { (_, entry) -> entry.isEligibleForGc(gcGracePeriod) } } companion object { diff --git a/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livemap/LiveMapEntry.kt b/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livemap/LiveMapEntry.kt index 4c32366e1..e1824aad8 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livemap/LiveMapEntry.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livemap/LiveMapEntry.kt @@ -3,7 +3,6 @@ package io.ably.lib.objects.type.livemap import io.ably.lib.objects.* import io.ably.lib.objects.ObjectData import io.ably.lib.objects.ObjectsPool -import io.ably.lib.objects.ObjectsPoolDefaults import io.ably.lib.objects.type.BaseRealtimeObject import io.ably.lib.objects.type.ObjectType import io.ably.lib.objects.type.counter.LiveCounter @@ -61,9 +60,9 @@ internal fun LiveMapEntry.getResolvedValue(objectsPool: ObjectsPool): LiveMapVal /** * Extension function to check if a LiveMapEntry is expired and ready for garbage collection */ -internal fun LiveMapEntry.isEligibleForGc(): Boolean { +internal fun LiveMapEntry.isEligibleForGc(gcGracePeriod: Long): Boolean { val currentTime = System.currentTimeMillis() - return isTombstoned && tombstonedAt?.let { currentTime - it >= ObjectsPoolDefaults.GC_GRACE_PERIOD_MS } == true + return isTombstoned && tombstonedAt?.let { currentTime - it >= gcGracePeriod } == true } private fun fromObjectValue(objValue: ObjectValue): LiveMapValue { diff --git a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/HelpersTest.kt b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/HelpersTest.kt index 4b6662636..feba041cb 100644 --- a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/HelpersTest.kt +++ b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/HelpersTest.kt @@ -25,7 +25,9 @@ class HelpersTest { val connManager = mockk(relaxed = true) val clientOptions = ClientOptions().apply { queueMessages = false } - every { adapter.connectionManager } returns connManager + every { adapter.connection } returns mockk(relaxed = true) { + setPrivateField("connectionManager", connManager) + } every { adapter.clientOptions } returns clientOptions every { connManager.send(any(), any(), any()) } answers { @@ -48,7 +50,9 @@ class HelpersTest { val connManager = mockk(relaxed = true) val clientOptions = ClientOptions() - every { adapter.connectionManager } returns connManager + every { adapter.connection } returns mockk(relaxed = true) { + setPrivateField("connectionManager", connManager) + } every { adapter.clientOptions } returns clientOptions every { connManager.send(any(), any(), any()) } answers { @@ -69,7 +73,9 @@ class HelpersTest { val connManager = mockk(relaxed = true) val clientOptions = ClientOptions() - every { adapter.connectionManager } returns connManager + every { adapter.connection } returns mockk(relaxed = true) { + setPrivateField("connectionManager", connManager) + } every { adapter.clientOptions } returns clientOptions every { connManager.send(any(), any(), any()) } throws RuntimeException("send failed hard") @@ -346,7 +352,9 @@ class HelpersTest { fun testThrowIfUnpublishableStateInactiveConnection() { val adapter = mockk(relaxed = true) val connManager = mockk(relaxed = true) - every { adapter.connectionManager } returns connManager + every { adapter.connection } returns mockk(relaxed = true) { + setPrivateField("connectionManager", connManager) + } every { connManager.isActive } returns false every { connManager.stateErrorInfo } returns serverError("not active").errorInfo @@ -359,7 +367,9 @@ class HelpersTest { fun testThrowIfUnpublishableStateChannelFailed() { val adapter = mockk(relaxed = true) val connManager = mockk(relaxed = true) - every { adapter.connectionManager } returns connManager + every { adapter.connection } returns mockk(relaxed = true) { + setPrivateField("connectionManager", connManager) + } every { connManager.isActive } returns true val channel = mockk(relaxed = true) every { adapter.getChannel("ch") } returns channel diff --git a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/ObjectMessageSizeTest.kt b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/ObjectMessageSizeTest.kt index 32a51069a..37c34cfb1 100644 --- a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/ObjectMessageSizeTest.kt +++ b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/ObjectMessageSizeTest.kt @@ -10,8 +10,10 @@ import io.ably.lib.objects.ObjectOperationAction import io.ably.lib.objects.ObjectValue import io.ably.lib.objects.ensureMessageSizeWithinLimit import io.ably.lib.objects.size +import io.ably.lib.transport.ConnectionManager import io.ably.lib.transport.Defaults import io.ably.lib.types.AblyException +import io.mockk.every import io.mockk.mockk import kotlinx.coroutines.test.runTest import org.junit.Test @@ -23,8 +25,12 @@ class ObjectMessageSizeTest { @Test fun testObjectMessageSizeWithinLimit() = runTest { val mockAdapter = mockk(relaxed = true) - mockAdapter.connectionManager.maxMessageSize = Defaults.maxMessageSize // 64 kb - assertEquals(65536, mockAdapter.connectionManager.maxMessageSize) + val connManager = mockk(relaxed = true) + every { mockAdapter.connection } returns mockk(relaxed = true) { + setPrivateField("connectionManager", connManager) + } + connManager.maxMessageSize = Defaults.maxMessageSize // 64 kb + assertEquals(65536, connManager.maxMessageSize) // ObjectMessage with all size-contributing fields val objectMessage = ObjectMessage( @@ -148,8 +154,12 @@ class ObjectMessageSizeTest { @Test fun testObjectMessageSizeAboveLimit() = runTest { val mockAdapter = mockk(relaxed = true) - mockAdapter.connectionManager.maxMessageSize = Defaults.maxMessageSize // 64 kb - assertEquals(65536, mockAdapter.connectionManager.maxMessageSize) + val connManager = mockk(relaxed = true) + every { mockAdapter.connection } returns mockk(relaxed = true) { + setPrivateField("connectionManager", connManager) + } + connManager.maxMessageSize = Defaults.maxMessageSize // 64 kb + assertEquals(65536, connManager.maxMessageSize) // Create ObjectMessage with dummy data that results in size 60kb val objectMessage1 = ObjectMessage( From cd9490be931a5235a1a2304a3b3043046c0d14da Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Thu, 28 Aug 2025 14:44:49 +0530 Subject: [PATCH 2/5] [ECO-5056] Updated test helper getMockObjectsAdapter with static mocck 1. Encouraged use of global mocck for Adapter 2. Updated failing tests accordingly 3. Added unit tests covering all cases for retrieveObjectsGCGracePeriod --- .../kotlin/io/ably/lib/objects/Helpers.kt | 19 ++- .../io/ably/lib/objects/unit/HelpersTest.kt | 128 +++++++++++------- .../lib/objects/unit/ObjectMessageSizeTest.kt | 24 +--- .../io/ably/lib/objects/unit/TestHelpers.kt | 10 +- .../objects/unit/objects/ObjectsPoolTest.kt | 8 +- 5 files changed, 109 insertions(+), 80 deletions(-) diff --git a/liveobjects/src/main/kotlin/io/ably/lib/objects/Helpers.kt b/liveobjects/src/main/kotlin/io/ably/lib/objects/Helpers.kt index fd09008b0..18e87e944 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/objects/Helpers.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/objects/Helpers.kt @@ -3,6 +3,7 @@ package io.ably.lib.objects import io.ably.lib.realtime.ChannelState import io.ably.lib.realtime.CompletionListener import io.ably.lib.realtime.ConnectionEvent +import io.ably.lib.realtime.ConnectionState import io.ably.lib.types.ChannelMode import io.ably.lib.types.ErrorInfo import io.ably.lib.types.ProtocolMessage @@ -11,12 +12,14 @@ import kotlinx.coroutines.suspendCancellableCoroutine import kotlin.coroutines.resume import kotlin.coroutines.resumeWithException +internal val ObjectsAdapter.connectionManager get() = connection.connectionManager + /** * Spec: RTO15g */ internal suspend fun ObjectsAdapter.sendAsync(message: ProtocolMessage) = suspendCancellableCoroutine { continuation -> try { - connection.connectionManager.send(message, clientOptions.queueMessages, object : CompletionListener { + connectionManager.send(message, clientOptions.queueMessages, object : CompletionListener { override fun onSuccess() { continuation.resume(Unit) } @@ -47,8 +50,12 @@ internal suspend fun ObjectsAdapter.attachAsync(channelName: String) = suspendCa } internal fun ObjectsAdapter.retrieveObjectsGCGracePeriod(block : (Long?) -> Unit) { - val connectionManager = connection.connectionManager - if (connectionManager.objectsGCGracePeriod != null) { + connectionManager.objectsGCGracePeriod?.let { + block(it) + return + } + // If already connected, no further `connected` event is guaranteed; return immediately. + if (connection.state == ConnectionState.connected) { block(connectionManager.objectsGCGracePeriod) return } @@ -88,7 +95,7 @@ internal fun ObjectsAdapter.getChannelModes(channelName: String): Array) { - val maximumAllowedSize = connection.connectionManager.maxMessageSize + val maximumAllowedSize = connectionManager.maxMessageSize val objectsTotalMessageSize = objectMessages.sumOf { it.size() } if (objectsTotalMessageSize > maximumAllowedSize) { throw ablyException("ObjectMessages size $objectsTotalMessageSize exceeds maximum allowed size of $maximumAllowedSize bytes", @@ -143,8 +150,8 @@ internal fun ObjectsAdapter.throwIfInvalidWriteApiConfiguration(channelName: Str } internal fun ObjectsAdapter.throwIfUnpublishableState(channelName: String) { - if (!connection.connectionManager.isActive) { - throw ablyException(connection.connectionManager.stateErrorInfo) + if (!connectionManager.isActive) { + throw ablyException(connectionManager.stateErrorInfo) } throwIfInChannelState(channelName, arrayOf(ChannelState.failed, ChannelState.suspended)) } diff --git a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/HelpersTest.kt b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/HelpersTest.kt index feba041cb..fc81be8fd 100644 --- a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/HelpersTest.kt +++ b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/HelpersTest.kt @@ -5,12 +5,10 @@ import io.ably.lib.realtime.Channel import io.ably.lib.realtime.ChannelState import io.ably.lib.realtime.ChannelStateListener import io.ably.lib.realtime.CompletionListener -import io.ably.lib.transport.ConnectionManager +import io.ably.lib.realtime.ConnectionEvent +import io.ably.lib.realtime.ConnectionStateListener import io.ably.lib.types.* -import io.mockk.every -import io.mockk.mockk -import io.mockk.slot -import io.mockk.verify +import io.mockk.* import kotlinx.coroutines.test.runTest import org.junit.Assert.* import org.junit.Test @@ -21,13 +19,10 @@ class HelpersTest { // sendAsync @Test fun testSendAsyncShouldQueueAccordingToClientOptions() = runTest { - val adapter = mockk(relaxed = true) - val connManager = mockk(relaxed = true) + val adapter = getMockObjectsAdapter() + val connManager = adapter.connectionManager val clientOptions = ClientOptions().apply { queueMessages = false } - every { adapter.connection } returns mockk(relaxed = true) { - setPrivateField("connectionManager", connManager) - } every { adapter.clientOptions } returns clientOptions every { connManager.send(any(), any(), any()) } answers { @@ -46,13 +41,10 @@ class HelpersTest { @Test fun testSendAsyncErrorPropagatesAblyException() = runTest { - val adapter = mockk(relaxed = true) - val connManager = mockk(relaxed = true) + val adapter = getMockObjectsAdapter() + val connManager = adapter.connectionManager val clientOptions = ClientOptions() - every { adapter.connection } returns mockk(relaxed = true) { - setPrivateField("connectionManager", connManager) - } every { adapter.clientOptions } returns clientOptions every { connManager.send(any(), any(), any()) } answers { @@ -67,15 +59,59 @@ class HelpersTest { assertEquals(40000, ex.errorInfo.code) } + @Test + fun testRetrieveObjectsGCGracePeriodImmediateInvokesBlock() { + val adapter = getMockObjectsAdapter() + val connManager = adapter.connectionManager + connManager.setPrivateField("objectsGCGracePeriod", 123L) + + var value: Long? = null + adapter.retrieveObjectsGCGracePeriod { v -> value = v } + + assertEquals(123L, value) + verify(exactly = 0) { adapter.connection.once(ConnectionEvent.connected, any()) } + } + + @Test + fun testRetrieveObjectsGCGracePeriodDeferredInvokesOnConnectedWithValue() { + val adapter = getMockObjectsAdapter() + val connManager = adapter.connectionManager + + var value: Long? = null + every { adapter.connection.once(ConnectionEvent.connected, any()) } answers { + val listener = secondArg() + connManager.setPrivateField("objectsGCGracePeriod", 456L) + listener.onConnectionStateChanged(mockk(relaxed = true)) + } + + adapter.retrieveObjectsGCGracePeriod { v -> value = v } + + assertEquals(456L, value) + verify(exactly = 1) { adapter.connection.once(ConnectionEvent.connected, any()) } + } + + @Test + fun testRetrieveObjectsGCGracePeriodDeferredInvokesOnConnectedWithNull() { + val adapter = getMockObjectsAdapter() + + var value: Long? = null + every { adapter.connection.once(ConnectionEvent.connected, any()) } answers { + val listener = secondArg() + listener.onConnectionStateChanged(mockk(relaxed = true)) + } + + adapter.retrieveObjectsGCGracePeriod { v -> value = v } + + assertNull(value) + verify(exactly = 1) { adapter.connection.once(ConnectionEvent.connected, any()) } + } + @Test fun testSendAsyncThrowsWhenConnectionManagerThrows() = runTest { - val adapter = mockk(relaxed = true) - val connManager = mockk(relaxed = true) + val adapter = getMockObjectsAdapter() + val connManager = adapter.connectionManager val clientOptions = ClientOptions() - every { adapter.connection } returns mockk(relaxed = true) { - setPrivateField("connectionManager", connManager) - } every { adapter.clientOptions } returns clientOptions every { connManager.send(any(), any(), any()) } throws RuntimeException("send failed hard") @@ -251,25 +287,25 @@ class HelpersTest { verify(exactly = 1) { channel.once(any()) } } - @Test - fun testEnsureAttachedAttachingButReceivesNonAttachedEmitsError() = runTest { - val adapter = mockk(relaxed = true) - val channel = mockk(relaxed = true) - every { adapter.getChannel("ch") } returns channel - channel.state = ChannelState.attaching - every { channel.once(any()) } answers { - val listener = firstArg() - val stateChange = mockk(relaxed = true) { - setPrivateField("current", ChannelState.suspended) - setPrivateField("reason", clientError("Not attached").errorInfo) - } - listener.onChannelStateChanged(stateChange) - } - val ex = assertFailsWith { adapter.ensureAttached("ch") } - assertEquals(ErrorCode.ChannelStateError.code, ex.errorInfo.code) - assertTrue(ex.errorInfo.message.contains("Not attached")) - verify(exactly = 1) { channel.once(any()) } - } + @Test + fun testEnsureAttachedAttachingButReceivesNonAttachedEmitsError() = runTest { + val adapter = mockk(relaxed = true) + val channel = mockk(relaxed = true) + every { adapter.getChannel("ch") } returns channel + channel.state = ChannelState.attaching + every { channel.once(any()) } answers { + val listener = firstArg() + val stateChange = mockk(relaxed = true) { + setPrivateField("current", ChannelState.suspended) + setPrivateField("reason", clientError("Not attached").errorInfo) + } + listener.onChannelStateChanged(stateChange) + } + val ex = assertFailsWith { adapter.ensureAttached("ch") } + assertEquals(ErrorCode.ChannelStateError.code, ex.errorInfo.code) + assertTrue(ex.errorInfo.message.contains("Not attached")) + verify(exactly = 1) { channel.once(any()) } + } @Test fun testEnsureAttachedThrowsForInvalidState() = runTest { @@ -350,11 +386,8 @@ class HelpersTest { // throwIfUnpublishableState @Test fun testThrowIfUnpublishableStateInactiveConnection() { - val adapter = mockk(relaxed = true) - val connManager = mockk(relaxed = true) - every { adapter.connection } returns mockk(relaxed = true) { - setPrivateField("connectionManager", connManager) - } + val adapter = getMockObjectsAdapter() + val connManager = adapter.connectionManager every { connManager.isActive } returns false every { connManager.stateErrorInfo } returns serverError("not active").errorInfo @@ -365,11 +398,8 @@ class HelpersTest { @Test fun testThrowIfUnpublishableStateChannelFailed() { - val adapter = mockk(relaxed = true) - val connManager = mockk(relaxed = true) - every { adapter.connection } returns mockk(relaxed = true) { - setPrivateField("connectionManager", connManager) - } + val adapter = getMockObjectsAdapter() + val connManager = adapter.connectionManager every { connManager.isActive } returns true val channel = mockk(relaxed = true) every { adapter.getChannel("ch") } returns channel diff --git a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/ObjectMessageSizeTest.kt b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/ObjectMessageSizeTest.kt index 37c34cfb1..12a78b550 100644 --- a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/ObjectMessageSizeTest.kt +++ b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/ObjectMessageSizeTest.kt @@ -10,27 +10,19 @@ import io.ably.lib.objects.ObjectOperationAction import io.ably.lib.objects.ObjectValue import io.ably.lib.objects.ensureMessageSizeWithinLimit import io.ably.lib.objects.size -import io.ably.lib.transport.ConnectionManager import io.ably.lib.transport.Defaults import io.ably.lib.types.AblyException -import io.mockk.every -import io.mockk.mockk import kotlinx.coroutines.test.runTest import org.junit.Test import kotlin.test.assertEquals import kotlin.test.assertFailsWith class ObjectMessageSizeTest { - @Test fun testObjectMessageSizeWithinLimit() = runTest { - val mockAdapter = mockk(relaxed = true) - val connManager = mockk(relaxed = true) - every { mockAdapter.connection } returns mockk(relaxed = true) { - setPrivateField("connectionManager", connManager) - } - connManager.maxMessageSize = Defaults.maxMessageSize // 64 kb - assertEquals(65536, connManager.maxMessageSize) + val mockAdapter = getMockObjectsAdapter() + mockAdapter.connectionManager.maxMessageSize = Defaults.maxMessageSize // 64 kb + assertEquals(65536, mockAdapter.connectionManager.maxMessageSize) // ObjectMessage with all size-contributing fields val objectMessage = ObjectMessage( @@ -153,13 +145,9 @@ class ObjectMessageSizeTest { @Test fun testObjectMessageSizeAboveLimit() = runTest { - val mockAdapter = mockk(relaxed = true) - val connManager = mockk(relaxed = true) - every { mockAdapter.connection } returns mockk(relaxed = true) { - setPrivateField("connectionManager", connManager) - } - connManager.maxMessageSize = Defaults.maxMessageSize // 64 kb - assertEquals(65536, connManager.maxMessageSize) + val mockAdapter = getMockObjectsAdapter() + mockAdapter.connectionManager.maxMessageSize = Defaults.maxMessageSize // 64 kb + assertEquals(65536, mockAdapter.connectionManager.maxMessageSize) // Create ObjectMessage with dummy data that results in size 60kb val objectMessage1 = ObjectMessage( diff --git a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/TestHelpers.kt b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/TestHelpers.kt index 94354fcf9..b482b418d 100644 --- a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/TestHelpers.kt +++ b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/TestHelpers.kt @@ -11,11 +11,13 @@ import io.ably.lib.objects.type.livemap.LiveMapManager import io.ably.lib.realtime.AblyRealtime import io.ably.lib.realtime.Channel import io.ably.lib.realtime.ChannelState +import io.ably.lib.transport.ConnectionManager import io.ably.lib.types.ChannelMode import io.ably.lib.types.ChannelOptions import io.ably.lib.types.ClientOptions import io.mockk.every import io.mockk.mockk +import io.mockk.mockkStatic import io.mockk.spyk internal fun getMockRealtimeChannel( @@ -45,9 +47,11 @@ internal fun getMockRealtimeChannel( } internal fun getMockObjectsAdapter(): ObjectsAdapter { - val mockkAdapter = mockk(relaxed = true) - every { mockkAdapter.getChannel(any()) } returns getMockRealtimeChannel("testChannelName") - return mockkAdapter + mockkStatic("io.ably.lib.objects.HelpersKt") + return mockk(relaxed = true) { + every { getChannel(any()) } returns getMockRealtimeChannel("testChannelName") + every { connectionManager } returns mockk(relaxed = true) + } } internal fun getMockObjectsPool(): ObjectsPool { diff --git a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsPoolTest.kt b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsPoolTest.kt index 656b1e7c1..aff4f9d1a 100644 --- a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsPoolTest.kt +++ b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsPoolTest.kt @@ -19,7 +19,7 @@ class ObjectsPoolTest { @Test fun `(RTO3, RTO3a, RTO3b) An internal ObjectsPool should be used to maintain the list of objects present on a channel`() { - val defaultRealtimeObjects = DefaultRealtimeObjects("dummyChannel", mockk(relaxed = true)) + val defaultRealtimeObjects = DefaultRealtimeObjects("dummyChannel", getMockObjectsAdapter()) val objectsPool = defaultRealtimeObjects.objectsPool assertNotNull(objectsPool) @@ -44,7 +44,7 @@ class ObjectsPoolTest { @Test fun `(RTO6) ObjectsPool should create zero-value objects if not exists`() { - val defaultRealtimeObjects = DefaultRealtimeObjects("dummyChannel", mockk(relaxed = true)) + val defaultRealtimeObjects = DefaultRealtimeObjects("dummyChannel", getMockObjectsAdapter()) val objectsPool = spyk(defaultRealtimeObjects.objectsPool) assertEquals(1, objectsPool.size(), "RTO3 - Should only contain the root object initially") @@ -78,7 +78,7 @@ class ObjectsPoolTest { @Test fun `(RTO4b1, RTO4b2) ObjectsPool should reset to initial pool retaining original root map`() { - val defaultRealtimeObjects = DefaultRealtimeObjects("dummyChannel", mockk(relaxed = true)) + val defaultRealtimeObjects = DefaultRealtimeObjects("dummyChannel", getMockObjectsAdapter()) val objectsPool = defaultRealtimeObjects.objectsPool assertEquals(1, objectsPool.size()) val rootMap = objectsPool.get(ROOT_OBJECT_ID) as DefaultLiveMap @@ -107,7 +107,7 @@ class ObjectsPoolTest { @Test fun `(RTO5c2, RTO5c2a) ObjectsPool should delete extra object IDs`() { - val defaultRealtimeObjects = DefaultRealtimeObjects("dummyChannel", mockk(relaxed = true)) + val defaultRealtimeObjects = DefaultRealtimeObjects("dummyChannel", getMockObjectsAdapter()) val objectsPool = defaultRealtimeObjects.objectsPool // Add some objects From d39929d33de1875a1d6edcd09c3a64152db20dbc Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Thu, 28 Aug 2025 16:25:01 +0530 Subject: [PATCH 3/5] [ECO-5056] Added test for getObjects when liveobjects plugin is not installed --- .../test/realtime/RealtimeChannelTest.java | 24 +++++++++++++++++++ .../io/ably/lib/objects/unit/HelpersTest.kt | 10 ++++---- 2 files changed, 30 insertions(+), 4 deletions(-) diff --git a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeChannelTest.java b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeChannelTest.java index d9824da31..6bd006f50 100644 --- a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeChannelTest.java +++ b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeChannelTest.java @@ -46,6 +46,7 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -2578,6 +2579,29 @@ public void connect_should_not_rewrite_immediate_attach() throws AblyException { } } + @Test + public void channel_get_objects_throws_exception() throws AblyException { + ClientOptions opts = createOptions(testVars.keys[0].keyStr); + try (AblyRealtime ably = new AblyRealtime(opts)) { + + /* wait until connected */ + new ConnectionWaiter(ably.connection).waitFor(ConnectionState.connected); + assertEquals("Verify connected state reached", ably.connection.state, ConnectionState.connected); + + /* create a channel and attach */ + final Channel channel = ably.channels.get("channel"); + channel.attach(); + new ChannelWaiter(channel).waitFor(ChannelState.attached); + assertEquals("Verify attached state reached", channel.state, ChannelState.attached); + + AblyException exception = assertThrows(AblyException.class, channel::getObjects); + assertNotNull(exception); + assertEquals(40019, exception.errorInfo.code); + assertEquals(400, exception.errorInfo.statusCode); + assertTrue(exception.errorInfo.message.contains("LiveObjects plugin hasn't been installed")); + } + } + static class DetachingProtocolListener implements DebugOptions.RawProtocolListener { public Channel theChannel; diff --git a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/HelpersTest.kt b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/HelpersTest.kt index fc81be8fd..ba86f81a8 100644 --- a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/HelpersTest.kt +++ b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/HelpersTest.kt @@ -76,9 +76,10 @@ class HelpersTest { fun testRetrieveObjectsGCGracePeriodDeferredInvokesOnConnectedWithValue() { val adapter = getMockObjectsAdapter() val connManager = adapter.connectionManager + val connection = adapter.connection var value: Long? = null - every { adapter.connection.once(ConnectionEvent.connected, any()) } answers { + every { connection.once(ConnectionEvent.connected, any()) } answers { val listener = secondArg() connManager.setPrivateField("objectsGCGracePeriod", 456L) listener.onConnectionStateChanged(mockk(relaxed = true)) @@ -87,15 +88,16 @@ class HelpersTest { adapter.retrieveObjectsGCGracePeriod { v -> value = v } assertEquals(456L, value) - verify(exactly = 1) { adapter.connection.once(ConnectionEvent.connected, any()) } + verify(exactly = 1) { connection.once(ConnectionEvent.connected, any()) } } @Test fun testRetrieveObjectsGCGracePeriodDeferredInvokesOnConnectedWithNull() { val adapter = getMockObjectsAdapter() + val connection = adapter.connection var value: Long? = null - every { adapter.connection.once(ConnectionEvent.connected, any()) } answers { + every { connection.once(ConnectionEvent.connected, any()) } answers { val listener = secondArg() listener.onConnectionStateChanged(mockk(relaxed = true)) } @@ -103,7 +105,7 @@ class HelpersTest { adapter.retrieveObjectsGCGracePeriod { v -> value = v } assertNull(value) - verify(exactly = 1) { adapter.connection.once(ConnectionEvent.connected, any()) } + verify(exactly = 1) { connection.once(ConnectionEvent.connected, any()) } } @Test From 544c079ae33e0cc64b9f9e5bc2fb2f5453a88795 Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Fri, 29 Aug 2025 16:32:43 +0530 Subject: [PATCH 4/5] [ECO-5056] Refactored HelpersTest.kt assertions with concrete types --- .../io/ably/lib/objects/unit/HelpersTest.kt | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/HelpersTest.kt b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/HelpersTest.kt index ba86f81a8..4e69d0d8a 100644 --- a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/HelpersTest.kt +++ b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/HelpersTest.kt @@ -69,7 +69,7 @@ class HelpersTest { adapter.retrieveObjectsGCGracePeriod { v -> value = v } assertEquals(123L, value) - verify(exactly = 0) { adapter.connection.once(ConnectionEvent.connected, any()) } + verify(exactly = 0) { adapter.connection.once(ConnectionEvent.connected, any()) } } @Test @@ -79,7 +79,7 @@ class HelpersTest { val connection = adapter.connection var value: Long? = null - every { connection.once(ConnectionEvent.connected, any()) } answers { + every { connection.once(ConnectionEvent.connected, any()) } answers { val listener = secondArg() connManager.setPrivateField("objectsGCGracePeriod", 456L) listener.onConnectionStateChanged(mockk(relaxed = true)) @@ -88,7 +88,7 @@ class HelpersTest { adapter.retrieveObjectsGCGracePeriod { v -> value = v } assertEquals(456L, value) - verify(exactly = 1) { connection.once(ConnectionEvent.connected, any()) } + verify(exactly = 1) { connection.once(ConnectionEvent.connected, any()) } } @Test @@ -97,7 +97,7 @@ class HelpersTest { val connection = adapter.connection var value: Long? = null - every { connection.once(ConnectionEvent.connected, any()) } answers { + every { connection.once(ConnectionEvent.connected, any()) } answers { val listener = secondArg() listener.onConnectionStateChanged(mockk(relaxed = true)) } @@ -105,7 +105,7 @@ class HelpersTest { adapter.retrieveObjectsGCGracePeriod { v -> value = v } assertNull(value) - verify(exactly = 1) { connection.once(ConnectionEvent.connected, any()) } + verify(exactly = 1) { connection.once(ConnectionEvent.connected, any()) } } @Test @@ -277,7 +277,7 @@ class HelpersTest { every { adapter.getChannel("ch") } returns channel channel.state = ChannelState.attaching - every { channel.once(any()) } answers { + every { channel.once(any()) } answers { val listener = firstArg() val stateChange = mockk(relaxed = true) { setPrivateField("current", ChannelState.attached) @@ -286,7 +286,7 @@ class HelpersTest { } adapter.ensureAttached("ch") - verify(exactly = 1) { channel.once(any()) } + verify(exactly = 1) { channel.once(any()) } } @Test @@ -295,7 +295,7 @@ class HelpersTest { val channel = mockk(relaxed = true) every { adapter.getChannel("ch") } returns channel channel.state = ChannelState.attaching - every { channel.once(any()) } answers { + every { channel.once(any()) } answers { val listener = firstArg() val stateChange = mockk(relaxed = true) { setPrivateField("current", ChannelState.suspended) @@ -306,7 +306,7 @@ class HelpersTest { val ex = assertFailsWith { adapter.ensureAttached("ch") } assertEquals(ErrorCode.ChannelStateError.code, ex.errorInfo.code) assertTrue(ex.errorInfo.message.contains("Not attached")) - verify(exactly = 1) { channel.once(any()) } + verify(exactly = 1) { channel.once(any()) } } @Test From 9f8263ffd98fe9747b96e64a630a0d8c73efee0b Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Thu, 25 Sep 2025 17:57:13 +0530 Subject: [PATCH 5/5] [ECO-5056] Updated retrieveObjectsGCGracePeriod to return gcperiod every time CONNECTED message is received, updated tests for the same --- .../kotlin/io/ably/lib/objects/Helpers.kt | 19 ++++++---------- .../kotlin/io/ably/lib/objects/ObjectsPool.kt | 7 +++--- .../io/ably/lib/objects/unit/HelpersTest.kt | 22 +++++++++---------- 3 files changed, 22 insertions(+), 26 deletions(-) diff --git a/liveobjects/src/main/kotlin/io/ably/lib/objects/Helpers.kt b/liveobjects/src/main/kotlin/io/ably/lib/objects/Helpers.kt index 18e87e944..77e3cd2ab 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/objects/Helpers.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/objects/Helpers.kt @@ -3,7 +3,7 @@ package io.ably.lib.objects import io.ably.lib.realtime.ChannelState import io.ably.lib.realtime.CompletionListener import io.ably.lib.realtime.ConnectionEvent -import io.ably.lib.realtime.ConnectionState +import io.ably.lib.realtime.ConnectionStateListener import io.ably.lib.types.ChannelMode import io.ably.lib.types.ErrorInfo import io.ably.lib.types.ProtocolMessage @@ -49,19 +49,14 @@ internal suspend fun ObjectsAdapter.attachAsync(channelName: String) = suspendCa } } -internal fun ObjectsAdapter.retrieveObjectsGCGracePeriod(block : (Long?) -> Unit) { - connectionManager.objectsGCGracePeriod?.let { - block(it) - return - } - // If already connected, no further `connected` event is guaranteed; return immediately. - if (connection.state == ConnectionState.connected) { - block(connectionManager.objectsGCGracePeriod) - return - } - connection.once(ConnectionEvent.connected) { +internal fun ObjectsAdapter.onGCGracePeriodUpdated(block : (Long?) -> Unit) : ObjectsSubscription { + connectionManager.objectsGCGracePeriod?.let { block(it) } + // Return new objectsGCGracePeriod whenever connection state changes to connected + val listener: (_: ConnectionStateListener.ConnectionStateChange) -> Unit = { block(connectionManager.objectsGCGracePeriod) } + connection.on(ConnectionEvent.connected, listener) + return ObjectsSubscription { connection.off(listener) } } /** diff --git a/liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsPool.kt b/liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsPool.kt index 43cab31c2..224cd606f 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsPool.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsPool.kt @@ -52,14 +52,14 @@ internal class ObjectsPool( private val gcScope = CoroutineScope(Dispatchers.Default + SupervisorJob()) private var gcJob: Job // Job for the garbage collection coroutine - @Volatile - private var gcGracePeriod = ObjectsPoolDefaults.GC_GRACE_PERIOD_MS + @Volatile private var gcGracePeriod = ObjectsPoolDefaults.GC_GRACE_PERIOD_MS + private var gcPeriodSubscription: ObjectsSubscription init { // RTO3b - Initialize pool with root object pool[ROOT_OBJECT_ID] = DefaultLiveMap.zeroValue(ROOT_OBJECT_ID, realtimeObjects) // Start garbage collection coroutine with server-provided grace period if available - realtimeObjects.adapter.retrieveObjectsGCGracePeriod { period -> + gcPeriodSubscription = realtimeObjects.adapter.onGCGracePeriodUpdated { period -> period?.let { gcGracePeriod = it Log.i(tag, "Using objectsGCGracePeriod from server: $gcGracePeriod ms") @@ -164,6 +164,7 @@ internal class ObjectsPool( * Should be called when the pool is no longer needed. */ fun dispose() { + gcPeriodSubscription.unsubscribe() gcJob.cancel() gcScope.cancel() pool.clear() diff --git a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/HelpersTest.kt b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/HelpersTest.kt index 4e69d0d8a..e8edfdb79 100644 --- a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/HelpersTest.kt +++ b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/HelpersTest.kt @@ -60,52 +60,52 @@ class HelpersTest { } @Test - fun testRetrieveObjectsGCGracePeriodImmediateInvokesBlock() { + fun testOnGCGracePeriodImmediateInvokesBlock() { val adapter = getMockObjectsAdapter() val connManager = adapter.connectionManager connManager.setPrivateField("objectsGCGracePeriod", 123L) var value: Long? = null - adapter.retrieveObjectsGCGracePeriod { v -> value = v } + adapter.onGCGracePeriodUpdated { v -> value = v } assertEquals(123L, value) - verify(exactly = 0) { adapter.connection.once(ConnectionEvent.connected, any()) } + verify(exactly = 1) { adapter.connection.on(ConnectionEvent.connected, any()) } } @Test - fun testRetrieveObjectsGCGracePeriodDeferredInvokesOnConnectedWithValue() { + fun testOnGCGracePeriodDeferredInvokesOnConnectedWithValue() { val adapter = getMockObjectsAdapter() val connManager = adapter.connectionManager val connection = adapter.connection var value: Long? = null - every { connection.once(ConnectionEvent.connected, any()) } answers { + every { connection.on(ConnectionEvent.connected, any()) } answers { val listener = secondArg() connManager.setPrivateField("objectsGCGracePeriod", 456L) listener.onConnectionStateChanged(mockk(relaxed = true)) } - adapter.retrieveObjectsGCGracePeriod { v -> value = v } + adapter.onGCGracePeriodUpdated { v -> value = v } assertEquals(456L, value) - verify(exactly = 1) { connection.once(ConnectionEvent.connected, any()) } + verify(exactly = 1) { connection.on(ConnectionEvent.connected, any()) } } @Test - fun testRetrieveObjectsGCGracePeriodDeferredInvokesOnConnectedWithNull() { + fun testOnGCGracePeriodDeferredInvokesOnConnectedWithNull() { val adapter = getMockObjectsAdapter() val connection = adapter.connection var value: Long? = null - every { connection.once(ConnectionEvent.connected, any()) } answers { + every { connection.on(ConnectionEvent.connected, any()) } answers { val listener = secondArg() listener.onConnectionStateChanged(mockk(relaxed = true)) } - adapter.retrieveObjectsGCGracePeriod { v -> value = v } + adapter.onGCGracePeriodUpdated { v -> value = v } assertNull(value) - verify(exactly = 1) { connection.once(ConnectionEvent.connected, any()) } + verify(exactly = 1) { connection.on(ConnectionEvent.connected, any()) } } @Test